network_libp2p/
event_handlers.rs

1use chat_core::network::{NetworkEvent, RequestId};
2use libp2p::PeerId;
3
4use crate::common::Libp2pNetwork;
5
6impl Libp2pNetwork {
7    // ------------------------------------------------------------------
8    // Swarm event handlers (shared between native and WASM)
9    // ------------------------------------------------------------------
10
11    pub fn on_new_listen_addr(&self, address: libp2p::Multiaddr) -> Option<NetworkEvent> {
12        Some(NetworkEvent::NewListenAddr {
13            addr: chat_core::network::Multiaddr::from(address.to_string()),
14        })
15    }
16
17    pub fn on_external_addr_confirmed(
18        &self,
19        address: libp2p::Multiaddr,
20    ) -> Option<NetworkEvent> {
21        Some(NetworkEvent::ExternalAddrConfirmed {
22            addr: chat_core::network::Multiaddr::from(address.to_string()),
23        })
24    }
25
26    pub fn on_connection_established(
27        &mut self,
28        peer_id: PeerId,
29        endpoint: libp2p::core::connection::ConnectedPoint,
30        num_established: u32,
31    ) -> Option<NetworkEvent> {
32        let (addr_str, addr) = match endpoint {
33            libp2p::core::connection::ConnectedPoint::Dialer { address, .. } => {
34                (address.to_string(), address)
35            }
36            libp2p::core::connection::ConnectedPoint::Listener { send_back_addr, .. } => {
37                (send_back_addr.to_string(), send_back_addr)
38            }
39        };
40        tracing::debug!(
41            "ConnectionEstablished peer={} addr={} num_established={}",
42            peer_id,
43            addr_str,
44            num_established
45        );
46        self.peer_addrs.entry(peer_id).or_default().push(addr);
47        self.peer_conn_addrs
48            .entry(peer_id)
49            .or_default()
50            .push(addr_str);
51        self.pending_dials.remove(&peer_id);
52        if num_established == 1 {
53            Some(NetworkEvent::PeerConnected {
54                peer_id: chat_core::network::PeerId::from(peer_id.to_string()),
55            })
56        } else {
57            None
58        }
59    }
60
61    pub fn on_connection_closed(
62        &mut self,
63        peer_id: PeerId,
64        endpoint: libp2p::core::connection::ConnectedPoint,
65        num_established: u32,
66    ) -> Option<NetworkEvent> {
67        let addr_str = match endpoint {
68            libp2p::core::connection::ConnectedPoint::Dialer { address, .. } => {
69                address.to_string()
70            }
71            libp2p::core::connection::ConnectedPoint::Listener { send_back_addr, .. } => {
72                send_back_addr.to_string()
73            }
74        };
75        if num_established == 0 {
76            tracing::debug!(
77                "ConnectionClosed peer={} addr={} num_established=0 -> PeerDisconnected",
78                peer_id,
79                addr_str
80            );
81            self.peer_addrs.remove(&peer_id);
82            self.peer_conn_addrs.remove(&peer_id);
83            self.pending_dials.remove(&peer_id);
84            self.swarm.behaviour_mut().limits.remove_peer_id(&peer_id);
85            Some(NetworkEvent::PeerDisconnected {
86                peer_id: chat_core::network::PeerId::from(peer_id.to_string()),
87            })
88        } else {
89            tracing::debug!(
90                "ConnectionClosed peer={} addr={} num_established={}",
91                peer_id,
92                addr_str,
93                num_established
94            );
95            if let Some(addrs) = self.peer_conn_addrs.get_mut(&peer_id) {
96                addrs.retain(|a| a != &addr_str);
97            }
98            None
99        }
100    }
101
102    pub fn on_history_message(
103        &mut self,
104        peer: PeerId,
105        message: libp2p::request_response::Message<Vec<u8>, Vec<u8>>,
106    ) -> Option<NetworkEvent> {
107        match message {
108            libp2p::request_response::Message::Request { request, channel, .. } => {
109                let req_id = format!("req-{}", self.next_response_id);
110                self.next_response_id += 1;
111                self.pending_responses.insert(req_id.clone(), channel);
112                Some(NetworkEvent::MessageReceived {
113                    peer_id: chat_core::network::PeerId::from(peer.to_string()),
114                    request_id: Some(RequestId::from(req_id)),
115                    data: request,
116                })
117            }
118            libp2p::request_response::Message::Response {
119                request_id,
120                response,
121            } => Some(NetworkEvent::MessageReceived {
122                peer_id: chat_core::network::PeerId::from(peer.to_string()),
123                request_id: Some(RequestId::from(request_id.to_string())),
124                data: response,
125            }),
126        }
127    }
128
129    pub fn on_identify_received(
130        &mut self,
131        peer_id: PeerId,
132        info: libp2p::identify::Info,
133    ) {
134        // Add the peer's advertised addresses to Kademlia so the
135        // swarm can route to them later (e.g. DHT provider lookups).
136        let kad = &mut self.swarm.behaviour_mut().kad;
137        for addr in &info.listen_addrs {
138            kad.add_address(&peer_id, addr.clone());
139        }
140        let entry = self.peer_addrs.entry(peer_id).or_default();
141        for addr in &info.listen_addrs {
142            if !entry.contains(addr) {
143                entry.push(addr.clone());
144            }
145        }
146    }
147
148    pub fn on_gossipsub_message(
149        &self,
150        propagation_source: PeerId,
151        message: libp2p::gossipsub::Message,
152    ) -> Option<NetworkEvent> {
153        Some(NetworkEvent::BroadcastReceived {
154            peer_id: chat_core::network::PeerId::from(propagation_source.to_string()),
155            data: message.data,
156        })
157    }
158
159    /// Shared swarm event handler. Native and WASM modules call this for
160    /// all common events, handling only platform-specific arms themselves.
161    pub fn handle_swarm_event(
162        &mut self,
163        event: libp2p::swarm::SwarmEvent<crate::behaviour::ChatBehaviourEvent>,
164    ) -> Option<NetworkEvent> {
165        use libp2p::swarm::SwarmEvent;
166        match event {
167            SwarmEvent::NewListenAddr { address, .. } => self.on_new_listen_addr(address),
168            SwarmEvent::ExternalAddrConfirmed { address } => {
169                self.on_external_addr_confirmed(address)
170            }
171            SwarmEvent::ConnectionEstablished {
172                peer_id,
173                endpoint,
174                num_established,
175                ..
176            } => self.on_connection_established(peer_id, endpoint, num_established.get()),
177            SwarmEvent::ConnectionClosed {
178                peer_id,
179                endpoint,
180                num_established,
181                ..
182            } => self.on_connection_closed(peer_id, endpoint, num_established),
183            SwarmEvent::OutgoingConnectionError {
184                peer_id, error, ..
185            } => {
186                if let Some(peer_id) = peer_id {
187                    self.pending_dials.remove(&peer_id);
188                    tracing::debug!(
189                        "OutgoingConnectionError peer={} error={}",
190                        peer_id,
191                        error
192                    );
193                }
194                None
195            }
196            SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::Kad(ref event)) => {
197                self.swarm.behaviour_mut().gossipsub_manager.on_kad_event(event);
198                None
199            }
200            SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::History(
201                libp2p::request_response::Event::Message {
202                    peer,
203                    connection_id: _,
204                    message,
205                },
206            )) => self.on_history_message(peer, message),
207            SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::Autorelay(ev)) => {
208                match ev {
209                    libp2p::relay::autorelay::Event::StatusChanged { status } => {
210                        tracing::info!("Autorelay status changed: {:?}", status);
211                    }
212                    libp2p::relay::autorelay::Event::NoRelaysAvailable => {
213                        tracing::warn!("Autorelay: no relays available");
214                    }
215                    libp2p::relay::autorelay::Event::RelaysAvailable => {
216                        tracing::info!("Autorelay: relays available");
217                    }
218                    _ => {}
219                }
220                None
221            }
222            SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::Identify(
223                libp2p::identify::Event::Received { peer_id, info, .. },
224            )) => {
225                self.on_identify_received(peer_id, info);
226                None
227            }
228            SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::Gossipsub(
229                ref event @ libp2p::gossipsub::Event::Subscribed { .. }
230                | ref event @ libp2p::gossipsub::Event::Unsubscribed { .. },
231            )) => {
232                tracing::info!("Gossipsub event: {:?}", event);
233                self.swarm
234                    .behaviour_mut()
235                    .gossipsub_manager
236                    .on_gossipsub_event(event);
237                None
238            }
239            SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::Gossipsub(
240                libp2p::gossipsub::Event::Message {
241                    propagation_source,
242                    message,
243                    ..
244                },
245            )) => self.on_gossipsub_message(propagation_source, message),
246            SwarmEvent::Behaviour(crate::behaviour::ChatBehaviourEvent::Gossipsub(ev)) => {
247                tracing::info!("Gossipsub other event: {:?}", ev);
248                None
249            }
250            SwarmEvent::Behaviour(
251                crate::behaviour::ChatBehaviourEvent::GossipsubManager(
252                    libp2p_gossipsub_manager::Event::PeerDiscovered { peer_id, .. },
253                ),
254            ) => {
255                self.swarm
256                    .behaviour_mut()
257                    .limits
258                    .bypass_peer_id(&peer_id);
259                None
260            }
261            SwarmEvent::Behaviour(
262                crate::behaviour::ChatBehaviourEvent::GossipsubManager(
263                    libp2p_gossipsub_manager::Event::TopicPeerJoined { peer_id, .. },
264                ),
265            ) => {
266                self.swarm
267                    .behaviour_mut()
268                    .limits
269                    .bypass_peer_id(&peer_id);
270                Some(NetworkEvent::RoomPeerJoined {
271                    peer_id: chat_core::network::PeerId::from(peer_id.to_string()),
272                })
273            }
274            SwarmEvent::Behaviour(
275                crate::behaviour::ChatBehaviourEvent::GossipsubManager(
276                    libp2p_gossipsub_manager::Event::TopicPeerLeft { peer_id, .. },
277                ),
278            ) => {
279                self.swarm
280                    .behaviour_mut()
281                    .limits
282                    .remove_peer_id(&peer_id);
283                Some(NetworkEvent::RoomPeerLeft {
284                    peer_id: chat_core::network::PeerId::from(peer_id.to_string()),
285                })
286            }
287            SwarmEvent::Behaviour(
288                crate::behaviour::ChatBehaviourEvent::GossipsubManager(
289                    libp2p_gossipsub_manager::Event::DiscoveryNeeded { topic, .. },
290                ),
291            ) => {
292                if self.dht_discovery_enabled {
293                    if let Ok(cid) = self.swarm.behaviour().gossipsub_manager.topic_to_cid(&topic) {
294                        let key = libp2p::kad::RecordKey::new(&cid.to_bytes());
295                        tracing::info!("DHT get_providers triggered by DiscoveryNeeded for topic {}", topic);
296                        self.swarm.behaviour_mut().kad.get_providers(key);
297                    }
298                }
299                None
300            }
301            _ => None,
302        }
303    }
304}