1use chat_core::network::{NetworkEvent, RequestId};
2use libp2p::PeerId;
3
4use crate::common::Libp2pNetwork;
5
6impl Libp2pNetwork {
7 pub fn on_new_listen_addr(&self, address: libp2p::Multiaddr) -> Option<NetworkEvent> {
12 Some(NetworkEvent::NewListenAddr {
13 addr: chat_core::network::Multiaddr::from(address.to_string()),
14 })
15 }
16
17 pub fn on_external_addr_confirmed(
18 &self,
19 address: libp2p::Multiaddr,
20 ) -> Option<NetworkEvent> {
21 Some(NetworkEvent::ExternalAddrConfirmed {
22 addr: chat_core::network::Multiaddr::from(address.to_string()),
23 })
24 }
25
26 pub fn on_connection_established(
27 &mut self,
28 peer_id: PeerId,
29 endpoint: libp2p::core::connection::ConnectedPoint,
30 num_established: u32,
31 ) -> Option<NetworkEvent> {
32 let (addr_str, addr) = match endpoint {
33 libp2p::core::connection::ConnectedPoint::Dialer { address, .. } => {
34 (address.to_string(), address)
35 }
36 libp2p::core::connection::ConnectedPoint::Listener { send_back_addr, .. } => {
37 (send_back_addr.to_string(), send_back_addr)
38 }
39 };
40 tracing::debug!(
41 "ConnectionEstablished peer={} addr={} num_established={}",
42 peer_id,
43 addr_str,
44 num_established
45 );
46 self.peer_addrs.entry(peer_id).or_default().push(addr);
47 self.peer_conn_addrs
48 .entry(peer_id)
49 .or_default()
50 .push(addr_str);
51 self.pending_dials.remove(&peer_id);
52 if num_established == 1 {
53 Some(NetworkEvent::PeerConnected {
54 peer_id: chat_core::network::PeerId::from(peer_id.to_string()),
55 })
56 } else {
57 None
58 }
59 }
60
61 pub fn on_connection_closed(
62 &mut self,
63 peer_id: PeerId,
64 endpoint: libp2p::core::connection::ConnectedPoint,
65 num_established: u32,
66 ) -> Option<NetworkEvent> {
67 let addr_str = match endpoint {
68 libp2p::core::connection::ConnectedPoint::Dialer { address, .. } => {
69 address.to_string()
70 }
71 libp2p::core::connection::ConnectedPoint::Listener { send_back_addr, .. } => {
72 send_back_addr.to_string()
73 }
74 };
75 if num_established == 0 {
76 tracing::debug!(
77 "ConnectionClosed peer={} addr={} num_established=0 -> PeerDisconnected",
78 peer_id,
79 addr_str
80 );
81 self.peer_addrs.remove(&peer_id);
82 self.peer_conn_addrs.remove(&peer_id);
83 self.pending_dials.remove(&peer_id);
84 self.swarm.behaviour_mut().limits.remove_peer_id(&peer_id);
85 Some(NetworkEvent::PeerDisconnected {
86 peer_id: chat_core::network::PeerId::from(peer_id.to_string()),
87 })
88 } else {
89 tracing::debug!(
90 "ConnectionClosed peer={} addr={} num_established={}",
91 peer_id,
92 addr_str,
93 num_established
94 );
95 if let Some(addrs) = self.peer_conn_addrs.get_mut(&peer_id) {
96 addrs.retain(|a| a != &addr_str);
97 }
98 None
99 }
100 }
101
102 pub fn on_history_message(
103 &mut self,
104 peer: PeerId,
105 message: libp2p::request_response::Message<Vec<u8>, Vec<u8>>,
106 ) -> Option<NetworkEvent> {
107 match message {
108 libp2p::request_response::Message::Request { request, channel, .. } => {
109 let req_id = format!("req-{}", self.next_response_id);
110 self.next_response_id += 1;
111 self.pending_responses.insert(req_id.clone(), channel);
112 Some(NetworkEvent::MessageReceived {
113 peer_id: chat_core::network::PeerId::from(peer.to_string()),
114 request_id: Some(RequestId::from(req_id)),
115 data: request,
116 })
117 }
118 libp2p::request_response::Message::Response {
119 request_id,
120 response,
121 } => Some(NetworkEvent::MessageReceived {
122 peer_id: chat_core::network::PeerId::from(peer.to_string()),
123 request_id: Some(RequestId::from(request_id.to_string())),
124 data: response,
125 }),
126 }
127 }
128
129 pub fn on_identify_received(
130 &mut self,
131 peer_id: PeerId,
132 info: libp2p::identify::Info,
133 ) {
134 let kad = &mut self.swarm.behaviour_mut().kad;
137 for addr in &info.listen_addrs {
138 kad.add_address(&peer_id, addr.clone());
139 }
140 let entry = self.peer_addrs.entry(peer_id).or_default();
141 for addr in &info.listen_addrs {
142 if !entry.contains(addr) {
143 entry.push(addr.clone());
144 }
145 }
146 }
147
148 pub fn on_gossipsub_message(
149 &self,
150 propagation_source: PeerId,
151 message: libp2p::gossipsub::Message,
152 ) -> Option<NetworkEvent> {
153 Some(NetworkEvent::BroadcastReceived {
154 peer_id: chat_core::network::PeerId::from(propagation_source.to_string()),
155 data: message.data,
156 })
157 }
158
159 pub fn handle_swarm_event(
162 &mut self,
163 event: libp2p::swarm::SwarmEvent<crate::behaviour::ChatBehaviourEvent>,
164 ) -> Option<NetworkEvent> {
165 use libp2p::swarm::SwarmEvent;
166 match event {
167 SwarmEvent::NewListenAddr { address, .. } => self.on_new_listen_addr(address),
168 SwarmEvent::ExternalAddrConfirmed { address } => {
169 self.on_external_addr_confirmed(address)
170 }
171 SwarmEvent::ConnectionEstablished {
172 peer_id,
173 endpoint,
174 num_established,
175 ..
176 } => self.on_connection_established(peer_id, endpoint, num_established.get()),
177 SwarmEvent::ConnectionClosed {
178 peer_id,
179 endpoint,
180 num_established,
181 ..
182 } => self.on_connection_closed(peer_id, endpoint, num_established),
183 SwarmEvent::OutgoingConnectionError {
184 peer_id, error, ..
185 } => {
186 if let Some(peer_id) = peer_id {
187 self.pending_dials.remove(&peer_id);
188 tracing::debug!(
189 "OutgoingConnectionError peer={} error={}",
190 peer_id,
191 error
192 );
193 }
194 None
195 }
196 SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::Kad(ref event)) => {
197 self.swarm.behaviour_mut().gossipsub_manager.on_kad_event(event);
198 None
199 }
200 SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::History(
201 libp2p::request_response::Event::Message {
202 peer,
203 connection_id: _,
204 message,
205 },
206 )) => self.on_history_message(peer, message),
207 SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::Autorelay(ev)) => {
208 match ev {
209 libp2p::relay::autorelay::Event::StatusChanged { status } => {
210 tracing::info!("Autorelay status changed: {:?}", status);
211 }
212 libp2p::relay::autorelay::Event::NoRelaysAvailable => {
213 tracing::warn!("Autorelay: no relays available");
214 }
215 libp2p::relay::autorelay::Event::RelaysAvailable => {
216 tracing::info!("Autorelay: relays available");
217 }
218 _ => {}
219 }
220 None
221 }
222 SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::Identify(
223 libp2p::identify::Event::Received { peer_id, info, .. },
224 )) => {
225 self.on_identify_received(peer_id, info);
226 None
227 }
228 SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::Gossipsub(
229 ref event @ libp2p::gossipsub::Event::Subscribed { .. }
230 | ref event @ libp2p::gossipsub::Event::Unsubscribed { .. },
231 )) => {
232 tracing::info!("Gossipsub event: {:?}", event);
233 self.swarm
234 .behaviour_mut()
235 .gossipsub_manager
236 .on_gossipsub_event(event);
237 None
238 }
239 SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::Gossipsub(
240 libp2p::gossipsub::Event::Message {
241 propagation_source,
242 message,
243 ..
244 },
245 )) => self.on_gossipsub_message(propagation_source, message),
246 SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::Gossipsub(ev)) => {
247 tracing::info!("Gossipsub other event: {:?}", ev);
248 None
249 }
250 SwarmEvent::Behaviour(
251 crate::behaviour::ChatBehaviourEvent::GossipsubManager(
252 libp2p_gossipsub_manager::Event::PeerDiscovered { peer_id, .. },
253 ),
254 ) => {
255 self.swarm
256 .behaviour_mut()
257 .limits
258 .bypass_peer_id(&peer_id);
259 None
260 }
261 SwarmEvent::Behaviour(
262 crate::behaviour::ChatBehaviourEvent::GossipsubManager(
263 libp2p_gossipsub_manager::Event::TopicPeerJoined { peer_id, .. },
264 ),
265 ) => {
266 self.swarm
267 .behaviour_mut()
268 .limits
269 .bypass_peer_id(&peer_id);
270 Some(NetworkEvent::RoomPeerJoined {
271 peer_id: chat_core::network::PeerId::from(peer_id.to_string()),
272 })
273 }
274 SwarmEvent::Behaviour(
275 crate::behaviour::ChatBehaviourEvent::GossipsubManager(
276 libp2p_gossipsub_manager::Event::TopicPeerLeft { peer_id, .. },
277 ),
278 ) => {
279 self.swarm
280 .behaviour_mut()
281 .limits
282 .remove_peer_id(&peer_id);
283 Some(NetworkEvent::RoomPeerLeft {
284 peer_id: chat_core::network::PeerId::from(peer_id.to_string()),
285 })
286 }
287 SwarmEvent::Behaviour(
288 crate::behaviour::ChatBehaviourEvent::GossipsubManager(
289 libp2p_gossipsub_manager::Event::DiscoveryNeeded { topic, .. },
290 ),
291 ) => {
292 if self.dht_discovery_enabled {
293 if let Ok(cid) = self.swarm.behaviour().gossipsub_manager.topic_to_cid(&topic) {
294 let key = libp2p::kad::RecordKey::new(&cid.to_bytes());
295 tracing::info!("DHT get_providers triggered by DiscoveryNeeded for topic {}", topic);
296 self.swarm.behaviour_mut().kad.get_providers(key);
297 }
298 }
299 None
300 }
301 _ => None,
302 }
303 }
304}