9#include "arg_wrappers.hpp"
12#include "introspect.hpp"
28#include <system_error>
39class null_output_stream {
40 std::uint64_t bytes_written = 0;
43 static auto is_error() noexcept ->
bool {
return false; }
44 [[nodiscard]]
auto tell() const noexcept -> std::optional<std::uint64_t> {
47 void write(std::span<std::byte const>
buffer)
noexcept {
48 bytes_written +=
buffer.size();
54template <
typename OStream>
class ostream_output_stream {
55 static_assert(std::is_base_of_v<std::ostream, OStream>);
59 explicit ostream_output_stream(OStream stream)
60 : stream(std::move(stream)) {
61 this->stream.exceptions(std::ios::goodbit);
64 auto is_error() noexcept ->
bool {
return stream.fail(); }
66 [[nodiscard]]
auto tell() noexcept -> std::optional<std::uint64_t> {
69 std::int64_t
const pos = stream.tellp();
71 return std::uint64_t(pos);
76 void write(std::span<std::byte const>
buffer)
noexcept {
78 stream.write(
reinterpret_cast<char const *
>(
buffer.data()),
79 static_cast<std::streamsize
>(
buffer.size()));
84class cfile_output_stream {
89 explicit cfile_output_stream(std::FILE *stream,
bool close_on_destruction)
90 : fp(stream), should_close(close_on_destruction && fp != nullptr) {}
92 cfile_output_stream(cfile_output_stream
const &) =
delete;
93 auto operator=(cfile_output_stream
const &) =
delete;
95 cfile_output_stream(cfile_output_stream &&other) noexcept
96 : fp(std::exchange(other.fp,
nullptr)),
97 should_close(std::exchange(other.should_close,
false)) {}
99 auto operator=(cfile_output_stream &&rhs)
noexcept
100 -> cfile_output_stream & {
102 (void)std::fclose(fp);
103 fp = std::exchange(rhs.fp,
nullptr);
104 should_close = std::exchange(rhs.should_close,
false);
108 ~cfile_output_stream() {
110 (void)std::fclose(fp);
113 auto is_error() noexcept ->
bool {
114 return fp ==
nullptr || std::ferror(fp) != 0;
117 [[nodiscard]]
auto tell() noexcept -> std::optional<std::uint64_t> {
127 return std::uint64_t(pos);
131 void write(std::span<std::byte const>
buffer)
noexcept {
141unbuffered_binary_ofstream_output_stream(std::string
const &filename,
142 arg::truncate<bool> truncate,
143 arg::append<bool> append) {
144 std::ofstream stream;
147 stream.rdbuf()->pubsetbuf(
nullptr, 0);
149 stream.open(filename,
151 (truncate.value ? std::ios::trunc : std::ios::openmode{}) |
152 (append.value ? std::ios::ate : std::ios::openmode{}));
154 throw input_output_error(
"failed to open output file: " + filename);
155 return internal::ostream_output_stream(std::move(stream));
159inline auto binary_ofstream_output_stream(std::string
const &filename,
160 arg::truncate<bool> truncate,
161 arg::append<bool> append) {
162 std::ofstream stream;
163 stream.open(filename,
165 (truncate.value ? std::ios::trunc : std::ios::openmode{}) |
166 (append.value ? std::ios::ate : std::ios::openmode{}));
168 throw input_output_error(
"failed to open output file: " + filename);
169 return internal::ostream_output_stream(std::move(stream));
172inline auto unbuffered_binary_cfile_output_stream(std::string
const &filename,
173 arg::truncate<bool> truncate,
174 arg::append<bool> append) {
175 char const *mode = std::invoke([&] {
184 (void)fopen_s(&fp, filename.c_str(), mode);
188 std::FILE *fp = std::fopen(filename.c_str(), mode);
192 throw std::system_error(errno, std::generic_category());
193 throw input_output_error(
"failed to open output file: " + filename);
195 if (std::setvbuf(fp,
nullptr, _IONBF, 0) != 0)
196 throw input_output_error(
197 "failed to disable buffering for output file: " + filename);
198 return internal::cfile_output_stream(fp,
true);
202inline auto binary_cfile_output_stream(std::string
const &filename,
203 arg::truncate<bool> truncate,
204 arg::append<bool> append) {
205 char const *mode = std::invoke([&] {
214 (void)fopen_s(&fp, filename.c_str(), mode);
218 std::FILE *fp = std::fopen(filename.c_str(), mode);
222 throw std::system_error(errno, std::generic_category());
223 throw input_output_error(
"failed to open output file: " + filename);
225 return internal::cfile_output_stream(fp,
true);
260 arg::append<bool>
append = arg::append{
false}) {
261 return internal::unbuffered_binary_cfile_output_stream(filename, truncate,
280 static_assert(std::is_base_of_v<std::ostream, OStream>);
281 return internal::ostream_output_stream(std::move(stream));
303 return internal::cfile_output_stream(fp,
true);
329 return internal::cfile_output_stream(fp,
false);
336 std::shared_ptr<bucket_source<std::byte>> bsource;
337 std::size_t write_granularity;
339 std::uint64_t total_bytes_written = 0;
342 bucket<std::byte> buffer;
343 std::size_t bytes_buffered = 0;
345 void handle_span(std::span<std::byte const> event_span) {
346 auto first_block_size = write_granularity;
347 if (total_bytes_written == 0) {
352 std::optional<std::uint64_t> pos = strm.tell();
353 if (pos.has_value()) {
355 write_granularity - *pos % write_granularity;
359 if (bytes_buffered > 0 || first_block_size < write_granularity) {
360 auto const bytes_available =
361 std::min(bytes_buffered + event_span.size(), first_block_size);
363 buffer = bsource->bucket_of_size(write_granularity);
364 auto const dest_span =
365 buffer.first(bytes_available).subspan(bytes_buffered);
366 auto const src_span = event_span.first(
367 std::min(event_span.size(), dest_span.size()));
368 std::copy(src_span.begin(), src_span.end(), dest_span.begin());
369 if (bytes_available == first_block_size) {
370 strm.write(buffer.first(bytes_available));
374 throw input_output_error(
"failed to write output");
375 total_bytes_written += bytes_available;
377 bytes_buffered = bytes_available;
379 event_span = event_span.subspan(src_span.size());
382 auto const direct_write_size =
383 event_span.size() / write_granularity * write_granularity;
384 if (direct_write_size > 0) {
385 strm.write(event_span.first(direct_write_size));
387 throw input_output_error(
"failed to write output");
388 total_bytes_written += direct_write_size;
389 event_span = event_span.subspan(direct_write_size);
392 if (not event_span.empty()) {
393 buffer = bsource->bucket_of_size(write_granularity);
394 bytes_buffered = event_span.size();
395 std::copy(event_span.begin(), event_span.end(), buffer.begin());
400 explicit write_binary_stream(
402 std::shared_ptr<bucket_source<std::byte>> buffer_provider,
403 arg::granularity<std::size_t> granularity)
404 : strm(std::move(stream)), bsource(std::move(buffer_provider)),
405 write_granularity(granularity.value) {
407 throw std::invalid_argument(
408 "write_binary_stream buffer_provider must not be null");
409 if (write_granularity <= 0)
410 throw std::invalid_argument(
411 "write_binary_stream granularity must be positive");
414 [[nodiscard]]
auto introspect_node() const -> processor_info {
415 return processor_info(
this,
"write_binary_stream");
418 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
419 return processor_graph().push_entry_point(
this);
422 template <
typename Span,
423 typename = std::void_t<
424 decltype(std::span<std::byte const>(std::declval<Span>()))>>
425 void handle(Span
const &event) {
426 handle_span(std::span<std::byte const>(event));
430 if (bytes_buffered > 0) {
431 strm.write(std::span(buffer).first(bytes_buffered));
434 throw input_output_error(
"failed to write output");
493template <
typename OutputStream>
499 if constexpr (std::is_base_of_v<std::ostream, OutputStream>) {
501 return internal::write_binary_stream<decltype(wrapped)>(
502 std::move(wrapped), std::move(buffer_provider), granularity);
504 return internal::write_binary_stream<OutputStream>(
505 std::move(stream), std::move(buffer_provider), granularity);
auto buffer(arg::threshold< std::size_t > threshold, access_tracker< buffer_access > &&tracker, Downstream downstream)
Create a processor that buffers events and emits them on a different thread.
Definition buffer.hpp:342
auto append(Event event, Downstream downstream)
Create a processor that inserts an event at the end of the stream.
Definition prepend_append.hpp:143
auto write_binary_stream(OutputStream stream, std::shared_ptr< bucket_source< std::byte > > buffer_provider, arg::granularity< std::size_t > granularity)
Create a sink that writes bytes to a binary stream, such as a file.
Definition write_binary_stream.hpp:494
auto binary_file_output_stream(std::string const &filename, arg::truncate< bool > truncate=arg::truncate{false}, arg::append< bool > append=arg::append{false})
Create a binary output stream for the given file.
Definition write_binary_stream.hpp:258
auto borrowed_cfile_output_stream(std::FILE *fp)
Create an output stream from a non-owned C file pointer.
Definition write_binary_stream.hpp:328
auto owning_cfile_output_stream(std::FILE *fp)
Create an output stream from a C file pointer, taking ownership.
Definition write_binary_stream.hpp:302
auto null_output_stream()
Create an output stream that discards all written bytes.
Definition write_binary_stream.hpp:237
auto ostream_output_stream(OStream stream)
Create an output stream from an std::ostream instance.
Definition write_binary_stream.hpp:279
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for granularity parameter.
Definition arg_wrappers.hpp:147
Function argument wrapper for truncate parameter.
Definition arg_wrappers.hpp:427
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505