9#include "arg_wrappers.hpp"
11#include "data_types.hpp"
13#include "int_arith.hpp"
14#include "introspect.hpp"
15#include "processor.hpp"
16#include "type_list.hpp"
17#include "variant_event.hpp"
30template <
typename EventList,
typename DataTypes,
typename Downstream>
34 using abstime_type =
typename DataTypes::abstime_type;
35 abstime_type window_size;
40 std::vector<variant_or_single_event<EventList>> buf;
43 abstime_type last_emitted_time = std::numeric_limits<abstime_type>::min();
45 Downstream downstream;
48 explicit recover_order(arg::time_window<abstime_type> time_window,
49 Downstream downstream)
50 : window_size(time_window.value), downstream(std::move(downstream)) {
52 throw std::invalid_argument(
53 "recover_order time_window must not be negative");
56 [[nodiscard]]
auto introspect_node() const -> processor_info {
57 return processor_info(
this,
"recover_order");
60 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
61 return downstream.introspect_graph().push_entry_point(
this);
64 template <
typename Event>
65 requires convertible_to_type_list_member<std::remove_cvref_t<Event>,
67 void handle(Event &&event) {
68 static_assert(std::is_same_v<
decltype(
event.abstime), abstime_type>);
69 if (event.abstime < last_emitted_time) {
70 throw data_validation_error(
71 "recover_order encountered event outside of time window");
84 auto const cutoff = pairing_cutoff(event.abstime, window_size);
86 std::find_if_not(buf.begin(), buf.end(), [&](
auto const &v) {
87 return visit_variant_or_single_event(
88 [&](auto const &e) { return e.abstime < cutoff; }, v);
91 std::for_each(buf.begin(), keep_it, [&](
auto &v) {
92 visit_variant_or_single_event(
93 [&]<typename E>(E &&e) {
94 last_emitted_time = e.abstime;
95 downstream.handle(std::forward<E>(e));
99 buf.erase(buf.begin(), keep_it);
102 std::find_if(buf.rbegin(), buf.rend(), [&](
auto const &v) {
103 return visit_variant_or_single_event(
104 [&](auto const &e) { return e.abstime < event.abstime; },
107 if (ins_it == buf.rend())
108 buf.insert(buf.begin(), std::forward<Event>(event));
110 buf.insert(ins_it.base(), std::forward<Event>(event));
116 std::for_each(buf.begin(), buf.end(), [&](
auto &v) {
117 visit_variant_or_single_event(
118 [&]<typename E>(E &&e) {
119 downstream.handle(std::forward<E>(e));
155template <
typename EventList,
typename DataTypes = default_data_types,
159 Downstream downstream) {
161 "recover_order requires non-empty event list");
162 return internal::recover_order<EventList, DataTypes, Downstream>(
163 time_window, std::move(downstream));
constexpr bool is_processor_of_list_v
Trait variable to check whether a processor handles a list of event types and flush.
Definition processor.hpp:246
auto recover_order(arg::time_window< typename DataTypes::abstime_type > time_window, Downstream downstream)
Create a processor that sorts events by abstime, provided that they are out of order only within a bo...
Definition recover_order.hpp:157
constexpr std::size_t type_list_size_v
Helper variable template for tcspc::type_list_size.
Definition type_list.hpp:97
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for time window parameter.
Definition arg_wrappers.hpp:417