chat_core/sync/
scheduler.rs1use std::collections::{BinaryHeap, HashSet};
2use std::cmp::Ordering;
3use std::time::Duration;
4
5use crate::network::PeerId;
6
7pub const TARGET_NEIGHBORS: usize = 6;
9pub const MIN_NEIGHBORS: usize = 3;
11
12pub const SYNC_INTERVAL_BASE_SECS: u64 = 30;
14pub const SYNC_INTERVAL_JITTER_SECS: u64 = 5;
16
17#[derive(Debug, Clone, Eq, PartialEq)]
19pub struct SyncEntry {
20 pub next_time: web_time::Instant,
21 pub peer_id: PeerId,
22}
23
24impl Ord for SyncEntry {
25 fn cmp(&self, other: &Self) -> Ordering {
26 other.next_time.cmp(&self.next_time)
28 .then_with(|| self.peer_id.as_str().cmp(other.peer_id.as_str()))
29 }
30}
31
32impl PartialOrd for SyncEntry {
33 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
34 Some(self.cmp(other))
35 }
36}
37
38pub struct SyncScheduler {
39 pub sync_neighbors: Vec<PeerId>,
40 pub available_peers: Vec<PeerId>,
41 pub sync_heap: BinaryHeap<SyncEntry>,
42}
43
44impl Default for SyncScheduler {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50impl SyncScheduler {
51 pub fn new() -> Self {
52 Self {
53 sync_neighbors: Vec::new(),
54 available_peers: Vec::new(),
55 sync_heap: BinaryHeap::new(),
56 }
57 }
58
59 pub fn schedule(&mut self, peer_id: &PeerId) {
60 let jitter_secs = rand::random::<u64>() % (2 * SYNC_INTERVAL_JITTER_SECS + 1);
61 let delay = Duration::from_secs(
62 SYNC_INTERVAL_BASE_SECS + jitter_secs.saturating_sub(SYNC_INTERVAL_JITTER_SECS)
63 );
64 self.sync_heap.push(SyncEntry {
65 next_time: web_time::Instant::now() + delay,
66 peer_id: peer_id.clone(),
67 });
68 }
69
70 pub fn add_peer(&mut self, peer_id: &PeerId) {
71 if self.sync_neighbors.len() < TARGET_NEIGHBORS && !self.sync_neighbors.iter().any(|p| p == peer_id) {
72 self.sync_neighbors.push(peer_id.clone());
73 self.schedule(peer_id);
74 tracing::debug!("Added {} to sync neighbors (now {})", peer_id, self.sync_neighbors.len());
75 } else {
76 self.available_peers.push(peer_id.clone());
77 }
78 }
79
80 pub fn remove_peer(&mut self, peer_id: &PeerId) {
81 self.sync_neighbors.retain(|p| p != peer_id);
82 self.available_peers.retain(|p| p != peer_id);
83 self.replenish();
85 }
86
87 pub fn replenish(&mut self) {
88 while self.sync_neighbors.len() < MIN_NEIGHBORS && !self.available_peers.is_empty() {
89 if let Some(p) = self.available_peers.pop() {
90 self.sync_neighbors.push(p.clone());
91 self.schedule(&p);
92 }
93 }
94 }
95
96 pub fn next_due_delay(&self, now: web_time::Instant) -> Option<Duration> {
97 self.sync_heap.peek().and_then(|e| {
98 if e.next_time > now {
99 e.next_time.checked_duration_since(now)
100 } else {
101 Some(Duration::from_secs(0))
102 }
103 })
104 }
105
106 pub fn pop_due(&mut self, now: web_time::Instant, room_peers: &HashSet<PeerId>) -> Vec<PeerId> {
109 let mut due = Vec::new();
110 while let Some(entry) = self.sync_heap.pop() {
111 if entry.next_time > now {
112 self.sync_heap.push(entry);
113 break;
114 }
115 if self.sync_neighbors.contains(&entry.peer_id) && room_peers.contains(&entry.peer_id) {
116 due.push(entry.peer_id.clone());
117 self.schedule(&entry.peer_id);
118 }
119 }
121 due
122 }
123}