chat_core/messages/
processor.rs1use async_broadcast::Sender as BroadcastSender;
2use base64::{engine::general_purpose::STANDARD, Engine as _};
3use yrs::updates::decoder::Decode;
4use yrs::{Doc, Map, MapRef, Transact, Update};
5
6use crate::coordinator::{CoordinatorEvent, CoordinatorState};
7use crate::error::ChatResult;
8use crate::history::append::message_from_any;
9use crate::history::types::{ChatMessage, HistoryRequest};
10use crate::network::NetworkNode;
11use crate::storage::SqliteStorage;
12
13pub struct YjsUpdateMeta {
14 pub update_bytes: Vec<u8>,
15 pub is_remote: bool,
16 pub is_clear: bool,
17 pub is_load: bool,
18}
19
20pub async fn handle_yjs_update<N: NetworkNode>(
21 meta: YjsUpdateMeta,
22 node: &mut N,
23 storage: &SqliteStorage,
24 doc: &Doc,
25 map: &MapRef,
26 state: &mut CoordinatorState,
27 event_tx: &BroadcastSender<CoordinatorEvent>,
28) {
29 if meta.is_clear {
32 state.messages.clear();
33 state.seen_message_ids.clear();
34 emit_messages_state(state, event_tx).await;
35 return;
36 }
37
38 if !meta.is_load {
41 if let Some(ref room_name) = state.room_name {
43 let now = web_time::SystemTime::now()
44 .duration_since(web_time::UNIX_EPOCH)
45 .unwrap_or_default()
46 .as_secs() as i64;
47 let _ = storage.insert_yjs_update(room_name, &meta.update_bytes, now);
48 }
49 }
50
51 if !meta.is_remote {
53 let data_base64 = STANDARD.encode(&meta.update_bytes);
54 let request = HistoryRequest::Update { data_base64 };
55 let data = match serde_json::to_vec(&request) {
56 Ok(d) => d,
57 Err(e) => {
58 tracing::warn!("Failed to serialize update request: {}", e);
59 return;
60 }
61 };
62 if let Err(e) = node.publish_message(data) {
63 tracing::warn!("Failed to publish message to gossipsub: {}", e);
64 }
65 }
66
67 {
69 let txn = doc.transact();
70 for (key, out) in map.iter(&txn) {
71 let id = key.to_string();
72 if !state.seen_message_ids.contains(&id)
73 && let Ok(msg) = message_from_any(&out)
74 {
75 state.seen_message_ids.insert(id);
76 state.messages.push(msg);
77 }
78 }
79 }
80
81 emit_messages_state(state, event_tx).await;
83}
84
85pub fn apply_remote_update(doc: &Doc, bytes: &[u8]) {
86 if let Ok(update) = Update::decode_v2(bytes) {
87 let mut txn = doc.transact_mut_with("remote");
88 let _ = txn.apply_update(update);
89 }
90}
91
92pub fn read_messages(doc: &Doc, map: &MapRef) -> Vec<ChatMessage> {
94 let txn = doc.transact();
95 let mut messages = Vec::new();
96 for (_key, out) in map.iter(&txn) {
97 if let Ok(msg) = message_from_any(&out) {
98 messages.push(msg);
99 }
100 }
101 messages
102}
103
104pub async fn emit_messages_state(
106 state: &mut CoordinatorState,
107 event_tx: &BroadcastSender<CoordinatorEvent>,
108) {
109 if let Err(e) = event_tx.try_broadcast(CoordinatorEvent::MessagesUpdated(state.messages.clone())) {
110 tracing::warn!("Failed to broadcast MessagesUpdated: {}", e);
111 }
112}
113
114pub fn load_room_updates(
115 doc: &Doc,
116 storage: &SqliteStorage,
117 room_name: &str,
118) -> ChatResult<()> {
119 let blobs = storage.get_yjs_updates(room_name)?;
120 let mut txn = doc.transact_mut_with("load");
121 for blob in blobs {
122 if let Ok(update) = Update::decode_v2(&blob) {
124 let _ = txn.apply_update(update);
125 } else if let Ok(update) = Update::decode_v1(&blob) {
126 let _ = txn.apply_update(update);
127 }
128 }
129 Ok(())
130}