9#include "arg_wrappers.hpp"
14#include "introspect.hpp"
15#include "processor.hpp"
30template <
typename T,
typename Downstream>
class copy_to_buckets {
31 static_assert(processor<Downstream, bucket<T>>);
33 std::shared_ptr<bucket_source<T>> bsource;
35 Downstream downstream;
38 explicit copy_to_buckets(std::shared_ptr<bucket_source<T>> buffer_provider,
39 Downstream downstream)
40 : bsource(std::move(buffer_provider)),
41 downstream(std::move(downstream)) {
43 throw std::invalid_argument(
44 "copy_to_buckets buffer_provider must not be null");
47 [[nodiscard]]
auto introspect_node() const -> processor_info {
48 return processor_info(
this,
"copy_to_buckets");
51 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
52 return downstream.introspect_graph().push_entry_point(
this);
55 template <
typename Event>
56 requires std::is_constructible_v<std::span<T const>,
57 std::remove_cvref_t<Event>>
58 void handle(Event &&event) {
59 auto const event_span = std::span<T const>(event);
60 auto b = bsource->bucket_of_size(event_span.size());
61 std::copy(event_span.begin(), event_span.end(), b.begin());
62 downstream.handle(std::move(b));
65 template <
typename Event>
66 requires(not std::is_constructible_v<std::span<T const>,
67 std::remove_cvref_t<Event>> and
68 handler_for<Downstream, std::remove_cvref_t<Event>>)
69 void handle(Event &&event) {
70 downstream.handle(std::forward<Event>(event));
73 void flush() { downstream.flush(); }
76template <
typename T,
typename LiveDownstream,
typename BatchDownstream>
77class copy_to_full_buckets {
78 static_assert(processor<LiveDownstream, bucket<T const>>);
79 static_assert(processor<BatchDownstream, bucket<T>>);
81 std::shared_ptr<bucket_source<T>> bsource;
85 std::size_t filled = 0;
87 LiveDownstream live_downstream;
88 BatchDownstream batch_downstream;
91 void emit_live(bucket<T> &b, std::size_t start, std::size_t
count) {
94 auto v = bsource->shared_view_of(b);
95 v.shrink(start,
count);
96 live_downstream.handle(std::move(v));
97 }
catch (end_of_processing
const &) {
98 b.shrink(0, start +
count);
99 batch_downstream.handle(std::move(b));
100 batch_downstream.flush();
106 void emit_batch(bucket<T> &&b) {
108 batch_downstream.handle(std::move(b));
109 }
catch (end_of_processing
const &) {
110 live_downstream.flush();
116 if (not bkt.empty() && filled > 0) {
117 bkt.shrink(0, filled);
118 batch_downstream.handle(std::move(bkt));
120 batch_downstream.flush();
124 explicit copy_to_full_buckets(
125 std::shared_ptr<bucket_source<T>> buffer_provider,
126 arg::batch_size<std::size_t> batch_size,
127 LiveDownstream live_downstream, BatchDownstream batch_downstream)
128 : bsource(std::move(buffer_provider)), bsize(batch_size.value),
129 live_downstream(std::move(live_downstream)),
130 batch_downstream(std::move(batch_downstream)) {
132 throw std::invalid_argument(
133 "copy_to_full_buckets buffer_provider must not be null");
134 if constexpr (not std::is_same_v<LiveDownstream, null_sink>) {
135 if (not bsource->supports_shared_views())
136 throw std::invalid_argument(
137 "copy_to_full_buckets buffer_provider must support shared views");
140 throw std::invalid_argument(
141 "copy_to_full_buckets batch size must be positive");
144 [[nodiscard]]
auto introspect_node() const -> processor_info {
145 return processor_info(
this,
"copy_to_full_buckets");
148 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
150 live_downstream.introspect_graph().push_entry_point(
this),
151 batch_downstream.introspect_graph().push_entry_point(
this));
154 template <
typename Event>
155 requires std::is_constructible_v<std::span<T const>,
156 std::remove_cvref_t<Event>>
157 void handle(Event &&event) {
158 auto src = std::span<T const>(event);
159 while (not src.empty()) {
160 if (filled == 0 && bkt.empty())
161 bkt = bsource->bucket_of_size(bsize);
162 auto const dest = std::span(bkt).subspan(filled);
163 auto const copy_size = std::min(src.size(), dest.size());
164 std::copy_n(src.begin(), copy_size, dest.begin());
165 if constexpr (not std::is_same_v<LiveDownstream, null_sink>)
166 emit_live(bkt, filled, copy_size);
168 if (filled == bsize) {
169 emit_batch(std::move(bkt));
173 src = src.subspan(copy_size);
177 template <
typename Event>
178 requires(not std::is_constructible_v<std::span<T const>,
179 std::remove_cvref_t<Event>> and
180 handler_for<LiveDownstream, std::remove_cvref_t<Event>>)
181 void handle(Event &&event) {
183 live_downstream.handle(std::forward<Event>(event));
184 }
catch (end_of_processing
const &) {
191 std::exception_ptr end;
193 live_downstream.flush();
194 }
catch (end_of_processing
const &) {
195 end = std::current_exception();
199 std::rethrow_exception(end);
237template <
typename T,
typename Downstream>
239 Downstream downstream) {
240 return internal::copy_to_buckets<T, Downstream>(std::move(buffer_provider),
241 std::move(downstream));
302template <
typename T,
typename LiveDownstream,
typename BatchDownstream>
305 LiveDownstream live_downstream,
306 BatchDownstream batch_downstream) {
307 return internal::copy_to_full_buckets<T, LiveDownstream, BatchDownstream>(
308 std::move(buffer_provider), batch_size, std::move(live_downstream),
309 std::move(batch_downstream));
auto merge_processor_graphs(processor_graph const &a, processor_graph const &b) -> processor_graph
Create a new processor graph by merging two existing ones.
Definition introspect.hpp:357
auto copy_to_full_buckets(std::shared_ptr< bucket_source< T > > buffer_provider, arg::batch_size< std::size_t > batch_size, LiveDownstream live_downstream, BatchDownstream batch_downstream)
Create a processor that copies data into buckets, ensuring that each bucket is filled to a fixed size...
Definition copy_to_buckets.hpp:303
auto copy_to_buckets(std::shared_ptr< bucket_source< T > > buffer_provider, Downstream downstream)
Create a processor that copies batches of data into buckets.
Definition copy_to_buckets.hpp:238
auto count(access_tracker< count_access > &&tracker, Downstream downstream)
Create a processor that counts events of a given type.
Definition count.hpp:313
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for batch size parameter.
Definition arg_wrappers.hpp:47
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505