chat_core/messages/
processor.rs

1use async_broadcast::Sender as BroadcastSender;
2use base64::{engine::general_purpose::STANDARD, Engine as _};
3use yrs::updates::decoder::Decode;
4use yrs::{Doc, Map, MapRef, Transact, Update};
5
6use crate::coordinator::{CoordinatorEvent, CoordinatorState};
7use crate::error::ChatResult;
8use crate::history::append::message_from_any;
9use crate::history::types::{ChatMessage, HistoryRequest};
10use crate::network::NetworkNode;
11use crate::storage::SqliteStorage;
12
13pub struct YjsUpdateMeta {
14    pub update_bytes: Vec<u8>,
15    pub is_remote: bool,
16    pub is_clear: bool,
17    pub is_load: bool,
18}
19
20pub async fn handle_yjs_update<N: NetworkNode>(
21    meta: YjsUpdateMeta,
22    node: &mut N,
23    storage: &SqliteStorage,
24    doc: &Doc,
25    map: &MapRef,
26    state: &mut CoordinatorState,
27    event_tx: &BroadcastSender<CoordinatorEvent>,
28) {
29    // A "clear" origin is used when switching rooms to wipe the doc map.
30    // We must not persist it or broadcast it.
31    if meta.is_clear {
32        state.messages.clear();
33        state.seen_message_ids.clear();
34        emit_messages_state(state, event_tx).await;
35        return;
36    }
37
38    // A "load" origin is used when loading persisted updates from SQLite.
39    // They are already stored, so we must not re-persist them.
40    if !meta.is_load {
41        // Persist update to storage
42        if let Some(ref room_name) = state.room_name {
43            let now = web_time::SystemTime::now()
44                .duration_since(web_time::UNIX_EPOCH)
45                .unwrap_or_default()
46                .as_secs() as i64;
47            let _ = storage.insert_yjs_update(room_name, &meta.update_bytes, now);
48        }
49    }
50
51    // If local update, publish to the room topic (pubsub / gossip).
52    if !meta.is_remote {
53        let data_base64 = STANDARD.encode(&meta.update_bytes);
54        let request = HistoryRequest::Update { data_base64 };
55        let data = match serde_json::to_vec(&request) {
56            Ok(d) => d,
57            Err(e) => {
58                tracing::warn!("Failed to serialize update request: {}", e);
59                return;
60            }
61        };
62        if let Err(e) = node.publish_message(data) {
63            tracing::warn!("Failed to publish message to gossipsub: {}", e);
64        }
65    }
66
67    // Scan for newly arrived messages and append them in insertion order.
68    {
69        let txn = doc.transact();
70        for (key, out) in map.iter(&txn) {
71            let id = key.to_string();
72            if !state.seen_message_ids.contains(&id)
73                && let Ok(msg) = message_from_any(&out)
74            {
75                state.seen_message_ids.insert(id);
76                state.messages.push(msg);
77            }
78        }
79    }
80
81    // Emit the updated message list
82    emit_messages_state(state, event_tx).await;
83}
84
85pub fn apply_remote_update(doc: &Doc, bytes: &[u8]) {
86    if let Ok(update) = Update::decode_v2(bytes) {
87        let mut txn = doc.transact_mut_with("remote");
88        let _ = txn.apply_update(update);
89    }
90}
91
92/// Read all messages from the Yjs map (used only for initial room load and send).
93pub fn read_messages(doc: &Doc, map: &MapRef) -> Vec<ChatMessage> {
94    let txn = doc.transact();
95    let mut messages = Vec::new();
96    for (_key, out) in map.iter(&txn) {
97        if let Ok(msg) = message_from_any(&out) {
98            messages.push(msg);
99        }
100    }
101    messages
102}
103
104/// Emit the current message list without blocking on a full broadcast channel.
105pub async fn emit_messages_state(
106    state: &mut CoordinatorState,
107    event_tx: &BroadcastSender<CoordinatorEvent>,
108) {
109    if let Err(e) = event_tx.try_broadcast(CoordinatorEvent::MessagesUpdated(state.messages.clone())) {
110        tracing::warn!("Failed to broadcast MessagesUpdated: {}", e);
111    }
112}
113
114pub fn load_room_updates(
115    doc: &Doc,
116    storage: &SqliteStorage,
117    room_name: &str,
118) -> ChatResult<()> {
119    let blobs = storage.get_yjs_updates(room_name)?;
120    let mut txn = doc.transact_mut_with("load");
121    for blob in blobs {
122        // Try v2 first (current format), fall back to v1 for backward compatibility.
123        if let Ok(update) = Update::decode_v2(&blob) {
124            let _ = txn.apply_update(update);
125        } else if let Ok(update) = Update::decode_v1(&blob) {
126            let _ = txn.apply_update(update);
127        }
128    }
129    Ok(())
130}