libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
buffer.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "arg_wrappers.hpp"
10#include "common.hpp"
11#include "context.hpp"
12#include "errors.hpp"
13#include "introspect.hpp"
14#include "processor.hpp"
15#include "vector_queue.hpp"
16
17#include <algorithm>
18#include <chrono>
19#include <condition_variable>
20#include <cstddef>
21#include <functional>
22#include <mutex>
23#include <stdexcept>
24#include <type_traits>
25#include <utility>
26
27namespace tcspc {
28
29namespace internal {
30
31// Avoid std::hardware_destructive_interference_size, because it suffers from
32// ABI compatibility requirements and therefore may not have the best value
33// (for example, it seems to be 256 on Linux/aarch64). Instead, just default to
34// 64 (correct for most x86_64 and many ARM and RISC-V processors) except in
35// known cases where a larger value is appropriate.
36#if (defined(__APPLE__) && defined(__arm64__)) || \
37 (defined(__powerpc64__) || defined(__ppc64__) || defined(__PPC64__))
38inline constexpr std::size_t destructive_interference_size = 128;
39#else
40inline constexpr std::size_t destructive_interference_size = 64;
41#endif
42
43} // namespace internal
44
51class buffer_access {
52 std::function<void()> halt_fn;
53 std::function<void()> pump_fn;
54
55 public:
57 template <typename HaltFunc, typename PumpFunc>
58 explicit buffer_access(HaltFunc halt_func, PumpFunc pump_func)
59 : halt_fn(halt_func), pump_fn(pump_func) {}
60
83 void halt() noexcept { halt_fn(); } // NOLINT(bugprone-exception-escape)
84
108 void pump() { pump_fn(); }
109};
110
111namespace internal {
112
113template <typename Event, bool LatencyLimited, typename Downstream>
114class buffer {
115 static_assert(processor<Downstream, Event>);
116
117 using clock_type = std::chrono::steady_clock;
118 using queue_type = vector_queue<Event>;
119
120 std::size_t threshold;
121 clock_type::duration max_latency;
122
123 std::mutex mutex;
124 std::condition_variable has_data_condition;
125 queue_type shared_queue;
126 clock_type::time_point oldest_enqueued_time;
127 bool upstream_flushed = false;
128 bool upstream_halted = false;
129 bool downstream_threw = false;
130
131#ifdef _MSC_VER
132#pragma warning(push)
133#pragma warning(disable : 4324) // Structure padded due to alignment specifier
134#endif
135
136 // To reduce lock contention on the shared_queue, we use a second queue
137 // that is accessed only by the emitting thread and is not protected by the
138 // mutex. Events in the shared_queue are transferred in bulk to the
139 // emit_queue while the mutex is held.
140 // This means that the mutex does not need to be acquired between every
141 // event emitted, so the producer will be less likely to block when the
142 // data rate is momentarily high, and the consumer will be less likely to
143 // block while catching up on buffered events.
144 // Furthermore, we ensure that the emit_queue and downstream do not share a
145 // CPU cache line with the shared_queue, to prevent false sharing.
146 alignas(destructive_interference_size) queue_type emit_queue;
147
148#ifdef _MSC_VER
149#pragma warning(pop)
150#endif
151
152 Downstream downstream;
153
154 // Cold data after downstream.
155 bool pumped = false;
156 access_tracker<buffer_access> trk;
157
158 void halt() noexcept {
159 {
160 auto const lock = std::lock_guard(mutex);
161 upstream_halted = true;
162 }
163 has_data_condition.notify_one();
164 }
165
166 void pump() {
167 try {
168 auto lock = std::unique_lock(mutex);
169 if (pumped) {
170 throw std::logic_error(
171 "buffer may not be pumped a second time");
172 }
173 pumped = true;
174
175 for (;;) {
176 if constexpr (LatencyLimited) {
177 has_data_condition.wait(lock, [&] {
178 return not shared_queue.empty() || upstream_flushed ||
179 upstream_halted;
180 });
181 // Won't overflow due to 24 h limit on max_latency:
182 auto const deadline = oldest_enqueued_time + max_latency;
183 has_data_condition.wait_until(lock, deadline, [&] {
184 return shared_queue.size() >= threshold ||
185 upstream_flushed || upstream_halted;
186 });
187 } else {
188 has_data_condition.wait(lock, [&] {
189 return shared_queue.size() >= threshold ||
190 upstream_flushed || upstream_halted;
191 });
192 }
193
194 if (not upstream_flushed && upstream_halted)
195 throw source_halted();
196 if (shared_queue.empty() && upstream_flushed) {
197 lock.unlock();
198 return downstream.flush();
199 }
200
201 emit_queue.swap(shared_queue);
202 lock.unlock();
203 while (!emit_queue.empty()) {
204 downstream.handle(std::move(emit_queue.front()));
205 emit_queue.pop();
206 }
207 lock.lock();
208 }
209 } catch (source_halted const &) {
210 throw;
211 } catch (...) {
212 auto const lock = std::lock_guard(mutex);
213 downstream_threw = true;
214 throw;
215 }
216 }
217
218 public:
219 template <typename Rep, typename Period>
220 explicit buffer(arg::threshold<std::size_t> threshold,
221 std::chrono::duration<Rep, Period> latency_limit,
222 access_tracker<buffer_access> &&tracker,
223 Downstream downstream)
224 : threshold(threshold.value >= 0 ? threshold.value : 1),
225 max_latency(
226 std::chrono::duration_cast<clock_type::duration>(latency_limit)),
227 downstream(std::move(downstream)), trk(std::move(tracker)) {
228 // Limit to avoid integer overflow.
229 if (max_latency > std::chrono::hours(24)) {
230 throw std::invalid_argument(
231 "buffer latency limit must not be greater than 24 h");
232 }
233
234 trk.register_access_factory([](auto &tracker) {
235 auto *self = LIBTCSPC_OBJECT_FROM_TRACKER(buffer, trk, tracker);
236 return buffer_access([self] { self->halt(); },
237 [self] { self->pump(); });
238 });
239 }
240
241 // NOLINTBEGIN(cppcoreguidelines-pro-type-member-init)
242 explicit buffer(arg::threshold<std::size_t> threshold,
243 access_tracker<buffer_access> &&tracker,
244 Downstream downstream)
245 : buffer(threshold, std::chrono::hours(24), std::move(tracker),
246 std::move(downstream)) {}
247 // NOLINTEND(cppcoreguidelines-pro-type-member-init)
248
249 [[nodiscard]] auto introspect_node() const -> processor_info {
250 return processor_info(this, "buffer");
251 }
252
253 [[nodiscard]] auto introspect_graph() const -> processor_graph {
254 return downstream.introspect_graph().push_entry_point(this);
255 }
256
257 template <typename E>
258 requires std::convertible_to<std::remove_cvref_t<E>, Event>
259 void handle(E &&event) {
260 bool should_notify{};
261 {
262 auto const lock = std::lock_guard(mutex);
263 if (downstream_threw)
264 throw end_of_processing(
265 "ending upstream of buffer upon end of downstream processing");
266
267 shared_queue.push(std::forward<E>(event));
268 should_notify = shared_queue.size() == threshold;
269 if constexpr (LatencyLimited) {
270 if (shared_queue.size() == 1) {
271 oldest_enqueued_time = clock_type::now();
272 should_notify = true; // Wake up once to set deadline.
273 }
274 }
275 }
276 if (should_notify)
277 has_data_condition.notify_one();
278 }
279
280 void flush() {
281 {
282 auto const lock = std::lock_guard(mutex);
283 if (downstream_threw)
284 throw end_of_processing(
285 "ending upstream of buffer upon end of downstream processing");
286 upstream_flushed = true;
287 }
288 has_data_condition.notify_one();
289 }
290};
291
292} // namespace internal
293
341template <typename Event, typename Downstream>
343 access_tracker<buffer_access> &&tracker, Downstream downstream) {
344 return internal::buffer<Event, false, Downstream>(
345 threshold, std::move(tracker), std::move(downstream));
346}
347
404template <typename Event, typename Rep, typename Period, typename Downstream>
406 std::chrono::duration<Rep, Period> latency_limit,
408 Downstream downstream) {
409 return internal::buffer<Event, true, Downstream>(
410 threshold, latency_limit, std::move(tracker), std::move(downstream));
411}
412
413} // namespace tcspc
Tracker that mediates access to objects via a tcspc::context.
Definition context.hpp:39
void halt() noexcept
Halt pumping of the buffer.
Definition buffer.hpp:83
void pump()
Pump buffered events downstream.
Definition buffer.hpp:108
#define LIBTCSPC_OBJECT_FROM_TRACKER(obj_type, tracker_field_name, tracker)
Recover the object address from a tcspc::access_tracker embedded in the object.
Definition context.hpp:253
auto buffer(arg::threshold< std::size_t > threshold, access_tracker< buffer_access > &&tracker, Downstream downstream)
Create a processor that buffers events and emits them on a different thread.
Definition buffer.hpp:342
auto real_time_buffer(arg::threshold< std::size_t > threshold, std::chrono::duration< Rep, Period > latency_limit, access_tracker< buffer_access > &&tracker, Downstream downstream)
Create a processor that buffers events and emits them on a different thread, with limited latency.
Definition buffer.hpp:405
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for threshold parameter.
Definition arg_wrappers.hpp:397