1mod api;
2mod config;
3
4use std::collections::HashSet;
5use std::net::SocketAddr;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use network_libp2p::NodeOptions;
10use clap::Parser;
11use tokio::sync::{mpsc, Mutex};
12use api::{AppState, NodeCommand};
13use config::HeadlessConfig;
14
15#[derive(Parser, Debug)]
16#[command(name = "chat-headless")]
17#[command(about = "Headless P2Pandemonium daemon")]
18struct Cli {
19 #[arg(short, long)]
21 config: Option<PathBuf>,
22
23 #[arg(long)]
25 api_bind: Option<String>,
26
27 #[arg(long)]
29 data_dir: Option<PathBuf>,
30
31 #[arg(short, long)]
33 room: Vec<String>,
34}
35
36#[tokio::main]
37async fn main() -> anyhow::Result<()> {
38 tracing_subscriber::fmt()
39 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
40 .init();
41
42 let cli = Cli::parse();
43
44 let mut config = if let Some(path) = &cli.config {
45 config::load_config(path)?
46 } else {
47 HeadlessConfig::default()
48 };
49
50 if let Some(bind) = cli.api_bind {
51 config.api_bind = bind;
52 }
53 if let Some(dir) = cli.data_dir {
54 config.data_dir = dir;
55 }
56 for room in cli.room {
57 if !config.rooms.contains(&room) {
58 config.rooms.push(room);
59 }
60 }
61
62 let node_options = NodeOptions {
63 listen_addresses: config.listen_addresses.clone(),
64 bootstrap_peers: config.bootstrap_peers.clone(),
65 max_connections: config.max_connections,
66 dht_client_mode: !config.relay_server,
67 dht_discovery_enabled: true,
68 stun_servers: None,
69 };
70
71 let node = network_libp2p::build_node(node_options).await.map_err(|e| anyhow::anyhow!(e))?;
72 let local_peer_id = node.local_peer_id;
73
74 tracing::info!("Node started with peer id: {}", local_peer_id);
75
76 let data_dir = config.data_dir.clone();
78 let conn = chat_platform_native::storage::open_storage(
79 "chat",
80 Some(data_dir.to_str().unwrap_or(".")),
81 ).await?;
82 let storage = chat_core::storage::SqliteStorage::new(conn);
83 storage.init_schema()?;
84 tracing::info!("Storage initialized at {:?}", data_dir);
85
86 let (handle, mut events, loop_fut) = chat_core::coordinator::build(node, storage);
88
89 tokio::spawn(loop_fut.run());
91
92 tokio::spawn(async move {
94 while let Ok(event) = events.recv().await {
95 match event {
96 chat_core::coordinator::CoordinatorEvent::SystemMessage(msg) => {
97 tracing::info!("{}", msg);
98 }
99 chat_core::coordinator::CoordinatorEvent::ErrorMessage(msg) => {
100 tracing::error!("{}", msg);
101 }
102 chat_core::coordinator::CoordinatorEvent::PeerConnected { peer_id } => {
103 tracing::info!("Peer connected: {}", peer_id);
104 }
105 chat_core::coordinator::CoordinatorEvent::PeerDisconnected { peer_id } => {
106 tracing::info!("Peer disconnected: {}", peer_id);
107 }
108 _ => {
109 tracing::debug!("Coordinator event: {:?}", event);
110 }
111 }
112 }
113 });
114
115 let (cmd_tx, mut cmd_rx) = mpsc::channel::<NodeCommand>(32);
117 let joined_rooms = Arc::new(Mutex::new(HashSet::<String>::new()));
118
119 let coordinator_handle = handle.clone();
121 let rooms_clone = joined_rooms.clone();
122 tokio::spawn(async move {
123 while let Some(cmd) = cmd_rx.recv().await {
124 match cmd {
125 NodeCommand::JoinRoom { room } => {
126 coordinator_handle.join_room(room.clone()).await;
127 rooms_clone.lock().await.insert(room);
128 }
129 NodeCommand::LeaveRoom { room } => {
130 coordinator_handle.leave_room().await;
131 rooms_clone.lock().await.remove(&room);
132 }
133 NodeCommand::SendMessage { content } => {
134 coordinator_handle.send_message(content).await;
135 }
136 NodeCommand::Shutdown => {
137 coordinator_handle.shutdown().await;
138 break;
139 }
140 }
141 }
142 });
143
144 for room in config.rooms {
146 handle.join_room(room.clone()).await;
147 joined_rooms.lock().await.insert(room);
148 }
149
150 let state = AppState {
151 cmd_tx: cmd_tx.clone(),
152 joined_rooms: joined_rooms.clone(),
153 };
154
155 let app = api::router(state);
157 let addr: SocketAddr = config.api_bind.parse()?;
158 tracing::info!("API server listening on {}", addr);
159
160 let listener = tokio::net::TcpListener::bind(addr).await?;
161 axum::serve(listener, app).await?;
162
163 let _ = cmd_tx.send(NodeCommand::Shutdown).await;
165
166 Ok(())
167}