9#include "arg_wrappers.hpp"
11#include "introspect.hpp"
12#include "move_only_any.hpp"
13#include "processor.hpp"
17#include <condition_variable>
111 struct owning_storage {
113 std::unique_ptr<T[]> p;
116 explicit owning_storage(std::unique_ptr<T[]> ptr)
117 : p(std::move(ptr)) {}
121 internal::move_only_any store;
136 template <typename S>
138 : s(span), store(std::forward<S>(
storage)) {}
144 : store(std::invoke([&s = other.s] {
145 using TMut = std::remove_cv_t<T>;
147 std::unique_ptr<TMut[]> r(s.empty() ?
nullptr
148 :
new TMut[s.size()]);
149 std::copy(s.begin(), s.end(), r.get());
150 return internal::move_only_any(
151 std::in_place_type<owning_storage>, std::move(r));
153 T *ptr = internal::move_only_any_cast<owning_storage>(&store)->p.get();
154 s = {ptr, other.s.size()};
183 using pointer =
typename std::span<T>::pointer;
198 typename std::span<T const>::reverse_iterator;
207 return std::span<T const>(s).begin();
216 [[nodiscard]]
constexpr auto end() noexcept ->
iterator {
return s.end(); }
220 return std::span<T const>(s).end();
234 [[nodiscard]]
constexpr auto crbegin() const noexcept
236 return std::span<T const>(s).rbegin();
240 [[nodiscard]]
constexpr auto rbegin() const noexcept
251 [[nodiscard]]
constexpr auto crend() const noexcept
253 return std::span<T const>(s).rend();
257 [[nodiscard]]
constexpr auto rend() const noexcept
267 return std::span<T const>(s).front();
275 return std::span<T const>(s).back();
292 throw std::out_of_range(
"bucket element index out of range");
299 throw std::out_of_range(
"bucket element index out of range");
320 return s.size_bytes();
324 [[nodiscard]]
constexpr auto empty() const noexcept ->
bool {
330 return s.first(
count);
335 -> std::span<T const> {
336 return std::span<T const>(s).first(
count);
341 return s.last(
count);
346 -> std::span<T const> {
347 return std::span<T const>(s).last(
count);
354 return s.subspan(offset,
count);
358 [[nodiscard]]
constexpr auto
360 -> std::span<T const> {
361 return std::span<T const>(s).subspan(offset,
count);
377 template <
typename S>
379 return store.type() ==
typeid(S);
392 template <
typename S> [[nodiscard]]
auto storage() const -> S const & {
393 return internal::move_only_any_cast<S const &>(store);
413 S ret = internal::move_only_any_cast<S>(std::move(store));
428 void shrink(std::size_t start, std::size_t
count = std::dynamic_extent) {
429 s = s.subspan(start,
count);
441 return lhs.s.size() == rhs.s.size() &&
442 std::equal(lhs.s.begin(), lhs.s.end(), rhs.s.begin());
448 static constexpr std::size_t num_to_print = 10;
449 auto const size = bkt.s.size();
450 stream <<
"bucket(size=" <<
size;
451 if constexpr (std::is_same_v<std::remove_cv_t<T>, std::byte>) {
452 for (std::size_t i = 0; i < std::min(
size, num_to_print - 1); ++i)
453 stream <<
", " << std::to_integer<int>(bkt.s[i]);
454 if (
size > num_to_print)
456 if (
size >= num_to_print)
457 stream <<
", " << std::to_integer<int>(bkt.s[
size - 1]);
459 for (std::size_t i = 0; i < std::min(
size, num_to_print - 1); ++i)
460 stream <<
", " << bkt.s[i];
461 if (
size > num_to_print)
463 if (
size >= num_to_print)
464 stream <<
", " << bkt.s[
size - 1];
466 return stream <<
')';
490 struct ad_hoc_storage {};
565 [[nodiscard]]
virtual auto
567 throw std::logic_error(
568 "this bucket source does not support shared views");
589 new_delete_bucket_source() =
default;
593 static auto create() -> std::shared_ptr<bucket_source<T>> {
594 static std::shared_ptr<bucket_source<T>> instance(
595 new new_delete_bucket_source());
602 std::unique_ptr<T[]> p(
new T[size]);
603 return bucket<T>{std::span(p.get(), size), std::move(p)};
625 sharable_new_delete_bucket_source() =
default;
629 static auto create() -> std::shared_ptr<bucket_source<T>> {
630 static std::shared_ptr<bucket_source<T>> instance(
631 new sharable_new_delete_bucket_source());
638 std::shared_ptr<T[]> p(
new T[size]);
639 return bucket<T>{std::span(p.get(), size), std::move(p)};
652 auto storage = bkt.template storage<std::shared_ptr<T[]>>();
680template <
typename T,
bool Blocking = false,
bool ClearRecycled = false>
681class recycling_bucket_source final
683 public std::enable_shared_from_this<
684 recycling_bucket_source<T, Blocking, ClearRecycled>> {
686 std::condition_variable not_empty_condition;
687 std::size_t max_buckets;
688 std::size_t max_recycled;
689 std::size_t bucket_count = 0;
690 std::vector<std::unique_ptr<std::vector<T>>> recyclable;
692 struct bucket_storage {
693 std::shared_ptr<recycling_bucket_source> source;
694 std::unique_ptr<std::vector<T>> storage;
705 if (source->max_recycled > 0 &&
706 storage->size() > source->max_recycled)
707 *storage = std::vector<T>();
709 if constexpr (ClearRecycled)
713 auto const lock = std::lock_guard(source->mutex);
714 source->recyclable.push_back(std::move(storage));
717 if constexpr (Blocking) {
721 source->not_empty_condition.notify_one();
725 explicit bucket_storage(
726 std::shared_ptr<recycling_bucket_source> source,
727 std::unique_ptr<std::vector<T>> &&storage)
728 : source(std::move(source)), storage(std::move(storage)) {}
730 bucket_storage(bucket_storage
const &) =
delete;
731 auto operator=(bucket_storage
const &) =
delete;
733 bucket_storage(bucket_storage &&)
noexcept =
default;
734 auto operator=(bucket_storage &&)
noexcept
735 -> bucket_storage & =
default;
738 explicit recycling_bucket_source(
741 : max_buckets(max_bucket_count.
value),
742 max_recycled(max_recycled_size.
value) {}
758 0}) -> std::shared_ptr<bucket_source<T>> {
759 return std::shared_ptr<recycling_bucket_source>(
760 new recycling_bucket_source(max_bucket_count, max_recycled_size));
774 std::unique_ptr<std::vector<T>> p;
776 auto lock = std::unique_lock(mutex);
777 if (recyclable.empty() && bucket_count < max_buckets) {
780 if constexpr (Blocking) {
781 not_empty_condition.wait(
782 lock, [&] {
return not recyclable.empty(); });
783 }
else if (recyclable.empty()) {
785 "recycling bucket source exhausted");
787 p = std::move(recyclable.back());
788 recyclable.pop_back();
792 p = std::make_unique<std::vector<T>>();
794 auto const spn = std::span(p->data(), p->size());
796 spn, bucket_storage(this->shared_from_this(), std::move(p))};
810template <
typename T,
bool Blocking = false,
bool ClearRecycled = false>
811class sharable_recycling_bucket_source final
813 public std::enable_shared_from_this<
814 sharable_recycling_bucket_source<T, Blocking, ClearRecycled>> {
816 std::condition_variable not_empty_condition;
817 std::size_t max_buckets;
818 std::size_t max_recycled;
819 std::size_t bucket_count = 0;
820 std::vector<std::unique_ptr<std::vector<T>>> recyclable;
823 struct bucket_storage {
824 std::shared_ptr<std::vector<T>> storage;
827 explicit sharable_recycling_bucket_source(
830 : max_buckets(max_bucket_count.
value),
831 max_recycled(max_recycled_size.
value) {}
843 0}) -> std::shared_ptr<bucket_source<T>> {
844 return std::shared_ptr<sharable_recycling_bucket_source>(
845 new sharable_recycling_bucket_source(max_bucket_count,
855 std::unique_ptr<std::vector<T>> p;
857 auto lock = std::unique_lock(mutex);
858 if (recyclable.empty() && bucket_count < max_buckets) {
861 if constexpr (Blocking) {
862 not_empty_condition.wait(
863 lock, [&] {
return not recyclable.empty(); });
864 }
else if (recyclable.empty()) {
866 "sharable recycling bucket source exhausted");
868 p = std::move(recyclable.back());
869 recyclable.pop_back();
873 p = std::make_unique<std::vector<T>>();
875 auto const spn = std::span(*p);
876 std::shared_ptr<std::vector<T>> shptr{
878 [self = this->shared_from_this()](std::vector<T> *pv) {
881 if (self->max_recycled > 0 && pv->size() > self->max_recycled)
882 *pv = std::vector<T>();
883 if constexpr (ClearRecycled)
886 auto const lock = std::lock_guard(self->mutex);
887 self->recyclable.emplace_back(pv);
889 if constexpr (Blocking)
890 self->not_empty_condition.notify_one();
894 return bucket<T>{spn, bucket_storage{shptr}};
906 auto storage = bkt.template storage<bucket_storage>();
913template <
typename Event,
typename Downstream>
class extract_bucket {
915 processor<Downstream, decltype(std::declval<Event>().data_bucket)>);
917 Downstream downstream;
920 explicit extract_bucket(Downstream downstream)
921 : downstream(std::move(downstream)) {}
923 [[nodiscard]]
auto introspect_node() const -> processor_info {
924 return processor_info(
this,
"extract_bucket");
927 [[nodiscard]]
auto introspect_graph() const -> processor_graph {
928 return downstream.introspect_graph().push_entry_point(
this);
931 void handle(Event
const &event) { downstream.handle(event.data_bucket); }
933 void handle(Event &&event) {
934 downstream.handle(std::move(event).data_bucket);
937 void flush() { downstream.flush(); }
960template <
typename Event,
typename Downstream>
962 return internal::extract_bucket<Event, Downstream>(std::move(downstream));
Value-semantic container for array data allowing use of custom storage.
Definition bucket.hpp:110
constexpr auto end() noexcept -> iterator
Return an iterator to the end.
Definition bucket.hpp:216
constexpr auto rend() noexcept -> reverse_iterator
Return a reverse iterator to the end.
Definition bucket.hpp:246
constexpr auto rbegin() noexcept -> reverse_iterator
Return a reverse iterator to the beginning.
Definition bucket.hpp:229
constexpr auto operator[](size_type idx) -> reference
Return an element without bounds checking.
Definition bucket.hpp:279
auto storage() const -> S const &
Observe the underlying storage.
Definition bucket.hpp:392
typename std::span< T >::iterator iterator
Iterator type.
Definition bucket.hpp:191
constexpr auto crbegin() const noexcept -> const_reverse_iterator
Return a reverse iterator to the beginning.
Definition bucket.hpp:234
typename std::span< T >::size_type size_type
Size type.
Definition bucket.hpp:179
constexpr auto empty() const noexcept -> bool
Return whether this bucket is empty.
Definition bucket.hpp:324
constexpr auto at(size_type pos) const -> const_reference
Return an element with bounds checking.
Definition bucket.hpp:297
constexpr auto rbegin() const noexcept -> const_reverse_iterator
Return a reverse iterator to the beginning.
Definition bucket.hpp:240
friend constexpr auto operator==(bucket const &lhs, bucket const &rhs) -> bool
Equality comparison operator.
Definition bucket.hpp:439
constexpr auto rend() const noexcept -> const_reverse_iterator
Return a reverse iterator to the end.
Definition bucket.hpp:257
constexpr auto operator[](size_type idx) const -> const_reference
Return an element without bounds checking.
Definition bucket.hpp:284
typename std::span< T >::element_type element_type
Element type.
Definition bucket.hpp:175
constexpr auto subspan(size_type offset, size_type count=std::dynamic_extent) const -> std::span< T const >
Return the span of the given range of elements.
Definition bucket.hpp:359
constexpr auto at(size_type pos) -> reference
Return an element with bounds checking.
Definition bucket.hpp:290
constexpr auto last(size_type count) -> std::span< T >
Return the span of the last count elements.
Definition bucket.hpp:340
bucket(bucket &&other) noexcept=default
Move constructor.
typename std::span< T >::const_reference const_reference
Element const reference type.
Definition bucket.hpp:189
typename std::span< T >::const_pointer const_pointer
Element const pointer type.
Definition bucket.hpp:185
constexpr auto back() const -> const_reference
Return the last element.
Definition bucket.hpp:274
constexpr auto front() const -> const_reference
Return the first element.
Definition bucket.hpp:266
auto operator=(bucket &&other) noexcept -> bucket &=default
Move assignment operator.
auto operator=(bucket const &other) -> bucket &
Copy assignment operator (allocates new private storage).
Definition bucket.hpp:161
constexpr auto end() const noexcept -> const_iterator
Return an iterator to the end.
Definition bucket.hpp:224
constexpr auto begin() const noexcept -> const_iterator
Return an iterator to the beginning.
Definition bucket.hpp:211
typename std::span< T >::difference_type difference_type
Difference type.
Definition bucket.hpp:181
typename std::span< T >::reverse_iterator reverse_iterator
Reverse iterator type.
Definition bucket.hpp:195
typename std::span< T const >::reverse_iterator const_reverse_iterator
Const reverse iterator type.
Definition bucket.hpp:197
constexpr auto cend() const noexcept -> const_iterator
Return an iterator to the end.
Definition bucket.hpp:219
bucket(bucket const &other)
Copy constructor (allocates new private storage).
Definition bucket.hpp:143
constexpr auto data() noexcept -> pointer
Return the address of the data.
Definition bucket.hpp:304
constexpr auto back() -> reference
Return the last element.
Definition bucket.hpp:271
auto extract_storage() -> S
Extract the underlying storage.
Definition bucket.hpp:411
void shrink(std::size_t start, std::size_t count=std::dynamic_extent)
Shrink the span of the bucket data.
Definition bucket.hpp:428
typename std::span< T >::pointer pointer
Element pointer type.
Definition bucket.hpp:183
bucket() noexcept=default
Construct an empty bucket.
constexpr auto size() const noexcept -> size_type
Return the number of data elements in this bucket.
Definition bucket.hpp:314
constexpr auto last(size_type count) const -> std::span< T const >
Return the span of the last count elements.
Definition bucket.hpp:345
constexpr auto begin() noexcept -> iterator
Return an iterator to the beginning.
Definition bucket.hpp:201
auto check_storage_type() const noexcept -> bool
Check if the underlying storage is of a given type.
Definition bucket.hpp:378
constexpr auto crend() const noexcept -> const_reverse_iterator
Return a reverse iterator to the end.
Definition bucket.hpp:251
constexpr auto front() -> reference
Return the first element.
Definition bucket.hpp:263
constexpr auto subspan(size_type offset, size_type count=std::dynamic_extent) -> std::span< T >
Return the span of the given range of elements.
Definition bucket.hpp:351
typename std::span< T const >::iterator const_iterator
Const iterator type.
Definition bucket.hpp:193
constexpr auto data() const noexcept -> const_pointer
Return the address of the data.
Definition bucket.hpp:309
constexpr auto cbegin() const noexcept -> const_iterator
Return an iterator to the beginning.
Definition bucket.hpp:206
friend auto operator<<(std::ostream &stream, bucket const &bkt) -> std::ostream &
Stream insertion operator.
Definition bucket.hpp:446
typename std::span< T >::value_type value_type
Value type.
Definition bucket.hpp:177
constexpr auto first(size_type count) const -> std::span< T const >
Return the span of the first count elements.
Definition bucket.hpp:334
constexpr auto first(size_type count) -> std::span< T >
Return the span of the first count elements.
Definition bucket.hpp:329
typename std::span< T >::reference reference
Element reference type.
Definition bucket.hpp:187
constexpr auto size_bytes() const noexcept -> size_type
Return the size of this bucket's data in bytes.
Definition bucket.hpp:319
Error thrown when buffer capacity has been exhausted.
Definition errors.hpp:84
static auto create() -> std::shared_ptr< bucket_source< T > >
Create an instance.
Definition bucket.hpp:593
auto bucket_of_size(std::size_t size) -> bucket< T > override
Implements bucket source requirement.
Definition bucket.hpp:600
auto bucket_of_size(std::size_t size) -> bucket< T > override
Implements bucket source requirement.
Definition bucket.hpp:773
static auto create(arg::max_bucket_count<> max_bucket_count=arg::max_bucket_count{std::numeric_limits< std::size_t >::max()}, arg::max_recycled_size<> max_recycled_size=arg::max_recycled_size<>{ 0}) -> std::shared_ptr< bucket_source< T > >
Create an instance.
Definition bucket.hpp:754
auto bucket_of_size(std::size_t size) -> bucket< T > override
Implements bucket source requirement.
Definition bucket.hpp:636
auto supports_shared_views() const noexcept -> bool override
Implements sharable bucket source requirement.
Definition bucket.hpp:643
static auto create() -> std::shared_ptr< bucket_source< T > >
Create an instance.
Definition bucket.hpp:629
auto shared_view_of(bucket< T > const &bkt) -> bucket< T const > override
Implements sharable bucket source requirement.
Definition bucket.hpp:649
auto supports_shared_views() const noexcept -> bool override
Implements sharable bucket source requirement.
Definition bucket.hpp:898
auto shared_view_of(bucket< T > const &bkt) -> bucket< T const > override
Implements sharable bucket source requirement.
Definition bucket.hpp:904
auto bucket_of_size(std::size_t size) -> bucket< T > override
Implements bucket source requirement.
Definition bucket.hpp:854
static auto create(arg::max_bucket_count<> max_bucket_count=arg::max_bucket_count{std::numeric_limits< std::size_t >::max()}, arg::max_recycled_size<> max_recycled_size=arg::max_recycled_size<>{ 0}) -> std::shared_ptr< bucket_source< T > >
Create an instance.
Definition bucket.hpp:839
auto ad_hoc_bucket(std::span< T > s) -> bucket< T >
Create a tcspc::bucket referencing a span.
Definition bucket.hpp:489
auto extract_bucket(Downstream downstream)
Create a processor that extracts the bucket carried by an event.
Definition bucket.hpp:961
auto count(access_tracker< count_access > &&tracker, Downstream downstream)
Create a processor that counts events of a given type.
Definition count.hpp:313
libtcspc namespace.
Definition acquire.hpp:29
Function argument wrapper for maximum bucket count.
Definition arg_wrappers.hpp:227
T value
The argument value.
Definition arg_wrappers.hpp:229
Function argument wrapper for maximum recycled size.
Definition arg_wrappers.hpp:297
T value
The argument value.
Definition arg_wrappers.hpp:299
Abstract base class for polymorphic bucket sources.
Definition bucket.hpp:505
virtual auto bucket_of_size(std::size_t size) -> bucket< T >=0
Create a bucket of size elements of type T.
virtual auto shared_view_of(bucket< T > const &bkt) -> bucket< T const >
Create a shared view bucket that is a read-only view of the given bucket but may outlive the original...
Definition bucket.hpp:566
virtual auto supports_shared_views() const noexcept -> bool
Return whether this bucket source is a sharable bucket source.
Definition bucket.hpp:528