chat_core/commands/
handlers.rs1use async_broadcast::Sender as BroadcastSender;
2use std::collections::HashSet;
3use web_time::{SystemTime, UNIX_EPOCH};
4use yrs::{Doc, Map, MapRef, Transact};
5
6use crate::coordinator::{CoordinatorCommand, CoordinatorEvent, CoordinatorState, broadcast_nonblocking};
7use crate::error::ChatResult;
8use crate::history::append::{append_message, message_from_any};
9use crate::history::types::ChatMessage;
10use crate::messages::processor::{emit_messages_state, load_room_updates, read_messages};
11use crate::network::{Multiaddr, NetworkNode, RoomId};
12use crate::room::{hash_topic, generate_random_nickname};
13use crate::storage::SqliteStorage;
14
15pub async fn handle_command<N: NetworkNode>(
16 cmd: CoordinatorCommand,
17 node: &mut N,
18 storage: &SqliteStorage,
19 doc: &Doc,
20 map: &MapRef,
21 state: &mut CoordinatorState,
22 event_tx: &BroadcastSender<CoordinatorEvent>,
23) -> ChatResult<()> {
24 match cmd {
25 CoordinatorCommand::JoinRoom { room_name } => {
26 let topic = hash_topic(&room_name);
27 let room_id = RoomId::from(room_name.clone());
28
29 {
33 let mut txn = doc.transact_mut_with("clear");
34 let keys: Vec<String> = map.iter(&txn).map(|(k, _)| k.to_string()).collect();
35 for key in keys {
36 map.remove(&mut txn, &key);
37 }
38 }
39
40 load_room_updates(doc, storage, &room_name)?;
42
43 {
45 let txn = doc.transact();
46 for (key, out) in map.iter(&txn) {
47 let id = key.to_string();
48 if let Ok(msg) = message_from_any(&out) {
49 state.seen_message_ids.insert(id);
50 state.messages.push(msg);
51 }
52 }
53 }
54
55 if let Err(e) = node.join_room(&room_id) {
57 tracing::warn!("join_room failed for {}: {}", room_name, e);
58 }
59
60 state.seq = storage.get_seq(&room_name).unwrap_or(0);
62
63 let nick_key = format!("nickname:{}", room_name);
65 if let Ok(Some(bytes)) = storage.get_setting(&nick_key)
66 && let Ok(s) = String::from_utf8(bytes)
67 && !s.is_empty()
68 {
69 state.nickname = s;
70 broadcast_nonblocking(event_tx, CoordinatorEvent::NicknameChanged {
71 nickname: state.nickname.clone(),
72 });
73 } else {
74 state.nickname = generate_random_nickname();
76 let _ = storage.set_setting(&nick_key, state.nickname.as_bytes());
77 broadcast_nonblocking(event_tx, CoordinatorEvent::NicknameChanged {
78 nickname: state.nickname.clone(),
79 });
80 }
81
82 match storage.load_bootstrap_peers(&room_name) {
84 Ok(peers) => {
85 for ticket in peers {
86 tracing::info!("Re-dialing bootstrap peer for room {}: {}", room_name, ticket);
87 if let Err(e) = node.dial(&Multiaddr::from(ticket)) {
88 tracing::warn!("Failed to re-dial bootstrap peer: {}", e);
89 }
90 }
91 }
92 Err(e) => {
93 tracing::warn!("Failed to load bootstrap peers: {}", e);
94 }
95 }
96
97 state.room_name = Some(room_name.clone());
98 state.room_topic = Some(topic);
99 state.room_id = Some(room_id);
100 state.is_online = true;
101
102 broadcast_nonblocking(event_tx, CoordinatorEvent::IsOnline(true));
103 broadcast_nonblocking(event_tx, CoordinatorEvent::RoomJoined { room_name: room_name.clone() });
104 broadcast_nonblocking(event_tx, CoordinatorEvent::SystemMessage(format!(
105 "Connected as {} in room \"{}\"",
106 state.nickname, room_name
107 )));
108
109 emit_messages_state(state, event_tx).await;
111 }
112
113 CoordinatorCommand::LeaveRoom => {
114 if let Some(room_name) = state.room_name.take() {
115 let _ = storage.clear_bootstrap_peers(&room_name);
118 }
119 state.room_topic = None;
120 state.room_id = None;
121 state.is_online = false;
122 state.connected_peers.clear();
123 state.room_peers.clear();
124 state.sync.sync_neighbors.clear();
125 state.sync.available_peers.clear();
126 state.sync.sync_heap.clear();
127 state.seen_peers.clear();
128 state.messages.clear();
129 state.seen_message_ids.clear();
130 broadcast_nonblocking(event_tx, CoordinatorEvent::IsOnline(false));
131 broadcast_nonblocking(event_tx, CoordinatorEvent::MessagesUpdated(vec![]));
132 broadcast_nonblocking(event_tx, CoordinatorEvent::SystemMessage("Left room".into()));
133 }
134
135 CoordinatorCommand::SendMessage { content } => {
136 let trimmed = content.trim();
137 if trimmed.is_empty() {
138 return Ok(());
139 }
140 if !state.is_online {
141 broadcast_nonblocking(event_tx, CoordinatorEvent::ErrorMessage(
142 "Not connected to network".into(),
143 ));
144 return Ok(());
145 }
146
147 let topic = state.room_topic.clone().unwrap_or_default();
148 if topic.is_empty() {
149 broadcast_nonblocking(event_tx, CoordinatorEvent::ErrorMessage(
150 "Not in a room".into(),
151 ));
152 return Ok(());
153 }
154
155 let messages = read_messages(doc, map);
156 let heads: HashSet<String> = {
157 let all_parents: HashSet<String> =
158 messages.iter().flat_map(|m| m.parent_ids.clone()).collect();
159 messages
160 .iter()
161 .filter(|m| !all_parents.contains(&m.id))
162 .map(|m| m.id.clone())
163 .collect()
164 };
165
166 let id = format!("{}:{}", state.stable_sender_id, state.seq);
167 let dial_addrs = node.dial_addrs();
168 let circuit_address = dial_addrs.iter().find(|a| a.as_str().contains("p2p-circuit")).map(|a| a.as_str().to_string());
169 let web_rtc_address = dial_addrs.iter().find(|a| a.as_str().contains("webrtc")).map(|a| a.as_str().to_string());
170
171 let msg = ChatMessage {
172 id: id.clone(),
173 peer_id: state.local_peer_id.as_str().to_string(),
174 stable_sender_id: state.stable_sender_id.clone(),
175 nickname: state.nickname.clone(),
176 content: trimmed.into(),
177 timestamp: SystemTime::now()
178 .duration_since(UNIX_EPOCH)
179 .unwrap_or_default()
180 .as_secs_f64()
181 * 1000.0,
182 seq: state.seq as f64,
183 parent_ids: heads.into_iter().collect(),
184 circuit_address,
185 web_rtc_address,
186 };
187
188 {
190 let mut txn = doc.transact_mut();
191 append_message(&mut txn, map, &msg)?;
192 }
193
194 state.seq += 1;
196 let _ = storage.set_seq(state.room_name.as_ref().unwrap_or(&"".into()), state.seq);
197 }
198
199 CoordinatorCommand::DialPeer { multiaddr } => {
200 tracing::info!("DialPeer command received: {}", &multiaddr[..multiaddr.len().min(60)]);
201 broadcast_nonblocking(event_tx, CoordinatorEvent::SystemMessage(format!(
202 "Dialing peer: {}...",
203 &multiaddr[..multiaddr.len().min(40)]
204 )));
205 if let Err(e) = node.dial(&Multiaddr::from(multiaddr.clone())) {
206 tracing::warn!("Dial failed for {}: {}", multiaddr, e);
207 broadcast_nonblocking(event_tx, CoordinatorEvent::ErrorMessage(format!(
208 "Dial failed: {}", e
209 )));
210 } else {
211 tracing::info!("Dial queued for {}", multiaddr);
212 broadcast_nonblocking(event_tx, CoordinatorEvent::SystemMessage(
213 "Dial queued".into(),
214 ));
215 if let Some(room_name) = &state.room_name
217 && let Err(e) = storage.save_bootstrap_peer(room_name, &multiaddr)
218 {
219 tracing::warn!("Failed to save bootstrap peer: {}", e);
220 }
221 }
222 }
223
224 CoordinatorCommand::SetNickname { nickname } => {
225 let nick = nickname.trim();
226 if !nick.is_empty() && nick != state.nickname {
227 state.nickname = nick[..nick.len().min(20)].to_string();
228 if let Some(room) = &state.room_name {
229 let key = format!("nickname:{}", room);
230 let _ = storage.set_setting(&key, state.nickname.as_bytes());
231 }
232 broadcast_nonblocking(event_tx, CoordinatorEvent::NicknameChanged {
233 nickname: state.nickname.clone(),
234 });
235 broadcast_nonblocking(event_tx, CoordinatorEvent::SystemMessage(format!(
236 "Nickname changed to: {}",
237 state.nickname
238 )));
239 }
240 }
241
242 CoordinatorCommand::Shutdown => {}
243 }
244
245 Ok(())
246}