9#include "arg_wrappers.hpp"
13#include "data_types.hpp"
15#include "histogram_events.hpp"
16#include "histogram_impl.hpp"
17#include "histogram_policy.hpp"
18#include "introspect.hpp"
19#include "processor.hpp"
35class scan_histograms {
38 static constexpr bool emit_concluding =
41 static constexpr bool reset_after_scan =
44 static constexpr bool clear_every_scan =
47 static constexpr bool clear_new_bucket =
52 processor<Downstream, histogram_array_progress_event<DataTypes>,
53 histogram_array_event<DataTypes>>);
54 static_assert(std::is_same_v<ResetEvent, never_event> ||
55 handler_for<Downstream, ResetEvent>);
57 handler_for<Downstream, warning_event>);
61 handler_for<Downstream, concluding_histogram_array_event<DataTypes>>);
69 using internal_overflow_policy = std::conditional_t<
71 saturate_on_internal_overflow, stop_on_internal_overflow>;
73 using bin_index_type =
typename DataTypes::bin_index_type;
74 using bin_type =
typename DataTypes::bin_type;
77 std::conditional_t<emit_concluding ||
80 bin_increment_cluster_journal<bin_index_type>,
81 null_journal<bin_index_type>>;
83 std::shared_ptr<bucket_source<bin_type>> bsource;
84 bucket<bin_type> hist_bucket;
85 multi_histogram_accumulation<bin_index_type, bin_type,
86 internal_overflow_policy>
88 bool saturate_warning_issued =
false;
91 Downstream downstream;
93 LIBTCSPC_NOINLINE
void start_new_round() {
94 auto const size = mhista.num_elements() * mhista.num_bins();
95 hist_bucket = bsource->bucket_of_size(size);
96 if constexpr (clear_new_bucket)
97 std::fill(hist_bucket.begin(), hist_bucket.end(), bin_type{0});
98 mhista =
decltype(mhista)(hist_bucket, mhista, not clear_new_bucket);
101 void reset_without_replay() {
102 if (hist_bucket.empty())
104 if constexpr (emit_concluding) {
105 mhista.roll_back_current_scan(journal);
106 downstream.handle(concluding_histogram_array_event<DataTypes>{
107 std::move(hist_bucket)});
111 if constexpr (overflow_policy ==
113 saturate_warning_issued =
false;
117 mhista.new_scan(journal, clear_every_scan);
118 auto const array_event = histogram_array_event<DataTypes>{
120 downstream.handle(array_event);
121 if constexpr (reset_after_scan)
122 reset_without_replay();
125 [[noreturn]] LIBTCSPC_NOINLINE
void overflow_error() {
126 throw histogram_overflow_error(
"histogram array bin overflowed");
129 [[noreturn]] LIBTCSPC_NOINLINE
void overflow_stop() {
130 if constexpr (emit_concluding) {
131 mhista.roll_back_current_scan(journal);
132 downstream.handle(concluding_histogram_array_event<DataTypes>{
133 std::move(hist_bucket)});
136 throw end_of_processing(
"histogram array bin overflowed");
139 LIBTCSPC_NOINLINE
void saturated_warning() {
140 downstream.handle(warning_event{
"histogram array bin saturated"});
141 saturate_warning_issued =
true;
144 template <
typename DT>
145 LIBTCSPC_NOINLINE
void
146 overflow_reset(bin_increment_cluster_event<DT>
const &event) {
147 if (mhista.scan_index() == 0)
148 throw histogram_overflow_error(
149 "histogram array bin overflowed on first scan");
150 mhista.roll_back_current_scan(journal);
151 if constexpr (emit_concluding)
152 downstream.handle(concluding_histogram_array_event<DataTypes>{
153 std::move(hist_bucket)});
155 mhista.replay(journal);
157 return handle(event);
161 explicit scan_histograms(
162 arg::num_elements<std::size_t> num_elements,
163 arg::num_bins<std::size_t> num_bins,
164 arg::max_per_bin<typename DataTypes::bin_type> max_per_bin,
165 std::shared_ptr<bucket_source<typename DataTypes::bin_type>>
167 Downstream downstream)
168 : bsource(std::move(buffer_provider)),
169 mhista(hist_bucket, max_per_bin, num_bins, num_elements, true),
170 downstream(std::move(downstream)) {
171 if (num_elements.value == 0)
172 throw std::invalid_argument(
173 "scan_histograms must have at least 1 element");
174 if (num_bins.value == 0)
175 throw std::invalid_argument(
176 "scan_histograms must have at least 1 bin per element");
177 if (max_per_bin.value < 0)
178 throw std::invalid_argument(
179 "scan_histograms max_per_bin must not be negative");
182 [[nodiscard]]
auto introspect_node() const -> processor_info {
183 return processor_info(
this,
"scan_histograms");
186 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
187 return downstream.introspect_graph().push_entry_point(
this);
190 template <
typename DT>
191 void handle(bin_increment_cluster_event<DT>
const &event) {
192 static_assert(std::is_same_v<
typename DT::bin_index_type,
193 typename DataTypes::bin_index_type>);
194 if (hist_bucket.empty())
196 auto const element_index = mhista.next_element_index();
197 if (not mhista.apply_increment_cluster(event.bin_indices, journal)) {
198 if constexpr (overflow_policy ==
201 }
else if constexpr (overflow_policy ==
204 }
else if constexpr (overflow_policy ==
206 if (not saturate_warning_issued)
208 }
else if constexpr (overflow_policy ==
210 return overflow_reset(event);
214 auto const progress = histogram_array_progress_event<DataTypes>{
215 (element_index + 1) * mhista.num_bins(),
217 downstream.handle(progress);
219 if (mhista.is_scan_complete())
224 template <
typename DT>
225 void handle(bin_increment_cluster_event<DT> &&event) {
226 handle(
static_cast<bin_increment_cluster_event<DT>
const &
>(event));
230 template <
typename E>
231 requires handler_for<Downstream, std::remove_cvref_t<E>>
232 void handle(E &&event) {
233 if constexpr (std::is_convertible_v<std::remove_cvref_t<E>,
235 reset_without_replay();
236 downstream.handle(std::forward<E>(event));
239 void flush() { downstream.flush(); }
356 Downstream downstream) {
357 return internal::scan_histograms<Policy, ResetEvent, DataTypes,
359 num_elements, num_bins, max_per_bin, std::move(buffer_provider),
360 std::move(downstream));
auto ad_hoc_bucket(std::span< T > s) -> bucket< T >
Create a tcspc::bucket referencing a span.
Definition bucket.hpp:489
histogram_policy
Histogramming policy specifying behavior.
Definition histogram_policy.hpp:29
@ stop_on_overflow
Treat a histogram bin overflow as end of processing.
Definition histogram_policy.hpp:53
@ saturate_on_overflow
Ignore increments that would cause a bin overflow.
Definition histogram_policy.hpp:61
@ reset_on_overflow
Perform a reset when a histogram bin is about to overflow.
Definition histogram_policy.hpp:79
@ overflow_mask
Bitmask for overflow behavior.
Definition histogram_policy.hpp:88
@ reset_after_scan
Automatically reset when the end of a scan has been reached.
Definition histogram_policy.hpp:123
@ no_clear_new_bucket
Do not zero-fill the histogram array at the beginning of a round of accumulation.
Definition histogram_policy.hpp:158
@ emit_concluding_events
Enable generation of tcspc::concluding_histogram_array_event.
Definition histogram_policy.hpp:108
@ default_policy
Default policy with no bit set: equal to error_on_overflow.
Definition histogram_policy.hpp:33
@ clear_every_scan
Clear element histograms before applying bin increment batches, during every scan.
Definition histogram_policy.hpp:140
@ error_on_overflow
Treat a histogram bin overflow as an error.
Definition histogram_policy.hpp:43
auto scan_histograms(arg::num_elements< std::size_t > num_elements, arg::num_bins< std::size_t > num_bins, arg::max_per_bin< typename DataTypes::bin_type > max_per_bin, std::shared_ptr< bucket_source< typename DataTypes::bin_type > > buffer_provider, Downstream downstream)
Create a processor that scans over an array of histograms, updating each with the received bin increm...
Definition scan_histograms.hpp:350
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for maximum bin value.
Definition arg_wrappers.hpp:287
Function argument wrapper for number of bins parameter.
Definition arg_wrappers.hpp:327
Function argument wrapper for number of elements parameter.
Definition arg_wrappers.hpp:337
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505
The default data type set.
Definition data_types.hpp:24
An event type whose instances never occur.
Definition core.hpp:54