9#include "arg_wrappers.hpp"
11#include "data_types.hpp"
13#include "introspect.hpp"
14#include "processor.hpp"
15#include "type_list.hpp"
16#include "variant_event.hpp"
17#include "vector_queue.hpp"
35template <
typename EventList,
typename DataTypes,
typename Downstream>
37 static_assert(type_list_like<EventList>);
46 bool pending_on_1 =
false;
47 std::array<bool, 2> input_flushed{
false,
false};
48 bool ended_with_exception =
false;
49 vector_queue<variant_or_single_event<EventList>> pending;
50 std::size_t max_buffered;
52 Downstream downstream;
54 template <
unsigned InputChannel>
55 [[nodiscard]]
auto is_other_flushed() const noexcept ->
bool {
56 return input_flushed[1 - InputChannel];
59 template <
unsigned InputChannel>
60 [[nodiscard]]
auto is_pending_on_other() const noexcept ->
bool {
61 return pending_on_1 == (InputChannel == 0);
64 template <
unsigned InputChannel>
void set_pending_on() noexcept {
65 pending_on_1 = (InputChannel == 1);
70 template <
typename Pred>
void emit_pending(Pred predicate) {
71 auto emit_if_true = [&](
auto const &e) {
72 bool p = predicate(e.abstime);
77 while (!pending.empty() &&
83 explicit merge_impl(arg::max_buffered<std::size_t> max_buffered,
84 Downstream downstream)
85 : max_buffered(max_buffered.value), downstream(std::move(downstream)) {
88 merge_impl(merge_impl
const &) =
delete;
89 auto operator=(merge_impl
const &) =
delete;
90 merge_impl(merge_impl &&) =
delete;
91 auto operator=(merge_impl &&) =
delete;
92 ~merge_impl() =
default;
94 [[nodiscard]]
auto introspect_node() const -> processor_info {
95 return processor_info(
this,
"merge_impl");
98 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
99 return downstream.introspect_graph().push_entry_point(
this);
102 template <
unsigned InputChannel,
typename Event>
103 void handle(Event
const &event) {
104 static_assert(convertible_to_type_list_member<Event, EventList>);
105 static_assert(std::is_same_v<
decltype(
event.abstime),
106 typename DataTypes::abstime_type>);
107 if (ended_with_exception)
110 if (is_pending_on_other<InputChannel>()) {
112 auto cutoff =
event.abstime;
115 if constexpr (InputChannel == 0)
117 emit_pending([=](
auto t) {
return t <= cutoff; });
121 if (not pending.empty())
122 return downstream.handle(event);
127 set_pending_on<InputChannel>();
131 if (is_other_flushed<InputChannel>()) {
132 assert(pending.empty());
133 return downstream.handle(event);
135 if (pending.size() == max_buffered)
136 throw buffer_overflow_error(
"merge buffer capacity exceeded");
138 }
catch (std::exception
const &) {
139 ended_with_exception =
true;
144 template <
unsigned InputChannel>
void flush() {
145 input_flushed[InputChannel] =
true;
146 if (ended_with_exception)
148 if (is_other_flushed<InputChannel>()) {
152 emit_pending([](
auto ) {
return true; });
154 }
else if (is_pending_on_other<InputChannel>()) {
157 emit_pending([](
auto ) {
return true; });
162template <
unsigned InputChannel,
typename EventList,
typename DataTypes,
165 std::shared_ptr<merge_impl<EventList, DataTypes, Downstream>> impl;
168 explicit merge_input(
169 std::shared_ptr<merge_impl<EventList, DataTypes, Downstream>> impl)
170 : impl(std::move(impl)) {}
173 merge_input(merge_input
const &) =
delete;
174 auto operator=(merge_input
const &) =
delete;
175 merge_input(merge_input &&) noexcept = default;
176 auto operator=(merge_input &&) noexcept -> merge_input & = default;
177 ~merge_input() = default;
179 [[nodiscard]] auto introspect_node() const -> processor_info {
180 return processor_info(
this,
"merge_input");
183 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
184 return impl->introspect_graph().push_entry_point(
this);
187 template <
typename Event>
188 requires convertible_to_type_list_member<std::remove_cvref_t<Event>,
190 void handle(Event &&event) {
191 static_assert(std::is_same_v<
decltype(
event.abstime),
192 typename DataTypes::abstime_type>);
193 impl->template handle<InputChannel>(std::forward<Event>(event));
196 void flush() { impl->template flush<InputChannel>(); }
242 Downstream downstream) {
243 auto p = std::make_shared<
244 internal::merge_impl<EventList, DataTypes, Downstream>>(
245 max_buffered, std::move(downstream));
247 internal::merge_input<0, EventList, DataTypes, Downstream>(p),
248 internal::merge_input<1, EventList, DataTypes, Downstream>(p)};
299template <std::size_t N,
typename EventList,
300 typename DataTypes = default_data_types,
typename Downstream>
302 Downstream downstream) {
303 if constexpr (N == 0) {
305 }
else if constexpr (N == 1) {
306 return std::tuple{std::move(downstream)};
308 auto [final_in0, final_in1] =
311 std::size_t
const left = N / 2;
312 std::size_t
const right = N - left;
313 if constexpr (left == 1) {
314 if constexpr (right == 1) {
315 return std::tuple{std::move(final_in0), std::move(final_in1)};
317 return std::tuple_cat(std::tuple{std::move(final_in0)},
319 max_buffered, std::move(final_in1)));
323 max_buffered, std::move(final_in0)),
325 max_buffered, std::move(final_in1)));
334template <std::
size_t N,
typename Downstream>
class merge_unsorted_impl {
335 static_assert(processor<Downstream>);
337 Downstream downstream;
340 bool ended_with_exception =
false;
341 std::array<bool, N> input_flushed{};
344 explicit merge_unsorted_impl(Downstream downstream)
345 : downstream(std::move(downstream)) {}
347 merge_unsorted_impl(merge_unsorted_impl
const &) =
delete;
348 auto operator=(merge_unsorted_impl
const &) =
delete;
349 merge_unsorted_impl(merge_unsorted_impl &&) =
delete;
350 auto operator=(merge_unsorted_impl &&) =
delete;
351 ~merge_unsorted_impl() =
default;
353 [[nodiscard]]
auto introspect_node() const -> processor_info {
354 return processor_info(
this,
"merge_unsorted_impl");
357 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
358 return downstream.introspect_graph().push_entry_point(
this);
361 template <
typename Event>
void handle(Event &&event) {
362 static_assert(handler_for<Downstream, std::remove_cvref_t<Event>>);
363 if (ended_with_exception)
366 downstream.handle(std::forward<Event>(event));
367 }
catch (std::exception
const &) {
368 ended_with_exception =
true;
373 void flush(std::size_t input_channel) {
374 input_flushed[input_channel] =
true;
375 if (ended_with_exception)
377 if (std::all_of(input_flushed.begin(), input_flushed.end(),
378 [](
auto f) { return f; }))
383template <std::
size_t N,
typename Downstream>
class merge_unsorted_input {
384 std::shared_ptr<merge_unsorted_impl<N, Downstream>> impl;
390 explicit merge_unsorted_input(
391 std::shared_ptr<merge_unsorted_impl<N, Downstream>> impl,
393 : impl(std::move(impl)), chan(channel) {}
396 merge_unsorted_input(merge_unsorted_input
const &) =
delete;
397 auto operator=(merge_unsorted_input
const &) =
delete;
398 merge_unsorted_input(merge_unsorted_input &&) noexcept = default;
399 auto operator=(merge_unsorted_input &&) noexcept
400 -> merge_unsorted_input & = default;
401 ~merge_unsorted_input() = default;
403 [[nodiscard]] auto introspect_node() const -> processor_info {
404 return processor_info(
this,
"merge_unsorted_input");
407 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
408 return impl->introspect_graph().push_entry_point(
this);
411 template <
typename Event>
412 requires handler_for<Downstream, std::remove_cvref_t<Event>>
413 void handle(Event &&event) {
414 impl->handle(std::forward<Event>(event));
417 void flush() { impl->flush(chan); }
420template <std::size_t N,
typename Downstream, std::size_t... Indices>
421auto make_merge_unsorted_inputs(
422 std::shared_ptr<merge_unsorted_impl<N, Downstream>> impl,
423 std::index_sequence<Indices...> ) {
424 using input_type = merge_unsorted_input<N, Downstream>;
425 return std::array<input_type, N>{(input_type(impl, Indices))...};
456template <std::
size_t N = 2,
typename Downstream>
458 auto impl = std::make_shared<internal::merge_unsorted_impl<N, Downstream>>(
459 std::move(downstream));
460 return internal::make_merge_unsorted_inputs(std::move(impl),
461 std::make_index_sequence<N>());
constexpr bool is_processor_of_list_v
Trait variable to check whether a processor handles a list of event types and flush.
Definition processor.hpp:246
auto merge_n_unsorted(Downstream downstream)
Create a processor that merges a given number of event streams without sorting by abstime.
Definition merge.hpp:457
auto merge_n(arg::max_buffered< std::size_t > max_buffered, Downstream downstream)
Create a processor that merges a given number of event streams.
Definition merge.hpp:301
auto merge(arg::max_buffered< std::size_t > max_buffered, Downstream downstream)
Create a pair of processors that merge two event streams.
Definition merge.hpp:241
constexpr auto visit_variant_or_single_event(Visitor visitor, Event &&event)
Apply a visitor to an event that is not a tcspc::variant_event.
Definition variant_event.hpp:127
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for maximum buffered parameter.
Definition arg_wrappers.hpp:237
The default data type set.
Definition data_types.hpp:24