37 std::function<void()> halt_fn;
41 template <
typename Func>
42 explicit acquire_access(Func halt_func) : halt_fn(halt_func) {}
68constexpr auto slow_acq_sleep = std::chrono::milliseconds(10);
70template <
typename T,
typename Reader,
typename Downstream>
class acquire {
71 static_assert(processor<Downstream, bucket<T>>);
74 std::shared_ptr<bucket_source<T>> bsource;
77 std::mutex halt_mutex;
78 std::condition_variable halt_cv;
81 Downstream downstream;
84 access_tracker<acquire_access> trk;
88 auto const lock = std::lock_guard(halt_mutex);
95 explicit acquire(Reader reader,
96 std::shared_ptr<bucket_source<T>> buffer_provider,
97 arg::batch_size<std::size_t> batch_size,
98 access_tracker<acquire_access> tracker,
99 Downstream downstream)
100 : reader(std::move(reader)), bsource(std::move(buffer_provider)),
101 bsize(batch_size.value), downstream(std::move(downstream)),
102 trk(std::move(tracker)) {
104 throw std::invalid_argument(
105 "acquire buffer_provider must not be null");
107 throw std::invalid_argument(
"acquire batch size must be positive");
109 trk.register_access_factory([](
auto &tracker) {
111 return acquire_access([self] { self->halt(); });
116 ~acquire() =
default;
118 acquire(acquire
const &) =
delete;
119 auto operator=(acquire
const &) =
delete;
121 acquire(acquire &&other) noexcept
122 : reader(std::move(other.reader)), bsource(std::move(other.bsource)),
123 bsize(other.bsize), halted(other.halted),
124 downstream(std::move(other.downstream)), trk(std::move(other.trk)) {}
126 auto operator=(acquire &&rhs)
noexcept -> acquire & {
127 reader = std::move(rhs.reader);
128 bsource = std::move(rhs.bsource);
131 downstream = std::move(rhs.downstream);
132 trk = std::move(rhs.trk);
136 [[nodiscard]]
auto introspect_node() const -> processor_info {
137 return processor_info(
this,
"acquire");
140 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
141 return downstream.introspect_graph().push_entry_point(
this);
146 bool reached_end =
false;
148 auto lock = std::unique_lock(halt_mutex);
151 auto const start_time = std::chrono::steady_clock::now();
153 b = bsource->bucket_of_size(bsize);
154 std::optional<std::size_t>
const read = reader(std::span(b));
161 downstream.handle(std::move(b));
166 halt_cv.wait_until(lock, start_time + slow_acq_sleep,
167 [&] {
return halted; });
174 throw acquisition_halted();
178template <
typename T,
typename Reader,
typename LiveDownstream,
179 typename BatchDownstream>
180class acquire_full_buckets {
181 static_assert(processor<LiveDownstream, bucket<T const>>);
182 static_assert(processor<BatchDownstream, bucket<T>>);
185 std::shared_ptr<bucket_source<T>> bsource;
188 std::mutex halt_mutex;
189 std::condition_variable halt_cv;
192 LiveDownstream live_downstream;
193 BatchDownstream batch_downstream;
196 access_tracker<acquire_access> trk;
200 auto const lock = std::lock_guard(halt_mutex);
203 halt_cv.notify_one();
207 void emit_live(bucket<T> &b, std::size_t start, std::size_t
count) {
210 auto v = bsource->shared_view_of(b);
211 v.shrink(start,
count);
212 live_downstream.handle(std::move(v));
213 }
catch (end_of_processing
const &) {
214 b.shrink(0, start +
count);
215 batch_downstream.handle(std::move(b));
216 batch_downstream.flush();
222 void emit_batch(bucket<T> &&b) {
224 batch_downstream.handle(std::move(b));
225 }
catch (end_of_processing
const &) {
226 live_downstream.flush();
231 void flush_downstreams(bucket<T> &&b, std::size_t filled) {
232 std::exception_ptr end;
234 live_downstream.flush();
235 }
catch (end_of_processing
const &) {
236 end = std::current_exception();
238 if (not b.empty() && filled > 0) {
240 batch_downstream.handle(std::move(b));
242 batch_downstream.flush();
244 std::rethrow_exception(end);
248 explicit acquire_full_buckets(
249 Reader reader, std::shared_ptr<bucket_source<T>> buffer_provider,
250 arg::batch_size<std::size_t> batch_size,
251 access_tracker<acquire_access> tracker, LiveDownstream live_downstream,
252 BatchDownstream batch_downstream)
253 : reader(std::move(reader)), bsource(std::move(buffer_provider)),
254 bsize(batch_size.value), live_downstream(std::move(live_downstream)),
255 batch_downstream(std::move(batch_downstream)),
256 trk(std::move(tracker)) {
258 throw std::invalid_argument(
259 "acquire_full_buckets buffer_provider must not be null");
260 if constexpr (not std::is_same_v<LiveDownstream, null_sink>) {
261 if (not bsource->supports_shared_views())
262 throw std::invalid_argument(
263 "acquire_full_buckets buffer_provider must support shared views");
266 throw std::invalid_argument(
267 "acquire_full_buckets batch size must be positive");
269 trk.register_access_factory([](
auto &tracker) {
272 return acquire_access([self] { self->halt(); });
277 ~acquire_full_buckets() =
default;
279 acquire_full_buckets(acquire_full_buckets
const &) =
delete;
280 auto operator=(acquire_full_buckets
const &) =
delete;
282 acquire_full_buckets(acquire_full_buckets &&other) noexcept
283 : reader(std::move(other.reader)), bsource(std::move(other.bsource)),
284 bsize(other.bsize), halted(other.halted),
285 live_downstream(std::move(other.live_downstream)),
286 batch_downstream(std::move(other.batch_downstream)),
287 trk(std::move(other.trk)) {}
289 auto operator=(acquire_full_buckets &&rhs)
noexcept
290 -> acquire_full_buckets & {
291 reader = std::move(rhs.reader);
292 bsource = std::move(rhs.bsource);
295 live_downstream = std::move(rhs.live_downstream);
296 batch_downstream = std::move(rhs.batch_downstream);
297 trk = std::move(rhs.trk);
301 [[nodiscard]]
auto introspect_node() const -> processor_info {
302 return processor_info(
this,
"acquire_full_buckets");
305 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
307 live_downstream.introspect_graph().push_entry_point(
this),
308 batch_downstream.introspect_graph().push_entry_point(
this));
313 std::size_t filled = 0;
315 auto lock = std::unique_lock(halt_mutex);
318 auto const start_time = std::chrono::steady_clock::now();
320 b = bsource->bucket_of_size(bsize);
323 auto const unfilled = std::span(b).subspan(filled);
324 std::optional<std::size_t>
const read = reader(unfilled);
326 return flush_downstreams(std::move(b), filled);
327 if constexpr (not std::is_same_v<LiveDownstream, null_sink>)
328 emit_live(b, filled, *read);
330 if (filled == bsize) {
331 emit_batch(std::move(b));
335 if (filled < bsize) {
336 halt_cv.wait_until(lock, start_time + slow_acq_sleep,
337 [&] {
return halted; });
341 throw acquisition_halted();
385template <
typename T,
typename Reader,
typename Downstream>
389 return internal::acquire<T, Reader, Downstream>(
390 std::move(reader), std::move(buffer_provider), batch_size,
391 std::move(tracker), std::move(downstream));
450template <
typename T,
typename Reader,
typename LiveDownstream,
451 typename BatchDownstream>
456 LiveDownstream live_downstream,
457 BatchDownstream batch_downstream) {
458 return internal::acquire_full_buckets<T, Reader, LiveDownstream,
460 std::move(reader), std::move(buffer_provider), batch_size,
461 std::move(tracker), std::move(live_downstream),
462 std::move(batch_downstream));
472 auto operator()(std::span<T> ) -> std::optional<std::size_t> {
484 auto operator()(std::span<T> ) -> std::optional<std::size_t> {
#define LIBTCSPC_OBJECT_FROM_TRACKER(obj_type, tracker_field_name, tracker)
Recover the object address from a tcspc::access_tracker embedded in the object.
Definition context.hpp:253