Back to Site
Loading...
Searching...
No Matches
pubsub.h
Go to the documentation of this file.
1#pragma once
2
31#include "node/peer_network.h"
32#include "peer/peer.h"
33#include "core/bytes.h"
34#include "peer/peer_id.h"
35
36#include <atomic>
37#include <chrono>
38#include <condition_variable>
39#include <cstdint>
40#include <deque>
41#include <functional>
42#include <mutex>
43#include <random>
44#include <string>
45#include <thread>
46#include <unordered_map>
47#include <unordered_set>
48#include <utility>
49#include <vector>
50
51namespace librats {
52
57
58class PubSub final : public Subsystem {
59public:
60 using Handler = std::function<void(const PeerId& from, const std::string& topic, ByteView data)>;
61 using Validator = std::function<ValidationResult(const PeerId& from, const std::string& topic, ByteView data)>;
62
65 struct Config {
66 int mesh_target = 6;
67 int mesh_low = 4;
68 int mesh_high = 12;
69 int fanout_size = 6;
70 int gossip_factor = 6;
71 std::chrono::milliseconds fanout_ttl{60000};
72 std::chrono::milliseconds heartbeat_interval{1000};
75 size_t seen_limit = 8192;
76 };
77
79 explicit PubSub(Config config);
80 ~PubSub() override;
81
83 void subscribe(const std::string& topic, Handler handler);
84 void unsubscribe(const std::string& topic);
85
87 void publish(const std::string& topic, ByteView data);
88
89 bool is_subscribed(const std::string& topic) const;
90 std::vector<std::string> subscribed_topics() const;
91 std::vector<PeerId> peers_for_topic(const std::string& topic) const;
92 std::vector<PeerId> mesh_peers(const std::string& topic) const;
93
96 void set_validator(const std::string& topic, Validator validator);
97
98 // Subsystem.
99 void attach(NodeContext& ctx) override;
100 void start() override;
101 void stop() override;
102
103private:
104 struct Topic {
105 std::unordered_set<PeerId, PeerId::Hash> subscribers;
106 std::unordered_set<PeerId, PeerId::Hash> mesh;
107 std::unordered_set<PeerId, PeerId::Hash> fanout;
108 std::chrono::steady_clock::time_point last_fanout{};
109 };
110
111 struct CachedMessage {
112 std::string topic;
113 Bytes frame;
114 };
115
116 // Inbound dispatch (all run on a reactor thread).
117 void on_new_peer(const Peer& peer);
118 void on_peer_gone(const PeerId& id);
119 void on_gossip(const Peer& peer, ByteView payload);
120
121 void recv_subscription(const PeerId& from, const std::string& topic, bool subscribe);
122 void recv_graft(const PeerId& from, const std::string& topic);
123 void recv_prune(const PeerId& from, const std::string& topic);
124 void recv_publish(const PeerId& from, ByteView frame, const PeerId& origin, uint64_t seqno,
125 const std::string& topic, ByteView data);
126 void recv_ihave(const PeerId& from, const std::string& topic, const std::vector<std::string>& ids);
127 void recv_iwant(const PeerId& from, const std::vector<std::string>& ids);
128
129 // Heartbeat (background thread).
130 void heartbeat_loop();
131 void do_heartbeat();
132
136 using CtrlList = std::vector<std::pair<PeerId, std::string>>;
137 void maintain_mesh_locked(const std::string& topic, CtrlList& grafts, CtrlList& prunes);
138
139 // Helpers.
140 void send_ctrl(const PeerId& to, uint8_t op, const std::string& topic);
141 void broadcast_ctrl(uint8_t op, const std::string& topic);
142 void deliver_local(const PeerId& from, const std::string& topic, ByteView data);
143 Handler handler_for(const std::string& topic) const;
144 ValidationResult validate(const PeerId& from, const std::string& topic, ByteView data);
145 bool mark_seen(const std::string& id);
146 void cache_message(const std::string& id, const std::string& topic, const Bytes& frame);
147 std::vector<PeerId> random_sample(std::vector<PeerId> in, int n);
148
149 PeerNetwork* network_ = nullptr;
150 Config config_;
151
152 mutable std::mutex mutex_;
153 uint64_t seqno_ = 0;
154 std::unordered_map<std::string, Handler> subscriptions_;
155 std::unordered_map<std::string, Topic> topics_;
156 std::unordered_map<std::string, Validator> validators_;
157 Validator global_validator_;
158
159 mutable std::mutex mcache_mutex_;
160 std::unordered_map<std::string, CachedMessage> mcache_;
161 std::deque<std::vector<std::string>> history_;
162 std::unordered_set<std::string> seen_;
163 std::deque<std::string> seen_order_;
164
165 std::mutex rng_mutex_;
166 std::mt19937 rng_;
167
168 std::atomic<bool> running_{false};
169 std::thread heartbeat_thread_;
170 std::mutex hb_mutex_;
171 std::condition_variable hb_cv_;
172};
173
174} // namespace librats
void subscribe(const std::string &topic, Handler handler)
Subscribe to a topic and deliver matching messages to handler.
void publish(const std::string &topic, ByteView data)
Publish data on topic: along the mesh if we are subscribed, else via fanout.
std::function< void(const PeerId &from, const std::string &topic, ByteView data)> Handler
Definition pubsub.h:60
PubSub(Config config)
void attach(NodeContext &ctx) override
void unsubscribe(const std::string &topic)
~PubSub() override
std::function< ValidationResult(const PeerId &from, const std::string &topic, ByteView data)> Validator
Definition pubsub.h:61
std::vector< PeerId > peers_for_topic(const std::string &topic) const
known subscribers
void stop() override
stop and join it
void start() override
launch the heartbeat thread
std::vector< PeerId > mesh_peers(const std::string &topic) const
our mesh for topic
bool is_subscribed(const std::string &topic) const
void set_validator(const std::string &topic, Validator validator)
Gate inbound messages for a topic; topic == "" installs a global validator used when no per-topic val...
std::vector< std::string > subscribed_topics() const
A pluggable network subsystem.
Definition node.h:65
ValidationResult
Outcome of validating an inbound published message before it is delivered or forwarded.
Definition pubsub.h:56
A lightweight handle to a connected peer.
Self-certifying peer identity.
The narrow contract a subsystem needs from the node — and nothing more.
GossipSub tuning.
Definition pubsub.h:65
int gossip_factor
D_lazy— peers we emit IHAVE to per heartbeat per topic.
Definition pubsub.h:70
std::chrono::milliseconds fanout_ttl
drop a fanout set idle this long
Definition pubsub.h:71
int history_gossip
of those, how many windows IHAVE advertises
Definition pubsub.h:74
int history_length
heartbeat windows the message cache keeps (for IWANT)
Definition pubsub.h:73
std::chrono::milliseconds heartbeat_interval
mesh maintenance + gossip cadence
Definition pubsub.h:72
int mesh_low
D_low — graft more peers below this.
Definition pubsub.h:67
size_t seen_limit
dedup ids remembered
Definition pubsub.h:75
int mesh_high
D_high— prune peers above this.
Definition pubsub.h:68
int fanout_size
peers used to publish to a topic we are not subscribed to
Definition pubsub.h:69
int mesh_target
D — desired mesh degree per topic.
Definition pubsub.h:66