libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
recover_order.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "arg_wrappers.hpp"
10#include "common.hpp"
11#include "data_types.hpp"
12#include "errors.hpp"
13#include "int_arith.hpp"
14#include "introspect.hpp"
15#include "processor.hpp"
16#include "type_list.hpp"
17#include "variant_event.hpp"
18
19#include <algorithm>
20#include <limits>
21#include <stdexcept>
22#include <type_traits>
23#include <utility>
24#include <vector>
25
26namespace tcspc {
27
28namespace internal {
29
30template <typename EventList, typename DataTypes, typename Downstream>
31class recover_order {
33
34 using abstime_type = typename DataTypes::abstime_type;
35 abstime_type window_size;
36
37 // We just use a sorted vector, because the intended use cases do not
38 // require buffering large numbers of events.
39 // Always in ascending abstime order:
40 std::vector<variant_or_single_event<EventList>> buf;
41
42 // For error checking
43 abstime_type last_emitted_time = std::numeric_limits<abstime_type>::min();
44
45 Downstream downstream;
46
47 public:
48 explicit recover_order(arg::time_window<abstime_type> time_window,
49 Downstream downstream)
50 : window_size(time_window.value), downstream(std::move(downstream)) {
51 if (window_size < 0)
52 throw std::invalid_argument(
53 "recover_order time_window must not be negative");
54 }
55
56 [[nodiscard]] auto introspect_node() const -> processor_info {
57 return processor_info(this, "recover_order");
58 }
59
60 [[nodiscard]] auto introspect_graph() const -> processor_graph {
61 return downstream.introspect_graph().push_entry_point(this);
62 }
63
64 template <typename Event>
65 requires convertible_to_type_list_member<std::remove_cvref_t<Event>,
66 EventList>
67 void handle(Event &&event) {
68 static_assert(std::is_same_v<decltype(event.abstime), abstime_type>);
69 if (event.abstime < last_emitted_time) {
70 throw data_validation_error(
71 "recover_order encountered event outside of time window");
72 }
73
74 // We perform a sliding-window version of insertion sort, enabled by
75 // the known time bound of out-of-order events.
76
77 // Both finding the events that are ready to emit and finding the
78 // position to insert the new one could be done with log complexity
79 // using std::lower_bound() and std::upper_bound(), but we expect the
80 // buffer to be small in the anticipated use cases, so prefer to do
81 // simple linear searches. (This choice could be made compile-time
82 // selectable.)
83
84 auto const cutoff = pairing_cutoff(event.abstime, window_size);
85 auto keep_it =
86 std::find_if_not(buf.begin(), buf.end(), [&](auto const &v) {
87 return visit_variant_or_single_event(
88 [&](auto const &e) { return e.abstime < cutoff; }, v);
89 });
90
91 std::for_each(buf.begin(), keep_it, [&](auto &v) {
92 visit_variant_or_single_event(
93 [&]<typename E>(E &&e) {
94 last_emitted_time = e.abstime;
95 downstream.handle(std::forward<E>(e));
96 },
97 std::move(v));
98 });
99 buf.erase(buf.begin(), keep_it);
100
101 auto ins_it =
102 std::find_if(buf.rbegin(), buf.rend(), [&](auto const &v) {
103 return visit_variant_or_single_event(
104 [&](auto const &e) { return e.abstime < event.abstime; },
105 v);
106 });
107 if (ins_it == buf.rend())
108 buf.insert(buf.begin(), std::forward<Event>(event));
109 else
110 buf.insert(ins_it.base(), std::forward<Event>(event));
111 }
112
113 // Do not allow other events.
114
115 void flush() {
116 std::for_each(buf.begin(), buf.end(), [&](auto &v) {
117 visit_variant_or_single_event(
118 [&]<typename E>(E &&e) {
119 downstream.handle(std::forward<E>(e));
120 },
121 std::move(v));
122 });
123 buf.clear();
124 downstream.flush();
125 }
126};
127
128} // namespace internal
129
155template <typename EventList, typename DataTypes = default_data_types,
156 typename Downstream>
159 Downstream downstream) {
160 static_assert(type_list_size_v<EventList> > 0,
161 "recover_order requires non-empty event list");
162 return internal::recover_order<EventList, DataTypes, Downstream>(
163 time_window, std::move(downstream));
164}
165
166} // namespace tcspc
constexpr bool is_processor_of_list_v
Trait variable to check whether a processor handles a list of event types and flush.
Definition processor.hpp:246
auto recover_order(arg::time_window< typename DataTypes::abstime_type > time_window, Downstream downstream)
Create a processor that sorts events by abstime, provided that they are out of order only within a bo...
Definition recover_order.hpp:157
constexpr std::size_t type_list_size_v
Helper variable template for tcspc::type_list_size.
Definition type_list.hpp:97
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for time window parameter.
Definition arg_wrappers.hpp:417