9#include "arg_wrappers.hpp"
12#include "introspect.hpp"
13#include "processor.hpp"
26template <
typename Event,
typename Downstream>
class batch {
27 static_assert(processor<Downstream, bucket<Event>>);
29 std::shared_ptr<bucket_source<Event>> bsource;
32 bucket<Event> cur_bucket;
33 std::size_t n_filled = 0;
35 Downstream downstream;
38 explicit batch(std::shared_ptr<bucket_source<Event>> buffer_provider,
39 arg::batch_size<std::size_t> batch_size,
40 Downstream downstream)
41 : bsource(std::move(buffer_provider)), bsize(batch_size.value),
42 downstream(std::move(downstream)) {
44 throw std::invalid_argument(
45 "batch processor batch_size must not be zero");
48 [[nodiscard]]
auto introspect_node() const -> processor_info {
49 return processor_info(
this,
"batch");
52 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
53 return downstream.introspect_graph().push_entry_point(
this);
57 requires std::convertible_to<std::remove_cvref_t<E>, Event>
58 void handle(E &&event) {
59 if (cur_bucket.empty())
60 cur_bucket = bsource->bucket_of_size(bsize);
62 cur_bucket[n_filled] = std::forward<E>(event);
65 if (n_filled == bsize) {
66 downstream.handle(std::move(cur_bucket));
74 cur_bucket.shrink(0, n_filled);
75 downstream.handle(std::move(cur_bucket));
81template <
typename ContainerEvent,
typename Downstream>
class unbatch {
82 using element_type =
typename std::iterator_traits<
83 decltype(std::declval<ContainerEvent>().end())>::value_type;
86 typename std::iterator_traits<
87 decltype(std::declval<ContainerEvent>().begin())>::value_type,
89 "ContainerEvent begin() and end() must return compatible iterators");
91 static_assert(processor<Downstream, element_type>);
93 Downstream downstream;
96 explicit unbatch(Downstream downstream)
97 : downstream(std::move(downstream)) {}
99 [[nodiscard]]
auto introspect_node() const -> processor_info {
100 return processor_info(
this,
"unbatch");
103 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
104 return downstream.introspect_graph().push_entry_point(
this);
115 template <
typename E>
116 requires std::convertible_to<std::remove_cvref_t<E>, ContainerEvent>
117 void handle(E &&event) {
118 if constexpr (std::is_lvalue_reference_v<E>) {
119 for (
auto const &e : event)
120 downstream.handle(e);
122 for (
auto &e : event)
123 downstream.handle(std::move(e));
127 template <
typename E>
129 not std::convertible_to<std::remove_cvref_t<E>, ContainerEvent> and
130 handler_for<Downstream, std::remove_cvref_t<E>>)
131 void handle(E &&event) {
132 downstream.handle(std::forward<E>(event));
135 void flush() { downstream.flush(); }
174template <
typename Event,
typename Downstream>
177 return internal::batch<Event, Downstream>(
178 std::move(buffer_provider), batch_size, std::move(downstream));
204template <
typename ContainerEvent,
typename Downstream>
206 return internal::unbatch<ContainerEvent, Downstream>(
207 std::move(downstream));
244template <
typename Event,
typename Downstream>
246 Downstream downstream) {
static auto create(arg::max_bucket_count<> max_bucket_count=arg::max_bucket_count{std::numeric_limits< std::size_t >::max()}, arg::max_recycled_size<> max_recycled_size=arg::max_recycled_size<>{ 0}) -> std::shared_ptr< bucket_source< T > >
Create an instance.
Definition bucket.hpp:754
auto unbatch(Downstream downstream)
Create a processor transforming batches of events to individual events.
Definition batch_unbatch.hpp:205
auto batch(std::shared_ptr< bucket_source< Event > > buffer_provider, arg::batch_size< std::size_t > batch_size, Downstream downstream)
Create a processor that batches events into buckets for buffering.
Definition batch_unbatch.hpp:175
auto process_in_batches(arg::batch_size< std::size_t > batch_size, Downstream downstream)
Create a processor that buffers events up to equally sized batches and passes them downstream in a ti...
Definition batch_unbatch.hpp:245
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for batch size parameter.
Definition arg_wrappers.hpp:47
Function argument wrapper for maximum bucket count.
Definition arg_wrappers.hpp:227
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505