libtcspc C++ API
Streaming TCSPC and time tag data processing
Loading...
Searching...
No Matches
route.hpp
1/*
2 * This file is part of libtcspc
3 * Copyright 2019-2026 Board of Regents of the University of Wisconsin System
4 * SPDX-License-Identifier: MIT
5 */
6
7#pragma once
8
9#include "common.hpp"
10#include "data_types.hpp"
11#include "errors.hpp"
12#include "int_arith.hpp"
13#include "introspect.hpp"
14#include "type_erased_processor.hpp"
15#include "type_list.hpp"
16
17#include <algorithm>
18#include <array>
19#include <cstddef>
20#include <exception>
21#include <functional>
22#include <limits>
23#include <numeric>
24#include <type_traits>
25#include <utility>
26
27namespace tcspc {
28
29namespace internal {
30
31// Design note: Currently the router produces a single downstream index per
32// event. We could generalize this so that the router produces a boolean mask
33// of the downstreams, such that a single event can be routed to multiple
34// downstreams. But let's keep it simple. If necessary, a "multiroute"
35// processor can be added.
36
37template <typename RoutedEventList, typename Router, std::size_t N,
38 typename Downstream>
39class route_homogeneous {
40 static_assert(type_list_like<RoutedEventList>);
41 // We do not require Downstream to handle all of RoutedEventList.
42 static_assert(processor<Downstream>);
43
44 Router router;
45 std::array<Downstream, N> downstreams;
46
47 LIBTCSPC_NOINLINE void flush_all_but(Downstream &excluded) {
48 for (auto &d : downstreams) {
49 if (&d != &excluded) {
50 try {
51 d.flush();
52 } catch (end_of_processing const &) {
53 ;
54 }
55 }
56 }
57 }
58
59 public:
60 explicit route_homogeneous(Router router,
61 std::array<Downstream, N> downstreams)
62 : router(std::move(router)), downstreams(std::move(downstreams)) {}
63
64 [[nodiscard]] auto introspect_node() const -> processor_info {
65 return processor_info(this, "route_homogeneous");
66 }
67
68 [[nodiscard]] auto introspect_graph() const -> processor_graph {
69 return std::transform_reduce(
70 downstreams.begin(), downstreams.end(), processor_graph(),
71 merge_processor_graphs, [this](auto const &d) {
72 return d.introspect_graph().push_entry_point(this);
73 });
74 }
75
76 template <typename Event>
77 requires(convertible_to_type_list_member<std::remove_cvref_t<Event>,
78 RoutedEventList> and
79 handler_for<Downstream, std::remove_cvref_t<Event>>)
80 void handle(Event &&event) {
81 std::size_t index = router(std::as_const(event));
82 if (index >= N)
83 return;
84 try {
85 downstreams[index].handle(std::forward<Event>(event));
86 } catch (end_of_processing const &) {
87 flush_all_but(downstreams[index]);
88 throw;
89 }
90 }
91
92 template <typename Event>
93 requires(not convertible_to_type_list_member<
94 std::remove_cvref_t<Event>, RoutedEventList> and
95 handler_for<Downstream, std::remove_cvref_t<Event>>)
96 void handle(Event &&event) {
97 for (auto &d : downstreams) {
98 try {
99 d.handle(std::as_const(event));
100 } catch (end_of_processing const &) {
101 flush_all_but(d);
102 throw;
103 }
104 }
105 }
106
107 void flush() {
108 std::exception_ptr end;
109 for (auto &d : downstreams) {
110 try {
111 d.flush();
112 } catch (end_of_processing const &) {
113 if (not end)
114 end = std::current_exception();
115 }
116 }
117 if (end)
118 std::rethrow_exception(end);
119 }
120};
121
122} // namespace internal
123
161template <typename RoutedEventList, typename Router, std::size_t N,
162 typename Downstream>
163auto route_homogeneous(Router router, std::array<Downstream, N> downstreams) {
164 return internal::route_homogeneous<RoutedEventList, Router, N, Downstream>(
165 std::move(router), std::move(downstreams));
166}
167
204template <typename RoutedEventList, typename Router, typename... Downstreams>
205auto route_homogeneous(Router router, Downstreams... downstreams) {
207 std::move(router), std::array{std::move(downstreams)...});
208}
209
248template <typename RoutedEventList, typename BroadcastEventList = type_list<>,
249 typename Router, typename... Downstreams>
250auto route(Router router, Downstreams... downstreams) {
251 static_assert(type_list_like<RoutedEventList>);
253 static_assert(
256 0,
257 "routed event list and broadcast event list must not overlap");
258 using type_erased_downstream = type_erased_processor<
260 return route_homogeneous<RoutedEventList, Router, sizeof...(Downstreams),
261 type_erased_downstream>(
262 std::move(router),
263 std::array<type_erased_downstream, sizeof...(Downstreams)>{
264 type_erased_downstream(std::move(downstreams))...});
265}
266
275 public:
280 template <typename Event>
281 auto operator()(Event const & /* event */) const -> std::size_t {
282 return std::size_t(-1);
283 }
284};
285
295template <std::size_t N, typename DataTypes = default_data_types>
297 std::array<typename DataTypes::channel_type, N> channels;
298 std::array<std::size_t, N> indices;
299
300 public:
310 template <typename ChannelIndexPair>
312 std::array<ChannelIndexPair, N> const &channel_indices)
313 : channels(std::invoke([&] {
314 std::array<typename DataTypes::channel_type, N> ret{};
315 std::transform(channel_indices.begin(), channel_indices.end(),
316 ret.begin(),
317 [](auto p) { return std::get<0>(p); });
318 return ret;
319 })),
320 indices(std::invoke([&] {
321 std::array<std::size_t, N> ret{};
322 std::transform(channel_indices.begin(), channel_indices.end(),
323 ret.begin(),
324 [](auto p) { return std::get<1>(p); });
325 return ret;
326 })) {
327
328 static_assert(
329 std::is_convertible_v<decltype(std::get<0>(channel_indices[0])),
330 typename DataTypes::channel_type> &&
331 std::is_convertible_v<
332 decltype(std::get<1>(channel_indices[0])), std::size_t>,
333 "channel_indices must be an array of pair-like convertible to (channel, std::size_t)");
334 }
335
337 template <typename Event>
338 auto operator()(Event const &event) const -> std::size_t {
339 static_assert(std::is_same_v<decltype(event.channel),
340 typename DataTypes::channel_type>);
341 auto it = std::find(channels.begin(), channels.end(), event.channel);
342 if (it == channels.end())
343 return std::numeric_limits<std::size_t>::max();
344 return indices[internal::as_unsigned(
345 std::distance(channels.begin(), it))];
346 }
347};
348
370template <std::size_t N, typename Downstream>
371auto broadcast_homogeneous(std::array<Downstream, N> downstreams) {
372 return route_homogeneous<type_list<>, null_router, N, Downstream>(
373 null_router(), std::move(downstreams));
374}
375
396template <typename... Downstreams>
397auto broadcast_homogeneous(Downstreams... downstreams) {
398 auto arr = std::array{std::move(downstreams)...};
399 return broadcast_homogeneous(std::move(arr));
400}
401
423template <typename BroadcastEventList, typename... Downstreams>
424auto broadcast(Downstreams... downstreams) {
425 return route<type_list<>, BroadcastEventList, null_router, Downstreams...>(
426 null_router(), std::move(downstreams)...);
427}
428
429} // namespace tcspc
auto operator()(Event const &event) const -> std::size_t
Implements router requirement.
Definition route.hpp:338
channel_router(std::array< ChannelIndexPair, N > const &channel_indices)
Construct with channels and corresponding downstream indices.
Definition route.hpp:311
Router that does not route.
Definition route.hpp:274
auto operator()(Event const &) const -> std::size_t
Implements router requirement; always returns std::numeric_limits<std::size_t>::max().
Definition route.hpp:281
Processor that type-erases the downstream processor.
Definition type_erased_processor.hpp:120
Concept that is satisfied when a type is a tcspc::type_list specialization.
Definition type_list.hpp:64
auto merge_processor_graphs(processor_graph const &a, processor_graph const &b) -> processor_graph
Create a new processor graph by merging two existing ones.
Definition introspect.hpp:357
auto broadcast(Downstreams... downstreams)
Create a processor that broadcasts events to multiple downstream processors.
Definition route.hpp:424
auto route_homogeneous(Router router, std::array< Downstream, N > downstreams)
Create a processor that routes events to multiple downstreams of the same type.
Definition route.hpp:163
auto broadcast_homogeneous(std::array< Downstream, N > downstreams)
Create a processor that broadcasts events to multiple downstream processors of the same type.
Definition route.hpp:371
auto route(Router router, Downstreams... downstreams)
Create a processor that routes events to different downstreams.
Definition route.hpp:250
typename type_list_intersection< TL0, TL1 >::type type_list_intersection_t
Helper type for tcspc::type_list_intersection.
Definition type_list.hpp:419
constexpr std::size_t type_list_size_v
Helper variable template for tcspc::type_list_size.
Definition type_list.hpp:97
typename type_list_union< TL0, TL1 >::type type_list_union_t
Helper type for tcspc::type_list_union.
Definition type_list.hpp:363
libtcspc namespace.
Definition acquire.hpp:29