libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
write_binary_stream.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "arg_wrappers.hpp"
10#include "bucket.hpp"
11#include "errors.hpp"
12#include "introspect.hpp"
13
14#include <algorithm>
15#include <cerrno>
16#include <cstddef>
17#include <cstdint>
18#include <cstdio>
19#include <fstream>
20#include <functional>
21#include <ios>
22#include <memory>
23#include <optional>
24#include <ostream>
25#include <span>
26#include <stdexcept>
27#include <string>
28#include <system_error>
29#include <type_traits>
30#include <utility>
31
32// When editing this file, maintain partial symmetry with
33// read_binary_stream.hpp.
34
35namespace tcspc {
36
37namespace internal {
38
39class null_output_stream {
40 std::uint64_t bytes_written = 0;
41
42 public:
43 static auto is_error() noexcept -> bool { return false; }
44 [[nodiscard]] auto tell() const noexcept -> std::optional<std::uint64_t> {
45 return bytes_written;
46 }
47 void write(std::span<std::byte const> buffer) noexcept {
48 bytes_written += buffer.size();
49 }
50};
51
52// We turn off ostream exceptions in the constructor.
53// NOLINTBEGIN(bugprone-exception-escape)
54template <typename OStream> class ostream_output_stream {
55 static_assert(std::is_base_of_v<std::ostream, OStream>);
56 OStream stream;
57
58 public:
59 explicit ostream_output_stream(OStream stream)
60 : stream(std::move(stream)) {
61 this->stream.exceptions(std::ios::goodbit);
62 }
63
64 auto is_error() noexcept -> bool { return stream.fail(); }
65
66 [[nodiscard]] auto tell() noexcept -> std::optional<std::uint64_t> {
67 if (stream.fail())
68 return std::nullopt; // Do not affect flags.
69 std::int64_t const pos = stream.tellp();
70 if (pos >= 0)
71 return std::uint64_t(pos);
72 stream.clear();
73 return std::nullopt;
74 }
75
76 void write(std::span<std::byte const> buffer) noexcept {
77 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
78 stream.write(reinterpret_cast<char const *>(buffer.data()),
79 static_cast<std::streamsize>(buffer.size()));
80 }
81};
82// NOLINTEND(bugprone-exception-escape)
83
84class cfile_output_stream {
85 std::FILE *fp;
86 bool should_close;
87
88 public:
89 explicit cfile_output_stream(std::FILE *stream, bool close_on_destruction)
90 : fp(stream), should_close(close_on_destruction && fp != nullptr) {}
91
92 cfile_output_stream(cfile_output_stream const &) = delete;
93 auto operator=(cfile_output_stream const &) = delete;
94
95 cfile_output_stream(cfile_output_stream &&other) noexcept
96 : fp(std::exchange(other.fp, nullptr)),
97 should_close(std::exchange(other.should_close, false)) {}
98
99 auto operator=(cfile_output_stream &&rhs) noexcept
100 -> cfile_output_stream & {
101 if (should_close)
102 (void)std::fclose(fp); // NOLINT(cppcoreguidelines-owning-memory)
103 fp = std::exchange(rhs.fp, nullptr);
104 should_close = std::exchange(rhs.should_close, false);
105 return *this;
106 }
107
108 ~cfile_output_stream() {
109 if (should_close)
110 (void)std::fclose(fp); // NOLINT(cppcoreguidelines-owning-memory)
111 }
112
113 auto is_error() noexcept -> bool {
114 return fp == nullptr || std::ferror(fp) != 0;
115 }
116
117 [[nodiscard]] auto tell() noexcept -> std::optional<std::uint64_t> {
118 if (fp == nullptr)
119 return std::nullopt;
120 std::int64_t pos =
121#ifdef _WIN32
122 ::_ftelli64(fp);
123#else
124 std::ftell(fp);
125#endif
126 if (pos >= 0)
127 return std::uint64_t(pos);
128 return std::nullopt;
129 }
130
131 void write(std::span<std::byte const> buffer) noexcept {
132 // Errors are checked separately by is_error(); ignore here.
133 if (fp == nullptr)
134 return;
135 (void)std::fwrite(buffer.data(), 1, buffer.size(), fp);
136 }
137};
138
139// For benchmarking only
140inline auto
141unbuffered_binary_ofstream_output_stream(std::string const &filename,
142 arg::truncate<bool> truncate,
143 arg::append<bool> append) {
144 std::ofstream stream;
145
146 // Set to unbuffered.
147 stream.rdbuf()->pubsetbuf(nullptr, 0);
148
149 stream.open(filename,
150 std::ios::binary |
151 (truncate.value ? std::ios::trunc : std::ios::openmode{}) |
152 (append.value ? std::ios::ate : std::ios::openmode{}));
153 if (stream.fail())
154 throw input_output_error("failed to open output file: " + filename);
155 return internal::ostream_output_stream(std::move(stream));
156}
157
158// For benchmarking only
159inline auto binary_ofstream_output_stream(std::string const &filename,
160 arg::truncate<bool> truncate,
161 arg::append<bool> append) {
162 std::ofstream stream;
163 stream.open(filename,
164 std::ios::binary |
165 (truncate.value ? std::ios::trunc : std::ios::openmode{}) |
166 (append.value ? std::ios::ate : std::ios::openmode{}));
167 if (stream.fail())
168 throw input_output_error("failed to open output file: " + filename);
169 return internal::ostream_output_stream(std::move(stream));
170}
171
172inline auto unbuffered_binary_cfile_output_stream(std::string const &filename,
173 arg::truncate<bool> truncate,
174 arg::append<bool> append) {
175 char const *mode = std::invoke([&] {
176 if (truncate.value)
177 return "wb";
178 if (append.value)
179 return "ab";
180 return "wbx";
181 });
182#ifdef _WIN32 // Avoid requiring _CRT_SECURE_NO_WARNINGS.
183 std::FILE *fp{};
184 (void)fopen_s(&fp, filename.c_str(), mode);
185#else
186 errno = 0; // ISO C does not require fopen to set errno on error.
187 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
188 std::FILE *fp = std::fopen(filename.c_str(), mode);
189#endif
190 if (fp == nullptr) {
191 if (errno != 0)
192 throw std::system_error(errno, std::generic_category());
193 throw input_output_error("failed to open output file: " + filename);
194 }
195 if (std::setvbuf(fp, nullptr, _IONBF, 0) != 0)
196 throw input_output_error(
197 "failed to disable buffering for output file: " + filename);
198 return internal::cfile_output_stream(fp, true);
199}
200
201// For benchmarking only
202inline auto binary_cfile_output_stream(std::string const &filename,
203 arg::truncate<bool> truncate,
204 arg::append<bool> append) {
205 char const *mode = std::invoke([&] {
206 if (truncate.value)
207 return "wb";
208 if (append.value)
209 return "ab";
210 return "wbx";
211 });
212#ifdef _WIN32 // Avoid requiring _CRT_SECURE_NO_WARNINGS.
213 std::FILE *fp{};
214 (void)fopen_s(&fp, filename.c_str(), mode);
215#else
216 errno = 0; // ISO C does not require fopen to set errno on error.
217 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
218 std::FILE *fp = std::fopen(filename.c_str(), mode);
219#endif
220 if (fp == nullptr) {
221 if (errno != 0)
222 throw std::system_error(errno, std::generic_category());
223 throw input_output_error("failed to open output file: " + filename);
224 }
225 return internal::cfile_output_stream(fp, true);
226}
227
228} // namespace internal
229
237inline auto null_output_stream() { return internal::null_output_stream(); }
238
257inline auto
258binary_file_output_stream(std::string const &filename,
259 arg::truncate<bool> truncate = arg::truncate{false},
260 arg::append<bool> append = arg::append{false}) {
261 return internal::unbuffered_binary_cfile_output_stream(filename, truncate,
262 append);
263}
264
279template <typename OStream> inline auto ostream_output_stream(OStream stream) {
280 static_assert(std::is_base_of_v<std::ostream, OStream>);
281 return internal::ostream_output_stream(std::move(stream));
282}
283
302inline auto owning_cfile_output_stream(std::FILE *fp) {
303 return internal::cfile_output_stream(fp, true);
304}
305
328inline auto borrowed_cfile_output_stream(std::FILE *fp) {
329 return internal::cfile_output_stream(fp, false);
330}
331
332namespace internal {
333
334template <typename OutputStream> class write_binary_stream {
335 OutputStream strm;
336 std::shared_ptr<bucket_source<std::byte>> bsource;
337 std::size_t write_granularity;
338
339 std::uint64_t total_bytes_written = 0;
340
341 // If not empty, buffer to use next, containing a partial event:
342 bucket<std::byte> buffer;
343 std::size_t bytes_buffered = 0;
344
345 void handle_span(std::span<std::byte const> event_span) {
346 auto first_block_size = write_granularity;
347 if (total_bytes_written == 0) {
348 // Align second and subsequent writes to write_granularity if
349 // current offset is available. This may or may not improve
350 // write performance (when the write_granularity is a multiple
351 // of the page size or block size), but shouldn't hurt.
352 std::optional<std::uint64_t> pos = strm.tell();
353 if (pos.has_value()) {
354 first_block_size =
355 write_granularity - *pos % write_granularity;
356 }
357 }
358
359 if (bytes_buffered > 0 || first_block_size < write_granularity) {
360 auto const bytes_available =
361 std::min(bytes_buffered + event_span.size(), first_block_size);
362 if (buffer.empty())
363 buffer = bsource->bucket_of_size(write_granularity);
364 auto const dest_span =
365 buffer.first(bytes_available).subspan(bytes_buffered);
366 auto const src_span = event_span.first(
367 std::min(event_span.size(), dest_span.size()));
368 std::copy(src_span.begin(), src_span.end(), dest_span.begin());
369 if (bytes_available == first_block_size) {
370 strm.write(buffer.first(bytes_available));
371 buffer = {};
372 bytes_buffered = 0;
373 if (strm.is_error())
374 throw input_output_error("failed to write output");
375 total_bytes_written += bytes_available;
376 } else {
377 bytes_buffered = bytes_available;
378 }
379 event_span = event_span.subspan(src_span.size());
380 }
381
382 auto const direct_write_size =
383 event_span.size() / write_granularity * write_granularity;
384 if (direct_write_size > 0) {
385 strm.write(event_span.first(direct_write_size));
386 if (strm.is_error())
387 throw input_output_error("failed to write output");
388 total_bytes_written += direct_write_size;
389 event_span = event_span.subspan(direct_write_size);
390 }
391
392 if (not event_span.empty()) {
393 buffer = bsource->bucket_of_size(write_granularity);
394 bytes_buffered = event_span.size();
395 std::copy(event_span.begin(), event_span.end(), buffer.begin());
396 }
397 }
398
399 public:
400 explicit write_binary_stream(
401 OutputStream stream,
402 std::shared_ptr<bucket_source<std::byte>> buffer_provider,
403 arg::granularity<std::size_t> granularity)
404 : strm(std::move(stream)), bsource(std::move(buffer_provider)),
405 write_granularity(granularity.value) {
406 if (not bsource)
407 throw std::invalid_argument(
408 "write_binary_stream buffer_provider must not be null");
409 if (write_granularity <= 0)
410 throw std::invalid_argument(
411 "write_binary_stream granularity must be positive");
412 }
413
414 [[nodiscard]] auto introspect_node() const -> processor_info {
415 return processor_info(this, "write_binary_stream");
416 }
417
418 [[nodiscard]] auto introspect_graph() const -> processor_graph {
419 return processor_graph().push_entry_point(this);
420 }
421
422 template <typename Span,
423 typename = std::void_t<
424 decltype(std::span<std::byte const>(std::declval<Span>()))>>
425 void handle(Span const &event) {
426 handle_span(std::span<std::byte const>(event));
427 }
428
429 void flush() {
430 if (bytes_buffered > 0) {
431 strm.write(std::span(buffer).first(bytes_buffered));
432 buffer = {};
433 if (strm.is_error())
434 throw input_output_error("failed to write output");
435 }
436 }
437};
438
439} // namespace internal
440
493template <typename OutputStream>
495 OutputStream stream,
496 std::shared_ptr<bucket_source<std::byte>> buffer_provider,
497 arg::granularity<std::size_t> granularity) {
498 // Support direct passing of C++ iostreams stream.
499 if constexpr (std::is_base_of_v<std::ostream, OutputStream>) {
500 auto wrapped = ostream_output_stream(std::move(stream));
501 return internal::write_binary_stream<decltype(wrapped)>(
502 std::move(wrapped), std::move(buffer_provider), granularity);
503 } else {
504 return internal::write_binary_stream<OutputStream>(
505 std::move(stream), std::move(buffer_provider), granularity);
506 }
507}
508
509} // namespace tcspc
auto buffer(arg::threshold< std::size_t > threshold, access_tracker< buffer_access > &&tracker, Downstream downstream)
Create a processor that buffers events and emits them on a different thread.
Definition buffer.hpp:342
auto append(Event event, Downstream downstream)
Create a processor that inserts an event at the end of the stream.
Definition prepend_append.hpp:143
auto write_binary_stream(OutputStream stream, std::shared_ptr< bucket_source< std::byte > > buffer_provider, arg::granularity< std::size_t > granularity)
Create a sink that writes bytes to a binary stream, such as a file.
Definition write_binary_stream.hpp:494
auto binary_file_output_stream(std::string const &filename, arg::truncate< bool > truncate=arg::truncate{false}, arg::append< bool > append=arg::append{false})
Create a binary output stream for the given file.
Definition write_binary_stream.hpp:258
auto borrowed_cfile_output_stream(std::FILE *fp)
Create an output stream from a non-owned C file pointer.
Definition write_binary_stream.hpp:328
auto owning_cfile_output_stream(std::FILE *fp)
Create an output stream from a C file pointer, taking ownership.
Definition write_binary_stream.hpp:302
auto null_output_stream()
Create an output stream that discards all written bytes.
Definition write_binary_stream.hpp:237
auto ostream_output_stream(OStream stream)
Create an output stream from an std::ostream instance.
Definition write_binary_stream.hpp:279
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for granularity parameter.
Definition arg_wrappers.hpp:147
Function argument wrapper for truncate parameter.
Definition arg_wrappers.hpp:427
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505