libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
batch_unbatch_from_bytes.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "bucket.hpp"
10#include "common.hpp"
11#include "introspect.hpp"
12#include "processor.hpp"
13
14#include <algorithm>
15#include <array>
16#include <cstddef>
17#include <memory>
18#include <span>
19#include <stdexcept>
20#include <type_traits>
21#include <utility>
22
23namespace tcspc {
24
25namespace internal {
26
27template <typename Event, typename Downstream> class batch_from_bytes {
28 static_assert(std::is_trivial_v<Event>);
29 static_assert(processor<Downstream, bucket<Event>>);
30
31 std::shared_ptr<bucket_source<Event>> bsource;
32
33 std::size_t bytes_buffered = 0; // < buf.size()
34 std::array<std::byte, sizeof(Event)> buf;
35
36 Downstream downstream;
37
38 public:
39 explicit batch_from_bytes(
40 std::shared_ptr<bucket_source<Event>> buffer_provider,
41 Downstream downstream)
42 : bsource(std::move(buffer_provider)),
43 downstream(std::move(downstream)) {}
44
45 [[nodiscard]] auto introspect_node() const -> processor_info {
46 return processor_info(this, "batch_from_bytes");
47 }
48
49 [[nodiscard]] auto introspect_graph() const -> processor_graph {
50 return downstream.introspect_graph().push_entry_point(this);
51 }
52
53 template <typename ByteSpan,
54 typename = std::void_t<decltype(std::span<std::byte const>(
55 std::declval<ByteSpan>()))>>
56 void handle(ByteSpan const &event) {
57 auto input_span = std::span<std::byte const>(event);
58 auto const bytes_available = bytes_buffered + input_span.size();
59 if (bytes_available < sizeof(Event)) {
60 std::copy(input_span.begin(), input_span.end(),
61 std::span(buf).subspan(bytes_buffered).begin());
62 bytes_buffered = bytes_available;
63 return;
64 }
65
66 auto const batch_size = bytes_available / sizeof(Event);
67 auto bucket = bsource->bucket_of_size(batch_size);
68 auto const output_span = std::as_writable_bytes(std::span(bucket));
69 auto const input_bulk =
70 input_span.first(output_span.size() - bytes_buffered);
71 auto const remainder = input_span.subspan(input_bulk.size());
72 auto const output_bulk = output_span.subspan(bytes_buffered);
73
74 std::copy_n(buf.begin(), bytes_buffered, output_span.begin());
75 std::copy(input_bulk.begin(), input_bulk.end(), output_bulk.begin());
76 std::copy(remainder.begin(), remainder.end(), buf.begin());
77 bytes_buffered = remainder.size();
78
79 downstream.handle(std::move(bucket));
80 }
81
82 void flush() {
83 if (bytes_buffered > 0)
84 throw std::runtime_error("excess bytes at end of stream");
85 downstream.flush();
86 }
87};
88
89template <typename Event, typename Downstream> class unbatch_from_bytes {
90 static_assert(std::is_trivial_v<Event>);
91 static_assert(processor<Downstream, Event>);
92
93 std::size_t bytes_buffered = 0; // < sizeof(buf)
94 std::array<std::byte, sizeof(Event)> buf;
95
96 Downstream downstream;
97
98 public:
99 explicit unbatch_from_bytes(Downstream downstream)
100 : downstream(std::move(downstream)) {}
101
102 [[nodiscard]] auto introspect_node() const -> processor_info {
103 return processor_info(this, "unbatch_from_bytes");
104 }
105
106 [[nodiscard]] auto introspect_graph() const -> processor_graph {
107 return downstream.introspect_graph().push_entry_point(this);
108 }
109
110 template <typename ByteSpan,
111 typename = std::void_t<decltype(std::span<std::byte const>(
112 std::declval<ByteSpan>()))>>
113 void handle(ByteSpan const &event) {
114 auto input_span = std::span<std::byte const>(event);
115 if (bytes_buffered > 0) {
116 auto const available_bytes = bytes_buffered + input_span.size();
117 if (available_bytes < sizeof(Event)) {
118 std::copy(input_span.begin(), input_span.end(),
119 std::span(buf).subspan(bytes_buffered).begin());
120 bytes_buffered = available_bytes;
121 return;
122 }
123 Event e;
124 auto const output_bytes = std::as_writable_bytes(std::span(&e, 1));
125 auto const bytes_to_fill = sizeof(Event) - bytes_buffered;
126 std::copy_n(buf.begin(), bytes_buffered, output_bytes.begin());
127 std::copy_n(input_span.begin(), bytes_to_fill,
128 output_bytes.subspan(bytes_buffered).begin());
129 downstream.handle(std::as_const(e));
130 input_span = input_span.subspan(bytes_to_fill);
131 }
132
133 auto const n_whole = input_span.size() / sizeof(Event);
134 auto const whole_event_bytes =
135 input_span.first(n_whole * sizeof(Event));
136 auto const remainder = input_span.subspan(whole_event_bytes.size());
137
138 if (is_aligned<Event>(input_span.data())) {
139 auto const *ptr =
140 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
141 reinterpret_cast<Event const *>(input_span.data());
142 for (Event const &e : std::span(ptr, n_whole))
143 downstream.handle(e);
144 } else {
145 for (std::size_t i = 0; i < whole_event_bytes.size();
146 i += sizeof(Event)) {
147 Event e;
148 std::copy_n(whole_event_bytes.subspan(i).begin(),
149 sizeof(Event),
150 std::as_writable_bytes(std::span(&e, 1)).begin());
151 downstream.handle(std::as_const(e));
152 }
153 }
154
155 std::copy(remainder.begin(), remainder.end(), buf.begin());
156 bytes_buffered = remainder.size();
157 }
158
159 void flush() {
160 if (bytes_buffered > 0)
161 throw std::runtime_error("excess bytes at end of stream");
162 downstream.flush();
163 }
164};
165
166} // namespace internal
167
203template <typename Event, typename Downstream>
204auto batch_from_bytes(std::shared_ptr<bucket_source<Event>> buffer_provider,
205 Downstream downstream) {
206 return internal::batch_from_bytes<Event, Downstream>(
207 std::move(buffer_provider), std::move(downstream));
208}
209
240template <typename Event, typename Downstream>
241auto unbatch_from_bytes(Downstream downstream) {
242 return internal::unbatch_from_bytes<Event, Downstream>(
243 std::move(downstream));
244}
245
246} // namespace tcspc
auto batch_from_bytes(std::shared_ptr< bucket_source< Event > > buffer_provider, Downstream downstream)
Create a processor that converts batches of bytes into batches of events.
Definition batch_unbatch_from_bytes.hpp:204
auto unbatch_from_bytes(Downstream downstream)
Create a processor that converts batches of bytes into individual events.
Definition batch_unbatch_from_bytes.hpp:241
libtcspc namespace.
Definition acquire.hpp:29
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505