chat_core/sync/
scheduler.rs

1use std::collections::{BinaryHeap, HashSet};
2use std::cmp::Ordering;
3use std::time::Duration;
4
5use crate::network::PeerId;
6
7/// Target number of sync neighbors for epidemic propagation.
8pub const TARGET_NEIGHBORS: usize = 6;
9/// Minimum number of sync neighbors before we actively seek more.
10pub const MIN_NEIGHBORS: usize = 3;
11
12/// Base interval for periodic state-vector sync with each neighbor.
13pub const SYNC_INTERVAL_BASE_SECS: u64 = 30;
14/// Jitter range for sync timing (± this many seconds).
15pub const SYNC_INTERVAL_JITTER_SECS: u64 = 5;
16
17/// Entry in the sync-heap: min-heap by next_time.
18#[derive(Debug, Clone, Eq, PartialEq)]
19pub struct SyncEntry {
20    pub next_time: web_time::Instant,
21    pub peer_id: PeerId,
22}
23
24impl Ord for SyncEntry {
25    fn cmp(&self, other: &Self) -> Ordering {
26        // Reverse for min-heap: earliest time first, then peer_id for determinism
27        other.next_time.cmp(&self.next_time)
28            .then_with(|| self.peer_id.as_str().cmp(other.peer_id.as_str()))
29    }
30}
31
32impl PartialOrd for SyncEntry {
33    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
34        Some(self.cmp(other))
35    }
36}
37
38pub struct SyncScheduler {
39    pub sync_neighbors: Vec<PeerId>,
40    pub available_peers: Vec<PeerId>,
41    pub sync_heap: BinaryHeap<SyncEntry>,
42}
43
44impl Default for SyncScheduler {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50impl SyncScheduler {
51    pub fn new() -> Self {
52        Self {
53            sync_neighbors: Vec::new(),
54            available_peers: Vec::new(),
55            sync_heap: BinaryHeap::new(),
56        }
57    }
58
59    pub fn schedule(&mut self, peer_id: &PeerId) {
60        let jitter_secs = rand::random::<u64>() % (2 * SYNC_INTERVAL_JITTER_SECS + 1);
61        let delay = Duration::from_secs(
62            SYNC_INTERVAL_BASE_SECS + jitter_secs.saturating_sub(SYNC_INTERVAL_JITTER_SECS)
63        );
64        self.sync_heap.push(SyncEntry {
65            next_time: web_time::Instant::now() + delay,
66            peer_id: peer_id.clone(),
67        });
68    }
69
70    pub fn add_peer(&mut self, peer_id: &PeerId) {
71        if self.sync_neighbors.len() < TARGET_NEIGHBORS && !self.sync_neighbors.iter().any(|p| p == peer_id) {
72            self.sync_neighbors.push(peer_id.clone());
73            self.schedule(peer_id);
74            tracing::debug!("Added {} to sync neighbors (now {})", peer_id, self.sync_neighbors.len());
75        } else {
76            self.available_peers.push(peer_id.clone());
77        }
78    }
79
80    pub fn remove_peer(&mut self, peer_id: &PeerId) {
81        self.sync_neighbors.retain(|p| p != peer_id);
82        self.available_peers.retain(|p| p != peer_id);
83        // Stale heap entries are skipped lazily when popped
84        self.replenish();
85    }
86
87    pub fn replenish(&mut self) {
88        while self.sync_neighbors.len() < MIN_NEIGHBORS && !self.available_peers.is_empty() {
89            if let Some(p) = self.available_peers.pop() {
90                self.sync_neighbors.push(p.clone());
91                self.schedule(&p);
92            }
93        }
94    }
95
96    pub fn next_due_delay(&self, now: web_time::Instant) -> Option<Duration> {
97        self.sync_heap.peek().and_then(|e| {
98            if e.next_time > now {
99                e.next_time.checked_duration_since(now)
100            } else {
101                Some(Duration::from_secs(0))
102            }
103        })
104    }
105
106    /// Pop all due sync entries, validate they are still neighbors and in the room,
107    /// reschedule valid ones, and return the list of peers to sync with.
108    pub fn pop_due(&mut self, now: web_time::Instant, room_peers: &HashSet<PeerId>) -> Vec<PeerId> {
109        let mut due = Vec::new();
110        while let Some(entry) = self.sync_heap.pop() {
111            if entry.next_time > now {
112                self.sync_heap.push(entry);
113                break;
114            }
115            if self.sync_neighbors.contains(&entry.peer_id) && room_peers.contains(&entry.peer_id) {
116                due.push(entry.peer_id.clone());
117                self.schedule(&entry.peer_id);
118            }
119            // If peer is not a valid sync neighbor or not in room, discard the entry.
120        }
121        due
122    }
123}