1use crate::{Client, ClientError, util::resolve_to_socket_addr};
2use clap::{Parser, Subcommand};
3use std::{net::SocketAddr, path::PathBuf};
4use tempfile::TempDir;
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6use tokio::net::{TcpListener, TcpStream};
7use tracing::{error, info, warn};
8use zoe_wire_protocol::VerifyingKey;
9
10#[derive(Parser, Debug)]
11pub struct RelayClientArgs {
12 #[arg(
14 short,
15 long,
16 env = "ZOE_RELAY_ADDRESS",
17 default_value = "127.0.0.1:13908"
18 )]
19 pub relay_address: String,
20
21 #[arg(short, long, value_parser = parse_verifying_key, conflicts_with = "server_key_file")]
23 pub server_key: Option<VerifyingKey>,
24
25 #[arg(long, env = "ZOE_SERVER_KEY_FILE", conflicts_with = "server_key")]
27 pub server_key_file: Option<PathBuf>,
28
29 #[arg(short, long, conflicts_with = "ephemeral")]
30 pub persist_path: Option<PathBuf>,
31
32 #[arg(short, long, env = "ZOE_EPHEMERAL", conflicts_with = "persist_path")]
33 pub ephemeral: bool,
34
35 #[arg(long, env = "ZOE_HEALTH_CHECK_PORT")]
37 pub health_check_port: Option<u16>,
38}
39
40#[derive(Subcommand, Debug)]
41pub enum RelayClientDefaultCommands {
42 HealthCheck {
44 #[arg(long, env = "ZOE_HEALTH_CHECK_PORT", default_value = "8080")]
46 port: u16,
47 },
48}
49
50fn parse_verifying_key(hex_str: &str) -> Result<VerifyingKey, String> {
52 let hex = hex::decode(hex_str).map_err(|e| format!("Invalid hex string: {e}"))?;
53 let key: VerifyingKey = postcard::from_bytes(&hex).map_err(|e| format!("Invalid key: {e}"))?;
54 Ok(key)
55}
56
57pub async fn main_setup() -> Result<(), Box<dyn std::error::Error>> {
59 rustls::crypto::ring::default_provider()
61 .install_default()
62 .expect("Failed to install crypto provider");
63
64 tracing_subscriber::fmt()
66 .with_env_filter(
67 tracing_subscriber::EnvFilter::try_from_default_env()
68 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("full=info")),
69 )
70 .init();
71
72 Ok(())
73}
74
75pub async fn run_default_command(
76 cmd: &RelayClientDefaultCommands,
77) -> Result<(), Box<dyn std::error::Error>> {
78 match cmd {
79 RelayClientDefaultCommands::HealthCheck { port } => run_health_check_command(*port).await,
80 }
81}
82
83pub async fn full_cli_client(args: RelayClientArgs) -> Result<Client, ClientError> {
84 info!("🚀 Starting Zoe Client Connection Test");
85 info!("📍 Target server: {}", args.relay_address);
86
87 let server_addr: SocketAddr = match resolve_to_socket_addr(&args.relay_address).await {
88 Ok(addr) => addr,
89 Err(e) => {
90 error!("Invalid server address or failed to resolve: {e}");
91 std::process::exit(1);
92 }
93 };
94
95 let server_public_key = if let Some(file_path) = args.server_key_file {
97 info!("📖 Reading server public key from: {}", file_path.display());
98 let content = std::fs::read_to_string(&file_path).map_err(|e| {
99 ClientError::BuildError(format!(
100 "Failed to read key file {}: {e}",
101 file_path.display()
102 ))
103 })?;
104 VerifyingKey::from_pem(&content).map_err(|e| {
105 ClientError::BuildError(format!(
106 "Failed to parse key file {}: {e}",
107 file_path.display()
108 ))
109 })?
110 } else if let Some(key) = args.server_key {
111 key
112 } else {
113 error!("Must specify either --server-key or --server-key-file");
114 std::process::exit(1);
115 };
116
117 let mut builder = Client::builder();
118 builder.autoconnect(false);
120
121 if let Some(persist_path) = args.persist_path {
122 info!("💾 Using persistent storage at: {}", persist_path.display());
123 error!("persistence not yet implemented");
124 } else if !args.ephemeral {
125 error!("💾 Must specify either --persist-path or --ephemeral");
126 std::process::exit(1);
127 } else {
128 let temp_dir = TempDir::new()?;
131 info!(
134 "💾 Using temporary storage at: {}",
135 temp_dir.path().display()
136 );
137 let media_storage_path = temp_dir.path().join("blobs");
138 let db_storage_path = temp_dir.path().join("db");
139
140 info!("🔧 Building client...");
141
142 builder.media_storage_dir_pathbuf(media_storage_path);
144 builder.db_storage_dir_pathbuf(db_storage_path);
145 }
146
147 let client = builder.build().await?;
148
149 info!("🔗 Establishing relay connection...");
151 use zoe_app_primitives::RelayAddress;
152 let relay_address = RelayAddress::new(server_public_key)
153 .with_address(server_addr.into())
154 .with_name("CLI Server".to_string());
155
156 client.add_relay(relay_address).await?;
157
158 info!("⏳ Waiting for relay connection to be ready...");
160 let mut attempts = 0;
161 const MAX_ATTEMPTS: u32 = 50; while attempts < MAX_ATTEMPTS {
164 if client.has_connected_relays().await {
165 info!("✅ Relay connection established successfully");
166 break;
167 }
168 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
169 attempts += 1;
170 }
171
172 if attempts >= MAX_ATTEMPTS {
173 return Err(ClientError::Generic(
174 "Failed to establish relay connection within timeout".to_string(),
175 ));
176 }
177
178 Ok(client)
179}
180
181pub struct HealthCheckServer {
183 listener: TcpListener,
184 port: u16,
185}
186
187impl HealthCheckServer {
188 pub async fn new(port: u16) -> Result<Self, std::io::Error> {
190 let addr = SocketAddr::from(([127, 0, 0, 1], port));
191 let listener = TcpListener::bind(addr).await?;
192 info!("🏥 Health check server listening on {}", addr);
193 Ok(Self { listener, port })
194 }
195
196 pub fn port(&self) -> u16 {
198 self.port
199 }
200
201 pub async fn run(&self) -> Result<(), std::io::Error> {
203 loop {
204 match self.listener.accept().await {
205 Ok((stream, addr)) => {
206 tokio::spawn(async move {
207 if let Err(e) = handle_health_check_connection(stream).await {
208 warn!("Health check connection error from {}: {}", addr, e);
209 }
210 });
211 }
212 Err(e) => {
213 error!("Failed to accept health check connection: {}", e);
214 return Err(e);
215 }
216 }
217 }
218 }
219}
220
221async fn handle_health_check_connection(mut stream: TcpStream) -> Result<(), std::io::Error> {
223 let mut buffer = [0; 1024];
224
225 let bytes_read = stream.read(&mut buffer).await?;
227 if bytes_read == 0 {
228 return Ok(()); }
230
231 let request = String::from_utf8_lossy(&buffer[..bytes_read]);
232 let request = request.trim();
233
234 let response = match request {
236 "ping" => "pong\n",
237 "health" => "ok\n",
238 "status" => "running\n",
239 _ => "unknown\n",
240 };
241
242 stream.write_all(response.as_bytes()).await?;
244 stream.flush().await?;
245
246 Ok(())
247}
248
249pub async fn run_with_health_check<F, Fut>(
252 health_check_port: Option<u16>,
253 main_task: F,
254) -> Result<(), Box<dyn std::error::Error>>
255where
256 F: FnOnce() -> Fut,
257 Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error>>>,
258{
259 if let Some(port) = health_check_port {
260 let health_server = HealthCheckServer::new(port).await?;
262 info!("🏥 Health check enabled on port {}", port);
263
264 tokio::select! {
266 result = main_task() => {
267 info!("🛑 Main task completed");
268 result
269 }
270 result = health_server.run() => {
271 error!("🏥 Health check server stopped unexpectedly");
272 result.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
273 }
274 }
275 } else {
276 main_task().await
278 }
279}
280
281pub async fn health_check_ping(port: u16) -> Result<String, std::io::Error> {
283 let addr = SocketAddr::from(([127, 0, 0, 1], port));
284 let mut stream = TcpStream::connect(addr).await?;
285
286 stream.write_all(b"ping").await?;
288 stream.flush().await?;
289
290 let mut buffer = [0; 1024];
292 let bytes_read = stream.read(&mut buffer).await?;
293
294 Ok(String::from_utf8_lossy(&buffer[..bytes_read])
295 .trim()
296 .to_string())
297}
298
299pub async fn run_health_check_command(port: u16) -> Result<(), Box<dyn std::error::Error>> {
302 match health_check_ping(port).await {
303 Ok(response) => {
304 if response == "pong" {
305 println!("healthy");
306 std::process::exit(0);
307 } else {
308 eprintln!("unexpected response: {response}");
309 std::process::exit(1);
310 }
311 }
312 Err(e) => {
313 eprintln!("health check failed: {e}");
314 std::process::exit(1);
315 }
316 }
317}