39#include "transport/connection.h"
40#include "transport/reactor_pool.h"
42#include "peer/peer_table.h"
45#include "security/identity.h"
46#include "security/handshaker.h"
51#include "wire/message_router.h"
54#include <condition_variable>
89 static_assert(std::is_base_of<Subsystem, T>::value,
"T must derive from Subsystem");
91 subsystems_.push_back(std::move(
subsystem));
107 uint16_t
listen_port() const noexcept
override {
return listen_port_; }
125 void connect(
const std::string& host, uint16_t port);
128 size_t peer_count() const noexcept {
return directory_.size(); }
130 std::vector<PeerInfo>
peers()
const override {
return directory_.snapshot(); }
140 size_t max_peers() const noexcept {
return max_peers_.load(std::memory_order_relaxed); }
141 void set_max_peers(
size_t n)
noexcept { max_peers_.store(n, std::memory_order_relaxed); }
143 const size_t cap = max_peers_.load(std::memory_order_relaxed);
144 return cap != 0 && directory_.size() >= cap;
153 void send(
const PeerId& to, std::string_view channel, ByteView payload);
155 void broadcast(std::string_view channel, ByteView payload);
167 void on(std::string_view channel, MessageRouter::Handler cb) { router_.on_channel(channel, std::move(cb)); }
174 for (
auto& s : subsystems_)
if (
auto* p =
dynamic_cast<T*
>(s.get()))
return p;
191 bool admit_inbound()
override;
192 void on_established(Connection& conn)
override;
193 void on_frame(Connection& conn,
const Frame& frame)
override;
194 void on_closed(Connection& conn, CloseReason reason)
override;
196 Peer make_peer(
const PeerId&
id, PeerRoute route) {
return Peer(
id, route, *
this); }
197 void route_send(PeerRoute route,
FrameHeader header, Bytes payload);
198 void route_close(PeerRoute route);
201 void send_identify(Connection& conn);
202 void handle_identify(Connection& conn,
const Frame& frame);
203 std::vector<Address> advertised_addresses()
const;
204 void rebuild_advertised_addresses(
const std::vector<std::string>& local_ips);
205 void record_observed_address(
const Address& addr);
207 void start_network_monitor();
208 void stop_network_monitor();
209 void maintenance_loop();
213 std::unique_ptr<SecurityProvider> security_;
214 PeerTable directory_;
215 MessageRouter router_;
218 std::unique_ptr<ReactorPool> reactors_;
220 std::vector<std::unique_ptr<Subsystem>> subsystems_;
225 std::unique_ptr<NetworkMonitor> monitor_;
226 std::thread maintenance_thread_;
227 std::mutex maintenance_mutex_;
228 std::condition_variable maintenance_cv_;
229 std::vector<std::string> pending_addresses_;
230 bool maintenance_pending_ =
false;
231 bool maintenance_stop_ =
false;
233 socket_t listen_socket_ = INVALID_SOCKET_VALUE;
234 uint16_t listen_port_ = 0;
235 std::atomic<bool> running_{
false};
236 std::atomic<size_t> max_peers_{0};
238 std::vector<PeerNetwork::PeerEventHandler> peer_connected_;
239 std::vector<PeerNetwork::PeerDisconnectHandler> peer_disconnected_;
240 std::vector<PeerNetwork::DialFailedHandler> dial_failed_;
243 mutable std::mutex observed_mutex_;
244 std::vector<Address> observed_addresses_;
250 mutable std::mutex advertised_mutex_;
251 std::vector<Address> advertised_addresses_;
A dialable transport address (ip + port).
MessageJson * json() noexcept
The JSON messaging module if one was attached (add_subsystem<MessageJson>), else nullptr.
Node(NodeConfig config)
Construct a node from its configuration (see NodeConfig).
std::vector< Address > observed_addresses() const
Our own addresses as remote peers reported observing us at — their observed IP paired with our listen...
Node & operator=(const Node &)=delete
ServiceRegistry & services() noexcept
void connect(const Address &address) override
Dial a peer.
std::vector< PeerId > connected_peers() const override
bool peer_limit_reached() const noexcept
T * add_subsystem(std::unique_ptr< T > subsystem)
Attach a subsystem (DHT, GossipSub, PingService…).
void on_peer_disconnected(PeerNetwork::PeerDisconnectHandler cb) override
Subscribe to peer-disconnected events. The handler runs on a reactor thread.
void on_peer_connected(PeerNetwork::PeerEventHandler cb) override
Subscribe to peer-connected events. The handler runs on a reactor thread.
void on_dial_failed(PeerNetwork::DialFailedHandler cb) override
Subscribe to failed-outbound-dial events. The handler runs on a reactor thread.
const PeerId & local_id() const noexcept override
Our self-certifying peer identity (the public key peers authenticate).
const std::string & protocol_name() const noexcept
Application protocol identity bound into the handshake (see NodeConfig).
~Node() override
Stops the node if still running, then releases all resources.
size_t peer_count() const noexcept
Number of currently-established peers.
uint16_t listen_port() const noexcept override
The bound listen port (the actual port when the config requested 0).
bool start()
Bring the node up: open the listener (if enabled), start the reactor pool, then start every attached ...
void broadcast(std::string_view channel, ByteView payload)
Send raw bytes on a named channel to every connected peer.
Node(const Node &)=delete
void connect(const std::string &host, uint16_t port)
Dial a peer.
EventBus & events() noexcept
std::vector< PeerInfo > peers() const override
Snapshot of all established peers (id, addresses, direction, timing).
void set_max_peers(size_t n) noexcept
std::optional< Peer > peer(const PeerId &id)
Handle to a connected peer by id, or std::nullopt if not connected.
void stop()
Tear the node down: stop subsystems (reverse order), close all connections, and join the reactor pool...
void send(const PeerId &to, std::string_view channel, ByteView payload)
Send raw bytes to one peer on a named channel.
void on(std::string_view channel, MessageRouter::Handler cb)
Register a handler for inbound messages on a named channel.
const std::string & protocol_version() const noexcept
size_t max_peers() const noexcept
std::function< void(const Address &)> DialFailedHandler
std::function< void(const Peer &, ByteView)> MessageHandler
std::function< void(const Peer &)> PeerEventHandler
std::function< void(const PeerId &)> PeerDisconnectHandler
Node construction options.
MessageType
Inner-message kind. Application traffic uses App, addressed by channel.
What a subsystem receives at attach() — the node's gift to its plugins.
A lightweight handle to a connected peer.
Self-certifying peer identity.
Addressing/metadata for a peer — the shareable, persistable identity.
The narrow contract a subsystem needs from the node — and nothing more.
A decoded inner message. payload is a non-owning view into the source bytes.
std::string protocol_name
Application protocol identity.
std::string protocol_version