libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
merge.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "arg_wrappers.hpp"
10#include "common.hpp"
11#include "data_types.hpp"
12#include "errors.hpp"
13#include "introspect.hpp"
14#include "processor.hpp"
15#include "type_list.hpp"
16#include "variant_event.hpp"
17#include "vector_queue.hpp"
18
19#include <algorithm>
20#include <array>
21#include <cassert>
22#include <cstddef>
23#include <exception>
24#include <memory>
25#include <tuple>
26#include <type_traits>
27#include <utility>
28
29namespace tcspc {
30
31namespace internal {
32
33// Internal implementation of merge processor. This processor is owned by the
34// two input processors via shared_ptr.
35template <typename EventList, typename DataTypes, typename Downstream>
36class merge_impl {
37 static_assert(type_list_like<EventList>);
39
40 // When events have equal abstime, those originating from input 0 are
41 // emitted before those originating from input1. Within the same input, the
42 // order is preserved.
43 // As long as we follow that rule and also ensure never to buffer events
44 // that can be emitted, we only ever need to buffer events from one or the
45 // other input at any given time.
46 bool pending_on_1 = false; // Pending on input 0 if false
47 std::array<bool, 2> input_flushed{false, false};
48 bool ended_with_exception = false;
49 vector_queue<variant_or_single_event<EventList>> pending;
50 std::size_t max_buffered;
51
52 Downstream downstream;
53
54 template <unsigned InputChannel>
55 [[nodiscard]] auto is_other_flushed() const noexcept -> bool {
56 return input_flushed[1 - InputChannel];
57 }
58
59 template <unsigned InputChannel>
60 [[nodiscard]] auto is_pending_on_other() const noexcept -> bool {
61 return pending_on_1 == (InputChannel == 0);
62 }
63
64 template <unsigned InputChannel> void set_pending_on() noexcept {
65 pending_on_1 = (InputChannel == 1);
66 }
67
68 // Emit pending while predicate is true.
69 // Pred: bool(abstime_type const &)
70 template <typename Pred> void emit_pending(Pred predicate) {
71 auto emit_if_true = [&](auto const &e) {
72 bool p = predicate(e.abstime);
73 if (p)
74 downstream.handle(e);
75 return p;
76 };
77 while (!pending.empty() &&
78 visit_variant_or_single_event(emit_if_true, pending.front()))
79 pending.pop();
80 }
81
82 public:
83 explicit merge_impl(arg::max_buffered<std::size_t> max_buffered,
84 Downstream downstream)
85 : max_buffered(max_buffered.value), downstream(std::move(downstream)) {
86 }
87
88 merge_impl(merge_impl const &) = delete;
89 auto operator=(merge_impl const &) = delete;
90 merge_impl(merge_impl &&) = delete;
91 auto operator=(merge_impl &&) = delete;
92 ~merge_impl() = default;
93
94 [[nodiscard]] auto introspect_node() const -> processor_info {
95 return processor_info(this, "merge_impl");
96 }
97
98 [[nodiscard]] auto introspect_graph() const -> processor_graph {
99 return downstream.introspect_graph().push_entry_point(this);
100 }
101
102 template <unsigned InputChannel, typename Event>
103 void handle(Event const &event) {
104 static_assert(convertible_to_type_list_member<Event, EventList>);
105 static_assert(std::is_same_v<decltype(event.abstime),
106 typename DataTypes::abstime_type>);
107 if (ended_with_exception)
108 return;
109 try {
110 if (is_pending_on_other<InputChannel>()) {
111 // Emit any older events pending on the other input.
112 auto cutoff = event.abstime;
113 // Emit events from input 0 before events from input 1 when
114 // they have equal abstime.
115 if constexpr (InputChannel == 0)
116 --cutoff;
117 emit_pending([=](auto t) { return t <= cutoff; });
118
119 // If events still pending on the other input, they are newer
120 // (or not older), so we can emit the current event first.
121 if (not pending.empty())
122 return downstream.handle(event);
123
124 // If we are still here, we have no more events pending from
125 // the other input, but will now enqueue the current event on
126 // this input.
127 set_pending_on<InputChannel>();
128 }
129 // If we got here, no events from the other input are pending. If
130 // the other input is also flushed, we have no need to buffer.
131 if (is_other_flushed<InputChannel>()) {
132 assert(pending.empty());
133 return downstream.handle(event);
134 }
135 if (pending.size() == max_buffered)
136 throw buffer_overflow_error("merge buffer capacity exceeded");
137 pending.push(event);
138 } catch (std::exception const &) {
139 ended_with_exception = true;
140 throw;
141 }
142 }
143
144 template <unsigned InputChannel> void flush() {
145 input_flushed[InputChannel] = true;
146 if (ended_with_exception)
147 return;
148 if (is_other_flushed<InputChannel>()) {
149 // Since the other input was flushed, events on this input have not
150 // been buffered. But there may still be events pending on the
151 // other input.
152 emit_pending([](auto /* t */) { return true; });
153 downstream.flush();
154 } else if (is_pending_on_other<InputChannel>()) {
155 // Since this input won't have any more events, no need to buffer
156 // the other any more.
157 emit_pending([](auto /* t */) { return true; });
158 }
159 }
160};
161
162template <unsigned InputChannel, typename EventList, typename DataTypes,
163 typename Downstream>
164class merge_input {
165 std::shared_ptr<merge_impl<EventList, DataTypes, Downstream>> impl;
166
167 public:
168 explicit merge_input(
169 std::shared_ptr<merge_impl<EventList, DataTypes, Downstream>> impl)
170 : impl(std::move(impl)) {}
171
172 // Movable but not copyable
173 merge_input(merge_input const &) = delete;
174 auto operator=(merge_input const &) = delete;
175 merge_input(merge_input &&) noexcept = default;
176 auto operator=(merge_input &&) noexcept -> merge_input & = default;
177 ~merge_input() = default;
178
179 [[nodiscard]] auto introspect_node() const -> processor_info {
180 return processor_info(this, "merge_input");
181 }
182
183 [[nodiscard]] auto introspect_graph() const -> processor_graph {
184 return impl->introspect_graph().push_entry_point(this);
185 }
186
187 template <typename Event>
188 requires convertible_to_type_list_member<std::remove_cvref_t<Event>,
189 EventList>
190 void handle(Event &&event) {
191 static_assert(std::is_same_v<decltype(event.abstime),
192 typename DataTypes::abstime_type>);
193 impl->template handle<InputChannel>(std::forward<Event>(event));
194 }
195
196 void flush() { impl->template flush<InputChannel>(); }
197};
198
199} // namespace internal
200
239template <typename EventList, typename DataTypes = default_data_types,
240 typename Downstream>
242 Downstream downstream) {
243 auto p = std::make_shared<
244 internal::merge_impl<EventList, DataTypes, Downstream>>(
245 max_buffered, std::move(downstream));
246 return std::pair{
247 internal::merge_input<0, EventList, DataTypes, Downstream>(p),
248 internal::merge_input<1, EventList, DataTypes, Downstream>(p)};
249}
250
299template <std::size_t N, typename EventList,
300 typename DataTypes = default_data_types, typename Downstream>
302 Downstream downstream) {
303 if constexpr (N == 0) {
304 return std::tuple{};
305 } else if constexpr (N == 1) {
306 return std::tuple{std::move(downstream)};
307 } else {
308 auto [final_in0, final_in1] =
309 merge<EventList, DataTypes>(max_buffered, std::move(downstream));
310
311 std::size_t const left = N / 2;
312 std::size_t const right = N - left;
313 if constexpr (left == 1) {
314 if constexpr (right == 1) {
315 return std::tuple{std::move(final_in0), std::move(final_in1)};
316 } else {
317 return std::tuple_cat(std::tuple{std::move(final_in0)},
319 max_buffered, std::move(final_in1)));
320 }
321 } else {
322 return std::tuple_cat(merge_n<left, EventList, DataTypes>(
323 max_buffered, std::move(final_in0)),
325 max_buffered, std::move(final_in1)));
326 }
327 }
328}
329
330namespace internal {
331
332// Internal implementation of N-way unsorted merge processor. This processor is
333// owned by the N input processors via shared_ptr.
334template <std::size_t N, typename Downstream> class merge_unsorted_impl {
335 static_assert(processor<Downstream>);
336
337 Downstream downstream;
338
339 // Cold data.
340 bool ended_with_exception = false;
341 std::array<bool, N> input_flushed{};
342
343 public:
344 explicit merge_unsorted_impl(Downstream downstream)
345 : downstream(std::move(downstream)) {}
346
347 merge_unsorted_impl(merge_unsorted_impl const &) = delete;
348 auto operator=(merge_unsorted_impl const &) = delete;
349 merge_unsorted_impl(merge_unsorted_impl &&) = delete;
350 auto operator=(merge_unsorted_impl &&) = delete;
351 ~merge_unsorted_impl() = default;
352
353 [[nodiscard]] auto introspect_node() const -> processor_info {
354 return processor_info(this, "merge_unsorted_impl");
355 }
356
357 [[nodiscard]] auto introspect_graph() const -> processor_graph {
358 return downstream.introspect_graph().push_entry_point(this);
359 }
360
361 template <typename Event> void handle(Event &&event) {
362 static_assert(handler_for<Downstream, std::remove_cvref_t<Event>>);
363 if (ended_with_exception)
364 return;
365 try {
366 downstream.handle(std::forward<Event>(event));
367 } catch (std::exception const &) {
368 ended_with_exception = true;
369 throw;
370 }
371 }
372
373 void flush(std::size_t input_channel) {
374 input_flushed[input_channel] = true;
375 if (ended_with_exception)
376 return;
377 if (std::all_of(input_flushed.begin(), input_flushed.end(),
378 [](auto f) { return f; }))
379 downstream.flush();
380 }
381};
382
383template <std::size_t N, typename Downstream> class merge_unsorted_input {
384 std::shared_ptr<merge_unsorted_impl<N, Downstream>> impl;
385
386 // Cold data.
387 std::size_t chan;
388
389 public:
390 explicit merge_unsorted_input(
391 std::shared_ptr<merge_unsorted_impl<N, Downstream>> impl,
392 std::size_t channel)
393 : impl(std::move(impl)), chan(channel) {}
394
395 // Movable but not copyable
396 merge_unsorted_input(merge_unsorted_input const &) = delete;
397 auto operator=(merge_unsorted_input const &) = delete;
398 merge_unsorted_input(merge_unsorted_input &&) noexcept = default;
399 auto operator=(merge_unsorted_input &&) noexcept
400 -> merge_unsorted_input & = default;
401 ~merge_unsorted_input() = default;
402
403 [[nodiscard]] auto introspect_node() const -> processor_info {
404 return processor_info(this, "merge_unsorted_input");
405 }
406
407 [[nodiscard]] auto introspect_graph() const -> processor_graph {
408 return impl->introspect_graph().push_entry_point(this);
409 }
410
411 template <typename Event>
412 requires handler_for<Downstream, std::remove_cvref_t<Event>>
413 void handle(Event &&event) {
414 impl->handle(std::forward<Event>(event));
415 }
416
417 void flush() { impl->flush(chan); }
418};
419
420template <std::size_t N, typename Downstream, std::size_t... Indices>
421auto make_merge_unsorted_inputs(
422 std::shared_ptr<merge_unsorted_impl<N, Downstream>> impl,
423 std::index_sequence<Indices...> /* indices */) {
424 using input_type = merge_unsorted_input<N, Downstream>;
425 return std::array<input_type, N>{(input_type(impl, Indices))...};
426}
427
428} // namespace internal
429
456template <std::size_t N = 2, typename Downstream>
457auto merge_n_unsorted(Downstream downstream) {
458 auto impl = std::make_shared<internal::merge_unsorted_impl<N, Downstream>>(
459 std::move(downstream));
460 return internal::make_merge_unsorted_inputs(std::move(impl),
461 std::make_index_sequence<N>());
462}
463
464} // namespace tcspc
constexpr bool is_processor_of_list_v
Trait variable to check whether a processor handles a list of event types and flush.
Definition processor.hpp:246
auto merge_n_unsorted(Downstream downstream)
Create a processor that merges a given number of event streams without sorting by abstime.
Definition merge.hpp:457
auto merge_n(arg::max_buffered< std::size_t > max_buffered, Downstream downstream)
Create a processor that merges a given number of event streams.
Definition merge.hpp:301
auto merge(arg::max_buffered< std::size_t > max_buffered, Downstream downstream)
Create a pair of processors that merge two event streams.
Definition merge.hpp:241
constexpr auto visit_variant_or_single_event(Visitor visitor, Event &&event)
Apply a visitor to an event that is not a tcspc::variant_event.
Definition variant_event.hpp:127
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for maximum buffered parameter.
Definition arg_wrappers.hpp:237
The default data type set.
Definition data_types.hpp:24