9#include "arg_wrappers.hpp"
10#include "bin_increment_cluster_encoding.hpp"
13#include "data_types.hpp"
14#include "histogram_events.hpp"
15#include "introspect.hpp"
16#include "processor.hpp"
31template <
typename BinIndex>
32class batch_bin_increment_clusters_encoding_adapter {
33 std::reference_wrapper<bucket<BinIndex>> bkt;
37 explicit batch_bin_increment_clusters_encoding_adapter(
38 bucket<BinIndex> &storage, std::size_t &usage)
39 : bkt(storage), siz(&usage) {}
41 [[nodiscard]]
auto available_capacity() const -> std::
size_t {
42 return bkt.get().size() - *siz;
45 [[nodiscard]]
auto make_space(std::size_t size) -> std::span<BinIndex> {
46 assert(size <= available_capacity());
47 auto const old_size = *siz;
49 return bkt.get().subspan(old_size, size);
53template <
typename DataTypes,
typename Downstream>
54class batch_bin_increment_clusters {
55 using bin_index_type =
typename DataTypes::bin_index_type;
56 static_assert(processor<Downstream, bucket<bin_index_type>>);
58 std::shared_ptr<bucket_source<bin_index_type>> bsource;
60 bucket<bin_index_type> cur_batch;
61 std::size_t bucket_used_size = 0;
62 std::size_t cur_batch_size = 0;
64 std::size_t batch_siz;
66 Downstream downstream;
68 void emit_cur_batch() {
69 if (cur_batch_size > 0) {
70 cur_batch.shrink(0, bucket_used_size);
71 downstream.handle(std::move(cur_batch));
79 explicit batch_bin_increment_clusters(
80 std::shared_ptr<bucket_source<typename DataTypes::bin_index_type>>
82 arg::bucket_size<std::size_t> bucket_size,
83 arg::batch_size<std::size_t> batch_size, Downstream downstream)
84 : bsource(std::move(buffer_provider)), bkt_siz(bucket_size.value),
85 batch_siz(batch_size.value), downstream(std::move(downstream)) {}
87 [[nodiscard]]
auto introspect_node() const -> processor_info {
88 return processor_info(
this,
"batch_bin_increment_clusters");
91 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
92 return downstream.introspect_graph().push_entry_point(
this);
95 template <
typename DT>
96 void handle(bin_increment_cluster_event<DT>
const &event) {
97 static_assert(std::is_same_v<
typename DT::bin_index_type,
98 typename DataTypes::bin_index_type>);
99 std::size_t
const encoded_size = encoded_bin_increment_cluster_size<
100 typename DataTypes::bin_index_type>(
event.bin_indices.size());
101 if (encoded_size > bkt_siz - bucket_used_size) {
111 if (encoded_size > bkt_siz) {
112 auto single_cluster_batch =
113 bsource->bucket_of_size(encoded_size);
114 std::size_t usage = 0;
115 [[maybe_unused]]
bool const did_fit =
116 encode_bin_increment_cluster(
117 batch_bin_increment_clusters_encoding_adapter(
118 single_cluster_batch, usage),
119 std::span(event.bin_indices));
121 assert(usage == encoded_size);
122 downstream.handle(std::move(single_cluster_batch));
127 if (cur_batch.empty())
128 cur_batch = bsource->bucket_of_size(bkt_siz);
129 [[maybe_unused]]
bool const did_fit = encode_bin_increment_cluster(
130 batch_bin_increment_clusters_encoding_adapter(cur_batch,
132 std::span(event.bin_indices));
136 if (cur_batch_size == batch_siz)
141 template <
typename DT>
142 void handle(bin_increment_cluster_event<DT> &&event) {
143 handle(
static_cast<bin_increment_cluster_event<DT>
const &
>(event));
147 template <
typename Event>
148 requires handler_for<Downstream, std::remove_cvref_t<Event>>
149 void handle(Event &&event) {
150 downstream.handle(std::forward<Event>(event));
159template <
typename DataTypes,
typename Downstream>
160class unbatch_bin_increment_clusters {
161 using bin_index_type =
typename DataTypes::bin_index_type;
163 processor<Downstream, bin_increment_cluster_event<DataTypes>>);
165 Downstream downstream;
168 explicit unbatch_bin_increment_clusters(Downstream downstream)
169 : downstream(std::move(downstream)) {}
171 [[nodiscard]]
auto introspect_node() const -> processor_info {
172 return processor_info(
this,
"unbatch_bin_increment_clusters");
175 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
176 return downstream.introspect_graph().push_entry_point(
this);
179 template <
typename Event>
181 std::convertible_to<std::remove_cvref_t<Event>,
182 bucket<typename DataTypes::bin_index_type>> or
184 std::remove_cvref_t<Event>,
185 bucket<typename DataTypes::bin_index_type const>>)
186 void handle(Event &&event) {
187 bin_increment_cluster_decoder<bin_index_type>
const decoder(event);
188 for (
auto const cluster_span : decoder) {
193 auto const mut_span = std::span<bin_index_type>(
194 const_cast<bin_index_type *
>(cluster_span.data()),
195 cluster_span.size());
196 bin_increment_cluster_event<DataTypes>
const e{
198 downstream.handle(e);
202 template <
typename Event>
203 requires(not std::convertible_to<
204 std::remove_cvref_t<Event>,
205 bucket<typename DataTypes::bin_index_type>> and
206 not std::convertible_to<
207 std::remove_cvref_t<Event>,
208 bucket<typename DataTypes::bin_index_type const>> and
209 handler_for<Downstream, std::remove_cvref_t<Event>>)
210 void handle(Event &&event) {
211 downstream.handle(std::forward<Event>(event));
214 void flush() { downstream.flush(); }
259template <
typename DataTypes = default_data_types,
typename Downstream>
265 return internal::batch_bin_increment_clusters<DataTypes, Downstream>(
266 std::move(buffer_provider), bucket_size, batch_size,
267 std::move(downstream));
296template <
typename DataTypes = default_data_types,
typename Downstream>
298 return internal::unbatch_bin_increment_clusters<DataTypes, Downstream>(
299 std::move(downstream));
auto ad_hoc_bucket(std::span< T > s) -> bucket< T >
Create a tcspc::bucket referencing a span.
Definition bucket.hpp:489
auto unbatch_bin_increment_clusters(Downstream downstream)
Create a processor that splits encoded batches of bin increment clusters into individual clusters.
Definition batch_unbatch_bin_increment_clusters.hpp:297
auto batch_bin_increment_clusters(std::shared_ptr< bucket_source< typename DataTypes::bin_index_type > > buffer_provider, arg::bucket_size< std::size_t > bucket_size, arg::batch_size< std::size_t > batch_size, Downstream downstream)
Create a processor that collects bin increment clusters into encoded batches.
Definition batch_unbatch_bin_increment_clusters.hpp:260
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for batch size parameter.
Definition arg_wrappers.hpp:47
Function argument wrapper for bucket size.
Definition arg_wrappers.hpp:67
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505