libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
batch_unbatch_bin_increment_clusters.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "arg_wrappers.hpp"
10#include "bin_increment_cluster_encoding.hpp"
11#include "bucket.hpp"
12#include "common.hpp"
13#include "data_types.hpp"
14#include "histogram_events.hpp"
15#include "introspect.hpp"
16#include "processor.hpp"
17
18#include <cassert>
19#include <cstddef>
20#include <functional>
21#include <memory>
22#include <span>
23#include <type_traits>
24#include <utility>
25
26namespace tcspc {
27
28namespace internal {
29
30// Helper for batch_bin_increment_clusters.
31template <typename BinIndex>
32class batch_bin_increment_clusters_encoding_adapter {
33 std::reference_wrapper<bucket<BinIndex>> bkt;
34 std::size_t *siz;
35
36 public:
37 explicit batch_bin_increment_clusters_encoding_adapter(
38 bucket<BinIndex> &storage, std::size_t &usage)
39 : bkt(storage), siz(&usage) {}
40
41 [[nodiscard]] auto available_capacity() const -> std::size_t {
42 return bkt.get().size() - *siz;
43 }
44
45 [[nodiscard]] auto make_space(std::size_t size) -> std::span<BinIndex> {
46 assert(size <= available_capacity());
47 auto const old_size = *siz;
48 *siz += size;
49 return bkt.get().subspan(old_size, size);
50 }
51};
52
53template <typename DataTypes, typename Downstream>
54class batch_bin_increment_clusters {
55 using bin_index_type = typename DataTypes::bin_index_type;
56 static_assert(processor<Downstream, bucket<bin_index_type>>);
57
58 std::shared_ptr<bucket_source<bin_index_type>> bsource;
59
60 bucket<bin_index_type> cur_batch;
61 std::size_t bucket_used_size = 0;
62 std::size_t cur_batch_size = 0;
63 std::size_t bkt_siz;
64 std::size_t batch_siz;
65
66 Downstream downstream;
67
68 void emit_cur_batch() {
69 if (cur_batch_size > 0) {
70 cur_batch.shrink(0, bucket_used_size);
71 downstream.handle(std::move(cur_batch));
72 }
73 cur_batch = {};
74 bucket_used_size = 0;
75 cur_batch_size = 0;
76 }
77
78 public:
79 explicit batch_bin_increment_clusters(
80 std::shared_ptr<bucket_source<typename DataTypes::bin_index_type>>
81 buffer_provider,
82 arg::bucket_size<std::size_t> bucket_size,
83 arg::batch_size<std::size_t> batch_size, Downstream downstream)
84 : bsource(std::move(buffer_provider)), bkt_siz(bucket_size.value),
85 batch_siz(batch_size.value), downstream(std::move(downstream)) {}
86
87 [[nodiscard]] auto introspect_node() const -> processor_info {
88 return processor_info(this, "batch_bin_increment_clusters");
89 }
90
91 [[nodiscard]] auto introspect_graph() const -> processor_graph {
92 return downstream.introspect_graph().push_entry_point(this);
93 }
94
95 template <typename DT>
96 void handle(bin_increment_cluster_event<DT> const &event) {
97 static_assert(std::is_same_v<typename DT::bin_index_type,
98 typename DataTypes::bin_index_type>);
99 std::size_t const encoded_size = encoded_bin_increment_cluster_size<
100 typename DataTypes::bin_index_type>(event.bin_indices.size());
101 if (encoded_size > bkt_siz - bucket_used_size) { // Won't fit.
102 emit_cur_batch();
103
104 // If the cluster will not fit in a single default-sized bucket,
105 // emit a dedicated batch. We do not attempt to minimize internal
106 // fragmentation (i.e., waste of remaining bucket capacity) under
107 // conditions where clusters take up a significant fraction of the
108 // default bucket size; users should avoid operating in a regime
109 // where that happens frequently (though the degradation is only in
110 // performance, not correctness).
111 if (encoded_size > bkt_siz) {
112 auto single_cluster_batch =
113 bsource->bucket_of_size(encoded_size);
114 std::size_t usage = 0;
115 [[maybe_unused]] bool const did_fit =
116 encode_bin_increment_cluster(
117 batch_bin_increment_clusters_encoding_adapter(
118 single_cluster_batch, usage),
119 std::span(event.bin_indices));
120 assert(did_fit);
121 assert(usage == encoded_size);
122 downstream.handle(std::move(single_cluster_batch));
123 return;
124 }
125 }
126
127 if (cur_batch.empty())
128 cur_batch = bsource->bucket_of_size(bkt_siz);
129 [[maybe_unused]] bool const did_fit = encode_bin_increment_cluster(
130 batch_bin_increment_clusters_encoding_adapter(cur_batch,
131 bucket_used_size),
132 std::span(event.bin_indices));
133 assert(did_fit);
134
135 ++cur_batch_size;
136 if (cur_batch_size == batch_siz)
137 emit_cur_batch();
138 }
139
140 // NOLINTBEGIN(cppcoreguidelines-rvalue-reference-param-not-moved)
141 template <typename DT>
142 void handle(bin_increment_cluster_event<DT> &&event) {
143 handle(static_cast<bin_increment_cluster_event<DT> const &>(event));
144 }
145 // NOLINTEND(cppcoreguidelines-rvalue-reference-param-not-moved)
146
147 template <typename Event>
148 requires handler_for<Downstream, std::remove_cvref_t<Event>>
149 void handle(Event &&event) {
150 downstream.handle(std::forward<Event>(event));
151 }
152
153 void flush() {
154 emit_cur_batch();
155 downstream.flush();
156 }
157};
158
159template <typename DataTypes, typename Downstream>
160class unbatch_bin_increment_clusters {
161 using bin_index_type = typename DataTypes::bin_index_type;
162 static_assert(
163 processor<Downstream, bin_increment_cluster_event<DataTypes>>);
164
165 Downstream downstream;
166
167 public:
168 explicit unbatch_bin_increment_clusters(Downstream downstream)
169 : downstream(std::move(downstream)) {}
170
171 [[nodiscard]] auto introspect_node() const -> processor_info {
172 return processor_info(this, "unbatch_bin_increment_clusters");
173 }
174
175 [[nodiscard]] auto introspect_graph() const -> processor_graph {
176 return downstream.introspect_graph().push_entry_point(this);
177 }
178
179 template <typename Event>
180 requires(
181 std::convertible_to<std::remove_cvref_t<Event>,
182 bucket<typename DataTypes::bin_index_type>> or
183 std::convertible_to<
184 std::remove_cvref_t<Event>,
185 bucket<typename DataTypes::bin_index_type const>>)
186 void handle(Event &&event) {
187 bin_increment_cluster_decoder<bin_index_type> const decoder(event);
188 for (auto const cluster_span : decoder) {
189 // The cluster_span is a span<T const>, but we want bucket<T>,
190 // not bucket<T const>. Casting is safe because
191 // `ad_hoc_bucket<T>` emitted as const lvalue reference does
192 // not allow mutation of the referred data.
193 auto const mut_span = std::span<bin_index_type>(
194 const_cast<bin_index_type *>(cluster_span.data()),
195 cluster_span.size());
196 bin_increment_cluster_event<DataTypes> const e{
197 ad_hoc_bucket(mut_span)};
198 downstream.handle(e);
199 }
200 }
201
202 template <typename Event>
203 requires(not std::convertible_to<
204 std::remove_cvref_t<Event>,
205 bucket<typename DataTypes::bin_index_type>> and
206 not std::convertible_to<
207 std::remove_cvref_t<Event>,
208 bucket<typename DataTypes::bin_index_type const>> and
209 handler_for<Downstream, std::remove_cvref_t<Event>>)
210 void handle(Event &&event) {
211 downstream.handle(std::forward<Event>(event));
212 }
213
214 void flush() { downstream.flush(); }
215};
216
217} // namespace internal
218
259template <typename DataTypes = default_data_types, typename Downstream>
262 buffer_provider,
264 arg::batch_size<std::size_t> batch_size, Downstream downstream) {
265 return internal::batch_bin_increment_clusters<DataTypes, Downstream>(
266 std::move(buffer_provider), bucket_size, batch_size,
267 std::move(downstream));
268}
269
296template <typename DataTypes = default_data_types, typename Downstream>
297auto unbatch_bin_increment_clusters(Downstream downstream) {
298 return internal::unbatch_bin_increment_clusters<DataTypes, Downstream>(
299 std::move(downstream));
300}
301
302} // namespace tcspc
auto ad_hoc_bucket(std::span< T > s) -> bucket< T >
Create a tcspc::bucket referencing a span.
Definition bucket.hpp:489
auto unbatch_bin_increment_clusters(Downstream downstream)
Create a processor that splits encoded batches of bin increment clusters into individual clusters.
Definition batch_unbatch_bin_increment_clusters.hpp:297
auto batch_bin_increment_clusters(std::shared_ptr< bucket_source< typename DataTypes::bin_index_type > > buffer_provider, arg::bucket_size< std::size_t > bucket_size, arg::batch_size< std::size_t > batch_size, Downstream downstream)
Create a processor that collects bin increment clusters into encoded batches.
Definition batch_unbatch_bin_increment_clusters.hpp:260
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for batch size parameter.
Definition arg_wrappers.hpp:47
Function argument wrapper for bucket size.
Definition arg_wrappers.hpp:67
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505