60 using Handler = std::function<void(
const PeerId& from,
const std::string& topic, ByteView data)>;
87 void publish(
const std::string& topic, ByteView data);
92 std::vector<PeerId>
mesh_peers(
const std::string& topic)
const;
105 std::unordered_set<PeerId, PeerId::Hash> subscribers;
106 std::unordered_set<PeerId, PeerId::Hash> mesh;
107 std::unordered_set<PeerId, PeerId::Hash> fanout;
108 std::chrono::steady_clock::time_point last_fanout{};
111 struct CachedMessage {
117 void on_new_peer(
const Peer& peer);
118 void on_peer_gone(
const PeerId&
id);
119 void on_gossip(
const Peer& peer, ByteView payload);
121 void recv_subscription(
const PeerId& from,
const std::string& topic,
bool subscribe);
122 void recv_graft(
const PeerId& from,
const std::string& topic);
123 void recv_prune(
const PeerId& from,
const std::string& topic);
124 void recv_publish(
const PeerId& from, ByteView frame,
const PeerId& origin, uint64_t seqno,
125 const std::string& topic, ByteView data);
126 void recv_ihave(
const PeerId& from,
const std::string& topic,
const std::vector<std::string>& ids);
127 void recv_iwant(
const PeerId& from,
const std::vector<std::string>& ids);
130 void heartbeat_loop();
136 using CtrlList = std::vector<std::pair<PeerId, std::string>>;
137 void maintain_mesh_locked(
const std::string& topic, CtrlList& grafts, CtrlList& prunes);
140 void send_ctrl(
const PeerId& to, uint8_t op,
const std::string& topic);
141 void broadcast_ctrl(uint8_t op,
const std::string& topic);
142 void deliver_local(
const PeerId& from,
const std::string& topic, ByteView data);
143 Handler handler_for(
const std::string& topic)
const;
144 ValidationResult validate(
const PeerId& from,
const std::string& topic, ByteView data);
145 bool mark_seen(
const std::string&
id);
146 void cache_message(
const std::string&
id,
const std::string& topic,
const Bytes& frame);
147 std::vector<PeerId> random_sample(std::vector<PeerId> in,
int n);
149 PeerNetwork* network_ =
nullptr;
152 mutable std::mutex mutex_;
154 std::unordered_map<std::string, Handler> subscriptions_;
155 std::unordered_map<std::string, Topic> topics_;
156 std::unordered_map<std::string, Validator> validators_;
159 mutable std::mutex mcache_mutex_;
160 std::unordered_map<std::string, CachedMessage> mcache_;
161 std::deque<std::vector<std::string>> history_;
162 std::unordered_set<std::string> seen_;
163 std::deque<std::string> seen_order_;
165 std::mutex rng_mutex_;
168 std::atomic<bool> running_{
false};
169 std::thread heartbeat_thread_;
170 std::mutex hb_mutex_;
171 std::condition_variable hb_cv_;
void publish(const std::string &topic, ByteView data)
Publish data on topic: along the mesh if we are subscribed, else via fanout.