Back to Site
Loading...
Searching...
No Matches
gossipsub.h
Go to the documentation of this file.
1#pragma once
2
3#include <string>
4#include <vector>
5#include <unordered_map>
6#include <unordered_set>
7#include <chrono>
8#include <memory>
9#include <functional>
10#include <mutex>
11#include <atomic>
12#include <thread>
13#include <condition_variable>
14#include <random>
15#include "json.hpp"
16
17namespace librats {
18
19// Forward declarations
20class RatsClient;
21
26 SUBSCRIBE, // Subscribe to a topic
27 UNSUBSCRIBE, // Unsubscribe from a topic
28 PUBLISH, // Publish a message to a topic
29 GOSSIP, // Gossip message about published content
30 GRAFT, // Join mesh for a topic
31 PRUNE, // Leave mesh for a topic
32 IHAVE, // Announce having certain messages
33 IWANT, // Request specific messages
34 HEARTBEAT // Periodic heartbeat with control information
35};
36
41 std::string message_id;
42 std::string topic;
43 std::string sender_peer_id;
44 std::chrono::steady_clock::time_point first_seen;
45 std::chrono::steady_clock::time_point last_seen;
46 std::vector<std::string> seen_from_peers;
49
50 MessageMetadata(const std::string& id, const std::string& t, const std::string& sender)
51 : message_id(id), topic(t), sender_peer_id(sender),
52 first_seen(std::chrono::steady_clock::now()),
53 last_seen(std::chrono::steady_clock::now()),
54 hop_count(0), validated(false) {}
55};
56
60struct PeerScore {
61 std::string peer_id;
62 double score;
63
64 // Score components
65 double topic_score; // Score based on topic participation
66 double delivery_score; // Score based on message delivery
67 double mesh_behavior_score; // Score based on mesh behavior
68 double invalid_message_score; // Penalty for invalid messages
69
70 // Timing metrics
71 std::chrono::steady_clock::time_point last_updated;
72 std::chrono::steady_clock::time_point connected_since;
73
74 // Behavioral metrics
79
80 PeerScore(const std::string& id)
81 : peer_id(id), score(0.0), topic_score(0.0), delivery_score(0.0),
83 last_updated(std::chrono::steady_clock::now()),
84 connected_since(std::chrono::steady_clock::now()),
87
89};
90
95 std::string topic;
96 std::unordered_set<std::string> subscribers; // All peers subscribed to this topic
97 std::unordered_set<std::string> mesh_peers; // Peers in our mesh for this topic
98 std::unordered_set<std::string> fanout_peers; // Peers in fanout (when we're not subscribed)
99 std::chrono::steady_clock::time_point last_fanout_prune;
100
101 TopicSubscription(const std::string& t) : topic(t), last_fanout_prune(std::chrono::steady_clock::now()) {}
102};
103
108 // Mesh parameters
109 int mesh_low; // Minimum number of peers in mesh
110 int mesh_high; // Maximum number of peers in mesh
111 int mesh_optimal; // Optimal number of peers in mesh
112
113 // Fanout parameters
114 int fanout_size; // Number of peers for fanout
115 std::chrono::milliseconds fanout_ttl; // Time to live for fanout
116
117 // Gossip parameters
118 int gossip_factor; // Number of peers to gossip to
119 int gossip_lazy; // Lazy pull parameter
120 std::chrono::milliseconds gossip_retransmit; // Gossip retransmission interval
121
122 // Heartbeat parameters
123 std::chrono::milliseconds heartbeat_interval; // Heartbeat interval
124
125 // Message parameters
126 std::chrono::milliseconds message_cache_ttl; // Message cache time to live
127 int max_ihave_messages; // Maximum messages in IHAVE
128 int max_iwant_messages; // Maximum messages in IWANT
129
130 // Scoring parameters
131 double score_threshold_accept; // Minimum score to accept messages
132 double score_threshold_gossip; // Minimum score to gossip to peer
133 double score_threshold_mesh; // Minimum score to keep in mesh
134 double score_threshold_publish; // Minimum score to accept published messages
135
137 : mesh_low(4), mesh_high(12), mesh_optimal(6),
138 fanout_size(6), fanout_ttl(std::chrono::seconds(60)),
139 gossip_factor(3), gossip_lazy(3), gossip_retransmit(std::chrono::seconds(3)),
140 heartbeat_interval(std::chrono::seconds(1)),
141 message_cache_ttl(std::chrono::minutes(5)),
145};
146
151 ACCEPT,
152 REJECT,
154};
155
159using MessageValidator = std::function<ValidationResult(const std::string& topic, const std::string& message, const std::string& sender_peer_id)>;
160using MessageHandler = std::function<void(const std::string& topic, const std::string& message, const std::string& sender_peer_id)>;
161using PeerJoinedHandler = std::function<void(const std::string& topic, const std::string& peer_id)>;
162using PeerLeftHandler = std::function<void(const std::string& topic, const std::string& peer_id)>;
163
168 friend class RatsClient;
169
170public:
176 explicit GossipSub(RatsClient& rats_client, const GossipSubConfig& config = GossipSubConfig());
177
182
187 bool start();
188
192 void stop();
193
198 bool is_running() const;
199
200 // Topic management
206 bool subscribe(const std::string& topic);
207
213 bool unsubscribe(const std::string& topic);
214
220 bool is_subscribed(const std::string& topic) const;
221
226 std::vector<std::string> get_subscribed_topics() const;
227
228 // Message publishing
235 bool publish(const std::string& topic, const std::string& message);
236
243 bool publish(const std::string& topic, const nlohmann::json& message);
244
245 // Callback registration
251 void set_message_validator(const std::string& topic, MessageValidator validator);
252
258 void set_message_handler(const std::string& topic, MessageHandler handler);
259
265 void set_peer_joined_handler(const std::string& topic, PeerJoinedHandler handler);
266
272 void set_peer_left_handler(const std::string& topic, PeerLeftHandler handler);
273
274 // Peer and mesh information
280 std::vector<std::string> get_topic_peers(const std::string& topic) const;
281
287 std::vector<std::string> get_mesh_peers(const std::string& topic) const;
288
294 double get_peer_score(const std::string& peer_id) const;
295
296 // Statistics and debugging
301 nlohmann::json get_statistics() const;
302
307 nlohmann::json get_cache_statistics() const;
308
309private:
310 RatsClient& rats_client_;
311 GossipSubConfig config_;
312 std::atomic<bool> running_;
313
314 // Thread management
315 std::thread heartbeat_thread_;
316 mutable std::mutex heartbeat_mutex_;
317 std::condition_variable heartbeat_cv_;
318
319 // Topic management
320 mutable std::mutex topics_mutex_;
321 std::unordered_map<std::string, std::unique_ptr<TopicSubscription>> topics_;
322 std::unordered_set<std::string> subscribed_topics_;
323
324 // Peer scoring
325 mutable std::mutex scores_mutex_;
326 std::unordered_map<std::string, std::unique_ptr<PeerScore>> peer_scores_;
327
328 // Message cache and deduplication
329 mutable std::mutex message_cache_mutex_;
330 std::unordered_map<std::string, std::unique_ptr<MessageMetadata>> message_cache_;
331 std::unordered_map<std::string, std::chrono::steady_clock::time_point> message_ids_seen_;
332
333 // Handlers and validators
334 mutable std::mutex handlers_mutex_;
335 std::unordered_map<std::string, MessageValidator> message_validators_; // topic -> validator
336 std::unordered_map<std::string, MessageHandler> message_handlers_; // topic -> handler
337 std::unordered_map<std::string, PeerJoinedHandler> peer_joined_handlers_; // topic -> handler
338 std::unordered_map<std::string, PeerLeftHandler> peer_left_handlers_; // topic -> handler
339 MessageValidator global_validator_; // Global validator for all topics
340
341 // Control message queues for heartbeat
342 mutable std::mutex control_queue_mutex_;
343 std::vector<nlohmann::json> pending_grafts_;
344 std::vector<nlohmann::json> pending_prunes_;
345 std::vector<nlohmann::json> pending_ihaves_;
346 std::vector<nlohmann::json> pending_iwants_;
347
348 // Random number generation
349 mutable std::mutex rng_mutex_;
350 std::mt19937 rng_;
351
352 // Internal methods
353 void heartbeat_loop();
354 void process_heartbeat();
355 void handle_gossipsub_message(const std::string& peer_id, const nlohmann::json& message);
356
357 // Message handling
358 void handle_subscribe(const std::string& peer_id, const nlohmann::json& payload);
359 void handle_unsubscribe(const std::string& peer_id, const nlohmann::json& payload);
360 void handle_publish(const std::string& peer_id, const nlohmann::json& payload);
361 void handle_gossip(const std::string& peer_id, const nlohmann::json& payload);
362 void handle_graft(const std::string& peer_id, const nlohmann::json& payload);
363 void handle_prune(const std::string& peer_id, const nlohmann::json& payload);
364 void handle_ihave(const std::string& peer_id, const nlohmann::json& payload);
365 void handle_iwant(const std::string& peer_id, const nlohmann::json& payload);
366 void handle_heartbeat(const std::string& peer_id, const nlohmann::json& payload);
367
368 // Mesh management
369 void maintain_mesh(const std::string& topic);
370 void add_peer_to_mesh(const std::string& topic, const std::string& peer_id);
371 void remove_peer_from_mesh(const std::string& topic, const std::string& peer_id);
372 std::vector<std::string> select_peers_for_mesh(const std::string& topic, int count);
373 std::vector<std::string> select_peers_for_gossip(const std::string& topic, int count, const std::unordered_set<std::string>& exclude = {});
374
375 // Message utilities
376 std::string generate_message_id(const std::string& topic, const std::string& message, const std::string& sender_peer_id);
377 bool is_message_seen(const std::string& message_id);
378 void cache_message(const std::string& message_id, const std::string& topic, const std::string& message, const std::string& sender_peer_id);
379 void cleanup_message_cache();
380
381 // Peer management
382 void update_peer_score(const std::string& peer_id);
383 void handle_peer_connected(const std::string& peer_id);
384 void handle_peer_disconnected(const std::string& peer_id);
385
386 // Message sending utilities
387 bool send_gossipsub_message(const std::string& peer_id, GossipSubMessageType type, const nlohmann::json& payload);
388 bool broadcast_gossipsub_message(GossipSubMessageType type, const nlohmann::json& payload, const std::unordered_set<std::string>& exclude = {});
389
390 // Validation
391 ValidationResult validate_message(const std::string& topic, const std::string& message, const std::string& sender_peer_id);
392 bool is_peer_score_acceptable(const std::string& peer_id, double threshold);
393
394 // Topic utilities
395 TopicSubscription* get_or_create_topic(const std::string& topic);
396 void cleanup_topic(const std::string& topic);
397
398 // Random selection utilities
399 std::vector<std::string> random_sample(const std::vector<std::string>& peers, int count);
400 std::vector<std::string> random_sample(const std::unordered_set<std::string>& peers, int count);
401};
402
403} // namespace librats
Main GossipSub implementation class.
Definition gossipsub.h:167
bool start()
Start the GossipSub service.
void stop()
Stop the GossipSub service.
bool subscribe(const std::string &topic)
Subscribe to a topic.
std::vector< std::string > get_subscribed_topics() const
Get list of subscribed topics.
void set_message_handler(const std::string &topic, MessageHandler handler)
Set message handler for a topic.
std::vector< std::string > get_mesh_peers(const std::string &topic) const
Get mesh peers for a topic.
void set_peer_left_handler(const std::string &topic, PeerLeftHandler handler)
Set peer left handler for a topic.
~GossipSub()
Destructor.
double get_peer_score(const std::string &peer_id) const
Get peer score.
bool publish(const std::string &topic, const nlohmann::json &message)
Publish a JSON message to a topic.
bool unsubscribe(const std::string &topic)
Unsubscribe from a topic.
void set_peer_joined_handler(const std::string &topic, PeerJoinedHandler handler)
Set peer joined handler for a topic.
nlohmann::json get_statistics() const
Get GossipSub statistics.
nlohmann::json get_cache_statistics() const
Get message cache statistics.
bool publish(const std::string &topic, const std::string &message)
Publish a message to a topic.
bool is_subscribed(const std::string &topic) const
Check if subscribed to a topic.
std::vector< std::string > get_topic_peers(const std::string &topic) const
Get peers subscribed to a topic.
bool is_running() const
Check if GossipSub is running.
void set_message_validator(const std::string &topic, MessageValidator validator)
Set message validator for a topic.
GossipSub(RatsClient &rats_client, const GossipSubConfig &config=GossipSubConfig())
Constructor.
RatsClient - Core peer-to-peer networking client.
Definition librats.h:208
GossipSubMessageType
GossipSub message types.
Definition gossipsub.h:25
std::function< void(const std::string &topic, const std::string &message, const std::string &sender_peer_id)> MessageHandler
Definition gossipsub.h:160
std::function< void(const std::string &topic, const std::string &peer_id)> PeerLeftHandler
Definition gossipsub.h:162
std::function< void(const std::string &topic, const std::string &peer_id)> PeerJoinedHandler
Definition gossipsub.h:161
ValidationResult
Message validation result.
Definition gossipsub.h:150
std::function< ValidationResult(const std::string &topic, const std::string &message, const std::string &sender_peer_id)> MessageValidator
Callback types for GossipSub.
Definition gossipsub.h:159
STL namespace.
GossipSub configuration parameters.
Definition gossipsub.h:107
std::chrono::milliseconds gossip_retransmit
Definition gossipsub.h:120
std::chrono::milliseconds heartbeat_interval
Definition gossipsub.h:123
std::chrono::milliseconds fanout_ttl
Definition gossipsub.h:115
std::chrono::milliseconds message_cache_ttl
Definition gossipsub.h:126
Message metadata for tracking and deduplication.
Definition gossipsub.h:40
MessageMetadata(const std::string &id, const std::string &t, const std::string &sender)
Definition gossipsub.h:50
std::chrono::steady_clock::time_point last_seen
Definition gossipsub.h:45
std::string sender_peer_id
Definition gossipsub.h:43
std::vector< std::string > seen_from_peers
Definition gossipsub.h:46
std::chrono::steady_clock::time_point first_seen
Definition gossipsub.h:44
Peer scoring metrics for mesh management.
Definition gossipsub.h:60
double invalid_message_score
Definition gossipsub.h:68
std::chrono::steady_clock::time_point connected_since
Definition gossipsub.h:72
std::string peer_id
Definition gossipsub.h:61
double mesh_behavior_score
Definition gossipsub.h:67
PeerScore(const std::string &id)
Definition gossipsub.h:80
double delivery_score
Definition gossipsub.h:66
std::chrono::steady_clock::time_point last_updated
Definition gossipsub.h:71
Topic subscription information.
Definition gossipsub.h:94
std::unordered_set< std::string > fanout_peers
Definition gossipsub.h:98
std::chrono::steady_clock::time_point last_fanout_prune
Definition gossipsub.h:99
TopicSubscription(const std::string &t)
Definition gossipsub.h:101
std::unordered_set< std::string > mesh_peers
Definition gossipsub.h:97
std::unordered_set< std::string > subscribers
Definition gossipsub.h:96