9#include "arg_wrappers.hpp"
11#include "data_types.hpp"
12#include "introspect.hpp"
13#include "processor.hpp"
14#include "time_tagged_events.hpp"
15#include "vector_queue.hpp"
31template <std::
size_t NStopChannels,
typename DataTypes,
typename Downstream>
34 processor<Downstream, std::array<detection_event<DataTypes>, 2>>);
36 typename DataTypes::channel_type start_chan;
37 std::array<typename DataTypes::channel_type, NStopChannels> stop_chans;
38 typename DataTypes::abstime_type window_size;
41 internal::vector_queue<typename DataTypes::abstime_type> starts;
43 Downstream downstream;
45 void expel_old_starts(
typename DataTypes::abstime_type earliest_stop) {
46 auto const cutoff = pairing_cutoff(earliest_stop, window_size);
47 while (not starts.empty() && starts.front() < cutoff)
53 arg::start_channel<typename DataTypes::channel_type> start_channel,
54 std::array<typename DataTypes::channel_type, NStopChannels>
56 arg::time_window<typename DataTypes::abstime_type> time_window,
57 Downstream downstream)
58 : start_chan(start_channel.value), stop_chans(stop_channels),
59 window_size(time_window.value), downstream(std::move(downstream)) {
61 throw std::invalid_argument(
62 "pair_all time_window must not be negative");
65 [[nodiscard]]
auto introspect_node() const -> processor_info {
66 return processor_info(
this,
"pair_all");
69 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
70 return downstream.introspect_graph().push_entry_point(
this);
73 template <
typename DT>
void handle(detection_event<DT>
const &event) {
74 static_assert(std::is_same_v<
typename DT::abstime_type,
75 typename DataTypes::abstime_type>);
76 static_assert(std::is_same_v<
typename DT::channel_type,
77 typename DataTypes::channel_type>);
79 expel_old_starts(event.abstime);
80 auto chan_index = std::distance(
82 std::find(stop_chans.cbegin(), stop_chans.cend(), event.channel));
83 if (std::size_t(chan_index) < NStopChannels) {
84 starts.for_each([&](
auto start_time) {
85 downstream.handle(std::array<detection_event<DataTypes>, 2>{
86 {{start_time, start_chan},
event}});
89 if (event.channel == start_chan)
90 starts.push(event.abstime);
91 downstream.handle(event);
95 template <
typename DT>
void handle(detection_event<DT> &&event) {
96 handle(
static_cast<detection_event<DT>
const &
>(event));
99 template <
typename OtherEvent>
100 requires handler_for<Downstream, std::remove_cvref_t<OtherEvent>>
101 void handle(OtherEvent &&event) {
102 downstream.handle(std::forward<OtherEvent>(event));
105 void flush() { downstream.flush(); }
108template <std::
size_t NStopChannels,
typename DataTypes,
typename Downstream>
111 processor<Downstream, std::array<detection_event<DataTypes>, 2>>);
113 typename DataTypes::channel_type start_chan;
114 std::array<typename DataTypes::channel_type, NStopChannels> stop_chans;
115 typename DataTypes::abstime_type window_size;
119 struct start_and_flags {
120 typename DataTypes::abstime_type time;
121 std::bitset<NStopChannels> stopped;
123 internal::vector_queue<start_and_flags> starts;
125 Downstream downstream;
127 void expel_old_starts(
typename DataTypes::abstime_type earliest_stop) {
128 auto const cutoff = pairing_cutoff(earliest_stop, window_size);
129 while (not starts.empty() &&
130 (starts.front().time < cutoff || starts.front().stopped.all()))
136 arg::start_channel<typename DataTypes::channel_type> start_channel,
137 std::array<typename DataTypes::channel_type, NStopChannels>
139 arg::time_window<typename DataTypes::abstime_type> time_window,
140 Downstream downstream)
141 : start_chan(start_channel.value), stop_chans(stop_channels),
142 window_size(time_window.value), downstream(std::move(downstream)) {
144 throw std::invalid_argument(
145 "pair_one time_window must not be negative");
148 [[nodiscard]]
auto introspect_node() const -> processor_info {
149 return processor_info(
this,
"pair_one");
152 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
153 return downstream.introspect_graph().push_entry_point(
this);
156 template <
typename DT>
void handle(detection_event<DT>
const &event) {
157 static_assert(std::is_same_v<
typename DT::abstime_type,
158 typename DataTypes::abstime_type>);
159 static_assert(std::is_same_v<
typename DT::channel_type,
160 typename DataTypes::channel_type>);
162 expel_old_starts(event.abstime);
163 auto const chan_index =
static_cast<std::size_t
>(std::distance(
165 std::find(stop_chans.cbegin(), stop_chans.cend(), event.channel)));
166 if (chan_index < NStopChannels) {
167 starts.for_each([&](start_and_flags &sf) {
168 if (not sf.stopped[chan_index]) {
170 std::array<detection_event<DataTypes>, 2>{
171 {{sf.time, start_chan},
event}});
172 sf.stopped[chan_index] =
true;
176 if (event.channel == start_chan)
177 starts.push(start_and_flags{
event.abstime, {}});
178 downstream.handle(event);
182 template <
typename DT>
void handle(detection_event<DT> &&event) {
183 handle(
static_cast<detection_event<DT>
const &
>(event));
186 template <
typename OtherEvent>
187 requires handler_for<Downstream, std::remove_cvref_t<OtherEvent>>
188 void handle(OtherEvent &&event) {
189 downstream.handle(std::forward<OtherEvent>(event));
192 void flush() { downstream.flush(); }
195template <std::
size_t NStopChannels,
typename DataTypes,
typename Downstream>
196class pair_all_between {
198 processor<Downstream, std::array<detection_event<DataTypes>, 2>>);
200 typename DataTypes::channel_type start_chan;
201 std::array<typename DataTypes::channel_type, NStopChannels> stop_chans;
202 typename DataTypes::abstime_type window_size;
205 std::optional<typename DataTypes::abstime_type> start;
207 Downstream downstream;
209 void expel_old_start(
typename DataTypes::abstime_type earliest_stop) {
210 auto const cutoff = pairing_cutoff(earliest_stop, window_size);
211 if (start.has_value() && *start < cutoff)
212 start = std::nullopt;
216 explicit pair_all_between(
217 arg::start_channel<typename DataTypes::channel_type> start_channel,
218 std::array<typename DataTypes::channel_type, NStopChannels>
220 arg::time_window<typename DataTypes::abstime_type> time_window,
221 Downstream downstream)
222 : start_chan(start_channel.value), stop_chans(stop_channels),
223 window_size(time_window.value), downstream(std::move(downstream)) {
225 throw std::invalid_argument(
226 "pair_all_between time_window must not be negative");
229 [[nodiscard]]
auto introspect_node() const -> processor_info {
230 return processor_info(
this,
"pair_all_between");
233 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
234 return downstream.introspect_graph().push_entry_point(
this);
237 template <
typename DT>
void handle(detection_event<DT>
const &event) {
238 static_assert(std::is_same_v<
typename DT::abstime_type,
239 typename DataTypes::abstime_type>);
240 static_assert(std::is_same_v<
typename DT::channel_type,
241 typename DataTypes::channel_type>);
243 expel_old_start(event.abstime);
244 if (start.has_value()) {
246 std::distance(stop_chans.cbegin(),
247 std::find(stop_chans.cbegin(), stop_chans.cend(),
249 if (std::size_t(chan_index) < NStopChannels) {
250 downstream.handle(std::array<detection_event<DataTypes>, 2>{
251 {{*start, start_chan},
event}});
254 if (event.channel == start_chan)
255 start =
event.abstime;
256 downstream.handle(event);
260 template <
typename DT>
void handle(detection_event<DT> &&event) {
261 handle(
static_cast<detection_event<DT>
const &
>(event));
264 template <
typename OtherEvent>
265 requires handler_for<Downstream, std::remove_cvref_t<OtherEvent>>
266 void handle(OtherEvent &&event) {
267 downstream.handle(std::forward<OtherEvent>(event));
270 void flush() { downstream.flush(); }
273template <std::
size_t NStopChannels,
typename DataTypes,
typename Downstream>
274class pair_one_between {
276 processor<Downstream, std::array<detection_event<DataTypes>, 2>>);
278 typename DataTypes::channel_type start_chan;
279 std::array<typename DataTypes::channel_type, NStopChannels> stop_chans;
280 typename DataTypes::abstime_type window_size;
284 struct start_and_flags {
285 typename DataTypes::abstime_type time;
286 std::bitset<NStopChannels> stopped;
288 std::optional<start_and_flags> start;
290 Downstream downstream;
292 void expel_old_start(
typename DataTypes::abstime_type earliest_stop) {
293 auto const cutoff = pairing_cutoff(earliest_stop, window_size);
294 if (start.has_value() &&
295 (start->time < cutoff || start->stopped.all()))
296 start = std::nullopt;
300 explicit pair_one_between(
301 arg::start_channel<typename DataTypes::channel_type> start_channel,
302 std::array<typename DataTypes::channel_type, NStopChannels>
304 arg::time_window<typename DataTypes::abstime_type> time_window,
305 Downstream downstream)
306 : start_chan(start_channel.value), stop_chans(stop_channels),
307 window_size(time_window.value), downstream(std::move(downstream)) {
309 throw std::invalid_argument(
310 "pair_one_between time_window must not be negative");
313 [[nodiscard]]
auto introspect_node() const -> processor_info {
314 return processor_info(
this,
"pair_one_between");
317 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
318 return downstream.introspect_graph().push_entry_point(
this);
321 template <
typename DT>
void handle(detection_event<DT>
const &event) {
322 static_assert(std::is_same_v<
typename DT::abstime_type,
323 typename DataTypes::abstime_type>);
324 static_assert(std::is_same_v<
typename DT::channel_type,
325 typename DataTypes::channel_type>);
327 expel_old_start(event.abstime);
328 if (start.has_value()) {
329 auto const chan_index =
static_cast<std::size_t
>(
330 std::distance(stop_chans.cbegin(),
331 std::find(stop_chans.cbegin(), stop_chans.cend(),
333 if (chan_index < NStopChannels && not start->stopped[chan_index]) {
334 downstream.handle(std::array<detection_event<DataTypes>, 2>{
335 {{start->time, start_chan},
event}});
336 start->stopped[chan_index] =
true;
339 if (event.channel == start_chan)
340 start = start_and_flags{
event.abstime, {}};
341 downstream.handle(event);
345 template <
typename DT>
void handle(detection_event<DT> &&event) {
346 handle(
static_cast<detection_event<DT>
const &
>(event));
349 template <
typename OtherEvent>
350 requires handler_for<Downstream, std::remove_cvref_t<OtherEvent>>
351 void handle(OtherEvent &&event) {
352 downstream.handle(std::forward<OtherEvent>(event));
355 void flush() { downstream.flush(); }
402 std::array<typename DataTypes::channel_type, NStopChannels> stop_channels,
404 Downstream downstream) {
405 return internal::pair_all<NStopChannels, DataTypes, Downstream>(
406 start_channel, stop_channels, time_window, std::move(downstream));
449template <std::size_t NStopChannels,
typename DataTypes = default_data_types,
453 std::array<typename DataTypes::channel_type, NStopChannels> stop_channels,
455 Downstream downstream) {
456 return internal::pair_one<NStopChannels, DataTypes, Downstream>(
457 start_channel, stop_channels, time_window, std::move(downstream));
499template <std::size_t NStopChannels,
typename DataTypes = default_data_types,
503 std::array<typename DataTypes::channel_type, NStopChannels> stop_channels,
505 Downstream downstream) {
506 return internal::pair_all_between<NStopChannels, DataTypes, Downstream>(
507 start_channel, stop_channels, time_window, std::move(downstream));
550template <std::size_t NStopChannels,
typename DataTypes = default_data_types,
554 std::array<typename DataTypes::channel_type, NStopChannels> stop_channels,
556 Downstream downstream) {
557 return internal::pair_one_between<NStopChannels, DataTypes, Downstream>(
558 start_channel, stop_channels, time_window, std::move(downstream));
auto pair_all_between(arg::start_channel< typename DataTypes::channel_type > start_channel, std::array< typename DataTypes::channel_type, NStopChannels > stop_channels, arg::time_window< typename DataTypes::abstime_type > time_window, Downstream downstream)
Create a processor that generates ordered pairs of detection events within a time window,...
Definition pair.hpp:501
auto pair_one(arg::start_channel< typename DataTypes::channel_type > start_channel, std::array< typename DataTypes::channel_type, NStopChannels > stop_channels, arg::time_window< typename DataTypes::abstime_type > time_window, Downstream downstream)
Create a processor that generates ordered pairs of detection events within a time window,...
Definition pair.hpp:451
auto pair_all(arg::start_channel< typename DataTypes::channel_type > start_channel, std::array< typename DataTypes::channel_type, NStopChannels > stop_channels, arg::time_window< typename DataTypes::abstime_type > time_window, Downstream downstream)
Create a processor that generates all ordered pairs of detection events within a time window.
Definition pair.hpp:400
auto pair_one_between(arg::start_channel< typename DataTypes::channel_type > start_channel, std::array< typename DataTypes::channel_type, NStopChannels > stop_channels, arg::time_window< typename DataTypes::abstime_type > time_window, Downstream downstream)
Create a processor that generates ordered pairs of detection events within a time window,...
Definition pair.hpp:552
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for start channel parameter.
Definition arg_wrappers.hpp:367
Function argument wrapper for time window parameter.
Definition arg_wrappers.hpp:417
The default data type set.
Definition data_types.hpp:24