zoe_client/cli/
mod.rs

1use crate::{Client, ClientError, util::resolve_to_socket_addr};
2use clap::{Parser, Subcommand};
3use std::{net::SocketAddr, path::PathBuf};
4use tempfile::TempDir;
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6use tokio::net::{TcpListener, TcpStream};
7use tracing::{error, info, warn};
8use zoe_wire_protocol::VerifyingKey;
9
10#[derive(Parser, Debug)]
11pub struct RelayClientArgs {
12    /// Relay server address (e.g., "127.0.0.1:8080")
13    #[arg(
14        short,
15        long,
16        env = "ZOE_RELAY_ADDRESS",
17        default_value = "127.0.0.1:13908"
18    )]
19    pub relay_address: String,
20
21    /// Server public key in hex format
22    #[arg(short, long, value_parser = parse_verifying_key, conflicts_with = "server_key_file")]
23    pub server_key: Option<VerifyingKey>,
24
25    /// Path to file containing server public key in hex format
26    #[arg(long, env = "ZOE_SERVER_KEY_FILE", conflicts_with = "server_key")]
27    pub server_key_file: Option<PathBuf>,
28
29    #[arg(short, long, conflicts_with = "ephemeral")]
30    pub persist_path: Option<PathBuf>,
31
32    #[arg(short, long, env = "ZOE_EPHEMERAL", conflicts_with = "persist_path")]
33    pub ephemeral: bool,
34
35    /// Enable health check server on specified port
36    #[arg(long, env = "ZOE_HEALTH_CHECK_PORT")]
37    pub health_check_port: Option<u16>,
38}
39
40#[derive(Subcommand, Debug)]
41pub enum RelayClientDefaultCommands {
42    /// Perform health check (for Docker health checks)
43    HealthCheck {
44        /// Health check port (defaults to ZOE_HEALTH_CHECK_PORT env var or 8080)
45        #[arg(long, env = "ZOE_HEALTH_CHECK_PORT", default_value = "8080")]
46        port: u16,
47    },
48}
49
50/// Helper function to parse hex string to VerifyingKey (simplified for demo)
51fn parse_verifying_key(hex_str: &str) -> Result<VerifyingKey, String> {
52    let hex = hex::decode(hex_str).map_err(|e| format!("Invalid hex string: {e}"))?;
53    let key: VerifyingKey = postcard::from_bytes(&hex).map_err(|e| format!("Invalid key: {e}"))?;
54    Ok(key)
55}
56
57/// Common setup to be done in a client cli
58pub async fn main_setup() -> Result<(), Box<dyn std::error::Error>> {
59    // Initialize Rustls crypto provider before any TLS operations
60    rustls::crypto::ring::default_provider()
61        .install_default()
62        .expect("Failed to install crypto provider");
63
64    // Initialize logging
65    tracing_subscriber::fmt()
66        .with_env_filter(
67            tracing_subscriber::EnvFilter::try_from_default_env()
68                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("full=info")),
69        )
70        .init();
71
72    Ok(())
73}
74
75pub async fn run_default_command(
76    cmd: &RelayClientDefaultCommands,
77) -> Result<(), Box<dyn std::error::Error>> {
78    match cmd {
79        RelayClientDefaultCommands::HealthCheck { port } => run_health_check_command(*port).await,
80    }
81}
82
83pub async fn full_cli_client(args: RelayClientArgs) -> Result<Client, ClientError> {
84    info!("🚀 Starting Zoe Client Connection Test");
85    info!("📍 Target server: {}", args.relay_address);
86
87    let server_addr: SocketAddr = match resolve_to_socket_addr(&args.relay_address).await {
88        Ok(addr) => addr,
89        Err(e) => {
90            error!("Invalid server address or failed to resolve: {e}");
91            std::process::exit(1);
92        }
93    };
94
95    // Get server public key from either direct argument or file
96    let server_public_key = if let Some(file_path) = args.server_key_file {
97        info!("📖 Reading server public key from: {}", file_path.display());
98        let content = std::fs::read_to_string(&file_path).map_err(|e| {
99            ClientError::BuildError(format!(
100                "Failed to read key file {}: {e}",
101                file_path.display()
102            ))
103        })?;
104        VerifyingKey::from_pem(&content).map_err(|e| {
105            ClientError::BuildError(format!(
106                "Failed to parse key file {}: {e}",
107                file_path.display()
108            ))
109        })?
110    } else if let Some(key) = args.server_key {
111        key
112    } else {
113        error!("Must specify either --server-key or --server-key-file");
114        std::process::exit(1);
115    };
116
117    let mut builder = Client::builder();
118    // Don't use autoconnect - we'll manually establish the connection to ensure it's ready
119    builder.autoconnect(false);
120
121    if let Some(persist_path) = args.persist_path {
122        info!("💾 Using persistent storage at: {}", persist_path.display());
123        error!("persistence not yet implemented");
124    } else if !args.ephemeral {
125        error!("💾 Must specify either --persist-path or --ephemeral");
126        std::process::exit(1);
127    } else {
128        // ephemeral mode
129
130        let temp_dir = TempDir::new()?;
131        // Create temporary directories for storage
132
133        info!(
134            "💾 Using temporary storage at: {}",
135            temp_dir.path().display()
136        );
137        let media_storage_path = temp_dir.path().join("blobs");
138        let db_storage_path = temp_dir.path().join("db");
139
140        info!("🔧 Building client...");
141
142        // Build the client
143        builder.media_storage_dir_pathbuf(media_storage_path);
144        builder.db_storage_dir_pathbuf(db_storage_path);
145    }
146
147    let client = builder.build().await?;
148
149    // Now manually establish the relay connection and wait for it to be ready
150    info!("🔗 Establishing relay connection...");
151    use zoe_app_primitives::RelayAddress;
152    let relay_address = RelayAddress::new(server_public_key)
153        .with_address(server_addr.into())
154        .with_name("CLI Server".to_string());
155
156    client.add_relay(relay_address).await?;
157
158    // Wait for the connection to be established
159    info!("⏳ Waiting for relay connection to be ready...");
160    let mut attempts = 0;
161    const MAX_ATTEMPTS: u32 = 50; // 5 seconds total (50 * 100ms)
162
163    while attempts < MAX_ATTEMPTS {
164        if client.has_connected_relays().await {
165            info!("✅ Relay connection established successfully");
166            break;
167        }
168        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
169        attempts += 1;
170    }
171
172    if attempts >= MAX_ATTEMPTS {
173        return Err(ClientError::Generic(
174            "Failed to establish relay connection within timeout".to_string(),
175        ));
176    }
177
178    Ok(client)
179}
180
181/// Health check server that responds to ping requests
182pub struct HealthCheckServer {
183    listener: TcpListener,
184    port: u16,
185}
186
187impl HealthCheckServer {
188    /// Create a new health check server on the specified port
189    pub async fn new(port: u16) -> Result<Self, std::io::Error> {
190        let addr = SocketAddr::from(([127, 0, 0, 1], port));
191        let listener = TcpListener::bind(addr).await?;
192        info!("🏥 Health check server listening on {}", addr);
193        Ok(Self { listener, port })
194    }
195
196    /// Get the port the server is listening on
197    pub fn port(&self) -> u16 {
198        self.port
199    }
200
201    /// Run the health check server
202    pub async fn run(&self) -> Result<(), std::io::Error> {
203        loop {
204            match self.listener.accept().await {
205                Ok((stream, addr)) => {
206                    tokio::spawn(async move {
207                        if let Err(e) = handle_health_check_connection(stream).await {
208                            warn!("Health check connection error from {}: {}", addr, e);
209                        }
210                    });
211                }
212                Err(e) => {
213                    error!("Failed to accept health check connection: {}", e);
214                    return Err(e);
215                }
216            }
217        }
218    }
219}
220
221/// Handle a single health check connection
222async fn handle_health_check_connection(mut stream: TcpStream) -> Result<(), std::io::Error> {
223    let mut buffer = [0; 1024];
224
225    // Read the request
226    let bytes_read = stream.read(&mut buffer).await?;
227    if bytes_read == 0 {
228        return Ok(()); // Connection closed
229    }
230
231    let request = String::from_utf8_lossy(&buffer[..bytes_read]);
232    let request = request.trim();
233
234    // Simple protocol: respond to "ping" with "pong"
235    let response = match request {
236        "ping" => "pong\n",
237        "health" => "ok\n",
238        "status" => "running\n",
239        _ => "unknown\n",
240    };
241
242    // Send response
243    stream.write_all(response.as_bytes()).await?;
244    stream.flush().await?;
245
246    Ok(())
247}
248
249/// Run a bot with health check support
250/// This function combines the main bot logic with a health check server
251pub async fn run_with_health_check<F, Fut>(
252    health_check_port: Option<u16>,
253    main_task: F,
254) -> Result<(), Box<dyn std::error::Error>>
255where
256    F: FnOnce() -> Fut,
257    Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error>>>,
258{
259    if let Some(port) = health_check_port {
260        // Create health check server
261        let health_server = HealthCheckServer::new(port).await?;
262        info!("🏥 Health check enabled on port {}", port);
263
264        // Run both the main task and health check server concurrently
265        tokio::select! {
266            result = main_task() => {
267                info!("🛑 Main task completed");
268                result
269            }
270            result = health_server.run() => {
271                error!("🏥 Health check server stopped unexpectedly");
272                result.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
273            }
274        }
275    } else {
276        // Run only the main task
277        main_task().await
278    }
279}
280
281/// Simple health check client for testing
282pub async fn health_check_ping(port: u16) -> Result<String, std::io::Error> {
283    let addr = SocketAddr::from(([127, 0, 0, 1], port));
284    let mut stream = TcpStream::connect(addr).await?;
285
286    // Send ping
287    stream.write_all(b"ping").await?;
288    stream.flush().await?;
289
290    // Read response
291    let mut buffer = [0; 1024];
292    let bytes_read = stream.read(&mut buffer).await?;
293
294    Ok(String::from_utf8_lossy(&buffer[..bytes_read])
295        .trim()
296        .to_string())
297}
298
299/// Perform a health check and exit with appropriate code
300/// This is designed to be used as a Docker health check command
301pub async fn run_health_check_command(port: u16) -> Result<(), Box<dyn std::error::Error>> {
302    match health_check_ping(port).await {
303        Ok(response) => {
304            if response == "pong" {
305                println!("healthy");
306                std::process::exit(0);
307            } else {
308                eprintln!("unexpected response: {response}");
309                std::process::exit(1);
310            }
311        }
312        Err(e) => {
313            eprintln!("health check failed: {e}");
314            std::process::exit(1);
315        }
316    }
317}