libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
copy_to_buckets.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "arg_wrappers.hpp"
10#include "bucket.hpp"
11#include "common.hpp"
12#include "core.hpp"
13#include "errors.hpp"
14#include "introspect.hpp"
15#include "processor.hpp"
16
17#include <algorithm>
18#include <cstddef>
19#include <exception>
20#include <memory>
21#include <span>
22#include <stdexcept>
23#include <type_traits>
24#include <utility>
25
26namespace tcspc {
27
28namespace internal {
29
30template <typename T, typename Downstream> class copy_to_buckets {
31 static_assert(processor<Downstream, bucket<T>>);
32
33 std::shared_ptr<bucket_source<T>> bsource;
34
35 Downstream downstream;
36
37 public:
38 explicit copy_to_buckets(std::shared_ptr<bucket_source<T>> buffer_provider,
39 Downstream downstream)
40 : bsource(std::move(buffer_provider)),
41 downstream(std::move(downstream)) {
42 if (not bsource)
43 throw std::invalid_argument(
44 "copy_to_buckets buffer_provider must not be null");
45 }
46
47 [[nodiscard]] auto introspect_node() const -> processor_info {
48 return processor_info(this, "copy_to_buckets");
49 }
50
51 [[nodiscard]] auto introspect_graph() const -> processor_graph {
52 return downstream.introspect_graph().push_entry_point(this);
53 }
54
55 template <typename Event>
56 requires std::is_constructible_v<std::span<T const>,
57 std::remove_cvref_t<Event>>
58 void handle(Event &&event) {
59 auto const event_span = std::span<T const>(event);
60 auto b = bsource->bucket_of_size(event_span.size());
61 std::copy(event_span.begin(), event_span.end(), b.begin());
62 downstream.handle(std::move(b));
63 }
64
65 template <typename Event>
66 requires(not std::is_constructible_v<std::span<T const>,
67 std::remove_cvref_t<Event>> and
68 handler_for<Downstream, std::remove_cvref_t<Event>>)
69 void handle(Event &&event) {
70 downstream.handle(std::forward<Event>(event));
71 }
72
73 void flush() { downstream.flush(); }
74};
75
76template <typename T, typename LiveDownstream, typename BatchDownstream>
77class copy_to_full_buckets {
78 static_assert(processor<LiveDownstream, bucket<T const>>);
79 static_assert(processor<BatchDownstream, bucket<T>>);
80
81 std::shared_ptr<bucket_source<T>> bsource;
82 std::size_t bsize;
83
84 bucket<T> bkt;
85 std::size_t filled = 0;
86
87 LiveDownstream live_downstream;
88 BatchDownstream batch_downstream;
89
90 // Mutates 'b' only when throwing.
91 void emit_live(bucket<T> &b, std::size_t start, std::size_t count) {
92 if (count > 0) {
93 try {
94 auto v = bsource->shared_view_of(b);
95 v.shrink(start, count);
96 live_downstream.handle(std::move(v));
97 } catch (end_of_processing const &) {
98 b.shrink(0, start + count);
99 batch_downstream.handle(std::move(b));
100 batch_downstream.flush();
101 throw;
102 }
103 }
104 }
105
106 void emit_batch(bucket<T> &&b) {
107 try {
108 batch_downstream.handle(std::move(b));
109 } catch (end_of_processing const &) {
110 live_downstream.flush();
111 throw;
112 }
113 }
114
115 void flush_batch() {
116 if (not bkt.empty() && filled > 0) {
117 bkt.shrink(0, filled);
118 batch_downstream.handle(std::move(bkt));
119 }
120 batch_downstream.flush();
121 }
122
123 public:
124 explicit copy_to_full_buckets(
125 std::shared_ptr<bucket_source<T>> buffer_provider,
126 arg::batch_size<std::size_t> batch_size,
127 LiveDownstream live_downstream, BatchDownstream batch_downstream)
128 : bsource(std::move(buffer_provider)), bsize(batch_size.value),
129 live_downstream(std::move(live_downstream)),
130 batch_downstream(std::move(batch_downstream)) {
131 if (not bsource)
132 throw std::invalid_argument(
133 "copy_to_full_buckets buffer_provider must not be null");
134 if constexpr (not std::is_same_v<LiveDownstream, null_sink>) {
135 if (not bsource->supports_shared_views())
136 throw std::invalid_argument(
137 "copy_to_full_buckets buffer_provider must support shared views");
138 }
139 if (bsize == 0)
140 throw std::invalid_argument(
141 "copy_to_full_buckets batch size must be positive");
142 }
143
144 [[nodiscard]] auto introspect_node() const -> processor_info {
145 return processor_info(this, "copy_to_full_buckets");
146 }
147
148 [[nodiscard]] auto introspect_graph() const -> processor_graph {
150 live_downstream.introspect_graph().push_entry_point(this),
151 batch_downstream.introspect_graph().push_entry_point(this));
152 }
153
154 template <typename Event>
155 requires std::is_constructible_v<std::span<T const>,
156 std::remove_cvref_t<Event>>
157 void handle(Event &&event) {
158 auto src = std::span<T const>(event);
159 while (not src.empty()) {
160 if (filled == 0 && bkt.empty())
161 bkt = bsource->bucket_of_size(bsize);
162 auto const dest = std::span(bkt).subspan(filled);
163 auto const copy_size = std::min(src.size(), dest.size());
164 std::copy_n(src.begin(), copy_size, dest.begin());
165 if constexpr (not std::is_same_v<LiveDownstream, null_sink>)
166 emit_live(bkt, filled, copy_size);
167 filled += copy_size;
168 if (filled == bsize) {
169 emit_batch(std::move(bkt));
170 bkt = {};
171 filled = 0;
172 }
173 src = src.subspan(copy_size);
174 }
175 }
176
177 template <typename Event>
178 requires(not std::is_constructible_v<std::span<T const>,
179 std::remove_cvref_t<Event>> and
180 handler_for<LiveDownstream, std::remove_cvref_t<Event>>)
181 void handle(Event &&event) {
182 try {
183 live_downstream.handle(std::forward<Event>(event));
184 } catch (end_of_processing const &) {
185 flush_batch();
186 throw;
187 }
188 }
189
190 void flush() {
191 std::exception_ptr end;
192 try {
193 live_downstream.flush();
194 } catch (end_of_processing const &) {
195 end = std::current_exception();
196 }
197 flush_batch();
198 if (end)
199 std::rethrow_exception(end);
200 }
201};
202
203} // namespace internal
204
237template <typename T, typename Downstream>
238auto copy_to_buckets(std::shared_ptr<bucket_source<T>> buffer_provider,
239 Downstream downstream) {
240 return internal::copy_to_buckets<T, Downstream>(std::move(buffer_provider),
241 std::move(downstream));
242}
243
302template <typename T, typename LiveDownstream, typename BatchDownstream>
303auto copy_to_full_buckets(std::shared_ptr<bucket_source<T>> buffer_provider,
305 LiveDownstream live_downstream,
306 BatchDownstream batch_downstream) {
307 return internal::copy_to_full_buckets<T, LiveDownstream, BatchDownstream>(
308 std::move(buffer_provider), batch_size, std::move(live_downstream),
309 std::move(batch_downstream));
310}
311
312}; // namespace tcspc
auto merge_processor_graphs(processor_graph const &a, processor_graph const &b) -> processor_graph
Create a new processor graph by merging two existing ones.
Definition introspect.hpp:357
auto copy_to_full_buckets(std::shared_ptr< bucket_source< T > > buffer_provider, arg::batch_size< std::size_t > batch_size, LiveDownstream live_downstream, BatchDownstream batch_downstream)
Create a processor that copies data into buckets, ensuring that each bucket is filled to a fixed size...
Definition copy_to_buckets.hpp:303
auto copy_to_buckets(std::shared_ptr< bucket_source< T > > buffer_provider, Downstream downstream)
Create a processor that copies batches of data into buckets.
Definition copy_to_buckets.hpp:238
auto count(access_tracker< count_access > &&tracker, Downstream downstream)
Create a processor that counts events of a given type.
Definition count.hpp:313
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for batch size parameter.
Definition arg_wrappers.hpp:47
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505