chat_headless/
main.rs

1mod api;
2mod config;
3
4use std::collections::HashSet;
5use std::net::SocketAddr;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use network_libp2p::NodeOptions;
10use clap::Parser;
11use tokio::sync::{mpsc, Mutex};
12use api::{AppState, NodeCommand};
13use config::HeadlessConfig;
14
15#[derive(Parser, Debug)]
16#[command(name = "chat-headless")]
17#[command(about = "Headless P2Pandemonium daemon")]
18struct Cli {
19    /// Path to config file
20    #[arg(short, long)]
21    config: Option<PathBuf>,
22
23    /// API bind address override
24    #[arg(long)]
25    api_bind: Option<String>,
26
27    /// Data directory override
28    #[arg(long)]
29    data_dir: Option<PathBuf>,
30
31    /// Room to join on startup (can be given multiple times)
32    #[arg(short, long)]
33    room: Vec<String>,
34}
35
36#[tokio::main]
37async fn main() -> anyhow::Result<()> {
38    tracing_subscriber::fmt()
39        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
40        .init();
41
42    let cli = Cli::parse();
43
44    let mut config = if let Some(path) = &cli.config {
45        config::load_config(path)?
46    } else {
47        HeadlessConfig::default()
48    };
49
50    if let Some(bind) = cli.api_bind {
51        config.api_bind = bind;
52    }
53    if let Some(dir) = cli.data_dir {
54        config.data_dir = dir;
55    }
56    for room in cli.room {
57        if !config.rooms.contains(&room) {
58            config.rooms.push(room);
59        }
60    }
61
62    let node_options = NodeOptions {
63        listen_addresses: config.listen_addresses.clone(),
64        bootstrap_peers: config.bootstrap_peers.clone(),
65        max_connections: config.max_connections,
66        dht_client_mode: !config.relay_server,
67        dht_discovery_enabled: true,
68        stun_servers: None,
69    };
70
71    let node = network_libp2p::build_node(node_options).await.map_err(|e| anyhow::anyhow!(e))?;
72    let local_peer_id = node.local_peer_id;
73
74    tracing::info!("Node started with peer id: {}", local_peer_id);
75
76    // Create storage and initialize schema
77    let data_dir = config.data_dir.clone();
78    let conn = chat_platform_native::storage::open_storage(
79        "chat",
80        Some(data_dir.to_str().unwrap_or(".")),
81    ).await?;
82    let storage = chat_core::storage::SqliteStorage::new(conn);
83    storage.init_schema()?;
84    tracing::info!("Storage initialized at {:?}", data_dir);
85
86    // Build coordinator
87    let (handle, mut events, loop_fut) = chat_core::coordinator::build(node, storage);
88
89    // Spawn the coordinator loop
90    tokio::spawn(loop_fut.run());
91
92    // Spawn event listener for logging / state updates
93    tokio::spawn(async move {
94        while let Ok(event) = events.recv().await {
95            match event {
96                chat_core::coordinator::CoordinatorEvent::SystemMessage(msg) => {
97                    tracing::info!("{}", msg);
98                }
99                chat_core::coordinator::CoordinatorEvent::ErrorMessage(msg) => {
100                    tracing::error!("{}", msg);
101                }
102                chat_core::coordinator::CoordinatorEvent::PeerConnected { peer_id } => {
103                    tracing::info!("Peer connected: {}", peer_id);
104                }
105                chat_core::coordinator::CoordinatorEvent::PeerDisconnected { peer_id } => {
106                    tracing::info!("Peer disconnected: {}", peer_id);
107                }
108                _ => {
109                    tracing::debug!("Coordinator event: {:?}", event);
110                }
111            }
112        }
113    });
114
115    // Command channel for REST API -> coordinator bridge
116    let (cmd_tx, mut cmd_rx) = mpsc::channel::<NodeCommand>(32);
117    let joined_rooms = Arc::new(Mutex::new(HashSet::<String>::new()));
118
119    // Spawn command translator: NodeCommand -> coordinator handle calls
120    let coordinator_handle = handle.clone();
121    let rooms_clone = joined_rooms.clone();
122    tokio::spawn(async move {
123        while let Some(cmd) = cmd_rx.recv().await {
124            match cmd {
125                NodeCommand::JoinRoom { room } => {
126                    coordinator_handle.join_room(room.clone()).await;
127                    rooms_clone.lock().await.insert(room);
128                }
129                NodeCommand::LeaveRoom { room } => {
130                    coordinator_handle.leave_room().await;
131                    rooms_clone.lock().await.remove(&room);
132                }
133                NodeCommand::SendMessage { content } => {
134                    coordinator_handle.send_message(content).await;
135                }
136                NodeCommand::Shutdown => {
137                    coordinator_handle.shutdown().await;
138                    break;
139                }
140            }
141        }
142    });
143
144    // Join initial rooms from config
145    for room in config.rooms {
146        handle.join_room(room.clone()).await;
147        joined_rooms.lock().await.insert(room);
148    }
149
150    let state = AppState {
151        cmd_tx: cmd_tx.clone(),
152        joined_rooms: joined_rooms.clone(),
153    };
154
155    // Start API server
156    let app = api::router(state);
157    let addr: SocketAddr = config.api_bind.parse()?;
158    tracing::info!("API server listening on {}", addr);
159
160    let listener = tokio::net::TcpListener::bind(addr).await?;
161    axum::serve(listener, app).await?;
162
163    // If API server exits, shut down coordinator
164    let _ = cmd_tx.send(NodeCommand::Shutdown).await;
165
166    Ok(())
167}