9#include "arg_wrappers.hpp"
11#include "data_types.hpp"
12#include "int_arith.hpp"
13#include "introspect.hpp"
14#include "processor.hpp"
15#include "time_tagged_events.hpp"
26template <
typename DataTypes,
typename Downstream>
27class regulate_time_reached {
28 static_assert(processor<Downstream, time_reached_event<DataTypes>>);
30 using abstime_type =
typename DataTypes::abstime_type;
32 abstime_type interval_thresh;
33 std::size_t count_thresh;
35 abstime_type exact_reached = std::numeric_limits<abstime_type>::min();
36 abstime_type next_time_thresh = std::numeric_limits<abstime_type>::min();
37 std::size_t emitted_since_prev_time_reached = 0;
38 std::size_t seen_since_prev_time_reached = 0;
40 Downstream downstream;
43 void handle_time_reached(abstime_type abstime) {
44 ++seen_since_prev_time_reached;
45 if (abstime >= next_time_thresh ||
46 emitted_since_prev_time_reached >= count_thresh) {
47 downstream.handle(time_reached_event<DataTypes>{abstime});
48 next_time_thresh = add_sat(abstime, interval_thresh);
49 emitted_since_prev_time_reached = 0;
50 seen_since_prev_time_reached = 0;
52 exact_reached = abstime;
56 explicit regulate_time_reached(
57 arg::interval_threshold<abstime_type> interval_threshold,
58 arg::count_threshold<std::size_t> count_threshold,
59 Downstream downstream)
60 : interval_thresh(interval_threshold.value),
61 count_thresh(count_threshold.value),
62 downstream(std::move(downstream)) {}
64 [[nodiscard]]
auto introspect_node() const -> processor_info {
65 return processor_info(
this,
"regulate_time_reached");
68 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
69 return downstream.introspect_graph().push_entry_point(
this);
72 template <
typename DT>
void handle(time_reached_event<DT>
const &event) {
73 static_assert(std::is_same_v<typename DT::abstime_type, abstime_type>);
74 handle_time_reached(event.abstime);
78 template <
typename DT>
void handle(time_reached_event<DT> &&event) {
79 handle(
static_cast<time_reached_event<DT>
const &
>(event));
82 template <
typename OtherEvent>
83 requires handler_for<Downstream, std::remove_cvref_t<OtherEvent>>
84 void handle(OtherEvent &&event) {
85 static_assert(std::is_same_v<
decltype(
event.abstime), abstime_type>);
86 auto const abstime =
event.abstime;
87 downstream.handle(std::forward<OtherEvent>(event));
88 ++emitted_since_prev_time_reached;
89 handle_time_reached(abstime);
97 if (exact_reached > std::numeric_limits<abstime_type>::min() &&
98 seen_since_prev_time_reached > 0)
99 downstream.handle(time_reached_event<DataTypes>{exact_reached});
168template <
typename DataTypes = default_data_types,
typename Downstream>
173 return internal::regulate_time_reached<DataTypes, Downstream>(
174 interval_threshold, count_threshold, std::move(downstream));
auto regulate_time_reached(arg::interval_threshold< typename DataTypes::abstime_type > interval_threshold, arg::count_threshold< std::size_t > count_threshold, Downstream downstream)
Create a processor that regulates the frequency of time-reached events.
Definition regulate_time_reached.hpp:169
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for count threshold parameter.
Definition arg_wrappers.hpp:107
Function argument wrapper for interval threshold parameter.
Definition arg_wrappers.hpp:187