9#include "arg_wrappers.hpp"
13#include "introspect.hpp"
14#include "processor.hpp"
15#include "vector_queue.hpp"
19#include <condition_variable>
36#if (defined(__APPLE__) && defined(__arm64__)) || \
37 (defined(__powerpc64__) || defined(__ppc64__) || defined(__PPC64__))
38inline constexpr std::size_t destructive_interference_size = 128;
40inline constexpr std::size_t destructive_interference_size = 64;
52 std::function<void()> halt_fn;
53 std::function<void()> pump_fn;
57 template <
typename HaltFunc,
typename PumpFunc>
58 explicit buffer_access(HaltFunc halt_func, PumpFunc pump_func)
59 : halt_fn(halt_func), pump_fn(pump_func) {}
83 void halt() noexcept { halt_fn(); }
113template <
typename Event,
bool LatencyLimited,
typename Downstream>
115 static_assert(processor<Downstream, Event>);
117 using clock_type = std::chrono::steady_clock;
118 using queue_type = vector_queue<Event>;
120 std::size_t threshold;
121 clock_type::duration max_latency;
124 std::condition_variable has_data_condition;
125 queue_type shared_queue;
126 clock_type::time_point oldest_enqueued_time;
127 bool upstream_flushed =
false;
128 bool upstream_halted =
false;
129 bool downstream_threw =
false;
133#pragma warning(disable : 4324)
146 alignas(destructive_interference_size) queue_type emit_queue;
152 Downstream downstream;
156 access_tracker<buffer_access> trk;
158 void halt() noexcept {
160 auto const lock = std::lock_guard(mutex);
161 upstream_halted =
true;
163 has_data_condition.notify_one();
168 auto lock = std::unique_lock(mutex);
170 throw std::logic_error(
171 "buffer may not be pumped a second time");
176 if constexpr (LatencyLimited) {
177 has_data_condition.wait(lock, [&] {
178 return not shared_queue.empty() || upstream_flushed ||
182 auto const deadline = oldest_enqueued_time + max_latency;
183 has_data_condition.wait_until(lock, deadline, [&] {
184 return shared_queue.size() >= threshold ||
185 upstream_flushed || upstream_halted;
188 has_data_condition.wait(lock, [&] {
189 return shared_queue.size() >= threshold ||
190 upstream_flushed || upstream_halted;
194 if (not upstream_flushed && upstream_halted)
195 throw source_halted();
196 if (shared_queue.empty() && upstream_flushed) {
198 return downstream.flush();
201 emit_queue.swap(shared_queue);
203 while (!emit_queue.empty()) {
204 downstream.handle(std::move(emit_queue.front()));
209 }
catch (source_halted
const &) {
212 auto const lock = std::lock_guard(mutex);
213 downstream_threw =
true;
219 template <
typename Rep,
typename Period>
220 explicit buffer(arg::threshold<std::size_t> threshold,
221 std::chrono::duration<Rep, Period> latency_limit,
222 access_tracker<buffer_access> &&tracker,
223 Downstream downstream)
224 : threshold(threshold.value >= 0 ? threshold.value : 1),
226 std::chrono::duration_cast<clock_type::duration>(latency_limit)),
227 downstream(std::move(downstream)), trk(std::move(tracker)) {
229 if (max_latency > std::chrono::hours(24)) {
230 throw std::invalid_argument(
231 "buffer latency limit must not be greater than 24 h");
234 trk.register_access_factory([](
auto &tracker) {
236 return buffer_access([self] { self->halt(); },
237 [self] { self->pump(); });
242 explicit buffer(arg::threshold<std::size_t> threshold,
243 access_tracker<buffer_access> &&tracker,
244 Downstream downstream)
245 : buffer(threshold, std::chrono::hours(24), std::move(tracker),
246 std::move(downstream)) {}
249 [[nodiscard]]
auto introspect_node() const -> processor_info {
250 return processor_info(
this,
"buffer");
253 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
254 return downstream.introspect_graph().push_entry_point(
this);
257 template <
typename E>
258 requires std::convertible_to<std::remove_cvref_t<E>, Event>
259 void handle(E &&event) {
260 bool should_notify{};
262 auto const lock = std::lock_guard(mutex);
263 if (downstream_threw)
264 throw end_of_processing(
265 "ending upstream of buffer upon end of downstream processing");
267 shared_queue.push(std::forward<E>(event));
268 should_notify = shared_queue.size() == threshold;
269 if constexpr (LatencyLimited) {
270 if (shared_queue.size() == 1) {
271 oldest_enqueued_time = clock_type::now();
272 should_notify =
true;
277 has_data_condition.notify_one();
282 auto const lock = std::lock_guard(mutex);
283 if (downstream_threw)
284 throw end_of_processing(
285 "ending upstream of buffer upon end of downstream processing");
286 upstream_flushed =
true;
288 has_data_condition.notify_one();
341template <
typename Event,
typename Downstream>
344 return internal::buffer<Event, false, Downstream>(
345 threshold, std::move(tracker), std::move(downstream));
404template <
typename Event,
typename Rep,
typename Period,
typename Downstream>
406 std::chrono::duration<Rep, Period> latency_limit,
408 Downstream downstream) {
409 return internal::buffer<Event, true, Downstream>(
410 threshold, latency_limit, std::move(tracker), std::move(downstream));
Tracker that mediates access to objects via a tcspc::context.
Definition context.hpp:39
void halt() noexcept
Halt pumping of the buffer.
Definition buffer.hpp:83
void pump()
Pump buffered events downstream.
Definition buffer.hpp:108
#define LIBTCSPC_OBJECT_FROM_TRACKER(obj_type, tracker_field_name, tracker)
Recover the object address from a tcspc::access_tracker embedded in the object.
Definition context.hpp:253
auto buffer(arg::threshold< std::size_t > threshold, access_tracker< buffer_access > &&tracker, Downstream downstream)
Create a processor that buffers events and emits them on a different thread.
Definition buffer.hpp:342
auto real_time_buffer(arg::threshold< std::size_t > threshold, std::chrono::duration< Rep, Period > latency_limit, access_tracker< buffer_access > &&tracker, Downstream downstream)
Create a processor that buffers events and emits them on a different thread, with limited latency.
Definition buffer.hpp:405
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for threshold parameter.
Definition arg_wrappers.hpp:397