chat_core/commands/
handlers.rs

1use async_broadcast::Sender as BroadcastSender;
2use std::collections::HashSet;
3use web_time::{SystemTime, UNIX_EPOCH};
4use yrs::{Doc, Map, MapRef, Transact};
5
6use crate::coordinator::{CoordinatorCommand, CoordinatorEvent, CoordinatorState, broadcast_nonblocking};
7use crate::error::ChatResult;
8use crate::history::append::{append_message, message_from_any};
9use crate::history::types::ChatMessage;
10use crate::messages::processor::{emit_messages_state, load_room_updates, read_messages};
11use crate::network::{Multiaddr, NetworkNode, RoomId};
12use crate::room::{hash_topic, generate_random_nickname};
13use crate::storage::SqliteStorage;
14
15pub async fn handle_command<N: NetworkNode>(
16    cmd: CoordinatorCommand,
17    node: &mut N,
18    storage: &SqliteStorage,
19    doc: &Doc,
20    map: &MapRef,
21    state: &mut CoordinatorState,
22    event_tx: &BroadcastSender<CoordinatorEvent>,
23) -> ChatResult<()> {
24    match cmd {
25        CoordinatorCommand::JoinRoom { room_name } => {
26            let topic = hash_topic(&room_name);
27            let room_id = RoomId::from(room_name.clone());
28
29            // Wipe any leftover data from previous rooms so the doc only
30            // contains the current room's state.  Use origin "clear" so the
31            // observer skips persist / publish.
32            {
33                let mut txn = doc.transact_mut_with("clear");
34                let keys: Vec<String> = map.iter(&txn).map(|(k, _)| k.to_string()).collect();
35                for key in keys {
36                    map.remove(&mut txn, &key);
37                }
38            }
39
40            // Load persisted updates for this room
41            load_room_updates(doc, storage, &room_name)?;
42
43            // Populate initial message list from the loaded doc.
44            {
45                let txn = doc.transact();
46                for (key, out) in map.iter(&txn) {
47                    let id = key.to_string();
48                    if let Ok(msg) = message_from_any(&out) {
49                        state.seen_message_ids.insert(id);
50                        state.messages.push(msg);
51                    }
52                }
53            }
54
55            // Join the room on the network backend.
56            if let Err(e) = node.join_room(&room_id) {
57                tracing::warn!("join_room failed for {}: {}", room_name, e);
58            }
59
60            // Restore seq for this room
61            state.seq = storage.get_seq(&room_name).unwrap_or(0);
62
63            // Restore nickname
64            let nick_key = format!("nickname:{}", room_name);
65            if let Ok(Some(bytes)) = storage.get_setting(&nick_key)
66                && let Ok(s) = String::from_utf8(bytes)
67                && !s.is_empty()
68            {
69                state.nickname = s;
70                broadcast_nonblocking(event_tx, CoordinatorEvent::NicknameChanged {
71                    nickname: state.nickname.clone(),
72                });
73            } else {
74                // Generate a random nickname
75                state.nickname = generate_random_nickname();
76                let _ = storage.set_setting(&nick_key, state.nickname.as_bytes());
77                broadcast_nonblocking(event_tx, CoordinatorEvent::NicknameChanged {
78                    nickname: state.nickname.clone(),
79                });
80            }
81
82            // Re-dial any persisted bootstrap peers for this room
83            match storage.load_bootstrap_peers(&room_name) {
84                Ok(peers) => {
85                    for ticket in peers {
86                        tracing::info!("Re-dialing bootstrap peer for room {}: {}", room_name, ticket);
87                        if let Err(e) = node.dial(&Multiaddr::from(ticket)) {
88                            tracing::warn!("Failed to re-dial bootstrap peer: {}", e);
89                        }
90                    }
91                }
92                Err(e) => {
93                    tracing::warn!("Failed to load bootstrap peers: {}", e);
94                }
95            }
96
97            state.room_name = Some(room_name.clone());
98            state.room_topic = Some(topic);
99            state.room_id = Some(room_id);
100            state.is_online = true;
101
102            broadcast_nonblocking(event_tx, CoordinatorEvent::IsOnline(true));
103            broadcast_nonblocking(event_tx, CoordinatorEvent::RoomJoined { room_name: room_name.clone() });
104            broadcast_nonblocking(event_tx, CoordinatorEvent::SystemMessage(format!(
105                "Connected as {} in room \"{}\"",
106                state.nickname, room_name
107            )));
108
109            // Emit initial messages (from persisted data)
110            emit_messages_state(state, event_tx).await;
111        }
112
113        CoordinatorCommand::LeaveRoom => {
114            if let Some(room_name) = state.room_name.take() {
115                // Note: we intentionally do NOT delete persisted Yjs updates
116                // so history is preserved when re-joining later.
117                let _ = storage.clear_bootstrap_peers(&room_name);
118            }
119            state.room_topic = None;
120            state.room_id = None;
121            state.is_online = false;
122            state.connected_peers.clear();
123            state.room_peers.clear();
124            state.sync.sync_neighbors.clear();
125            state.sync.available_peers.clear();
126            state.sync.sync_heap.clear();
127            state.seen_peers.clear();
128            state.messages.clear();
129            state.seen_message_ids.clear();
130            broadcast_nonblocking(event_tx, CoordinatorEvent::IsOnline(false));
131            broadcast_nonblocking(event_tx, CoordinatorEvent::MessagesUpdated(vec![]));
132            broadcast_nonblocking(event_tx, CoordinatorEvent::SystemMessage("Left room".into()));
133        }
134
135        CoordinatorCommand::SendMessage { content } => {
136            let trimmed = content.trim();
137            if trimmed.is_empty() {
138                return Ok(());
139            }
140            if !state.is_online {
141                broadcast_nonblocking(event_tx, CoordinatorEvent::ErrorMessage(
142                    "Not connected to network".into(),
143                ));
144                return Ok(());
145            }
146
147            let topic = state.room_topic.clone().unwrap_or_default();
148            if topic.is_empty() {
149                broadcast_nonblocking(event_tx, CoordinatorEvent::ErrorMessage(
150                    "Not in a room".into(),
151                ));
152                return Ok(());
153            }
154
155            let messages = read_messages(doc, map);
156            let heads: HashSet<String> = {
157                let all_parents: HashSet<String> =
158                    messages.iter().flat_map(|m| m.parent_ids.clone()).collect();
159                messages
160                    .iter()
161                    .filter(|m| !all_parents.contains(&m.id))
162                    .map(|m| m.id.clone())
163                    .collect()
164            };
165
166            let id = format!("{}:{}", state.stable_sender_id, state.seq);
167            let dial_addrs = node.dial_addrs();
168            let circuit_address = dial_addrs.iter().find(|a| a.as_str().contains("p2p-circuit")).map(|a| a.as_str().to_string());
169            let web_rtc_address = dial_addrs.iter().find(|a| a.as_str().contains("webrtc")).map(|a| a.as_str().to_string());
170
171            let msg = ChatMessage {
172                id: id.clone(),
173                peer_id: state.local_peer_id.as_str().to_string(),
174                stable_sender_id: state.stable_sender_id.clone(),
175                nickname: state.nickname.clone(),
176                content: trimmed.into(),
177                timestamp: SystemTime::now()
178                    .duration_since(UNIX_EPOCH)
179                    .unwrap_or_default()
180                    .as_secs_f64()
181                    * 1000.0,
182                seq: state.seq as f64,
183                parent_ids: heads.into_iter().collect(),
184                circuit_address,
185                web_rtc_address,
186            };
187
188            // Append to Yjs map (triggers observer)
189            {
190                let mut txn = doc.transact_mut();
191                append_message(&mut txn, map, &msg)?;
192            }
193
194            // Persist seq
195            state.seq += 1;
196            let _ = storage.set_seq(state.room_name.as_ref().unwrap_or(&"".into()), state.seq);
197        }
198
199        CoordinatorCommand::DialPeer { multiaddr } => {
200            tracing::info!("DialPeer command received: {}", &multiaddr[..multiaddr.len().min(60)]);
201            broadcast_nonblocking(event_tx, CoordinatorEvent::SystemMessage(format!(
202                "Dialing peer: {}...",
203                &multiaddr[..multiaddr.len().min(40)]
204            )));
205            if let Err(e) = node.dial(&Multiaddr::from(multiaddr.clone())) {
206                tracing::warn!("Dial failed for {}: {}", multiaddr, e);
207                broadcast_nonblocking(event_tx, CoordinatorEvent::ErrorMessage(format!(
208                    "Dial failed: {}", e
209                )));
210            } else {
211                tracing::info!("Dial queued for {}", multiaddr);
212                broadcast_nonblocking(event_tx, CoordinatorEvent::SystemMessage(
213                    "Dial queued".into(),
214                ));
215                // Persist this bootstrap peer for the current room
216                if let Some(room_name) = &state.room_name
217                    && let Err(e) = storage.save_bootstrap_peer(room_name, &multiaddr)
218                {
219                    tracing::warn!("Failed to save bootstrap peer: {}", e);
220                }
221            }
222        }
223
224        CoordinatorCommand::SetNickname { nickname } => {
225            let nick = nickname.trim();
226            if !nick.is_empty() && nick != state.nickname {
227                state.nickname = nick[..nick.len().min(20)].to_string();
228                if let Some(room) = &state.room_name {
229                    let key = format!("nickname:{}", room);
230                    let _ = storage.set_setting(&key, state.nickname.as_bytes());
231                }
232                broadcast_nonblocking(event_tx, CoordinatorEvent::NicknameChanged {
233                    nickname: state.nickname.clone(),
234                });
235                broadcast_nonblocking(event_tx, CoordinatorEvent::SystemMessage(format!(
236                    "Nickname changed to: {}",
237                    state.nickname
238                )));
239            }
240        }
241
242        CoordinatorCommand::Shutdown => {}
243    }
244
245    Ok(())
246}