libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
acquire.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "arg_wrappers.hpp"
10#include "bucket.hpp"
11#include "context.hpp"
12#include "core.hpp"
13#include "errors.hpp"
14#include "introspect.hpp"
15#include "processor.hpp"
16
17#include <chrono>
18#include <condition_variable>
19#include <cstddef>
20#include <exception>
21#include <functional>
22#include <memory>
23#include <mutex>
24#include <optional>
25#include <span>
26#include <stdexcept>
27#include <utility>
28
29namespace tcspc {
30
36class acquire_access {
37 std::function<void()> halt_fn;
38
39 public:
41 template <typename Func>
42 explicit acquire_access(Func halt_func) : halt_fn(halt_func) {}
43
59 void halt() { halt_fn(); }
60};
61
62namespace internal {
63
64// The max sleep duration when a read doesn't fill a batch is chosen to be
65// short enough that (1) its effect is unnoticeable in a live display of the
66// data and (2) hardware buffers are unlikely to fill up if they started out
67// empty, given the buffer capacity and maximum count rates of typical devices.
68constexpr auto slow_acq_sleep = std::chrono::milliseconds(10);
69
70template <typename T, typename Reader, typename Downstream> class acquire {
71 static_assert(processor<Downstream, bucket<T>>);
72
73 Reader reader;
74 std::shared_ptr<bucket_source<T>> bsource;
75 std::size_t bsize;
76
77 std::mutex halt_mutex;
78 std::condition_variable halt_cv;
79 bool halted = false;
80
81 Downstream downstream;
82
83 // Cold data after downstream.
84 access_tracker<acquire_access> trk;
85
86 void halt() {
87 {
88 auto const lock = std::lock_guard(halt_mutex);
89 halted = true;
90 }
91 halt_cv.notify_one();
92 }
93
94 public:
95 explicit acquire(Reader reader,
96 std::shared_ptr<bucket_source<T>> buffer_provider,
97 arg::batch_size<std::size_t> batch_size,
98 access_tracker<acquire_access> tracker,
99 Downstream downstream)
100 : reader(std::move(reader)), bsource(std::move(buffer_provider)),
101 bsize(batch_size.value), downstream(std::move(downstream)),
102 trk(std::move(tracker)) {
103 if (not bsource)
104 throw std::invalid_argument(
105 "acquire buffer_provider must not be null");
106 if (bsize == 0)
107 throw std::invalid_argument("acquire batch size must be positive");
108
109 trk.register_access_factory([](auto &tracker) {
110 auto *self = LIBTCSPC_OBJECT_FROM_TRACKER(acquire, trk, tracker);
111 return acquire_access([self] { self->halt(); });
112 });
113 }
114
115 // Custom move because we have a mutex. Move only works when not running.
116 ~acquire() = default;
117
118 acquire(acquire const &) = delete;
119 auto operator=(acquire const &) = delete;
120
121 acquire(acquire &&other) noexcept
122 : reader(std::move(other.reader)), bsource(std::move(other.bsource)),
123 bsize(other.bsize), halted(other.halted),
124 downstream(std::move(other.downstream)), trk(std::move(other.trk)) {}
125
126 auto operator=(acquire &&rhs) noexcept -> acquire & {
127 reader = std::move(rhs.reader);
128 bsource = std::move(rhs.bsource);
129 bsize = rhs.bsize;
130 halted = rhs.halted;
131 downstream = std::move(rhs.downstream);
132 trk = std::move(rhs.trk);
133 return *this;
134 }
135
136 [[nodiscard]] auto introspect_node() const -> processor_info {
137 return processor_info(this, "acquire");
138 }
139
140 [[nodiscard]] auto introspect_graph() const -> processor_graph {
141 return downstream.introspect_graph().push_entry_point(this);
142 }
143
144 void flush() {
145 bucket<T> b;
146 bool reached_end = false;
147 {
148 auto lock = std::unique_lock(halt_mutex);
149 while (not halted) {
150 lock.unlock();
151 auto const start_time = std::chrono::steady_clock::now();
152 if (b.empty())
153 b = bsource->bucket_of_size(bsize);
154 std::optional<std::size_t> const read = reader(std::span(b));
155 if (not read) {
156 reached_end = true;
157 break;
158 }
159 if (*read > 0) {
160 b.shrink(0, *read);
161 downstream.handle(std::move(b));
162 b = {};
163 }
164 lock.lock();
165 if (*read < bsize) { // Not enough data to fill the batch.
166 halt_cv.wait_until(lock, start_time + slow_acq_sleep,
167 [&] { return halted; });
168 }
169 }
170 }
171 if (reached_end)
172 downstream.flush();
173 else
174 throw acquisition_halted();
175 }
176};
177
178template <typename T, typename Reader, typename LiveDownstream,
179 typename BatchDownstream>
180class acquire_full_buckets {
181 static_assert(processor<LiveDownstream, bucket<T const>>);
182 static_assert(processor<BatchDownstream, bucket<T>>);
183
184 Reader reader;
185 std::shared_ptr<bucket_source<T>> bsource;
186 std::size_t bsize;
187
188 std::mutex halt_mutex;
189 std::condition_variable halt_cv;
190 bool halted = false;
191
192 LiveDownstream live_downstream;
193 BatchDownstream batch_downstream;
194
195 // Cold data after downstream.
196 access_tracker<acquire_access> trk;
197
198 void halt() {
199 {
200 auto const lock = std::lock_guard(halt_mutex);
201 halted = true;
202 }
203 halt_cv.notify_one();
204 }
205
206 // Mutates 'b' only when throwing.
207 void emit_live(bucket<T> &b, std::size_t start, std::size_t count) {
208 if (count > 0) {
209 try {
210 auto v = bsource->shared_view_of(b);
211 v.shrink(start, count);
212 live_downstream.handle(std::move(v));
213 } catch (end_of_processing const &) {
214 b.shrink(0, start + count);
215 batch_downstream.handle(std::move(b));
216 batch_downstream.flush();
217 throw;
218 }
219 }
220 }
221
222 void emit_batch(bucket<T> &&b) {
223 try {
224 batch_downstream.handle(std::move(b));
225 } catch (end_of_processing const &) {
226 live_downstream.flush();
227 throw;
228 }
229 }
230
231 void flush_downstreams(bucket<T> &&b, std::size_t filled) {
232 std::exception_ptr end;
233 try {
234 live_downstream.flush();
235 } catch (end_of_processing const &) {
236 end = std::current_exception();
237 }
238 if (not b.empty() && filled > 0) {
239 b.shrink(0, filled);
240 batch_downstream.handle(std::move(b));
241 }
242 batch_downstream.flush();
243 if (end)
244 std::rethrow_exception(end);
245 }
246
247 public:
248 explicit acquire_full_buckets(
249 Reader reader, std::shared_ptr<bucket_source<T>> buffer_provider,
250 arg::batch_size<std::size_t> batch_size,
251 access_tracker<acquire_access> tracker, LiveDownstream live_downstream,
252 BatchDownstream batch_downstream)
253 : reader(std::move(reader)), bsource(std::move(buffer_provider)),
254 bsize(batch_size.value), live_downstream(std::move(live_downstream)),
255 batch_downstream(std::move(batch_downstream)),
256 trk(std::move(tracker)) {
257 if (not bsource)
258 throw std::invalid_argument(
259 "acquire_full_buckets buffer_provider must not be null");
260 if constexpr (not std::is_same_v<LiveDownstream, null_sink>) {
261 if (not bsource->supports_shared_views())
262 throw std::invalid_argument(
263 "acquire_full_buckets buffer_provider must support shared views");
264 }
265 if (bsize == 0)
266 throw std::invalid_argument(
267 "acquire_full_buckets batch size must be positive");
268
269 trk.register_access_factory([](auto &tracker) {
270 auto *self = LIBTCSPC_OBJECT_FROM_TRACKER(acquire_full_buckets,
271 trk, tracker);
272 return acquire_access([self] { self->halt(); });
273 });
274 }
275
276 // Custom move because we have a mutex. Move only works when not running.
277 ~acquire_full_buckets() = default;
278
279 acquire_full_buckets(acquire_full_buckets const &) = delete;
280 auto operator=(acquire_full_buckets const &) = delete;
281
282 acquire_full_buckets(acquire_full_buckets &&other) noexcept
283 : reader(std::move(other.reader)), bsource(std::move(other.bsource)),
284 bsize(other.bsize), halted(other.halted),
285 live_downstream(std::move(other.live_downstream)),
286 batch_downstream(std::move(other.batch_downstream)),
287 trk(std::move(other.trk)) {}
288
289 auto operator=(acquire_full_buckets &&rhs) noexcept
290 -> acquire_full_buckets & {
291 reader = std::move(rhs.reader);
292 bsource = std::move(rhs.bsource);
293 bsize = rhs.bsize;
294 halted = rhs.halted;
295 live_downstream = std::move(rhs.live_downstream);
296 batch_downstream = std::move(rhs.batch_downstream);
297 trk = std::move(rhs.trk);
298 return *this;
299 }
300
301 [[nodiscard]] auto introspect_node() const -> processor_info {
302 return processor_info(this, "acquire_full_buckets");
303 }
304
305 [[nodiscard]] auto introspect_graph() const -> processor_graph {
307 live_downstream.introspect_graph().push_entry_point(this),
308 batch_downstream.introspect_graph().push_entry_point(this));
309 }
310
311 void flush() {
312 bucket<T> b;
313 std::size_t filled = 0;
314 {
315 auto lock = std::unique_lock(halt_mutex);
316 while (not halted) {
317 lock.unlock();
318 auto const start_time = std::chrono::steady_clock::now();
319 if (b.empty()) {
320 b = bsource->bucket_of_size(bsize);
321 filled = 0;
322 }
323 auto const unfilled = std::span(b).subspan(filled);
324 std::optional<std::size_t> const read = reader(unfilled);
325 if (not read)
326 return flush_downstreams(std::move(b), filled);
327 if constexpr (not std::is_same_v<LiveDownstream, null_sink>)
328 emit_live(b, filled, *read);
329 filled += *read;
330 if (filled == bsize) {
331 emit_batch(std::move(b));
332 b = {};
333 }
334 lock.lock();
335 if (filled < bsize) {
336 halt_cv.wait_until(lock, start_time + slow_acq_sleep,
337 [&] { return halted; });
338 }
339 }
340 }
341 throw acquisition_halted();
342 }
343};
344
345} // namespace internal
346
385template <typename T, typename Reader, typename Downstream>
386auto acquire(Reader reader, std::shared_ptr<bucket_source<T>> buffer_provider,
388 access_tracker<acquire_access> tracker, Downstream downstream) {
389 return internal::acquire<T, Reader, Downstream>(
390 std::move(reader), std::move(buffer_provider), batch_size,
391 std::move(tracker), std::move(downstream));
392}
393
450template <typename T, typename Reader, typename LiveDownstream,
451 typename BatchDownstream>
452auto acquire_full_buckets(Reader reader,
453 std::shared_ptr<bucket_source<T>> buffer_provider,
456 LiveDownstream live_downstream,
457 BatchDownstream batch_downstream) {
458 return internal::acquire_full_buckets<T, Reader, LiveDownstream,
459 BatchDownstream>(
460 std::move(reader), std::move(buffer_provider), batch_size,
461 std::move(tracker), std::move(live_downstream),
462 std::move(batch_downstream));
463}
464
470template <typename T> struct null_reader {
472 auto operator()(std::span<T> /* buffer */) -> std::optional<std::size_t> {
473 return std::nullopt;
474 }
475};
476
482template <typename T> struct stuck_reader {
484 auto operator()(std::span<T> /* buffer */) -> std::optional<std::size_t> {
485 return 0;
486 }
487};
488
489} // namespace tcspc
Tracker that mediates access to objects via a tcspc::context.
Definition context.hpp:39
void halt()
Halt the acquisition: stop reading further data.
Definition acquire.hpp:59
#define LIBTCSPC_OBJECT_FROM_TRACKER(obj_type, tracker_field_name, tracker)
Recover the object address from a tcspc::access_tracker embedded in the object.
Definition context.hpp:253
auto merge_processor_graphs(processor_graph const &a, processor_graph const &b) -> processor_graph
Create a new processor graph by merging two existing ones.
Definition introspect.hpp:357
auto acquire(Reader reader, std::shared_ptr< bucket_source< T > > buffer_provider, arg::batch_size< std::size_t > batch_size, access_tracker< acquire_access > tracker, Downstream downstream)
Create a processor that acquires data into buckets.
Definition acquire.hpp:386
auto acquire_full_buckets(Reader reader, std::shared_ptr< bucket_source< T > > buffer_provider, arg::batch_size< std::size_t > batch_size, access_tracker< acquire_access > tracker, LiveDownstream live_downstream, BatchDownstream batch_downstream)
Create a processor that acquires data into buckets, ensuring that each bucket is filled to a fixed si...
Definition acquire.hpp:452
auto count(access_tracker< count_access > &&tracker, Downstream downstream)
Create a processor that counts events of a given type.
Definition count.hpp:313
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for batch size parameter.
Definition arg_wrappers.hpp:47
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505
Acquisition reader that reads an empty stream.
Definition acquire.hpp:470
auto operator()(std::span< T >) -> std::optional< std::size_t >
Implements the acquisition reader requirement.
Definition acquire.hpp:472
Acquisition reader that waits indefinitely without producing data.
Definition acquire.hpp:482
auto operator()(std::span< T >) -> std::optional< std::size_t >
Implements the acquisition reader requirement.
Definition acquire.hpp:484