chat_core/network/
handlers.rs

1use async_broadcast::Sender as BroadcastSender;
2use base64::{engine::general_purpose::STANDARD, Engine as _};
3use yrs::updates::decoder::Decode;
4use yrs::updates::encoder::Encode;
5use yrs::{Doc, ReadTxn, Transact};
6
7use crate::coordinator::{CoordinatorEvent, CoordinatorState, broadcast_nonblocking};
8use crate::history::types::{HistoryRequest, HistoryResponse, PeerInfo};
9use crate::messages::processor::apply_remote_update;
10use crate::network::{HelloMessage, NetworkEvent, NetworkNode, PeerId};
11use crate::stats::build_connection_stats;
12
13pub async fn handle_network_event<N: NetworkNode>(
14    event: NetworkEvent,
15    node: &mut N,
16    doc: &Doc,
17    state: &mut CoordinatorState,
18    event_tx: &BroadcastSender<CoordinatorEvent>,
19) {
20    match event {
21        NetworkEvent::PeerConnected { peer_id } => {
22            state.connected_peers.insert(peer_id.clone());
23            tracing::debug!("Peer connected: {}", peer_id);
24            broadcast_nonblocking(event_tx, CoordinatorEvent::PeerConnected {
25                peer_id: peer_id.clone(),
26            });
27
28            if peer_id == state.local_peer_id {
29                return;
30            }
31
32            // Proactive sync: don't wait for RoomPeerJoined. Gossipsub
33            // subscription propagation can be lost or delayed, especially
34            // with only two peers. Send a sync request immediately on
35            // connection so history exchange works even if Subscribed
36            // never fires.
37            if state.is_online {
38                let sv = doc.transact().state_vector();
39                let sv_bytes = sv.encode_v2();
40                let sv_base64 = STANDARD.encode(&sv_bytes);
41                let sync_request = HistoryRequest::Sync {
42                    state_vector_base64: sv_base64,
43                };
44                if let Ok(data) = serde_json::to_vec(&sync_request) {
45                    let _ = node.send_message(&peer_id, data);
46                }
47            }
48        }
49
50        NetworkEvent::PeerDisconnected { peer_id } => {
51            state.connected_peers.remove(&peer_id);
52            state.room_peers.remove(&peer_id);
53            state.sync.remove_peer(&peer_id);
54            tracing::debug!("Peer disconnected: {}", peer_id);
55
56            broadcast_nonblocking(event_tx, CoordinatorEvent::PeerDisconnected {
57                peer_id: peer_id.clone(),
58            });
59        }
60
61        NetworkEvent::MessageReceived { peer_id, request_id, data } => {
62            // Try to deserialize as a request first
63            match serde_json::from_slice::<HistoryRequest>(&data) {
64                Ok(HistoryRequest::Sync { state_vector_base64 }) => {
65                    if let Ok(sv_bytes) = STANDARD.decode(&state_vector_base64)
66                        && let Ok(sv) = yrs::StateVector::decode_v2(&sv_bytes)
67                    {
68                        let update = doc.transact().encode_state_as_update_v2(&sv);
69                        let data_base64 = STANDARD.encode(&update);
70                        // Defensive cap: base64 of a 10 MiB update is ~13.3 MiB of JSON.
71                        // Anything larger indicates an unusually large room state.
72                        const MAX_SYNC_BASE64_LEN: usize = 10 * 1024 * 1024;
73                        if data_base64.len() > MAX_SYNC_BASE64_LEN {
74                            tracing::warn!(
75                                "Sync response for {} exceeds {} MiB ({} bytes), skipping",
76                                peer_id,
77                                MAX_SYNC_BASE64_LEN / (1024 * 1024),
78                                data_base64.len()
79                            );
80                            return;
81                        }
82                        let response = HistoryResponse::Sync { data_base64 };
83                        let resp_bytes = match serde_json::to_vec(&response) {
84                            Ok(d) => d,
85                            Err(e) => {
86                                tracing::warn!("Failed to serialize sync response: {}", e);
87                                return;
88                            }
89                        };
90                        if let Some(req_id) = request_id {
91                            let _ = node.send_response(&req_id, resp_bytes);
92                        }
93                    }
94                }
95                Ok(HistoryRequest::PeerList { peers }) => {
96                    // Merge received peers and dial unknown ones
97                    for info in &peers {
98                        let info_peer_id = PeerId::from(info.peer_id.clone());
99                        if info_peer_id == state.local_peer_id {
100                            continue;
101                        }
102                        if !state.connected_peers.contains(&info_peer_id) {
103                            for addr in &info.addrs {
104                                if let Err(e) = node.dial(&addr.as_str().into()) {
105                                    tracing::debug!("Auto-dial peer {} via {} failed: {}", info.peer_id, addr, e);
106                                }
107                            }
108                        }
109                    }
110                    // Reply with our known peers
111                    let my_addrs: Vec<String> = node.dial_addrs().iter().map(|a| a.as_str().to_string()).collect();
112                    let mut reply_peers = vec![PeerInfo {
113                        peer_id: state.local_peer_id.as_str().to_string(),
114                        addrs: my_addrs,
115                    }];
116                    for p in &state.connected_peers {
117                        if p != &peer_id && p != &state.local_peer_id {
118                            reply_peers.push(PeerInfo {
119                                peer_id: p.as_str().to_string(),
120                                addrs: vec![],
121                            });
122                        }
123                    }
124                    let response = HistoryResponse::PeerList { peers: reply_peers };
125                    let resp_bytes = match serde_json::to_vec(&response) {
126                        Ok(d) => d,
127                        Err(e) => {
128                            tracing::warn!("Failed to serialize peer list response: {}", e);
129                            return;
130                        }
131                    };
132                    if let Some(req_id) = request_id {
133                        let _ = node.send_response(&req_id, resp_bytes);
134                    }
135                }
136                Ok(HistoryRequest::Update { .. }) => {
137                    tracing::debug!("Ignoring direct HistoryRequest::Update from {} (updates use gossipsub)", peer_id);
138                }
139                Err(_) => {
140                    // Not a request — try as a response
141                    match serde_json::from_slice::<HistoryResponse>(&data) {
142                        Ok(HistoryResponse::Sync { data_base64 }) => {
143                            // Accept sync responses from any connected peer.
144                            // The proactive sync sent on PeerConnected targets peers that may
145                            // not yet be in the gossipsub mesh (Subscribed event can be
146                            // delayed), so rejecting responses based on room_peers would
147                            // defeat the purpose of that early sync.
148                            if state.connected_peers.contains(&peer_id) {
149                                if let Ok(update_bytes) = STANDARD.decode(&data_base64) {
150                                    apply_remote_update(doc, &update_bytes);
151                                }
152                            } else {
153                                tracing::debug!("Ignoring sync response from disconnected peer {}", peer_id);
154                            }
155                        }
156                        Ok(HistoryResponse::PeerList { peers }) => {
157                            for info in &peers {
158                                let info_peer_id = PeerId::from(info.peer_id.clone());
159                                if info_peer_id == state.local_peer_id {
160                                    continue;
161                                }
162                                if !state.connected_peers.contains(&info_peer_id) {
163                                    for addr in &info.addrs {
164                                        if let Err(e) = node.dial(&addr.as_str().into()) {
165                                            tracing::debug!("Auto-dial peer {} via {} failed: {}", info.peer_id, addr, e);
166                                        }
167                                    }
168                                }
169                            }
170                        }
171                        Err(e) => {
172                            tracing::warn!("Failed to deserialize history message from {}: {}", peer_id, e);
173                        }
174                    }
175                }
176            }
177        }
178
179        NetworkEvent::BroadcastReceived { peer_id, data } => {
180            // Ignore our own gossipsub loopback messages.
181            if peer_id == state.local_peer_id {
182                return;
183            }
184
185            // Try to deserialize as HelloMessage first (ephemeral signaling).
186            match serde_json::from_slice::<HelloMessage>(&data) {
187                Ok(hello) => {
188                    let hello_peer_id = PeerId::from(hello.peer_id.clone());
189                    if hello_peer_id == state.local_peer_id {
190                        return;
191                    }
192                    tracing::debug!("Received hello from {} with addresses", peer_id);
193                    // Auto-dial the first advertised address if we are neither connected
194                    // nor already dialing this peer.  The circuit and webrtc addresses
195                    // resolve to the same relay path; dialing both would double the
196                    // outbound relay load for no benefit.
197                    if !state.connected_peers.contains(&peer_id) && !node.is_dialing(&peer_id) {
198                        let addr_to_dial = hello.circuit_address.as_ref().or(hello.web_rtc_address.as_ref());
199                        if let Some(addr) = addr_to_dial
200                            && let Err(e) = node.dial(&addr.as_str().into())
201                        {
202                            tracing::debug!("Auto-dial from hello failed for {}: {}", peer_id, e);
203                        }
204                    }
205                    return;
206                }
207                Err(_) => {
208                    // Not a hello message — fall through to history request handling.
209                }
210            }
211
212            match serde_json::from_slice::<HistoryRequest>(&data) {
213                Ok(HistoryRequest::Update { data_base64 }) => {
214                    if let Ok(update_bytes) = STANDARD.decode(&data_base64) {
215                        apply_remote_update(doc, &update_bytes);
216                    }
217                    // Updates received via pubsub are already broadcast to all room
218                    // members. No ack or forwarding needed.
219                }
220                Ok(other) => {
221                    tracing::warn!("Unexpected broadcast message type from {}: {:?}", peer_id, other);
222                }
223                Err(e) => {
224                    tracing::warn!("Failed to deserialize broadcast from {}: {}", peer_id, e);
225                }
226            }
227        }
228
229        NetworkEvent::NewListenAddr { addr } => {
230            tracing::debug!("New listen address: {}", addr);
231            let room_peer_strs: Vec<String> = state.room_peers.iter().map(|p| p.as_str().to_string()).collect();
232            let stats = build_connection_stats(node.raw_stats(), room_peer_strs);
233            broadcast_nonblocking(event_tx, CoordinatorEvent::StatsUpdated(Box::new(stats)));
234        }
235
236        NetworkEvent::ExternalAddrConfirmed { addr } => {
237            let addr_str = addr.as_str();
238            if addr_str.contains("p2p-circuit") || addr_str.contains("webrtc") {
239                tracing::debug!("New dialable external address: {}", addr_str);
240                let room_peer_strs: Vec<String> = state.room_peers.iter().map(|p| p.as_str().to_string()).collect();
241                let stats = build_connection_stats(node.raw_stats(), room_peer_strs);
242                broadcast_nonblocking(event_tx, CoordinatorEvent::StatsUpdated(Box::new(stats)));
243            } else {
244                tracing::debug!("Ignoring non-dialable external address: {}", addr_str);
245            }
246        }
247
248        NetworkEvent::RoomPeerJoined { peer_id } => {
249            if peer_id == state.local_peer_id {
250                return;
251            }
252            state.room_peers.insert(peer_id.clone());
253            tracing::debug!("Room peer joined: {}", peer_id);
254
255            // Send history sync request now that we know they're in our room
256            let sv = doc.transact().state_vector();
257            let sv_bytes = sv.encode_v2();
258            let sv_base64 = STANDARD.encode(&sv_bytes);
259            let sync_request = HistoryRequest::Sync {
260                state_vector_base64: sv_base64,
261            };
262            let sync_data = match serde_json::to_vec(&sync_request) {
263                Ok(d) => d,
264                Err(e) => {
265                    tracing::warn!("Failed to serialize sync request: {}", e);
266                    return;
267                }
268            };
269            let _ = node.send_message(&peer_id, sync_data);
270
271            // Send our peer list so they know who else is in the room
272            let my_addrs: Vec<String> = node.dial_addrs().iter().map(|a| a.as_str().to_string()).collect();
273            let mut peers = vec![PeerInfo {
274                peer_id: state.local_peer_id.as_str().to_string(),
275                addrs: my_addrs,
276            }];
277            for p in &state.room_peers {
278                if p != &peer_id && p != &state.local_peer_id {
279                    peers.push(PeerInfo {
280                        peer_id: p.as_str().to_string(),
281                        addrs: vec![], // we don't know their addrs yet
282                    });
283                }
284            }
285            let list_request = HistoryRequest::PeerList { peers };
286            let list_data = match serde_json::to_vec(&list_request) {
287                Ok(d) => d,
288                Err(e) => {
289                    tracing::warn!("Failed to serialize peer list request: {}", e);
290                    return;
291                }
292            };
293            let _ = node.send_message(&peer_id, list_data);
294
295            // Add to sync neighbors if we have room
296            state.sync.add_peer(&peer_id);
297        }
298
299        NetworkEvent::RoomPeerLeft { peer_id } => {
300            state.room_peers.remove(&peer_id);
301            tracing::debug!("Room peer left: {}", peer_id);
302        }
303
304        NetworkEvent::StatsUpdated { .. } => {
305            // The backend may emit StatsUpdated events on its own.
306            // For now we rely on the periodic stats_interval calling node.stats().
307        }
308    }
309}