libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
batch_unbatch.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "arg_wrappers.hpp"
10#include "bucket.hpp"
11#include "common.hpp"
12#include "introspect.hpp"
13#include "processor.hpp"
14
15#include <cstddef>
16#include <iterator>
17#include <memory>
18#include <stdexcept>
19#include <type_traits>
20#include <utility>
21
22namespace tcspc {
23
24namespace internal {
25
26template <typename Event, typename Downstream> class batch {
27 static_assert(processor<Downstream, bucket<Event>>);
28
29 std::shared_ptr<bucket_source<Event>> bsource;
30 std::size_t bsize;
31
32 bucket<Event> cur_bucket;
33 std::size_t n_filled = 0;
34
35 Downstream downstream;
36
37 public:
38 explicit batch(std::shared_ptr<bucket_source<Event>> buffer_provider,
39 arg::batch_size<std::size_t> batch_size,
40 Downstream downstream)
41 : bsource(std::move(buffer_provider)), bsize(batch_size.value),
42 downstream(std::move(downstream)) {
43 if (bsize == 0)
44 throw std::invalid_argument(
45 "batch processor batch_size must not be zero");
46 }
47
48 [[nodiscard]] auto introspect_node() const -> processor_info {
49 return processor_info(this, "batch");
50 }
51
52 [[nodiscard]] auto introspect_graph() const -> processor_graph {
53 return downstream.introspect_graph().push_entry_point(this);
54 }
55
56 template <typename E>
57 requires std::convertible_to<std::remove_cvref_t<E>, Event>
58 void handle(E &&event) {
59 if (cur_bucket.empty())
60 cur_bucket = bsource->bucket_of_size(bsize);
61
62 cur_bucket[n_filled] = std::forward<E>(event);
63 ++n_filled;
64
65 if (n_filled == bsize) {
66 downstream.handle(std::move(cur_bucket));
67 cur_bucket = {};
68 n_filled = 0;
69 }
70 }
71
72 void flush() {
73 if (n_filled > 0) {
74 cur_bucket.shrink(0, n_filled);
75 downstream.handle(std::move(cur_bucket));
76 }
77 downstream.flush();
78 }
79};
80
81template <typename ContainerEvent, typename Downstream> class unbatch {
82 using element_type = typename std::iterator_traits<
83 decltype(std::declval<ContainerEvent>().end())>::value_type;
84 static_assert(
85 std::is_same_v<
86 typename std::iterator_traits<
87 decltype(std::declval<ContainerEvent>().begin())>::value_type,
88 element_type>,
89 "ContainerEvent begin() and end() must return compatible iterators");
90
91 static_assert(processor<Downstream, element_type>);
92
93 Downstream downstream;
94
95 public:
96 explicit unbatch(Downstream downstream)
97 : downstream(std::move(downstream)) {}
98
99 [[nodiscard]] auto introspect_node() const -> processor_info {
100 return processor_info(this, "unbatch");
101 }
102
103 [[nodiscard]] auto introspect_graph() const -> processor_graph {
104 return downstream.introspect_graph().push_entry_point(this);
105 }
106
107 // Should we mark this LIBTCSPC_NOINLINE? It would be good to increase the
108 // chances that the downstream call will be inlined. But preliminary tests
109 // (Apple clang 14 arm64) suggest that when the downstream is simple enough
110 // to inline, it will be inlined, together with this loop, into upstream;
111 // conversely, if the downstream is too complex to inline, it won't be
112 // inlined even if this function is marked noinline. There may be
113 // borderline cases where this doesn't hold, but it is probably best to
114 // leave it to the compiler.
115 template <typename E>
116 requires std::convertible_to<std::remove_cvref_t<E>, ContainerEvent>
117 void handle(E &&event) {
118 if constexpr (std::is_lvalue_reference_v<E>) {
119 for (auto const &e : event)
120 downstream.handle(e);
121 } else {
122 for (auto &e : event)
123 downstream.handle(std::move(e));
124 }
125 }
126
127 template <typename E>
128 requires(
129 not std::convertible_to<std::remove_cvref_t<E>, ContainerEvent> and
130 handler_for<Downstream, std::remove_cvref_t<E>>)
131 void handle(E &&event) {
132 downstream.handle(std::forward<E>(event));
133 }
134
135 void flush() { downstream.flush(); }
136};
137
138} // namespace internal
139
174template <typename Event, typename Downstream>
175auto batch(std::shared_ptr<bucket_source<Event>> buffer_provider,
176 arg::batch_size<std::size_t> batch_size, Downstream downstream) {
177 return internal::batch<Event, Downstream>(
178 std::move(buffer_provider), batch_size, std::move(downstream));
179}
180
204template <typename ContainerEvent, typename Downstream>
205auto unbatch(Downstream downstream) {
206 return internal::unbatch<ContainerEvent, Downstream>(
207 std::move(downstream));
208}
209
244template <typename Event, typename Downstream>
246 Downstream downstream) {
247 return batch<Event>(
249 batch_size, unbatch<bucket<Event>>(std::move(downstream)));
250}
251
252} // namespace tcspc
static auto create(arg::max_bucket_count<> max_bucket_count=arg::max_bucket_count{std::numeric_limits< std::size_t >::max()}, arg::max_recycled_size<> max_recycled_size=arg::max_recycled_size<>{ 0}) -> std::shared_ptr< bucket_source< T > >
Create an instance.
Definition bucket.hpp:754
auto unbatch(Downstream downstream)
Create a processor transforming batches of events to individual events.
Definition batch_unbatch.hpp:205
auto batch(std::shared_ptr< bucket_source< Event > > buffer_provider, arg::batch_size< std::size_t > batch_size, Downstream downstream)
Create a processor that batches events into buckets for buffering.
Definition batch_unbatch.hpp:175
auto process_in_batches(arg::batch_size< std::size_t > batch_size, Downstream downstream)
Create a processor that buffers events up to equally sized batches and passes them downstream in a ti...
Definition batch_unbatch.hpp:245
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for batch size parameter.
Definition arg_wrappers.hpp:47
Function argument wrapper for maximum bucket count.
Definition arg_wrappers.hpp:227
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505