network_libp2p/
network_node.rs1use std::collections::HashMap;
2
3use chat_core::network::{Multiaddr, NetworkEvent, NetworkNode, NetworkResult, RequestId, RoomId};
4use chat_core::stats::RawStats;
5use libp2p::PeerId;
6
7use crate::common::Libp2pNetwork;
8
9impl NetworkNode for Libp2pNetwork {
10 fn local_id(&self) -> chat_core::network::PeerId {
11 chat_core::network::PeerId::from(self.local_peer_id.to_string())
12 }
13
14 fn dial(&mut self, addr: &Multiaddr) -> NetworkResult<()> {
15 let addr: libp2p::Multiaddr = addr
16 .as_str()
17 .parse()
18 .map_err(|e: libp2p::multiaddr::Error| e.to_string())?;
19 if let Some(peer_id) = addr.iter().find_map(|p| {
20 if let libp2p::multiaddr::Protocol::P2p(id) = p {
21 Some(id)
22 } else {
23 None
24 }
25 }) {
26 if self.pending_dials.contains(&peer_id)
27 || self.swarm.behaviour().gossipsub_manager.is_dialing(&peer_id)
28 {
29 return Err("Already dialing peer".to_string().into());
30 }
31 self.pending_dials.insert(peer_id);
32 if let Err(e) = self.swarm.dial(addr) {
33 self.pending_dials.remove(&peer_id);
34 return Err(e.to_string().into());
35 }
36 Ok(())
37 } else {
38 self.swarm.dial(addr).map_err(|e| e.to_string().into())
39 }
40 }
41
42 fn join_room(&mut self, room: &RoomId) -> NetworkResult<()> {
43 let topic_name = room.as_str();
44 let topic = self
45 .swarm
46 .behaviour_mut()
47 .gossipsub_manager
48 .subscribe(topic_name)
49 .map_err(|e| e.to_string())?;
50 let cid = self
51 .swarm
52 .behaviour()
53 .gossipsub_manager
54 .topic_to_cid(topic_name)
55 .map_err(|e| e.to_string())?;
56 let key = libp2p::kad::RecordKey::new(&cid.to_bytes());
57 tracing::info!("DHT start_providing for room {}", topic_name);
58 self.swarm
59 .behaviour_mut()
60 .kad
61 .start_providing(key.clone())
62 .map_err(|e| format!("start_providing error: {:?}", e))?;
63 if self.dht_discovery_enabled {
64 tracing::info!("DHT get_providers for room {}", topic_name);
65 self.swarm.behaviour_mut().kad.get_providers(key);
66 }
67 self.swarm
68 .behaviour_mut()
69 .gossipsub
70 .subscribe(&topic)
71 .map_err(|e| format!("Failed to subscribe to gossipsub topic: {}", e))?;
72 Ok(())
73 }
74
75 fn publish_message(&mut self, data: Vec<u8>) -> NetworkResult<()> {
76 let topics: Vec<_> = self.swarm.behaviour().gossipsub.topics().cloned().collect();
78 let mut any_success = false;
79 for topic_hash in &topics {
80 match self
81 .swarm
82 .behaviour_mut()
83 .gossipsub
84 .publish(topic_hash.clone(), data.clone())
85 {
86 Ok(_) => any_success = true,
87 Err(e) => {
88 tracing::warn!("Gossipsub publish failed for topic {}: {:?}", topic_hash, e);
89 }
90 }
91 }
92 if !any_success && !topics.is_empty() {
93 return Err("No gossipsub topics available for publish".to_string().into());
94 }
95 Ok(())
96 }
97
98 fn send_message(&mut self, peer_id: &chat_core::network::PeerId, data: Vec<u8>) -> NetworkResult<RequestId> {
99 let peer_id: PeerId = match peer_id.as_str().parse() {
100 Ok(p) => p,
101 Err(e) => return Err(e.to_string().into()),
102 };
103 let req_id = self.swarm.behaviour_mut().history.send_request(&peer_id, data);
104 Ok(RequestId::from(req_id.to_string()))
105 }
106
107 fn send_response(&mut self, request_id: &RequestId, data: Vec<u8>) -> NetworkResult<()> {
108 let channel = self
109 .pending_responses
110 .remove(request_id.as_str())
111 .ok_or_else(|| "Unknown request id".to_string())?;
112 if self
113 .swarm
114 .behaviour_mut()
115 .history
116 .send_response(channel, data)
117 .is_err()
118 {
119 return Err("Failed to send response (channel closed)".to_string().into());
120 }
121 Ok(())
122 }
123
124 fn connected_peers(&self) -> Vec<chat_core::network::PeerId> {
125 self.swarm
126 .connected_peers()
127 .filter(|p| *p != &self.local_peer_id)
128 .map(|p| chat_core::network::PeerId::from(p.to_string()))
129 .collect()
130 }
131
132 fn is_dialing(&self, peer_id: &chat_core::network::PeerId) -> bool {
133 match peer_id.as_str().parse::<PeerId>() {
134 Ok(p) => {
135 self.pending_dials.contains(&p)
136 || self.swarm.behaviour().gossipsub_manager.is_dialing(&p)
137 }
138 Err(_) => false,
139 }
140 }
141
142 fn raw_stats(&self) -> RawStats {
143 let num_peers = self.swarm.network_info().num_peers() as u32;
144 let relay_count = self
145 .swarm
146 .external_addresses()
147 .filter(|a| a.to_string().contains("p2p-circuit"))
148 .count() as u32;
149
150 let addrs = self.dial_addrs();
151 let circuit_address = addrs.iter().find(|a| a.as_str().contains("p2p-circuit")).map(|a| a.as_str().to_string());
152 let web_rtc_address = addrs.iter().find(|a| a.as_str().contains("webrtc")).map(|a| a.as_str().to_string());
153
154 let listeners: Vec<String> = self.swarm.listeners().map(|a| a.to_string()).collect();
155 let external_addresses: Vec<String> =
156 self.swarm.external_addresses().map(|a| a.to_string()).collect();
157
158 let connected_peers: Vec<String> = self
159 .swarm
160 .connected_peers()
161 .filter(|p| *p != &self.local_peer_id)
162 .map(|p| p.to_string())
163 .collect();
164
165 let mut peer_conn_addrs = HashMap::new();
166 let mut peer_advertised_addrs = HashMap::new();
167 for peer_id in &connected_peers {
168 let pid: PeerId = match peer_id.parse() {
169 Ok(p) => p,
170 Err(_) => continue,
171 };
172 let conn_addrs = self.peer_conn_addrs.get(&pid).cloned().unwrap_or_default();
173 peer_conn_addrs.insert(peer_id.clone(), conn_addrs);
174 let advertised: Vec<String> = self.peer_addrs.get(&pid).cloned().unwrap_or_default().iter().map(|a| a.to_string()).collect();
175 peer_advertised_addrs.insert(peer_id.clone(), advertised);
176 }
177
178 RawStats {
179 peer_id: self.local_peer_id.to_string(),
180 num_peers,
181 relay_count,
182 listeners,
183 external_addresses,
184 connected_peers,
185 peer_conn_addrs,
186 peer_advertised_addrs,
187 circuit_address,
188 web_rtc_address,
189 }
190 }
191
192 fn bootstrap(&mut self) -> NetworkResult<()> {
193 self.swarm
194 .behaviour_mut()
195 .kad
196 .bootstrap()
197 .map(|_| ())
198 .map_err(|e| e.to_string().into())
199 }
200
201 fn dial_addrs(&self) -> Vec<Multiaddr> {
202 self.platform_dial_addrs()
203 }
204
205 async fn next_event(&mut self) -> Option<NetworkEvent> {
206 self.platform_next_event().await
207 }
208}