libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
pair.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "arg_wrappers.hpp"
10#include "common.hpp"
11#include "data_types.hpp"
12#include "introspect.hpp"
13#include "processor.hpp"
14#include "time_tagged_events.hpp"
15#include "vector_queue.hpp"
16
17#include <algorithm>
18#include <array>
19#include <bitset>
20#include <cstddef>
21#include <iterator>
22#include <optional>
23#include <stdexcept>
24#include <type_traits>
25#include <utility>
26
27namespace tcspc {
28
29namespace internal {
30
31template <std::size_t NStopChannels, typename DataTypes, typename Downstream>
32class pair_all {
33 static_assert(
34 processor<Downstream, std::array<detection_event<DataTypes>, 2>>);
35
36 typename DataTypes::channel_type start_chan;
37 std::array<typename DataTypes::channel_type, NStopChannels> stop_chans;
38 typename DataTypes::abstime_type window_size;
39
40 // Buffer all starts within time window.
41 internal::vector_queue<typename DataTypes::abstime_type> starts;
42
43 Downstream downstream;
44
45 void expel_old_starts(typename DataTypes::abstime_type earliest_stop) {
46 auto const cutoff = pairing_cutoff(earliest_stop, window_size);
47 while (not starts.empty() && starts.front() < cutoff)
48 starts.pop();
49 }
50
51 public:
52 explicit pair_all(
53 arg::start_channel<typename DataTypes::channel_type> start_channel,
54 std::array<typename DataTypes::channel_type, NStopChannels>
55 stop_channels,
56 arg::time_window<typename DataTypes::abstime_type> time_window,
57 Downstream downstream)
58 : start_chan(start_channel.value), stop_chans(stop_channels),
59 window_size(time_window.value), downstream(std::move(downstream)) {
60 if (window_size < 0)
61 throw std::invalid_argument(
62 "pair_all time_window must not be negative");
63 }
64
65 [[nodiscard]] auto introspect_node() const -> processor_info {
66 return processor_info(this, "pair_all");
67 }
68
69 [[nodiscard]] auto introspect_graph() const -> processor_graph {
70 return downstream.introspect_graph().push_entry_point(this);
71 }
72
73 template <typename DT> void handle(detection_event<DT> const &event) {
74 static_assert(std::is_same_v<typename DT::abstime_type,
75 typename DataTypes::abstime_type>);
76 static_assert(std::is_same_v<typename DT::channel_type,
77 typename DataTypes::channel_type>);
78
79 expel_old_starts(event.abstime);
80 auto chan_index = std::distance(
81 stop_chans.cbegin(),
82 std::find(stop_chans.cbegin(), stop_chans.cend(), event.channel));
83 if (std::size_t(chan_index) < NStopChannels) {
84 starts.for_each([&](auto start_time) {
85 downstream.handle(std::array<detection_event<DataTypes>, 2>{
86 {{start_time, start_chan}, event}});
87 });
88 }
89 if (event.channel == start_chan)
90 starts.push(event.abstime);
91 downstream.handle(event);
92 }
93
94 // NOLINTNEXTLINE(cppcoreguidelines-rvalue-reference-param-not-moved)
95 template <typename DT> void handle(detection_event<DT> &&event) {
96 handle(static_cast<detection_event<DT> const &>(event));
97 }
98
99 template <typename OtherEvent>
100 requires handler_for<Downstream, std::remove_cvref_t<OtherEvent>>
101 void handle(OtherEvent &&event) {
102 downstream.handle(std::forward<OtherEvent>(event));
103 }
104
105 void flush() { downstream.flush(); }
106};
107
108template <std::size_t NStopChannels, typename DataTypes, typename Downstream>
109class pair_one {
110 static_assert(
111 processor<Downstream, std::array<detection_event<DataTypes>, 2>>);
112
113 typename DataTypes::channel_type start_chan;
114 std::array<typename DataTypes::channel_type, NStopChannels> stop_chans;
115 typename DataTypes::abstime_type window_size;
116
117 // Buffer all starts within time window, and mark stop channels that have
118 // been matched.
119 struct start_and_flags {
120 typename DataTypes::abstime_type time;
121 std::bitset<NStopChannels> stopped;
122 };
123 internal::vector_queue<start_and_flags> starts;
124
125 Downstream downstream;
126
127 void expel_old_starts(typename DataTypes::abstime_type earliest_stop) {
128 auto const cutoff = pairing_cutoff(earliest_stop, window_size);
129 while (not starts.empty() &&
130 (starts.front().time < cutoff || starts.front().stopped.all()))
131 starts.pop();
132 }
133
134 public:
135 explicit pair_one(
136 arg::start_channel<typename DataTypes::channel_type> start_channel,
137 std::array<typename DataTypes::channel_type, NStopChannels>
138 stop_channels,
139 arg::time_window<typename DataTypes::abstime_type> time_window,
140 Downstream downstream)
141 : start_chan(start_channel.value), stop_chans(stop_channels),
142 window_size(time_window.value), downstream(std::move(downstream)) {
143 if (window_size < 0)
144 throw std::invalid_argument(
145 "pair_one time_window must not be negative");
146 }
147
148 [[nodiscard]] auto introspect_node() const -> processor_info {
149 return processor_info(this, "pair_one");
150 }
151
152 [[nodiscard]] auto introspect_graph() const -> processor_graph {
153 return downstream.introspect_graph().push_entry_point(this);
154 }
155
156 template <typename DT> void handle(detection_event<DT> const &event) {
157 static_assert(std::is_same_v<typename DT::abstime_type,
158 typename DataTypes::abstime_type>);
159 static_assert(std::is_same_v<typename DT::channel_type,
160 typename DataTypes::channel_type>);
161
162 expel_old_starts(event.abstime);
163 auto const chan_index = static_cast<std::size_t>(std::distance(
164 stop_chans.cbegin(),
165 std::find(stop_chans.cbegin(), stop_chans.cend(), event.channel)));
166 if (chan_index < NStopChannels) {
167 starts.for_each([&](start_and_flags &sf) {
168 if (not sf.stopped[chan_index]) {
169 downstream.handle(
170 std::array<detection_event<DataTypes>, 2>{
171 {{sf.time, start_chan}, event}});
172 sf.stopped[chan_index] = true;
173 }
174 });
175 }
176 if (event.channel == start_chan)
177 starts.push(start_and_flags{event.abstime, {}});
178 downstream.handle(event);
179 }
180
181 // NOLINTNEXTLINE(cppcoreguidelines-rvalue-reference-param-not-moved)
182 template <typename DT> void handle(detection_event<DT> &&event) {
183 handle(static_cast<detection_event<DT> const &>(event));
184 }
185
186 template <typename OtherEvent>
187 requires handler_for<Downstream, std::remove_cvref_t<OtherEvent>>
188 void handle(OtherEvent &&event) {
189 downstream.handle(std::forward<OtherEvent>(event));
190 }
191
192 void flush() { downstream.flush(); }
193};
194
195template <std::size_t NStopChannels, typename DataTypes, typename Downstream>
196class pair_all_between {
197 static_assert(
198 processor<Downstream, std::array<detection_event<DataTypes>, 2>>);
199
200 typename DataTypes::channel_type start_chan;
201 std::array<typename DataTypes::channel_type, NStopChannels> stop_chans;
202 typename DataTypes::abstime_type window_size;
203
204 // Buffer the most recent start within the time window.
205 std::optional<typename DataTypes::abstime_type> start;
206
207 Downstream downstream;
208
209 void expel_old_start(typename DataTypes::abstime_type earliest_stop) {
210 auto const cutoff = pairing_cutoff(earliest_stop, window_size);
211 if (start.has_value() && *start < cutoff)
212 start = std::nullopt;
213 }
214
215 public:
216 explicit pair_all_between(
217 arg::start_channel<typename DataTypes::channel_type> start_channel,
218 std::array<typename DataTypes::channel_type, NStopChannels>
219 stop_channels,
220 arg::time_window<typename DataTypes::abstime_type> time_window,
221 Downstream downstream)
222 : start_chan(start_channel.value), stop_chans(stop_channels),
223 window_size(time_window.value), downstream(std::move(downstream)) {
224 if (window_size < 0)
225 throw std::invalid_argument(
226 "pair_all_between time_window must not be negative");
227 }
228
229 [[nodiscard]] auto introspect_node() const -> processor_info {
230 return processor_info(this, "pair_all_between");
231 }
232
233 [[nodiscard]] auto introspect_graph() const -> processor_graph {
234 return downstream.introspect_graph().push_entry_point(this);
235 }
236
237 template <typename DT> void handle(detection_event<DT> const &event) {
238 static_assert(std::is_same_v<typename DT::abstime_type,
239 typename DataTypes::abstime_type>);
240 static_assert(std::is_same_v<typename DT::channel_type,
241 typename DataTypes::channel_type>);
242
243 expel_old_start(event.abstime);
244 if (start.has_value()) {
245 auto chan_index =
246 std::distance(stop_chans.cbegin(),
247 std::find(stop_chans.cbegin(), stop_chans.cend(),
248 event.channel));
249 if (std::size_t(chan_index) < NStopChannels) {
250 downstream.handle(std::array<detection_event<DataTypes>, 2>{
251 {{*start, start_chan}, event}});
252 }
253 }
254 if (event.channel == start_chan)
255 start = event.abstime;
256 downstream.handle(event);
257 }
258
259 // NOLINTNEXTLINE(cppcoreguidelines-rvalue-reference-param-not-moved)
260 template <typename DT> void handle(detection_event<DT> &&event) {
261 handle(static_cast<detection_event<DT> const &>(event));
262 }
263
264 template <typename OtherEvent>
265 requires handler_for<Downstream, std::remove_cvref_t<OtherEvent>>
266 void handle(OtherEvent &&event) {
267 downstream.handle(std::forward<OtherEvent>(event));
268 }
269
270 void flush() { downstream.flush(); }
271};
272
273template <std::size_t NStopChannels, typename DataTypes, typename Downstream>
274class pair_one_between {
275 static_assert(
276 processor<Downstream, std::array<detection_event<DataTypes>, 2>>);
277
278 typename DataTypes::channel_type start_chan;
279 std::array<typename DataTypes::channel_type, NStopChannels> stop_chans;
280 typename DataTypes::abstime_type window_size;
281
282 // Buffer the most recent start within the time window, and mark stop
283 // channels that have been matched.
284 struct start_and_flags {
285 typename DataTypes::abstime_type time;
286 std::bitset<NStopChannels> stopped;
287 };
288 std::optional<start_and_flags> start;
289
290 Downstream downstream;
291
292 void expel_old_start(typename DataTypes::abstime_type earliest_stop) {
293 auto const cutoff = pairing_cutoff(earliest_stop, window_size);
294 if (start.has_value() &&
295 (start->time < cutoff || start->stopped.all()))
296 start = std::nullopt;
297 }
298
299 public:
300 explicit pair_one_between(
301 arg::start_channel<typename DataTypes::channel_type> start_channel,
302 std::array<typename DataTypes::channel_type, NStopChannels>
303 stop_channels,
304 arg::time_window<typename DataTypes::abstime_type> time_window,
305 Downstream downstream)
306 : start_chan(start_channel.value), stop_chans(stop_channels),
307 window_size(time_window.value), downstream(std::move(downstream)) {
308 if (window_size < 0)
309 throw std::invalid_argument(
310 "pair_one_between time_window must not be negative");
311 }
312
313 [[nodiscard]] auto introspect_node() const -> processor_info {
314 return processor_info(this, "pair_one_between");
315 }
316
317 [[nodiscard]] auto introspect_graph() const -> processor_graph {
318 return downstream.introspect_graph().push_entry_point(this);
319 }
320
321 template <typename DT> void handle(detection_event<DT> const &event) {
322 static_assert(std::is_same_v<typename DT::abstime_type,
323 typename DataTypes::abstime_type>);
324 static_assert(std::is_same_v<typename DT::channel_type,
325 typename DataTypes::channel_type>);
326
327 expel_old_start(event.abstime);
328 if (start.has_value()) {
329 auto const chan_index = static_cast<std::size_t>(
330 std::distance(stop_chans.cbegin(),
331 std::find(stop_chans.cbegin(), stop_chans.cend(),
332 event.channel)));
333 if (chan_index < NStopChannels && not start->stopped[chan_index]) {
334 downstream.handle(std::array<detection_event<DataTypes>, 2>{
335 {{start->time, start_chan}, event}});
336 start->stopped[chan_index] = true;
337 }
338 }
339 if (event.channel == start_chan)
340 start = start_and_flags{event.abstime, {}};
341 downstream.handle(event);
342 }
343
344 // NOLINTNEXTLINE(cppcoreguidelines-rvalue-reference-param-not-moved)
345 template <typename DT> void handle(detection_event<DT> &&event) {
346 handle(static_cast<detection_event<DT> const &>(event));
347 }
348
349 template <typename OtherEvent>
350 requires handler_for<Downstream, std::remove_cvref_t<OtherEvent>>
351 void handle(OtherEvent &&event) {
352 downstream.handle(std::forward<OtherEvent>(event));
353 }
354
355 void flush() { downstream.flush(); }
356};
357
358} // namespace internal
359
398template <std::size_t NStopChannels, typename DataTypes = default_data_types,
399 typename Downstream>
402 std::array<typename DataTypes::channel_type, NStopChannels> stop_channels,
404 Downstream downstream) {
405 return internal::pair_all<NStopChannels, DataTypes, Downstream>(
406 start_channel, stop_channels, time_window, std::move(downstream));
407}
408
449template <std::size_t NStopChannels, typename DataTypes = default_data_types,
450 typename Downstream>
453 std::array<typename DataTypes::channel_type, NStopChannels> stop_channels,
455 Downstream downstream) {
456 return internal::pair_one<NStopChannels, DataTypes, Downstream>(
457 start_channel, stop_channels, time_window, std::move(downstream));
458}
459
499template <std::size_t NStopChannels, typename DataTypes = default_data_types,
500 typename Downstream>
503 std::array<typename DataTypes::channel_type, NStopChannels> stop_channels,
505 Downstream downstream) {
506 return internal::pair_all_between<NStopChannels, DataTypes, Downstream>(
507 start_channel, stop_channels, time_window, std::move(downstream));
508}
509
550template <std::size_t NStopChannels, typename DataTypes = default_data_types,
551 typename Downstream>
554 std::array<typename DataTypes::channel_type, NStopChannels> stop_channels,
556 Downstream downstream) {
557 return internal::pair_one_between<NStopChannels, DataTypes, Downstream>(
558 start_channel, stop_channels, time_window, std::move(downstream));
559}
560
561} // namespace tcspc
auto pair_all_between(arg::start_channel< typename DataTypes::channel_type > start_channel, std::array< typename DataTypes::channel_type, NStopChannels > stop_channels, arg::time_window< typename DataTypes::abstime_type > time_window, Downstream downstream)
Create a processor that generates ordered pairs of detection events within a time window,...
Definition pair.hpp:501
auto pair_one(arg::start_channel< typename DataTypes::channel_type > start_channel, std::array< typename DataTypes::channel_type, NStopChannels > stop_channels, arg::time_window< typename DataTypes::abstime_type > time_window, Downstream downstream)
Create a processor that generates ordered pairs of detection events within a time window,...
Definition pair.hpp:451
auto pair_all(arg::start_channel< typename DataTypes::channel_type > start_channel, std::array< typename DataTypes::channel_type, NStopChannels > stop_channels, arg::time_window< typename DataTypes::abstime_type > time_window, Downstream downstream)
Create a processor that generates all ordered pairs of detection events within a time window.
Definition pair.hpp:400
auto pair_one_between(arg::start_channel< typename DataTypes::channel_type > start_channel, std::array< typename DataTypes::channel_type, NStopChannels > stop_channels, arg::time_window< typename DataTypes::abstime_type > time_window, Downstream downstream)
Create a processor that generates ordered pairs of detection events within a time window,...
Definition pair.hpp:552
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for start channel parameter.
Definition arg_wrappers.hpp:367
Function argument wrapper for time window parameter.
Definition arg_wrappers.hpp:417
The default data type set.
Definition data_types.hpp:24