chat_core/storage/
mod.rs

1use crate::error::ChatResult;
2use rusqlite::params;
3use std::sync::Mutex;
4use uuid::Uuid;
5
6/// SQLite-backed storage for chat data.
7pub struct SqliteStorage {
8    pub conn: Mutex<rusqlite::Connection>,
9}
10
11impl SqliteStorage {
12    pub fn new(conn: rusqlite::Connection) -> Self {
13        Self {
14            conn: Mutex::new(conn),
15        }
16    }
17
18    pub fn init_schema(&self) -> ChatResult<()> {
19        {
20            let conn = self.conn.lock().unwrap();
21            conn.execute_batch(
22                "CREATE TABLE IF NOT EXISTS settings (key TEXT PRIMARY KEY, value BLOB);
23                 CREATE TABLE IF NOT EXISTS yjs_updates (room_name TEXT, update_blob BLOB, timestamp INTEGER);
24                 CREATE TABLE IF NOT EXISTS room_bootstrap_peers (
25                     room_name TEXT NOT NULL,
26                     peer_ticket TEXT NOT NULL,
27                     added_at INTEGER,
28                     PRIMARY KEY (room_name, peer_ticket)
29                 );"
30            )?;
31        }
32        let version = crate::config::SCHEMA_VERSION.to_string();
33        self.set_setting("__schemaVersion", version.as_bytes())?;
34        Ok(())
35    }
36
37    pub fn get_setting(&self, key: &str) -> ChatResult<Option<Vec<u8>>> {
38        let conn = self.conn.lock().unwrap();
39        let mut stmt = conn.prepare("SELECT value FROM settings WHERE key = ?1")?;
40        let result = stmt.query_row([key], |row| row.get::<_, Vec<u8>>(0));
41        match result {
42            Ok(v) => Ok(Some(v)),
43            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
44            Err(e) => Err(e.into()),
45        }
46    }
47
48    pub fn set_setting(&self, key: &str, value: &[u8]) -> ChatResult<()> {
49        let conn = self.conn.lock().unwrap();
50        conn.execute(
51            "INSERT OR REPLACE INTO settings (key, value) VALUES (?1, ?2)",
52            params![key, value],
53        )?;
54        Ok(())
55    }
56
57    pub fn get_stable_sender_id(&self) -> ChatResult<String> {
58        let id: Option<String> = self.get_setting("stableSenderId")?
59            .map(|bytes| String::from_utf8(bytes).unwrap_or_default())
60            .filter(|s| !s.is_empty());
61
62        match id {
63            Some(id) => Ok(id),
64            None => {
65                let new_id = Uuid::new_v4().to_string();
66                self.set_stable_sender_id(&new_id)?;
67                Ok(new_id)
68            }
69        }
70    }
71
72    pub fn set_stable_sender_id(&self, id: &str) -> ChatResult<()> {
73        self.set_setting("stableSenderId", id.as_bytes())
74    }
75
76    pub fn get_seq(&self, room_name: &str) -> ChatResult<i64> {
77        let key = format!("seq:{}", room_name);
78        match self.get_setting(&key)? {
79            Some(bytes) => {
80                let s = String::from_utf8(bytes)?;
81                Ok(s.parse::<i64>().unwrap_or(0))
82            }
83            None => Ok(0),
84        }
85    }
86
87    pub fn set_seq(&self, room_name: &str, seq: i64) -> ChatResult<()> {
88        let key = format!("seq:{}", room_name);
89        self.set_setting(&key, seq.to_string().as_bytes())
90    }
91
92    pub fn clear_room(&self, room_name: &str) -> ChatResult<()> {
93        let conn = self.conn.lock().unwrap();
94        conn.execute(
95            "DELETE FROM yjs_updates WHERE room_name = ?1",
96            params![room_name],
97        )?;
98        Ok(())
99    }
100
101    pub fn clear_all(&self) -> ChatResult<()> {
102        let schema_version = self.get_setting("__schemaVersion")?;
103
104        let conn = self.conn.lock().unwrap();
105        conn.execute("DELETE FROM settings", [])?;
106        conn.execute("DELETE FROM yjs_updates", [])?;
107        conn.execute("DELETE FROM room_bootstrap_peers", [])?;
108        drop(conn);
109
110        if let Some(value) = schema_version {
111            self.set_setting("__schemaVersion", &value)?;
112        }
113        Ok(())
114    }
115
116    pub fn insert_yjs_update(&self, room_name: &str, blob: &[u8], timestamp: i64) -> ChatResult<()> {
117        let conn = self.conn.lock().unwrap();
118        conn.execute(
119            "INSERT INTO yjs_updates (room_name, update_blob, timestamp) VALUES (?1, ?2, ?3)",
120            params![room_name, blob, timestamp],
121        )?;
122        Ok(())
123    }
124
125    pub fn get_yjs_updates(&self, room_name: &str) -> ChatResult<Vec<Vec<u8>>> {
126        let conn = self.conn.lock().unwrap();
127        let mut stmt = conn.prepare(
128            "SELECT update_blob FROM yjs_updates WHERE room_name = ?1 ORDER BY timestamp ASC"
129        )?;
130        let blobs = stmt
131            .query_map(params![room_name], |row| row.get::<_, Vec<u8>>(0))?
132            .collect::<Result<Vec<_>, _>>()?;
133        Ok(blobs)
134    }
135
136    /// Save a bootstrap peer ticket for a room.
137    pub fn save_bootstrap_peer(&self, room_name: &str, peer_ticket: &str) -> ChatResult<()> {
138        let conn = self.conn.lock().unwrap();
139        let now = web_time::SystemTime::now()
140            .duration_since(web_time::SystemTime::UNIX_EPOCH)
141            .unwrap_or_default()
142            .as_secs() as i64;
143        conn.execute(
144            "INSERT OR REPLACE INTO room_bootstrap_peers (room_name, peer_ticket, added_at) VALUES (?1, ?2, ?3)",
145            params![room_name, peer_ticket, now],
146        )?;
147        Ok(())
148    }
149
150    /// Load all bootstrap peer tickets for a room.
151    pub fn load_bootstrap_peers(&self, room_name: &str) -> ChatResult<Vec<String>> {
152        let conn = self.conn.lock().unwrap();
153        let mut stmt = conn.prepare(
154            "SELECT peer_ticket FROM room_bootstrap_peers WHERE room_name = ?1 ORDER BY added_at DESC"
155        )?;
156        let peers = stmt
157            .query_map(params![room_name], |row| row.get::<_, String>(0))?
158            .collect::<Result<Vec<_>, _>>()?;
159        Ok(peers)
160    }
161
162    /// Remove a specific bootstrap peer from a room.
163    pub fn remove_bootstrap_peer(&self, room_name: &str, peer_ticket: &str) -> ChatResult<()> {
164        let conn = self.conn.lock().unwrap();
165        conn.execute(
166            "DELETE FROM room_bootstrap_peers WHERE room_name = ?1 AND peer_ticket = ?2",
167            params![room_name, peer_ticket],
168        )?;
169        Ok(())
170    }
171
172    /// Clear all bootstrap peers for a room.
173    pub fn clear_bootstrap_peers(&self, room_name: &str) -> ChatResult<()> {
174        let conn = self.conn.lock().unwrap();
175        conn.execute(
176            "DELETE FROM room_bootstrap_peers WHERE room_name = ?1",
177            params![room_name],
178        )?;
179        Ok(())
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186
187    fn in_memory_storage() -> SqliteStorage {
188        let conn = rusqlite::Connection::open_in_memory().unwrap();
189        let storage = SqliteStorage::new(conn);
190        storage.init_schema().unwrap();
191        storage
192    }
193
194    #[test]
195    fn schema_version_is_1() {
196        let s = in_memory_storage();
197        let version = s.get_setting("__schemaVersion").unwrap();
198        assert_eq!(version, Some("1".as_bytes().to_vec()));
199    }
200
201    #[test]
202    fn get_set_setting_round_trip() {
203        let s = in_memory_storage();
204        s.set_setting("myKey", b"myValue").unwrap();
205        let value = s.get_setting("myKey").unwrap();
206        assert_eq!(value, Some(b"myValue".to_vec()));
207    }
208
209    #[test]
210    fn get_setting_returns_none_for_missing_key() {
211        let s = in_memory_storage();
212        let value = s.get_setting("missing").unwrap();
213        assert_eq!(value, None);
214    }
215
216    #[test]
217    fn get_stable_sender_id_generates_uuid_on_first_call() {
218        let s = in_memory_storage();
219        let id = s.get_stable_sender_id().unwrap();
220        assert!(!id.is_empty());
221        assert!(id.contains('-'));
222    }
223
224    #[test]
225    fn get_stable_sender_id_returns_same_uuid_on_subsequent_calls() {
226        let s = in_memory_storage();
227        let id1 = s.get_stable_sender_id().unwrap();
228        let id2 = s.get_stable_sender_id().unwrap();
229        assert_eq!(id1, id2);
230    }
231
232    #[test]
233    fn set_stable_sender_id_overwrites() {
234        let s = in_memory_storage();
235        s.set_stable_sender_id("custom-id").unwrap();
236        let id = s.get_stable_sender_id().unwrap();
237        assert_eq!(id, "custom-id");
238    }
239
240    #[test]
241    fn get_seq_returns_0_by_default() {
242        let s = in_memory_storage();
243        let seq = s.get_seq("room-a").unwrap();
244        assert_eq!(seq, 0);
245    }
246
247    #[test]
248    fn get_seq_set_seq_round_trip() {
249        let s = in_memory_storage();
250        s.set_seq("room-a", 42).unwrap();
251        let seq = s.get_seq("room-a").unwrap();
252        assert_eq!(seq, 42);
253    }
254
255    #[test]
256    fn seq_values_are_isolated_per_room() {
257        let s = in_memory_storage();
258        s.set_seq("room-a", 3).unwrap();
259        s.set_seq("room-b", 7).unwrap();
260        assert_eq!(s.get_seq("room-a").unwrap(), 3);
261        assert_eq!(s.get_seq("room-b").unwrap(), 7);
262    }
263
264    #[test]
265    fn clear_room_removes_only_target_room() {
266        let s = in_memory_storage();
267        s.insert_yjs_update("room-a", b"update1", 1000).unwrap();
268        s.insert_yjs_update("room-b", b"update2", 2000).unwrap();
269
270        s.clear_room("room-a").unwrap();
271
272        let conn = s.conn.lock().unwrap();
273        let mut stmt = conn.prepare("SELECT room_name FROM yjs_updates").unwrap();
274        let rooms: Vec<String> = stmt
275            .query_map([], |row| row.get(0))
276            .unwrap()
277            .collect::<Result<Vec<_>, _>>()
278            .unwrap();
279        assert_eq!(rooms, vec!["room-b"]);
280    }
281
282    #[test]
283    fn clear_room_is_no_op_for_missing_room() {
284        let s = in_memory_storage();
285        s.clear_room("nonexistent").unwrap();
286        let conn = s.conn.lock().unwrap();
287        let count: i64 = conn
288            .query_row("SELECT COUNT(*) FROM yjs_updates", [], |row| row.get(0))
289            .unwrap();
290        assert_eq!(count, 0);
291    }
292
293    #[test]
294    fn clear_all_clears_data_but_preserves_schema_version() {
295        let s = in_memory_storage();
296        s.set_setting("myKey", "myValue".as_bytes()).unwrap();
297        s.set_seq("some-room", 42).unwrap();
298        s.set_stable_sender_id("some-id").unwrap();
299
300        s.clear_all().unwrap();
301
302        assert_eq!(s.get_setting("myKey").unwrap(), None);
303        assert_eq!(s.get_seq("some-room").unwrap(), 0);
304
305        let version = s.get_setting("__schemaVersion").unwrap();
306        assert_eq!(version, Some("1".as_bytes().to_vec()));
307    }
308
309    #[test]
310    fn insert_yjs_update_and_get_yjs_updates_round_trip() {
311        let s = in_memory_storage();
312        s.insert_yjs_update("room-a", b"update1", 1000).unwrap();
313        s.insert_yjs_update("room-a", b"update2", 2000).unwrap();
314        s.insert_yjs_update("room-b", b"update3", 1500).unwrap();
315
316        let updates = s.get_yjs_updates("room-a").unwrap();
317        assert_eq!(updates.len(), 2);
318        assert_eq!(updates[0], b"update1".to_vec());
319        assert_eq!(updates[1], b"update2".to_vec());
320    }
321
322    #[test]
323    fn save_and_load_bootstrap_peers() {
324        let s = in_memory_storage();
325        s.save_bootstrap_peer("room-a", "ticket1").unwrap();
326        s.save_bootstrap_peer("room-a", "ticket2").unwrap();
327        s.save_bootstrap_peer("room-b", "ticket3").unwrap();
328
329        let peers_a = s.load_bootstrap_peers("room-a").unwrap();
330        assert_eq!(peers_a.len(), 2);
331        assert!(peers_a.contains(&"ticket1".to_string()));
332        assert!(peers_a.contains(&"ticket2".to_string()));
333
334        let peers_b = s.load_bootstrap_peers("room-b").unwrap();
335        assert_eq!(peers_b, vec!["ticket3"]);
336    }
337
338    #[test]
339    fn remove_bootstrap_peer() {
340        let s = in_memory_storage();
341        s.save_bootstrap_peer("room-a", "ticket1").unwrap();
342        s.save_bootstrap_peer("room-a", "ticket2").unwrap();
343        s.remove_bootstrap_peer("room-a", "ticket1").unwrap();
344
345        let peers = s.load_bootstrap_peers("room-a").unwrap();
346        assert_eq!(peers, vec!["ticket2"]);
347    }
348
349    #[test]
350    fn clear_bootstrap_peers() {
351        let s = in_memory_storage();
352        s.save_bootstrap_peer("room-a", "ticket1").unwrap();
353        s.save_bootstrap_peer("room-a", "ticket2").unwrap();
354        s.clear_bootstrap_peers("room-a").unwrap();
355
356        let peers = s.load_bootstrap_peers("room-a").unwrap();
357        assert!(peers.is_empty());
358    }
359
360    #[test]
361    fn clear_all_preserves_bootstrap_peers() {
362        let s = in_memory_storage();
363        s.save_bootstrap_peer("room-a", "ticket1").unwrap();
364
365        s.clear_all().unwrap();
366
367        let peers = s.load_bootstrap_peers("room-a").unwrap();
368        assert!(peers.is_empty());
369    }
370}