Back to Site
Loading...
Searching...
No Matches
node.h
Go to the documentation of this file.
1#pragma once
2
39#include "transport/connection.h" // ConnectionDelegate
40#include "transport/reactor_pool.h"
41#include "core/address.h"
42#include "peer/peer_table.h"
43#include "peer/peer_id.h"
44#include "peer/peer_info.h"
45#include "security/identity.h"
46#include "security/handshaker.h" // SecurityProvider
47#include "node/config.h"
48#include "node/node_context.h" // NodeContext, EventBus, ServiceRegistry
49#include "peer/peer.h"
50#include "node/peer_network.h"
51#include "wire/message_router.h"
52
53#include <atomic>
54#include <condition_variable>
55#include <functional>
56#include <memory>
57#include <mutex>
58#include <optional>
59#include <string>
60#include <string_view>
61#include <thread>
62#include <type_traits>
63#include <vector>
64
65namespace librats {
66
67class NetworkMonitor; // util/network_monitor.h — owned via unique_ptr, included in node.cpp
68class MessageJson; // subsystems/message_json.h — reached via json() (json.h stays out of node.h)
69
70class Node final : public ConnectionDelegate, public PeerNetwork {
71public:
74 explicit Node(NodeConfig config);
76 ~Node() override;
77
78 Node(const Node&) = delete;
79 Node& operator=(const Node&) = delete;
80
87 template <class T>
88 T* add_subsystem(std::unique_ptr<T> subsystem) {
89 static_assert(std::is_base_of<Subsystem, T>::value, "T must derive from Subsystem");
90 T* raw = subsystem.get();
91 subsystems_.push_back(std::move(subsystem)); // upcast to unique_ptr<Subsystem>
92 return raw;
93 }
94
99 bool start();
102 void stop();
103
105 const PeerId& local_id() const noexcept override { return identity_.id; }
107 uint16_t listen_port() const noexcept override { return listen_port_; }
108
110 const std::string& protocol_name() const noexcept { return config_.protocol_name; }
111 const std::string& protocol_version() const noexcept { return config_.protocol_version; }
112
113 // — node-scoped coordination, shared by subsystems and the app (see NodeContext) —
114 // events() : fire-and-forget notifications, one→many (host events, …)
115 // services() : targeted synchronous calls by capability interface, one→one
116 EventBus& events() noexcept { return events_; }
117 ServiceRegistry& services() noexcept { return services_; }
118
119 // — connections —
123 void connect(const Address& address) override;
125 void connect(const std::string& host, uint16_t port);
126
128 size_t peer_count() const noexcept { return directory_.size(); }
130 std::vector<PeerInfo> peers() const override { return directory_.snapshot(); }
132 std::optional<Peer> peer(const PeerId& id);
133
137 std::vector<Address> observed_addresses() const;
138
139 // — peer admission limit (0 = unlimited; guards inbound, not our own dials) —
140 size_t max_peers() const noexcept { return max_peers_.load(std::memory_order_relaxed); }
141 void set_max_peers(size_t n) noexcept { max_peers_.store(n, std::memory_order_relaxed); }
142 bool peer_limit_reached() const noexcept {
143 const size_t cap = max_peers_.load(std::memory_order_relaxed);
144 return cap != 0 && directory_.size() >= cap;
145 }
146
147 // — application messaging (raw bytes on a named channel) —
153 void send(const PeerId& to, std::string_view channel, ByteView payload);
155 void broadcast(std::string_view channel, ByteView payload);
156
157 // — events (register before start(); invoked on a reactor thread). Multiple
158 // listeners are supported, so subsystems and the app can both subscribe. —
160 void on_peer_connected(PeerNetwork::PeerEventHandler cb) override { peer_connected_.push_back(std::move(cb)); }
162 void on_peer_disconnected(PeerNetwork::PeerDisconnectHandler cb) override { peer_disconnected_.push_back(std::move(cb)); }
164 void on_dial_failed(PeerNetwork::DialFailedHandler cb) override { dial_failed_.push_back(std::move(cb)); }
167 void on(std::string_view channel, MessageRouter::Handler cb) { router_.on_channel(channel, std::move(cb)); }
168
169 // — typed lookup of an attached subsystem (nullptr if none of that type) —
170 // reaches a module's own API without threading a pointer from add_subsystem:
171 // if (auto* j = node.json()) j->on("chat", …);
172 template <class T>
173 T* subsystem() noexcept {
174 for (auto& s : subsystems_) if (auto* p = dynamic_cast<T*>(s.get())) return p;
175 return nullptr;
176 }
179 MessageJson* json() noexcept;
180
181 // — PeerNetwork (for subsystems) —
182 void send(const PeerId& to, MessageType type, ByteView payload) override;
183 void broadcast(MessageType type, ByteView payload) override;
184 std::vector<PeerId> connected_peers() const override;
185 void on(MessageType type, PeerNetwork::MessageHandler cb) override { router_.on_type(type, std::move(cb)); }
186
187private:
188 friend class Peer;
189
190 // ConnectionDelegate (reactor thread)
191 bool admit_inbound() override;
192 void on_established(Connection& conn) override;
193 void on_frame(Connection& conn, const Frame& frame) override;
194 void on_closed(Connection& conn, CloseReason reason) override;
195
196 Peer make_peer(const PeerId& id, PeerRoute route) { return Peer(id, route, *this); }
197 void route_send(PeerRoute route, FrameHeader header, Bytes payload);
198 void route_close(PeerRoute route);
199
200 // — identify: how peers learn each other's dialable addresses (reactor thread) —
201 void send_identify(Connection& conn);
202 void handle_identify(Connection& conn, const Frame& frame);
203 std::vector<Address> advertised_addresses() const;
204 void rebuild_advertised_addresses(const std::vector<std::string>& local_ips);
205 void record_observed_address(const Address& addr);
206
207 void start_network_monitor();
208 void stop_network_monitor();
209 void maintenance_loop();
210
211 NodeConfig config_;
212 Identity identity_;
213 std::unique_ptr<SecurityProvider> security_;
214 PeerTable directory_;
215 MessageRouter router_;
216 EventBus events_;
217 ServiceRegistry services_;
218 std::unique_ptr<ReactorPool> reactors_;
219
220 std::vector<std::unique_ptr<Subsystem>> subsystems_;
221
222 // Host network-change watch. The monitor signals on its own thread; the
223 // maintenance thread does the (possibly blocking) EventBus emit off it, so
224 // subscribers may run slow recovery without stalling change detection.
225 std::unique_ptr<NetworkMonitor> monitor_;
226 std::thread maintenance_thread_;
227 std::mutex maintenance_mutex_;
228 std::condition_variable maintenance_cv_;
229 std::vector<std::string> pending_addresses_;
230 bool maintenance_pending_ = false;
231 bool maintenance_stop_ = false;
232
233 socket_t listen_socket_ = INVALID_SOCKET_VALUE;
234 uint16_t listen_port_ = 0;
235 std::atomic<bool> running_{false};
236 std::atomic<size_t> max_peers_{0};
237
238 std::vector<PeerNetwork::PeerEventHandler> peer_connected_;
239 std::vector<PeerNetwork::PeerDisconnectHandler> peer_disconnected_;
240 std::vector<PeerNetwork::DialFailedHandler> dial_failed_;
241
242 // Our own addresses as peers observe us (their reported IP + our listen port).
243 mutable std::mutex observed_mutex_;
244 std::vector<Address> observed_addresses_;
245
246 // The dialable addresses we advertise to peers in identify. Derived from local
247 // interfaces (and, in future, promoted observed addresses). Rebuilt once at
248 // start() and on NetworkMonitor changes — never re-enumerated per connection,
249 // since interface enumeration is a syscall and the send path is hot.
250 mutable std::mutex advertised_mutex_;
251 std::vector<Address> advertised_addresses_;
252};
253
254} // namespace librats
A dialable transport address (ip + port).
MessageJson * json() noexcept
The JSON messaging module if one was attached (add_subsystem<MessageJson>), else nullptr.
Node(NodeConfig config)
Construct a node from its configuration (see NodeConfig).
std::vector< Address > observed_addresses() const
Our own addresses as remote peers reported observing us at — their observed IP paired with our listen...
Node & operator=(const Node &)=delete
ServiceRegistry & services() noexcept
Definition node.h:117
void connect(const Address &address) override
Dial a peer.
std::vector< PeerId > connected_peers() const override
bool peer_limit_reached() const noexcept
Definition node.h:142
T * add_subsystem(std::unique_ptr< T > subsystem)
Attach a subsystem (DHT, GossipSub, PingService…).
Definition node.h:88
void on_peer_disconnected(PeerNetwork::PeerDisconnectHandler cb) override
Subscribe to peer-disconnected events. The handler runs on a reactor thread.
Definition node.h:162
void on_peer_connected(PeerNetwork::PeerEventHandler cb) override
Subscribe to peer-connected events. The handler runs on a reactor thread.
Definition node.h:160
void on_dial_failed(PeerNetwork::DialFailedHandler cb) override
Subscribe to failed-outbound-dial events. The handler runs on a reactor thread.
Definition node.h:164
const PeerId & local_id() const noexcept override
Our self-certifying peer identity (the public key peers authenticate).
Definition node.h:105
const std::string & protocol_name() const noexcept
Application protocol identity bound into the handshake (see NodeConfig).
Definition node.h:110
~Node() override
Stops the node if still running, then releases all resources.
size_t peer_count() const noexcept
Number of currently-established peers.
Definition node.h:128
friend class Peer
Definition node.h:188
uint16_t listen_port() const noexcept override
The bound listen port (the actual port when the config requested 0).
Definition node.h:107
bool start()
Bring the node up: open the listener (if enabled), start the reactor pool, then start every attached ...
void broadcast(std::string_view channel, ByteView payload)
Send raw bytes on a named channel to every connected peer.
Node(const Node &)=delete
void connect(const std::string &host, uint16_t port)
Dial a peer.
EventBus & events() noexcept
Definition node.h:116
std::vector< PeerInfo > peers() const override
Snapshot of all established peers (id, addresses, direction, timing).
Definition node.h:130
T * subsystem() noexcept
Definition node.h:173
void set_max_peers(size_t n) noexcept
Definition node.h:141
std::optional< Peer > peer(const PeerId &id)
Handle to a connected peer by id, or std::nullopt if not connected.
void stop()
Tear the node down: stop subsystems (reverse order), close all connections, and join the reactor pool...
void send(const PeerId &to, std::string_view channel, ByteView payload)
Send raw bytes to one peer on a named channel.
void on(std::string_view channel, MessageRouter::Handler cb)
Register a handler for inbound messages on a named channel.
Definition node.h:167
const std::string & protocol_version() const noexcept
Definition node.h:111
size_t max_peers() const noexcept
Definition node.h:140
std::function< void(const Address &)> DialFailedHandler
std::function< void(const Peer &, ByteView)> MessageHandler
std::function< void(const Peer &)> PeerEventHandler
std::function< void(const PeerId &)> PeerDisconnectHandler
Node construction options.
Definition node.h:65
MessageType
Inner-message kind. Application traffic uses App, addressed by channel.
Definition frame.h:38
STL namespace.
What a subsystem receives at attach() — the node's gift to its plugins.
A lightweight handle to a connected peer.
Self-certifying peer identity.
Addressing/metadata for a peer — the shareable, persistable identity.
The narrow contract a subsystem needs from the node — and nothing more.
Fixed header of an inner message.
Definition frame.h:50
A decoded inner message. payload is a non-owning view into the source bytes.
Definition frame.h:57
std::string protocol_name
Application protocol identity.
Definition config.h:47
std::string protocol_version
Definition config.h:48