libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
regulate_time_reached.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "arg_wrappers.hpp"
10#include "common.hpp"
11#include "data_types.hpp"
12#include "int_arith.hpp"
13#include "introspect.hpp"
14#include "processor.hpp"
15#include "time_tagged_events.hpp"
16
17#include <cstddef>
18#include <limits>
19#include <type_traits>
20#include <utility>
21
22namespace tcspc {
23
24namespace internal {
25
26template <typename DataTypes, typename Downstream>
27class regulate_time_reached {
28 static_assert(processor<Downstream, time_reached_event<DataTypes>>);
29
30 using abstime_type = typename DataTypes::abstime_type;
31
32 abstime_type interval_thresh;
33 std::size_t count_thresh;
34
35 abstime_type exact_reached = std::numeric_limits<abstime_type>::min();
36 abstime_type next_time_thresh = std::numeric_limits<abstime_type>::min();
37 std::size_t emitted_since_prev_time_reached = 0;
38 std::size_t seen_since_prev_time_reached = 0;
39
40 Downstream downstream;
41
42 // Called for all upstream times seen.
43 void handle_time_reached(abstime_type abstime) {
44 ++seen_since_prev_time_reached;
45 if (abstime >= next_time_thresh ||
46 emitted_since_prev_time_reached >= count_thresh) {
47 downstream.handle(time_reached_event<DataTypes>{abstime});
48 next_time_thresh = add_sat(abstime, interval_thresh);
49 emitted_since_prev_time_reached = 0;
50 seen_since_prev_time_reached = 0;
51 }
52 exact_reached = abstime;
53 }
54
55 public:
56 explicit regulate_time_reached(
57 arg::interval_threshold<abstime_type> interval_threshold,
58 arg::count_threshold<std::size_t> count_threshold,
59 Downstream downstream)
60 : interval_thresh(interval_threshold.value),
61 count_thresh(count_threshold.value),
62 downstream(std::move(downstream)) {}
63
64 [[nodiscard]] auto introspect_node() const -> processor_info {
65 return processor_info(this, "regulate_time_reached");
66 }
67
68 [[nodiscard]] auto introspect_graph() const -> processor_graph {
69 return downstream.introspect_graph().push_entry_point(this);
70 }
71
72 template <typename DT> void handle(time_reached_event<DT> const &event) {
73 static_assert(std::is_same_v<typename DT::abstime_type, abstime_type>);
74 handle_time_reached(event.abstime);
75 }
76
77 // NOLINTNEXTLINE(cppcoreguidelines-rvalue-reference-param-not-moved)
78 template <typename DT> void handle(time_reached_event<DT> &&event) {
79 handle(static_cast<time_reached_event<DT> const &>(event));
80 }
81
82 template <typename OtherEvent>
83 requires handler_for<Downstream, std::remove_cvref_t<OtherEvent>>
84 void handle(OtherEvent &&event) {
85 static_assert(std::is_same_v<decltype(event.abstime), abstime_type>);
86 auto const abstime = event.abstime;
87 downstream.handle(std::forward<OtherEvent>(event));
88 ++emitted_since_prev_time_reached;
89 handle_time_reached(abstime);
90 }
91
92 void flush() {
93 // Emit time-reached for last seen event in order to convey the (best
94 // known) stream end time on all downstream paths.
95 // Only do so if we received at least one event and the last emitted
96 // was something other than time-reached.
97 if (exact_reached > std::numeric_limits<abstime_type>::min() &&
98 seen_since_prev_time_reached > 0)
99 downstream.handle(time_reached_event<DataTypes>{exact_reached});
100 downstream.flush();
101 }
102};
103
104} // namespace internal
105
168template <typename DataTypes = default_data_types, typename Downstream>
171 interval_threshold,
172 arg::count_threshold<std::size_t> count_threshold, Downstream downstream) {
173 return internal::regulate_time_reached<DataTypes, Downstream>(
174 interval_threshold, count_threshold, std::move(downstream));
175}
176
177} // namespace tcspc
auto regulate_time_reached(arg::interval_threshold< typename DataTypes::abstime_type > interval_threshold, arg::count_threshold< std::size_t > count_threshold, Downstream downstream)
Create a processor that regulates the frequency of time-reached events.
Definition regulate_time_reached.hpp:169
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for count threshold parameter.
Definition arg_wrappers.hpp:107
Function argument wrapper for interval threshold parameter.
Definition arg_wrappers.hpp:187