chat_core/network/
handlers.rs1use async_broadcast::Sender as BroadcastSender;
2use base64::{engine::general_purpose::STANDARD, Engine as _};
3use yrs::updates::decoder::Decode;
4use yrs::updates::encoder::Encode;
5use yrs::{Doc, ReadTxn, Transact};
6
7use crate::coordinator::{CoordinatorEvent, CoordinatorState, broadcast_nonblocking};
8use crate::history::types::{HistoryRequest, HistoryResponse, PeerInfo};
9use crate::messages::processor::apply_remote_update;
10use crate::network::{HelloMessage, NetworkEvent, NetworkNode, PeerId};
11use crate::stats::build_connection_stats;
12
13pub async fn handle_network_event<N: NetworkNode>(
14 event: NetworkEvent,
15 node: &mut N,
16 doc: &Doc,
17 state: &mut CoordinatorState,
18 event_tx: &BroadcastSender<CoordinatorEvent>,
19) {
20 match event {
21 NetworkEvent::PeerConnected { peer_id } => {
22 state.connected_peers.insert(peer_id.clone());
23 tracing::debug!("Peer connected: {}", peer_id);
24 broadcast_nonblocking(event_tx, CoordinatorEvent::PeerConnected {
25 peer_id: peer_id.clone(),
26 });
27
28 if peer_id == state.local_peer_id {
29 return;
30 }
31
32 if state.is_online {
38 let sv = doc.transact().state_vector();
39 let sv_bytes = sv.encode_v2();
40 let sv_base64 = STANDARD.encode(&sv_bytes);
41 let sync_request = HistoryRequest::Sync {
42 state_vector_base64: sv_base64,
43 };
44 if let Ok(data) = serde_json::to_vec(&sync_request) {
45 let _ = node.send_message(&peer_id, data);
46 }
47 }
48 }
49
50 NetworkEvent::PeerDisconnected { peer_id } => {
51 state.connected_peers.remove(&peer_id);
52 state.room_peers.remove(&peer_id);
53 state.sync.remove_peer(&peer_id);
54 tracing::debug!("Peer disconnected: {}", peer_id);
55
56 broadcast_nonblocking(event_tx, CoordinatorEvent::PeerDisconnected {
57 peer_id: peer_id.clone(),
58 });
59 }
60
61 NetworkEvent::MessageReceived { peer_id, request_id, data } => {
62 match serde_json::from_slice::<HistoryRequest>(&data) {
64 Ok(HistoryRequest::Sync { state_vector_base64 }) => {
65 if let Ok(sv_bytes) = STANDARD.decode(&state_vector_base64)
66 && let Ok(sv) = yrs::StateVector::decode_v2(&sv_bytes)
67 {
68 let update = doc.transact().encode_state_as_update_v2(&sv);
69 let data_base64 = STANDARD.encode(&update);
70 const MAX_SYNC_BASE64_LEN: usize = 10 * 1024 * 1024;
73 if data_base64.len() > MAX_SYNC_BASE64_LEN {
74 tracing::warn!(
75 "Sync response for {} exceeds {} MiB ({} bytes), skipping",
76 peer_id,
77 MAX_SYNC_BASE64_LEN / (1024 * 1024),
78 data_base64.len()
79 );
80 return;
81 }
82 let response = HistoryResponse::Sync { data_base64 };
83 let resp_bytes = match serde_json::to_vec(&response) {
84 Ok(d) => d,
85 Err(e) => {
86 tracing::warn!("Failed to serialize sync response: {}", e);
87 return;
88 }
89 };
90 if let Some(req_id) = request_id {
91 let _ = node.send_response(&req_id, resp_bytes);
92 }
93 }
94 }
95 Ok(HistoryRequest::PeerList { peers }) => {
96 for info in &peers {
98 let info_peer_id = PeerId::from(info.peer_id.clone());
99 if info_peer_id == state.local_peer_id {
100 continue;
101 }
102 if !state.connected_peers.contains(&info_peer_id) {
103 for addr in &info.addrs {
104 if let Err(e) = node.dial(&addr.as_str().into()) {
105 tracing::debug!("Auto-dial peer {} via {} failed: {}", info.peer_id, addr, e);
106 }
107 }
108 }
109 }
110 let my_addrs: Vec<String> = node.dial_addrs().iter().map(|a| a.as_str().to_string()).collect();
112 let mut reply_peers = vec![PeerInfo {
113 peer_id: state.local_peer_id.as_str().to_string(),
114 addrs: my_addrs,
115 }];
116 for p in &state.connected_peers {
117 if p != &peer_id && p != &state.local_peer_id {
118 reply_peers.push(PeerInfo {
119 peer_id: p.as_str().to_string(),
120 addrs: vec![],
121 });
122 }
123 }
124 let response = HistoryResponse::PeerList { peers: reply_peers };
125 let resp_bytes = match serde_json::to_vec(&response) {
126 Ok(d) => d,
127 Err(e) => {
128 tracing::warn!("Failed to serialize peer list response: {}", e);
129 return;
130 }
131 };
132 if let Some(req_id) = request_id {
133 let _ = node.send_response(&req_id, resp_bytes);
134 }
135 }
136 Ok(HistoryRequest::Update { .. }) => {
137 tracing::debug!("Ignoring direct HistoryRequest::Update from {} (updates use gossipsub)", peer_id);
138 }
139 Err(_) => {
140 match serde_json::from_slice::<HistoryResponse>(&data) {
142 Ok(HistoryResponse::Sync { data_base64 }) => {
143 if state.connected_peers.contains(&peer_id) {
149 if let Ok(update_bytes) = STANDARD.decode(&data_base64) {
150 apply_remote_update(doc, &update_bytes);
151 }
152 } else {
153 tracing::debug!("Ignoring sync response from disconnected peer {}", peer_id);
154 }
155 }
156 Ok(HistoryResponse::PeerList { peers }) => {
157 for info in &peers {
158 let info_peer_id = PeerId::from(info.peer_id.clone());
159 if info_peer_id == state.local_peer_id {
160 continue;
161 }
162 if !state.connected_peers.contains(&info_peer_id) {
163 for addr in &info.addrs {
164 if let Err(e) = node.dial(&addr.as_str().into()) {
165 tracing::debug!("Auto-dial peer {} via {} failed: {}", info.peer_id, addr, e);
166 }
167 }
168 }
169 }
170 }
171 Err(e) => {
172 tracing::warn!("Failed to deserialize history message from {}: {}", peer_id, e);
173 }
174 }
175 }
176 }
177 }
178
179 NetworkEvent::BroadcastReceived { peer_id, data } => {
180 if peer_id == state.local_peer_id {
182 return;
183 }
184
185 match serde_json::from_slice::<HelloMessage>(&data) {
187 Ok(hello) => {
188 let hello_peer_id = PeerId::from(hello.peer_id.clone());
189 if hello_peer_id == state.local_peer_id {
190 return;
191 }
192 tracing::debug!("Received hello from {} with addresses", peer_id);
193 if !state.connected_peers.contains(&peer_id) && !node.is_dialing(&peer_id) {
198 let addr_to_dial = hello.circuit_address.as_ref().or(hello.web_rtc_address.as_ref());
199 if let Some(addr) = addr_to_dial
200 && let Err(e) = node.dial(&addr.as_str().into())
201 {
202 tracing::debug!("Auto-dial from hello failed for {}: {}", peer_id, e);
203 }
204 }
205 return;
206 }
207 Err(_) => {
208 }
210 }
211
212 match serde_json::from_slice::<HistoryRequest>(&data) {
213 Ok(HistoryRequest::Update { data_base64 }) => {
214 if let Ok(update_bytes) = STANDARD.decode(&data_base64) {
215 apply_remote_update(doc, &update_bytes);
216 }
217 }
220 Ok(other) => {
221 tracing::warn!("Unexpected broadcast message type from {}: {:?}", peer_id, other);
222 }
223 Err(e) => {
224 tracing::warn!("Failed to deserialize broadcast from {}: {}", peer_id, e);
225 }
226 }
227 }
228
229 NetworkEvent::NewListenAddr { addr } => {
230 tracing::debug!("New listen address: {}", addr);
231 let room_peer_strs: Vec<String> = state.room_peers.iter().map(|p| p.as_str().to_string()).collect();
232 let stats = build_connection_stats(node.raw_stats(), room_peer_strs);
233 broadcast_nonblocking(event_tx, CoordinatorEvent::StatsUpdated(Box::new(stats)));
234 }
235
236 NetworkEvent::ExternalAddrConfirmed { addr } => {
237 let addr_str = addr.as_str();
238 if addr_str.contains("p2p-circuit") || addr_str.contains("webrtc") {
239 tracing::debug!("New dialable external address: {}", addr_str);
240 let room_peer_strs: Vec<String> = state.room_peers.iter().map(|p| p.as_str().to_string()).collect();
241 let stats = build_connection_stats(node.raw_stats(), room_peer_strs);
242 broadcast_nonblocking(event_tx, CoordinatorEvent::StatsUpdated(Box::new(stats)));
243 } else {
244 tracing::debug!("Ignoring non-dialable external address: {}", addr_str);
245 }
246 }
247
248 NetworkEvent::RoomPeerJoined { peer_id } => {
249 if peer_id == state.local_peer_id {
250 return;
251 }
252 state.room_peers.insert(peer_id.clone());
253 tracing::debug!("Room peer joined: {}", peer_id);
254
255 let sv = doc.transact().state_vector();
257 let sv_bytes = sv.encode_v2();
258 let sv_base64 = STANDARD.encode(&sv_bytes);
259 let sync_request = HistoryRequest::Sync {
260 state_vector_base64: sv_base64,
261 };
262 let sync_data = match serde_json::to_vec(&sync_request) {
263 Ok(d) => d,
264 Err(e) => {
265 tracing::warn!("Failed to serialize sync request: {}", e);
266 return;
267 }
268 };
269 let _ = node.send_message(&peer_id, sync_data);
270
271 let my_addrs: Vec<String> = node.dial_addrs().iter().map(|a| a.as_str().to_string()).collect();
273 let mut peers = vec![PeerInfo {
274 peer_id: state.local_peer_id.as_str().to_string(),
275 addrs: my_addrs,
276 }];
277 for p in &state.room_peers {
278 if p != &peer_id && p != &state.local_peer_id {
279 peers.push(PeerInfo {
280 peer_id: p.as_str().to_string(),
281 addrs: vec![], });
283 }
284 }
285 let list_request = HistoryRequest::PeerList { peers };
286 let list_data = match serde_json::to_vec(&list_request) {
287 Ok(d) => d,
288 Err(e) => {
289 tracing::warn!("Failed to serialize peer list request: {}", e);
290 return;
291 }
292 };
293 let _ = node.send_message(&peer_id, list_data);
294
295 state.sync.add_peer(&peer_id);
297 }
298
299 NetworkEvent::RoomPeerLeft { peer_id } => {
300 state.room_peers.remove(&peer_id);
301 tracing::debug!("Room peer left: {}", peer_id);
302 }
303
304 NetworkEvent::StatsUpdated { .. } => {
305 }
308 }
309}