1use crate::error::ChatResult;
2use rusqlite::params;
3use std::sync::Mutex;
4use uuid::Uuid;
5
6pub struct SqliteStorage {
8 pub conn: Mutex<rusqlite::Connection>,
9}
10
11impl SqliteStorage {
12 pub fn new(conn: rusqlite::Connection) -> Self {
13 Self {
14 conn: Mutex::new(conn),
15 }
16 }
17
18 pub fn init_schema(&self) -> ChatResult<()> {
19 {
20 let conn = self.conn.lock().unwrap();
21 conn.execute_batch(
22 "CREATE TABLE IF NOT EXISTS settings (key TEXT PRIMARY KEY, value BLOB);
23 CREATE TABLE IF NOT EXISTS yjs_updates (room_name TEXT, update_blob BLOB, timestamp INTEGER);
24 CREATE TABLE IF NOT EXISTS room_bootstrap_peers (
25 room_name TEXT NOT NULL,
26 peer_ticket TEXT NOT NULL,
27 added_at INTEGER,
28 PRIMARY KEY (room_name, peer_ticket)
29 );"
30 )?;
31 }
32 let version = crate::config::SCHEMA_VERSION.to_string();
33 self.set_setting("__schemaVersion", version.as_bytes())?;
34 Ok(())
35 }
36
37 pub fn get_setting(&self, key: &str) -> ChatResult<Option<Vec<u8>>> {
38 let conn = self.conn.lock().unwrap();
39 let mut stmt = conn.prepare("SELECT value FROM settings WHERE key = ?1")?;
40 let result = stmt.query_row([key], |row| row.get::<_, Vec<u8>>(0));
41 match result {
42 Ok(v) => Ok(Some(v)),
43 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
44 Err(e) => Err(e.into()),
45 }
46 }
47
48 pub fn set_setting(&self, key: &str, value: &[u8]) -> ChatResult<()> {
49 let conn = self.conn.lock().unwrap();
50 conn.execute(
51 "INSERT OR REPLACE INTO settings (key, value) VALUES (?1, ?2)",
52 params![key, value],
53 )?;
54 Ok(())
55 }
56
57 pub fn get_stable_sender_id(&self) -> ChatResult<String> {
58 let id: Option<String> = self.get_setting("stableSenderId")?
59 .map(|bytes| String::from_utf8(bytes).unwrap_or_default())
60 .filter(|s| !s.is_empty());
61
62 match id {
63 Some(id) => Ok(id),
64 None => {
65 let new_id = Uuid::new_v4().to_string();
66 self.set_stable_sender_id(&new_id)?;
67 Ok(new_id)
68 }
69 }
70 }
71
72 pub fn set_stable_sender_id(&self, id: &str) -> ChatResult<()> {
73 self.set_setting("stableSenderId", id.as_bytes())
74 }
75
76 pub fn get_seq(&self, room_name: &str) -> ChatResult<i64> {
77 let key = format!("seq:{}", room_name);
78 match self.get_setting(&key)? {
79 Some(bytes) => {
80 let s = String::from_utf8(bytes)?;
81 Ok(s.parse::<i64>().unwrap_or(0))
82 }
83 None => Ok(0),
84 }
85 }
86
87 pub fn set_seq(&self, room_name: &str, seq: i64) -> ChatResult<()> {
88 let key = format!("seq:{}", room_name);
89 self.set_setting(&key, seq.to_string().as_bytes())
90 }
91
92 pub fn clear_room(&self, room_name: &str) -> ChatResult<()> {
93 let conn = self.conn.lock().unwrap();
94 conn.execute(
95 "DELETE FROM yjs_updates WHERE room_name = ?1",
96 params![room_name],
97 )?;
98 Ok(())
99 }
100
101 pub fn clear_all(&self) -> ChatResult<()> {
102 let schema_version = self.get_setting("__schemaVersion")?;
103
104 let conn = self.conn.lock().unwrap();
105 conn.execute("DELETE FROM settings", [])?;
106 conn.execute("DELETE FROM yjs_updates", [])?;
107 conn.execute("DELETE FROM room_bootstrap_peers", [])?;
108 drop(conn);
109
110 if let Some(value) = schema_version {
111 self.set_setting("__schemaVersion", &value)?;
112 }
113 Ok(())
114 }
115
116 pub fn insert_yjs_update(&self, room_name: &str, blob: &[u8], timestamp: i64) -> ChatResult<()> {
117 let conn = self.conn.lock().unwrap();
118 conn.execute(
119 "INSERT INTO yjs_updates (room_name, update_blob, timestamp) VALUES (?1, ?2, ?3)",
120 params![room_name, blob, timestamp],
121 )?;
122 Ok(())
123 }
124
125 pub fn get_yjs_updates(&self, room_name: &str) -> ChatResult<Vec<Vec<u8>>> {
126 let conn = self.conn.lock().unwrap();
127 let mut stmt = conn.prepare(
128 "SELECT update_blob FROM yjs_updates WHERE room_name = ?1 ORDER BY timestamp ASC"
129 )?;
130 let blobs = stmt
131 .query_map(params![room_name], |row| row.get::<_, Vec<u8>>(0))?
132 .collect::<Result<Vec<_>, _>>()?;
133 Ok(blobs)
134 }
135
136 pub fn save_bootstrap_peer(&self, room_name: &str, peer_ticket: &str) -> ChatResult<()> {
138 let conn = self.conn.lock().unwrap();
139 let now = web_time::SystemTime::now()
140 .duration_since(web_time::SystemTime::UNIX_EPOCH)
141 .unwrap_or_default()
142 .as_secs() as i64;
143 conn.execute(
144 "INSERT OR REPLACE INTO room_bootstrap_peers (room_name, peer_ticket, added_at) VALUES (?1, ?2, ?3)",
145 params![room_name, peer_ticket, now],
146 )?;
147 Ok(())
148 }
149
150 pub fn load_bootstrap_peers(&self, room_name: &str) -> ChatResult<Vec<String>> {
152 let conn = self.conn.lock().unwrap();
153 let mut stmt = conn.prepare(
154 "SELECT peer_ticket FROM room_bootstrap_peers WHERE room_name = ?1 ORDER BY added_at DESC"
155 )?;
156 let peers = stmt
157 .query_map(params![room_name], |row| row.get::<_, String>(0))?
158 .collect::<Result<Vec<_>, _>>()?;
159 Ok(peers)
160 }
161
162 pub fn remove_bootstrap_peer(&self, room_name: &str, peer_ticket: &str) -> ChatResult<()> {
164 let conn = self.conn.lock().unwrap();
165 conn.execute(
166 "DELETE FROM room_bootstrap_peers WHERE room_name = ?1 AND peer_ticket = ?2",
167 params![room_name, peer_ticket],
168 )?;
169 Ok(())
170 }
171
172 pub fn clear_bootstrap_peers(&self, room_name: &str) -> ChatResult<()> {
174 let conn = self.conn.lock().unwrap();
175 conn.execute(
176 "DELETE FROM room_bootstrap_peers WHERE room_name = ?1",
177 params![room_name],
178 )?;
179 Ok(())
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use super::*;
186
187 fn in_memory_storage() -> SqliteStorage {
188 let conn = rusqlite::Connection::open_in_memory().unwrap();
189 let storage = SqliteStorage::new(conn);
190 storage.init_schema().unwrap();
191 storage
192 }
193
194 #[test]
195 fn schema_version_is_1() {
196 let s = in_memory_storage();
197 let version = s.get_setting("__schemaVersion").unwrap();
198 assert_eq!(version, Some("1".as_bytes().to_vec()));
199 }
200
201 #[test]
202 fn get_set_setting_round_trip() {
203 let s = in_memory_storage();
204 s.set_setting("myKey", b"myValue").unwrap();
205 let value = s.get_setting("myKey").unwrap();
206 assert_eq!(value, Some(b"myValue".to_vec()));
207 }
208
209 #[test]
210 fn get_setting_returns_none_for_missing_key() {
211 let s = in_memory_storage();
212 let value = s.get_setting("missing").unwrap();
213 assert_eq!(value, None);
214 }
215
216 #[test]
217 fn get_stable_sender_id_generates_uuid_on_first_call() {
218 let s = in_memory_storage();
219 let id = s.get_stable_sender_id().unwrap();
220 assert!(!id.is_empty());
221 assert!(id.contains('-'));
222 }
223
224 #[test]
225 fn get_stable_sender_id_returns_same_uuid_on_subsequent_calls() {
226 let s = in_memory_storage();
227 let id1 = s.get_stable_sender_id().unwrap();
228 let id2 = s.get_stable_sender_id().unwrap();
229 assert_eq!(id1, id2);
230 }
231
232 #[test]
233 fn set_stable_sender_id_overwrites() {
234 let s = in_memory_storage();
235 s.set_stable_sender_id("custom-id").unwrap();
236 let id = s.get_stable_sender_id().unwrap();
237 assert_eq!(id, "custom-id");
238 }
239
240 #[test]
241 fn get_seq_returns_0_by_default() {
242 let s = in_memory_storage();
243 let seq = s.get_seq("room-a").unwrap();
244 assert_eq!(seq, 0);
245 }
246
247 #[test]
248 fn get_seq_set_seq_round_trip() {
249 let s = in_memory_storage();
250 s.set_seq("room-a", 42).unwrap();
251 let seq = s.get_seq("room-a").unwrap();
252 assert_eq!(seq, 42);
253 }
254
255 #[test]
256 fn seq_values_are_isolated_per_room() {
257 let s = in_memory_storage();
258 s.set_seq("room-a", 3).unwrap();
259 s.set_seq("room-b", 7).unwrap();
260 assert_eq!(s.get_seq("room-a").unwrap(), 3);
261 assert_eq!(s.get_seq("room-b").unwrap(), 7);
262 }
263
264 #[test]
265 fn clear_room_removes_only_target_room() {
266 let s = in_memory_storage();
267 s.insert_yjs_update("room-a", b"update1", 1000).unwrap();
268 s.insert_yjs_update("room-b", b"update2", 2000).unwrap();
269
270 s.clear_room("room-a").unwrap();
271
272 let conn = s.conn.lock().unwrap();
273 let mut stmt = conn.prepare("SELECT room_name FROM yjs_updates").unwrap();
274 let rooms: Vec<String> = stmt
275 .query_map([], |row| row.get(0))
276 .unwrap()
277 .collect::<Result<Vec<_>, _>>()
278 .unwrap();
279 assert_eq!(rooms, vec!["room-b"]);
280 }
281
282 #[test]
283 fn clear_room_is_no_op_for_missing_room() {
284 let s = in_memory_storage();
285 s.clear_room("nonexistent").unwrap();
286 let conn = s.conn.lock().unwrap();
287 let count: i64 = conn
288 .query_row("SELECT COUNT(*) FROM yjs_updates", [], |row| row.get(0))
289 .unwrap();
290 assert_eq!(count, 0);
291 }
292
293 #[test]
294 fn clear_all_clears_data_but_preserves_schema_version() {
295 let s = in_memory_storage();
296 s.set_setting("myKey", "myValue".as_bytes()).unwrap();
297 s.set_seq("some-room", 42).unwrap();
298 s.set_stable_sender_id("some-id").unwrap();
299
300 s.clear_all().unwrap();
301
302 assert_eq!(s.get_setting("myKey").unwrap(), None);
303 assert_eq!(s.get_seq("some-room").unwrap(), 0);
304
305 let version = s.get_setting("__schemaVersion").unwrap();
306 assert_eq!(version, Some("1".as_bytes().to_vec()));
307 }
308
309 #[test]
310 fn insert_yjs_update_and_get_yjs_updates_round_trip() {
311 let s = in_memory_storage();
312 s.insert_yjs_update("room-a", b"update1", 1000).unwrap();
313 s.insert_yjs_update("room-a", b"update2", 2000).unwrap();
314 s.insert_yjs_update("room-b", b"update3", 1500).unwrap();
315
316 let updates = s.get_yjs_updates("room-a").unwrap();
317 assert_eq!(updates.len(), 2);
318 assert_eq!(updates[0], b"update1".to_vec());
319 assert_eq!(updates[1], b"update2".to_vec());
320 }
321
322 #[test]
323 fn save_and_load_bootstrap_peers() {
324 let s = in_memory_storage();
325 s.save_bootstrap_peer("room-a", "ticket1").unwrap();
326 s.save_bootstrap_peer("room-a", "ticket2").unwrap();
327 s.save_bootstrap_peer("room-b", "ticket3").unwrap();
328
329 let peers_a = s.load_bootstrap_peers("room-a").unwrap();
330 assert_eq!(peers_a.len(), 2);
331 assert!(peers_a.contains(&"ticket1".to_string()));
332 assert!(peers_a.contains(&"ticket2".to_string()));
333
334 let peers_b = s.load_bootstrap_peers("room-b").unwrap();
335 assert_eq!(peers_b, vec!["ticket3"]);
336 }
337
338 #[test]
339 fn remove_bootstrap_peer() {
340 let s = in_memory_storage();
341 s.save_bootstrap_peer("room-a", "ticket1").unwrap();
342 s.save_bootstrap_peer("room-a", "ticket2").unwrap();
343 s.remove_bootstrap_peer("room-a", "ticket1").unwrap();
344
345 let peers = s.load_bootstrap_peers("room-a").unwrap();
346 assert_eq!(peers, vec!["ticket2"]);
347 }
348
349 #[test]
350 fn clear_bootstrap_peers() {
351 let s = in_memory_storage();
352 s.save_bootstrap_peer("room-a", "ticket1").unwrap();
353 s.save_bootstrap_peer("room-a", "ticket2").unwrap();
354 s.clear_bootstrap_peers("room-a").unwrap();
355
356 let peers = s.load_bootstrap_peers("room-a").unwrap();
357 assert!(peers.is_empty());
358 }
359
360 #[test]
361 fn clear_all_preserves_bootstrap_peers() {
362 let s = in_memory_storage();
363 s.save_bootstrap_peer("room-a", "ticket1").unwrap();
364
365 s.clear_all().unwrap();
366
367 let peers = s.load_bootstrap_peers("room-a").unwrap();
368 assert!(peers.is_empty());
369 }
370}