9#include "arg_wrappers.hpp"
14#include "int_types.hpp"
15#include "introspect.hpp"
16#include "processor.hpp"
32#include <system_error>
44struct null_input_stream {
45 static auto is_error() noexcept ->
bool {
return false; }
46 static auto is_eof() noexcept ->
bool {
return true; }
47 static auto is_good() noexcept ->
bool {
return false; }
48 static auto tell() noexcept -> std::optional<std::uint64_t> {
return 0; }
49 static auto skip(std::uint64_t bytes)
noexcept ->
bool {
52 static auto read(std::span<std::byte> )
noexcept
60template <
typename IStream>
class istream_input_stream {
61 static_assert(std::is_base_of_v<std::istream, IStream>);
65 explicit istream_input_stream(IStream stream) : stream(std::move(stream)) {
66 this->stream.exceptions(std::ios::goodbit);
69 auto is_error() noexcept ->
bool {
70 auto const flags = stream.rdstate();
71 return ((flags & std::ios::failbit) || (flags & std::ios::badbit)) &&
72 not(flags & std::ios::eofbit);
75 auto is_eof() noexcept ->
bool {
return stream.eof(); }
77 auto is_good() noexcept ->
bool {
return stream.good(); }
79 auto tell() noexcept -> std::optional<std::uint64_t> {
82 std::int64_t
const pos = stream.tellg();
84 return std::uint64_t(pos);
89 auto skip(std::uint64_t bytes)
noexcept ->
bool {
91 bytes > std::uint64_t(std::numeric_limits<std::streamoff>::max()))
93 stream.seekg(std::streamoff(bytes), std::ios::cur);
94 auto const ret = stream.good();
99 auto read(std::span<std::byte>
buffer)
noexcept -> std::uint64_t {
101 stream.read(
reinterpret_cast<char *
>(
buffer.data()),
102 static_cast<std::streamsize
>(
buffer.size()));
103 return static_cast<std::uint64_t
>(stream.gcount());
108class cfile_input_stream {
113 explicit cfile_input_stream(std::FILE *stream,
bool close_on_destruction)
114 : fp(stream), should_close(close_on_destruction && fp != nullptr) {}
116 cfile_input_stream(cfile_input_stream
const &) =
delete;
117 auto operator=(cfile_input_stream
const &) =
delete;
119 cfile_input_stream(cfile_input_stream &&other) noexcept
120 : fp(std::exchange(other.fp,
nullptr)),
121 should_close(std::exchange(other.should_close,
false)) {}
123 auto operator=(cfile_input_stream &&rhs)
noexcept -> cfile_input_stream & {
125 (void)std::fclose(fp);
126 fp = std::exchange(rhs.fp,
nullptr);
127 should_close = std::exchange(rhs.should_close,
false);
131 ~cfile_input_stream() {
133 (void)std::fclose(fp);
136 auto is_error() noexcept ->
bool {
137 return fp ==
nullptr || std::ferror(fp) != 0;
140 auto is_eof() noexcept ->
bool {
141 return fp !=
nullptr && std::feof(fp) != 0;
144 auto is_good() noexcept ->
bool {
145 return fp !=
nullptr && std::ferror(fp) == 0 && std::feof(fp) == 0;
148 auto tell() noexcept -> std::optional<std::uint64_t> {
158 return std::uint64_t(pos);
162 auto skip(std::uint64_t bytes)
noexcept ->
bool {
166 if (bytes <= std::uint64_t(std::numeric_limits<__int64>::max()))
167 return ::_fseeki64(fp, __int64(bytes), SEEK_CUR) == 0;
169 if (bytes <= std::numeric_limits<long>::max())
170 return std::fseek(fp,
long(bytes), SEEK_CUR) == 0;
175 auto read(std::span<std::byte>
buffer)
noexcept -> std::uint64_t {
182template <
typename InputStream>
183inline void skip_stream_bytes(InputStream &stream, std::uint64_t bytes) {
184 if (not stream.skip(bytes)) {
187 std::uint64_t bytes_discarded = 0;
190 static constexpr std::streamsize bufsize = 32768;
191 std::vector<std::byte> buf(bufsize);
192 std::span<std::byte>
const bufspan(buf);
193 while (bytes_discarded < bytes) {
195 std::min<std::uint64_t>(bufsize, bytes - bytes_discarded);
196 bytes_discarded += stream.read(bufspan.first(read_size));
197 if (not stream.is_good())
205unbuffered_binary_ifstream_input_stream(std::string
const &filename,
206 arg::start_offset<u64> start_offset) {
207 std::ifstream stream;
212 stream.rdbuf()->pubsetbuf(
nullptr, 0);
214 stream.open(filename, std::ios::binary);
216 throw input_output_error(
"failed to open input file: " + filename);
217 auto ret = internal::istream_input_stream(std::move(stream));
218 skip_stream_bytes(ret, start_offset.value);
223inline auto binary_ifstream_input_stream(std::string
const &filename,
224 arg::start_offset<u64> start_offset) {
225 std::ifstream stream;
226 stream.open(filename, std::ios::binary);
228 throw input_output_error(
"failed to open input file: " + filename);
229 auto ret = internal::istream_input_stream(std::move(stream));
230 skip_stream_bytes(ret, start_offset.value);
234inline auto unbuffered_binary_cfile_input_stream(
235 std::string
const &filename,
236 arg::start_offset<std::uint64_t> start_offset) {
239 (void)fopen_s(&fp, filename.c_str(),
"rb");
243 std::FILE *fp = std::fopen(filename.c_str(),
"rb");
247 throw std::system_error(errno, std::generic_category());
248 throw input_output_error(
"failed to open input file: " + filename);
250 if (std::setvbuf(fp,
nullptr, _IONBF, 0) != 0)
251 throw input_output_error(
252 "failed to disable buffering for input file: " + filename);
253 auto ret = internal::cfile_input_stream(fp,
true);
254 skip_stream_bytes(ret, start_offset.value);
259inline auto binary_cfile_input_stream(std::string
const &filename,
260 arg::start_offset<u64> start_offset) {
263 (void)fopen_s(&fp, filename.c_str(),
"rb");
267 std::FILE *fp = std::fopen(filename.c_str(),
"rb");
271 throw std::system_error(errno, std::generic_category());
272 throw input_output_error(
"failed to open input file: " + filename);
274 auto ret = internal::cfile_input_stream(fp,
true);
275 skip_stream_bytes(ret, start_offset.value);
305 std::string
const &filename,
309 return internal::unbuffered_binary_cfile_input_stream(filename,
331 static_assert(std::is_base_of_v<std::istream, IStream>);
332 return internal::istream_input_stream(std::move(stream));
354 return internal::cfile_input_stream(fp,
true);
380 return internal::cfile_input_stream(fp,
false);
385template <
typename InputStream,
typename Event,
typename Downstream>
388 std::is_trivial_v<Event>,
389 "Event type must be trivial to work with read_binary_stream");
390 static_assert(processor<Downstream, bucket<Event>, warning_event>);
393 std::uint64_t length;
395 std::size_t read_granularity;
396 std::shared_ptr<bucket_source<Event>> bsource;
398 Downstream downstream;
400 LIBTCSPC_NOINLINE
auto first_read_size() -> std::uint64_t {
401 auto ret = read_granularity;
402 if (stream.is_good()) {
407 std::optional<std::uint64_t> pos = stream.tell();
409 ret -= *pos % read_granularity;
416 auto read_units(std::span<std::byte> dest, std::size_t max_units,
417 std::uint64_t &total_bytes_read) -> std::uint64_t {
419 std::min<std::uint64_t>(dest.size(), length - total_bytes_read);
420 if (total_bytes_read == 0) {
422 std::min<std::uint64_t>(bytes_to_read, first_read_size());
424 if (bytes_to_read > read_granularity) {
425 bytes_to_read = read_granularity *
426 std::min<std::uint64_t>(
427 max_units, bytes_to_read / read_granularity);
429 auto const bytes_read = stream.read(dest.first(bytes_to_read));
430 total_bytes_read += bytes_read;
435 explicit read_binary_stream(
436 InputStream stream, arg::max_length<std::uint64_t> max_length,
437 std::shared_ptr<bucket_source<Event>> buffer_provider,
438 arg::granularity<std::size_t> granularity, Downstream downstream)
439 : stream(std::move(stream)), length(max_length.value),
440 read_granularity(granularity.value),
441 bsource(std::move(buffer_provider)),
442 downstream(std::move(downstream)) {
444 throw std::invalid_argument(
445 "read_binary_stream buffer_provider must not be null");
446 if (read_granularity <= 0)
447 throw std::invalid_argument(
448 "read_binary_stream granularity must be positive");
451 [[nodiscard]]
auto introspect_node() const -> processor_info {
452 return processor_info(
this,
"read_binary_stream");
455 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
456 return downstream.introspect_graph().push_entry_point(
this);
460 auto const bucket_size =
461 sizeof(Event) >= read_granularity
463 : (read_granularity - 1) / sizeof(Event) + 1;
464 auto const bucket_size_bytes = bucket_size *
sizeof(Event);
466 std::uint64_t total_bytes_read = 0;
468 std::size_t remainder_nbytes = 0;
470 while (total_bytes_read < length && stream.is_good()) {
471 auto const bytes_left_in_bucket =
472 bucket_size_bytes - remainder_nbytes;
473 if (bytes_left_in_bucket >= read_granularity) {
475 bkt = bsource->bucket_of_size(bucket_size);
476 auto const bytes_read = read_units(
477 std::as_writable_bytes(std::span(bkt))
478 .subspan(remainder_nbytes),
479 std::numeric_limits<std::size_t>::max(), total_bytes_read);
480 auto const available_nbytes = remainder_nbytes + bytes_read;
481 auto const this_batch_size = available_nbytes / sizeof(Event);
482 remainder_nbytes = available_nbytes % sizeof(Event);
483 if (this_batch_size == 0)
486 if (remainder_nbytes > 0) {
487 bkt2 = bsource->bucket_of_size(bucket_size);
488 auto const remainder_span =
489 std::as_bytes(std::span(bkt))
490 .subspan(available_nbytes - remainder_nbytes);
491 std::copy(remainder_span.begin(), remainder_span.end(),
492 std::as_writable_bytes(std::span(bkt2)).begin());
494 bkt.shrink(0, this_batch_size);
495 downstream.handle(std::move(bkt));
496 bkt = std::move(bkt2);
499 bkt2 = bsource->bucket_of_size(bucket_size);
500 auto const bytes_read =
501 read_units(std::as_writable_bytes(std::span(bkt2)), 1,
503 if (bytes_read < bytes_left_in_bucket)
505 auto const top_off_span =
506 std::as_bytes(std::span(bkt2)).first(bytes_left_in_bucket);
507 auto const remainder_span = std::as_bytes(std::span(bkt2))
509 .subspan(bytes_left_in_bucket);
510 std::copy(top_off_span.begin(), top_off_span.end(),
511 std::as_writable_bytes(std::span(bkt))
512 .last(bytes_left_in_bucket)
514 std::copy(remainder_span.begin(), remainder_span.end(),
515 std::as_writable_bytes(std::span(bkt2)).begin());
516 downstream.handle(std::move(bkt));
517 bkt = std::move(bkt2);
518 remainder_nbytes = remainder_span.size();
522 if (stream.is_error())
523 throw input_output_error(
"failed to read input");
524 if (remainder_nbytes > 0) {
525 downstream.handle(warning_event{
526 "bytes fewer than record size remain at end of input"});
593template <
typename Event,
typename InputStream,
typename Downstream>
598 Downstream downstream) {
600 if constexpr (std::is_base_of_v<std::istream, InputStream>) {
602 return internal::read_binary_stream<
decltype(wrapped), Event,
604 std::move(wrapped), max_length, std::move(buffer_provider),
605 granularity, std::move(downstream));
607 return internal::read_binary_stream<InputStream, Event, Downstream>(
608 std::move(stream), max_length, std::move(buffer_provider),
609 granularity, std::move(downstream));
auto buffer(arg::threshold< std::size_t > threshold, access_tracker< buffer_access > &&tracker, Downstream downstream)
Create a processor that buffers events and emits them on a different thread.
Definition buffer.hpp:342
auto read_binary_stream(InputStream stream, arg::max_length< std::uint64_t > max_length, std::shared_ptr< bucket_source< Event > > buffer_provider, arg::granularity< std::size_t > granularity, Downstream downstream)
Create a source that reads batches of events from a binary stream, such as a file.
Definition read_binary_stream.hpp:594
std::uint64_t u64
Short name for uint64_t.
Definition int_types.hpp:33
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for granularity parameter.
Definition arg_wrappers.hpp:147
Function argument wrapper for maximum length parameter.
Definition arg_wrappers.hpp:267
Function argument wrapper for start offset parameter.
Definition arg_wrappers.hpp:377
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505