libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
read_binary_stream.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "arg_wrappers.hpp"
10#include "bucket.hpp"
11#include "common.hpp"
12#include "core.hpp"
13#include "errors.hpp"
14#include "int_types.hpp"
15#include "introspect.hpp"
16#include "processor.hpp"
17
18#include <algorithm>
19#include <cerrno>
20#include <cstddef>
21#include <cstdint>
22#include <cstdio>
23#include <fstream>
24#include <ios>
25#include <istream>
26#include <limits>
27#include <memory>
28#include <optional>
29#include <span>
30#include <stdexcept>
31#include <string>
32#include <system_error>
33#include <type_traits>
34#include <utility>
35#include <vector>
36
37// When editing this file, maintain partial symmetry with
38// write_binary_stream.hpp.
39
40namespace tcspc {
41
42namespace internal {
43
44struct null_input_stream {
45 static auto is_error() noexcept -> bool { return false; }
46 static auto is_eof() noexcept -> bool { return true; }
47 static auto is_good() noexcept -> bool { return false; }
48 static auto tell() noexcept -> std::optional<std::uint64_t> { return 0; }
49 static auto skip(std::uint64_t bytes) noexcept -> bool {
50 return bytes == 0;
51 }
52 static auto read(std::span<std::byte> /* buffer */) noexcept
53 -> std::uint64_t {
54 return 0;
55 }
56};
57
58// We turn off istream exceptions in the constructor.
59// NOLINTBEGIN(bugprone-exception-escape)
60template <typename IStream> class istream_input_stream {
61 static_assert(std::is_base_of_v<std::istream, IStream>);
62 IStream stream;
63
64 public:
65 explicit istream_input_stream(IStream stream) : stream(std::move(stream)) {
66 this->stream.exceptions(std::ios::goodbit);
67 }
68
69 auto is_error() noexcept -> bool {
70 auto const flags = stream.rdstate();
71 return ((flags & std::ios::failbit) || (flags & std::ios::badbit)) &&
72 not(flags & std::ios::eofbit);
73 }
74
75 auto is_eof() noexcept -> bool { return stream.eof(); }
76
77 auto is_good() noexcept -> bool { return stream.good(); }
78
79 auto tell() noexcept -> std::optional<std::uint64_t> {
80 if (stream.fail())
81 return std::nullopt; // Do not affect flags.
82 std::int64_t const pos = stream.tellg();
83 if (pos >= 0)
84 return std::uint64_t(pos);
85 stream.clear();
86 return std::nullopt;
87 }
88
89 auto skip(std::uint64_t bytes) noexcept -> bool {
90 if (stream.fail() ||
91 bytes > std::uint64_t(std::numeric_limits<std::streamoff>::max()))
92 return false;
93 stream.seekg(std::streamoff(bytes), std::ios::cur);
94 auto const ret = stream.good();
95 stream.clear();
96 return ret;
97 }
98
99 auto read(std::span<std::byte> buffer) noexcept -> std::uint64_t {
100 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
101 stream.read(reinterpret_cast<char *>(buffer.data()),
102 static_cast<std::streamsize>(buffer.size()));
103 return static_cast<std::uint64_t>(stream.gcount());
104 }
105};
106// NOLINTEND(bugprone-exception-escape)
107
108class cfile_input_stream {
109 std::FILE *fp;
110 bool should_close;
111
112 public:
113 explicit cfile_input_stream(std::FILE *stream, bool close_on_destruction)
114 : fp(stream), should_close(close_on_destruction && fp != nullptr) {}
115
116 cfile_input_stream(cfile_input_stream const &) = delete;
117 auto operator=(cfile_input_stream const &) = delete;
118
119 cfile_input_stream(cfile_input_stream &&other) noexcept
120 : fp(std::exchange(other.fp, nullptr)),
121 should_close(std::exchange(other.should_close, false)) {}
122
123 auto operator=(cfile_input_stream &&rhs) noexcept -> cfile_input_stream & {
124 if (should_close)
125 (void)std::fclose(fp); // NOLINT(cppcoreguidelines-owning-memory)
126 fp = std::exchange(rhs.fp, nullptr);
127 should_close = std::exchange(rhs.should_close, false);
128 return *this;
129 }
130
131 ~cfile_input_stream() {
132 if (should_close)
133 (void)std::fclose(fp); // NOLINT(cppcoreguidelines-owning-memory)
134 }
135
136 auto is_error() noexcept -> bool {
137 return fp == nullptr || std::ferror(fp) != 0;
138 }
139
140 auto is_eof() noexcept -> bool {
141 return fp != nullptr && std::feof(fp) != 0;
142 }
143
144 auto is_good() noexcept -> bool {
145 return fp != nullptr && std::ferror(fp) == 0 && std::feof(fp) == 0;
146 }
147
148 auto tell() noexcept -> std::optional<std::uint64_t> {
149 if (fp == nullptr)
150 return std::nullopt;
151 std::int64_t pos =
152#ifdef _WIN32
153 ::_ftelli64(fp);
154#else
155 std::ftell(fp);
156#endif
157 if (pos >= 0)
158 return std::uint64_t(pos);
159 return std::nullopt;
160 }
161
162 auto skip(std::uint64_t bytes) noexcept -> bool {
163 if (fp == nullptr)
164 return false;
165#ifdef _WIN32
166 if (bytes <= std::uint64_t(std::numeric_limits<__int64>::max()))
167 return ::_fseeki64(fp, __int64(bytes), SEEK_CUR) == 0;
168#else
169 if (bytes <= std::numeric_limits<long>::max())
170 return std::fseek(fp, long(bytes), SEEK_CUR) == 0;
171#endif
172 return false;
173 }
174
175 auto read(std::span<std::byte> buffer) noexcept -> std::uint64_t {
176 if (fp == nullptr)
177 return 0;
178 return std::fread(buffer.data(), 1, buffer.size(), fp);
179 }
180};
181
182template <typename InputStream>
183inline void skip_stream_bytes(InputStream &stream, std::uint64_t bytes) {
184 if (not stream.skip(bytes)) {
185 // Try instead reading and discarding up to 'start', to support
186 // non-seekable streams (e.g., pipes).
187 std::uint64_t bytes_discarded = 0;
188 // For now, use the read size that was found fastest when reading
189 // /dev/zero on an Apple M1 Pro laptop. Could be tuned.
190 static constexpr std::streamsize bufsize = 32768;
191 std::vector<std::byte> buf(bufsize);
192 std::span<std::byte> const bufspan(buf);
193 while (bytes_discarded < bytes) {
194 auto read_size =
195 std::min<std::uint64_t>(bufsize, bytes - bytes_discarded);
196 bytes_discarded += stream.read(bufspan.first(read_size));
197 if (not stream.is_good())
198 break;
199 }
200 }
201}
202
203// For benchmarking only
204inline auto
205unbuffered_binary_ifstream_input_stream(std::string const &filename,
206 arg::start_offset<u64> start_offset) {
207 std::ifstream stream;
208
209 // The standard says that the following makes the stream "unbuffered", but
210 // its definition of unbuffered specifies nothing about input streams. At
211 // least with libc++, this is a huge pessimization:
212 stream.rdbuf()->pubsetbuf(nullptr, 0);
213
214 stream.open(filename, std::ios::binary);
215 if (stream.fail())
216 throw input_output_error("failed to open input file: " + filename);
217 auto ret = internal::istream_input_stream(std::move(stream));
218 skip_stream_bytes(ret, start_offset.value);
219 return ret;
220}
221
222// For benchmarking only
223inline auto binary_ifstream_input_stream(std::string const &filename,
224 arg::start_offset<u64> start_offset) {
225 std::ifstream stream;
226 stream.open(filename, std::ios::binary);
227 if (stream.fail())
228 throw input_output_error("failed to open input file: " + filename);
229 auto ret = internal::istream_input_stream(std::move(stream));
230 skip_stream_bytes(ret, start_offset.value);
231 return ret;
232}
233
234inline auto unbuffered_binary_cfile_input_stream(
235 std::string const &filename,
236 arg::start_offset<std::uint64_t> start_offset) {
237#ifdef _WIN32 // Avoid requiring _CRT_SECURE_NO_WARNINGS.
238 std::FILE *fp{};
239 (void)fopen_s(&fp, filename.c_str(), "rb");
240#else
241 errno = 0; // ISO C does not require fopen to set errno on error.
242 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
243 std::FILE *fp = std::fopen(filename.c_str(), "rb");
244#endif
245 if (fp == nullptr) {
246 if (errno != 0)
247 throw std::system_error(errno, std::generic_category());
248 throw input_output_error("failed to open input file: " + filename);
249 }
250 if (std::setvbuf(fp, nullptr, _IONBF, 0) != 0)
251 throw input_output_error(
252 "failed to disable buffering for input file: " + filename);
253 auto ret = internal::cfile_input_stream(fp, true);
254 skip_stream_bytes(ret, start_offset.value);
255 return ret;
256}
257
258// For benchmarking only
259inline auto binary_cfile_input_stream(std::string const &filename,
260 arg::start_offset<u64> start_offset) {
261#ifdef _WIN32 // Avoid requiring _CRT_SECURE_NO_WARNINGS.
262 std::FILE *fp{};
263 (void)fopen_s(&fp, filename.c_str(), "rb");
264#else
265 errno = 0; // ISO C does not require fopen to set errno on error.
266 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
267 std::FILE *fp = std::fopen(filename.c_str(), "rb");
268#endif
269 if (fp == nullptr) {
270 if (errno != 0)
271 throw std::system_error(errno, std::generic_category());
272 throw input_output_error("failed to open input file: " + filename);
273 }
274 auto ret = internal::cfile_input_stream(fp, true);
275 skip_stream_bytes(ret, start_offset.value);
276 return ret;
277}
278
279} // namespace internal
280
288inline auto null_input_stream() { return internal::null_input_stream(); }
289
305 std::string const &filename,
306 arg::start_offset<u64> start_offset = arg::start_offset{u64(0)}) {
307 // Prefer cfile over ifstream for performance; for cfile, unbuffered
308 // performs better (given our own buffering). See benchmark.
309 return internal::unbuffered_binary_cfile_input_stream(filename,
310 start_offset);
311}
312
330template <typename IStream> inline auto istream_input_stream(IStream stream) {
331 static_assert(std::is_base_of_v<std::istream, IStream>);
332 return internal::istream_input_stream(std::move(stream));
333}
334
353inline auto owning_cfile_input_stream(std::FILE *fp) {
354 return internal::cfile_input_stream(fp, true);
355}
356
379inline auto borrowed_cfile_input_stream(std::FILE *fp) {
380 return internal::cfile_input_stream(fp, false);
381}
382
383namespace internal {
384
385template <typename InputStream, typename Event, typename Downstream>
386class read_binary_stream {
387 static_assert(
388 std::is_trivial_v<Event>,
389 "Event type must be trivial to work with read_binary_stream");
390 static_assert(processor<Downstream, bucket<Event>, warning_event>);
391
392 InputStream stream;
393 std::uint64_t length;
394
395 std::size_t read_granularity;
396 std::shared_ptr<bucket_source<Event>> bsource;
397
398 Downstream downstream;
399
400 LIBTCSPC_NOINLINE auto first_read_size() -> std::uint64_t {
401 auto ret = read_granularity;
402 if (stream.is_good()) {
403 // Align second and subsequent reads to read_granularity if current
404 // offset is available. This may or may not improve read
405 // performance (when the read_granularity is a multiple of the page
406 // size or block size), but shouldn't hurt.
407 std::optional<std::uint64_t> pos = stream.tell();
408 if (pos.has_value())
409 ret -= *pos % read_granularity;
410 }
411 return ret;
412 }
413
414 // Read some multiple (max: max_units) of the read granularity that fits in
415 // dest, subject to first-read size and max length.
416 auto read_units(std::span<std::byte> dest, std::size_t max_units,
417 std::uint64_t &total_bytes_read) -> std::uint64_t {
418 auto bytes_to_read =
419 std::min<std::uint64_t>(dest.size(), length - total_bytes_read);
420 if (total_bytes_read == 0) {
421 bytes_to_read =
422 std::min<std::uint64_t>(bytes_to_read, first_read_size());
423 }
424 if (bytes_to_read > read_granularity) {
425 bytes_to_read = read_granularity *
426 std::min<std::uint64_t>(
427 max_units, bytes_to_read / read_granularity);
428 }
429 auto const bytes_read = stream.read(dest.first(bytes_to_read));
430 total_bytes_read += bytes_read;
431 return bytes_read;
432 }
433
434 public:
435 explicit read_binary_stream(
436 InputStream stream, arg::max_length<std::uint64_t> max_length,
437 std::shared_ptr<bucket_source<Event>> buffer_provider,
438 arg::granularity<std::size_t> granularity, Downstream downstream)
439 : stream(std::move(stream)), length(max_length.value),
440 read_granularity(granularity.value),
441 bsource(std::move(buffer_provider)),
442 downstream(std::move(downstream)) {
443 if (not bsource)
444 throw std::invalid_argument(
445 "read_binary_stream buffer_provider must not be null");
446 if (read_granularity <= 0)
447 throw std::invalid_argument(
448 "read_binary_stream granularity must be positive");
449 }
450
451 [[nodiscard]] auto introspect_node() const -> processor_info {
452 return processor_info(this, "read_binary_stream");
453 }
454
455 [[nodiscard]] auto introspect_graph() const -> processor_graph {
456 return downstream.introspect_graph().push_entry_point(this);
457 }
458
459 void flush() {
460 auto const bucket_size =
461 sizeof(Event) >= read_granularity
462 ? 1
463 : (read_granularity - 1) / sizeof(Event) + 1;
464 auto const bucket_size_bytes = bucket_size * sizeof(Event);
465
466 std::uint64_t total_bytes_read = 0;
467 bucket<Event> bkt;
468 std::size_t remainder_nbytes = 0; // Always < sizeof(Event)
469
470 while (total_bytes_read < length && stream.is_good()) {
471 auto const bytes_left_in_bucket =
472 bucket_size_bytes - remainder_nbytes;
473 if (bytes_left_in_bucket >= read_granularity) {
474 if (bkt.empty())
475 bkt = bsource->bucket_of_size(bucket_size);
476 auto const bytes_read = read_units(
477 std::as_writable_bytes(std::span(bkt))
478 .subspan(remainder_nbytes),
479 std::numeric_limits<std::size_t>::max(), total_bytes_read);
480 auto const available_nbytes = remainder_nbytes + bytes_read;
481 auto const this_batch_size = available_nbytes / sizeof(Event);
482 remainder_nbytes = available_nbytes % sizeof(Event);
483 if (this_batch_size == 0)
484 continue; // Leave incomplete event in current bucket.
485 bucket<Event> bkt2;
486 if (remainder_nbytes > 0) {
487 bkt2 = bsource->bucket_of_size(bucket_size);
488 auto const remainder_span =
489 std::as_bytes(std::span(bkt))
490 .subspan(available_nbytes - remainder_nbytes);
491 std::copy(remainder_span.begin(), remainder_span.end(),
492 std::as_writable_bytes(std::span(bkt2)).begin());
493 }
494 bkt.shrink(0, this_batch_size);
495 downstream.handle(std::move(bkt));
496 bkt = std::move(bkt2);
497 } else { // Top off single event.
498 bucket<Event> bkt2;
499 bkt2 = bsource->bucket_of_size(bucket_size);
500 auto const bytes_read =
501 read_units(std::as_writable_bytes(std::span(bkt2)), 1,
502 total_bytes_read);
503 if (bytes_read < bytes_left_in_bucket)
504 break;
505 auto const top_off_span =
506 std::as_bytes(std::span(bkt2)).first(bytes_left_in_bucket);
507 auto const remainder_span = std::as_bytes(std::span(bkt2))
508 .first(bytes_read)
509 .subspan(bytes_left_in_bucket);
510 std::copy(top_off_span.begin(), top_off_span.end(),
511 std::as_writable_bytes(std::span(bkt))
512 .last(bytes_left_in_bucket)
513 .begin());
514 std::copy(remainder_span.begin(), remainder_span.end(),
515 std::as_writable_bytes(std::span(bkt2)).begin());
516 downstream.handle(std::move(bkt));
517 bkt = std::move(bkt2);
518 remainder_nbytes = remainder_span.size();
519 }
520 }
521
522 if (stream.is_error())
523 throw input_output_error("failed to read input");
524 if (remainder_nbytes > 0) {
525 downstream.handle(warning_event{
526 "bytes fewer than record size remain at end of input"});
527 }
528 downstream.flush();
529 }
530};
531
532} // namespace internal
533
593template <typename Event, typename InputStream, typename Downstream>
594auto read_binary_stream(InputStream stream,
596 std::shared_ptr<bucket_source<Event>> buffer_provider,
598 Downstream downstream) {
599 // Support direct passing of C++ iostreams stream.
600 if constexpr (std::is_base_of_v<std::istream, InputStream>) {
601 auto wrapped = istream_input_stream(std::move(stream));
602 return internal::read_binary_stream<decltype(wrapped), Event,
603 Downstream>(
604 std::move(wrapped), max_length, std::move(buffer_provider),
605 granularity, std::move(downstream));
606 } else {
607 return internal::read_binary_stream<InputStream, Event, Downstream>(
608 std::move(stream), max_length, std::move(buffer_provider),
609 granularity, std::move(downstream));
610 }
611}
612
613} // namespace tcspc
auto buffer(arg::threshold< std::size_t > threshold, access_tracker< buffer_access > &&tracker, Downstream downstream)
Create a processor that buffers events and emits them on a different thread.
Definition buffer.hpp:342
auto read_binary_stream(InputStream stream, arg::max_length< std::uint64_t > max_length, std::shared_ptr< bucket_source< Event > > buffer_provider, arg::granularity< std::size_t > granularity, Downstream downstream)
Create a source that reads batches of events from a binary stream, such as a file.
Definition read_binary_stream.hpp:594
auto null_input_stream()
Create an input stream that contains no bytes.
Definition read_binary_stream.hpp:288
auto istream_input_stream(IStream stream)
Create an input stream from an std::istream instance.
Definition read_binary_stream.hpp:330
auto owning_cfile_input_stream(std::FILE *fp)
Create an input stream from a C file pointer, taking ownership.
Definition read_binary_stream.hpp:353
auto binary_file_input_stream(std::string const &filename, arg::start_offset< u64 > start_offset=arg::start_offset{u64(0)})
Create a binary input stream for the given file.
Definition read_binary_stream.hpp:304
auto borrowed_cfile_input_stream(std::FILE *fp)
Create an input stream from a non-owned C file pointer.
Definition read_binary_stream.hpp:379
std::uint64_t u64
Short name for uint64_t.
Definition int_types.hpp:33
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for granularity parameter.
Definition arg_wrappers.hpp:147
Function argument wrapper for maximum length parameter.
Definition arg_wrappers.hpp:267
Function argument wrapper for start offset parameter.
Definition arg_wrappers.hpp:377
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505