network_libp2p/
network_node.rs

1use std::collections::HashMap;
2
3use chat_core::network::{Multiaddr, NetworkEvent, NetworkNode, NetworkResult, RequestId, RoomId};
4use chat_core::stats::RawStats;
5use libp2p::PeerId;
6
7use crate::common::Libp2pNetwork;
8
9impl NetworkNode for Libp2pNetwork {
10    fn local_id(&self) -> chat_core::network::PeerId {
11        chat_core::network::PeerId::from(self.local_peer_id.to_string())
12    }
13
14    fn dial(&mut self, addr: &Multiaddr) -> NetworkResult<()> {
15        let addr: libp2p::Multiaddr = addr
16            .as_str()
17            .parse()
18            .map_err(|e: libp2p::multiaddr::Error| e.to_string())?;
19        if let Some(peer_id) = addr.iter().find_map(|p| {
20            if let libp2p::multiaddr::Protocol::P2p(id) = p {
21                Some(id)
22            } else {
23                None
24            }
25        }) {
26            if self.pending_dials.contains(&peer_id)
27                || self.swarm.behaviour().gossipsub_manager.is_dialing(&peer_id)
28            {
29                return Err("Already dialing peer".to_string().into());
30            }
31            self.pending_dials.insert(peer_id);
32            if let Err(e) = self.swarm.dial(addr) {
33                self.pending_dials.remove(&peer_id);
34                return Err(e.to_string().into());
35            }
36            Ok(())
37        } else {
38            self.swarm.dial(addr).map_err(|e| e.to_string().into())
39        }
40    }
41
42    fn join_room(&mut self, room: &RoomId) -> NetworkResult<()> {
43        let topic_name = room.as_str();
44        let topic = self
45            .swarm
46            .behaviour_mut()
47            .gossipsub_manager
48            .subscribe(topic_name)
49            .map_err(|e| e.to_string())?;
50        let cid = self
51            .swarm
52            .behaviour()
53            .gossipsub_manager
54            .topic_to_cid(topic_name)
55            .map_err(|e| e.to_string())?;
56        let key = libp2p::kad::RecordKey::new(&cid.to_bytes());
57        tracing::info!("DHT start_providing for room {}", topic_name);
58        self.swarm
59            .behaviour_mut()
60            .kad
61            .start_providing(key.clone())
62            .map_err(|e| format!("start_providing error: {:?}", e))?;
63        if self.dht_discovery_enabled {
64            tracing::info!("DHT get_providers for room {}", topic_name);
65            self.swarm.behaviour_mut().kad.get_providers(key);
66        }
67        self.swarm
68            .behaviour_mut()
69            .gossipsub
70            .subscribe(&topic)
71            .map_err(|e| format!("Failed to subscribe to gossipsub topic: {}", e))?;
72        Ok(())
73    }
74
75    fn publish_message(&mut self, data: Vec<u8>) -> NetworkResult<()> {
76        // Publish to all subscribed topics. In practice there is only one active room.
77        let topics: Vec<_> = self.swarm.behaviour().gossipsub.topics().cloned().collect();
78        let mut any_success = false;
79        for topic_hash in &topics {
80            match self
81                .swarm
82                .behaviour_mut()
83                .gossipsub
84                .publish(topic_hash.clone(), data.clone())
85            {
86                Ok(_) => any_success = true,
87                Err(e) => {
88                    tracing::warn!("Gossipsub publish failed for topic {}: {:?}", topic_hash, e);
89                }
90            }
91        }
92        if !any_success && !topics.is_empty() {
93            return Err("No gossipsub topics available for publish".to_string().into());
94        }
95        Ok(())
96    }
97
98    fn send_message(&mut self, peer_id: &chat_core::network::PeerId, data: Vec<u8>) -> NetworkResult<RequestId> {
99        let peer_id: PeerId = match peer_id.as_str().parse() {
100            Ok(p) => p,
101            Err(e) => return Err(e.to_string().into()),
102        };
103        let req_id = self.swarm.behaviour_mut().history.send_request(&peer_id, data);
104        Ok(RequestId::from(req_id.to_string()))
105    }
106
107    fn send_response(&mut self, request_id: &RequestId, data: Vec<u8>) -> NetworkResult<()> {
108        let channel = self
109            .pending_responses
110            .remove(request_id.as_str())
111            .ok_or_else(|| "Unknown request id".to_string())?;
112        if self
113            .swarm
114            .behaviour_mut()
115            .history
116            .send_response(channel, data)
117            .is_err()
118        {
119            return Err("Failed to send response (channel closed)".to_string().into());
120        }
121        Ok(())
122    }
123
124    fn connected_peers(&self) -> Vec<chat_core::network::PeerId> {
125        self.swarm
126            .connected_peers()
127            .filter(|p| *p != &self.local_peer_id)
128            .map(|p| chat_core::network::PeerId::from(p.to_string()))
129            .collect()
130    }
131
132    fn is_dialing(&self, peer_id: &chat_core::network::PeerId) -> bool {
133        match peer_id.as_str().parse::<PeerId>() {
134            Ok(p) => {
135                self.pending_dials.contains(&p)
136                    || self.swarm.behaviour().gossipsub_manager.is_dialing(&p)
137            }
138            Err(_) => false,
139        }
140    }
141
142    fn raw_stats(&self) -> RawStats {
143        let num_peers = self.swarm.network_info().num_peers() as u32;
144        let relay_count = self
145            .swarm
146            .external_addresses()
147            .filter(|a| a.to_string().contains("p2p-circuit"))
148            .count() as u32;
149
150        let addrs = self.dial_addrs();
151        let circuit_address = addrs.iter().find(|a| a.as_str().contains("p2p-circuit")).map(|a| a.as_str().to_string());
152        let web_rtc_address = addrs.iter().find(|a| a.as_str().contains("webrtc")).map(|a| a.as_str().to_string());
153
154        let listeners: Vec<String> = self.swarm.listeners().map(|a| a.to_string()).collect();
155        let external_addresses: Vec<String> =
156            self.swarm.external_addresses().map(|a| a.to_string()).collect();
157
158        let connected_peers: Vec<String> = self
159            .swarm
160            .connected_peers()
161            .filter(|p| *p != &self.local_peer_id)
162            .map(|p| p.to_string())
163            .collect();
164
165        let mut peer_conn_addrs = HashMap::new();
166        let mut peer_advertised_addrs = HashMap::new();
167        for peer_id in &connected_peers {
168            let pid: PeerId = match peer_id.parse() {
169                Ok(p) => p,
170                Err(_) => continue,
171            };
172            let conn_addrs = self.peer_conn_addrs.get(&pid).cloned().unwrap_or_default();
173            peer_conn_addrs.insert(peer_id.clone(), conn_addrs);
174            let advertised: Vec<String> = self.peer_addrs.get(&pid).cloned().unwrap_or_default().iter().map(|a| a.to_string()).collect();
175            peer_advertised_addrs.insert(peer_id.clone(), advertised);
176        }
177
178        RawStats {
179            peer_id: self.local_peer_id.to_string(),
180            num_peers,
181            relay_count,
182            listeners,
183            external_addresses,
184            connected_peers,
185            peer_conn_addrs,
186            peer_advertised_addrs,
187            circuit_address,
188            web_rtc_address,
189        }
190    }
191
192    fn bootstrap(&mut self) -> NetworkResult<()> {
193        self.swarm
194            .behaviour_mut()
195            .kad
196            .bootstrap()
197            .map(|_| ())
198            .map_err(|e| e.to_string().into())
199    }
200
201    fn dial_addrs(&self) -> Vec<Multiaddr> {
202        self.platform_dial_addrs()
203    }
204
205    async fn next_event(&mut self) -> Option<NetworkEvent> {
206        self.platform_next_event().await
207    }
208}