1use super::super::Client;
2use crate::error::Result;
3use crate::util::DEFAULT_PORT;
4use crate::{
5 ClientError, OverallConnectionStatus, RelayClient, RelayClientBuilder, RelayConnectionInfo,
6 RelayConnectionStatus, RelayInfo, RelayStatusUpdate,
7};
8use async_broadcast;
9use std::net::SocketAddr;
10use std::sync::Arc;
11use tokio::sync::oneshot;
12use tokio::task::JoinHandle;
13use tokio::time::Duration;
14use zoe_app_primitives::RelayAddress;
15use zoe_wire_protocol::{KeyId, VerifyingKey};
16
17#[cfg(feature = "frb-api")]
18use flutter_rust_bridge::frb;
19
20#[derive(Debug)]
26#[cfg_attr(feature = "frb-api", frb(ignore))]
27pub struct RelayConnectionHandle {
28 pub relay_id: KeyId,
30 pub relay_address: RelayAddress,
32 result_receiver: Option<oneshot::Receiver<Result<()>>>,
35 _task_handle: JoinHandle<()>,
37}
38
39#[cfg_attr(feature = "frb-api", frb(ignore))]
40impl RelayConnectionHandle {
41 pub fn try_result(&mut self) -> Result<Option<Result<()>>> {
48 if let Some(receiver) = &mut self.result_receiver {
49 match receiver.try_recv() {
50 Ok(result) => {
51 self.result_receiver = None; Ok(Some(result))
53 }
54 Err(oneshot::error::TryRecvError::Empty) => Ok(None),
55 Err(oneshot::error::TryRecvError::Closed) => {
56 self.result_receiver = None;
57 Ok(Some(Err(ClientError::Generic(
58 "Connection task was cancelled".to_string(),
59 ))))
60 }
61 }
62 } else {
63 Ok(None)
65 }
66 }
67
68 pub async fn await_result(mut self) -> Result<()> {
73 if let Some(receiver) = self.result_receiver.take() {
74 match receiver.await {
75 Ok(result) => result,
76 Err(_) => Err(ClientError::Generic(
77 "Connection task was cancelled".to_string(),
78 )),
79 }
80 } else {
81 Err(ClientError::Generic(
82 "Result was already consumed".to_string(),
83 ))
84 }
85 }
86
87 pub fn detach(mut self) {
92 self.result_receiver = None;
93 }
95}
96
97#[cfg_attr(feature = "frb-api", frb)]
99impl Client {
100 pub async fn add_relay(&self, address: RelayAddress) -> Result<()> {
106 let relay_id = address.id();
107
108 self.notify_relay_status_change(
110 relay_id,
111 address.clone(),
112 RelayConnectionStatus::Connecting,
113 )
114 .await;
115
116 match self.try_connect_to_relay_addresses(&address).await {
118 Ok((successful_addr, relay_client)) => {
119 let relay_info = RelayInfo {
120 relay_id,
121 relay_address: address.clone(),
122 };
123
124 {
126 let mut info_map = self.relay_info.write().await;
127 info_map.insert(
128 relay_id,
129 RelayConnectionInfo {
130 info: relay_info.clone(),
131 status: RelayConnectionStatus::Connected {
132 connected_address: successful_addr,
133 },
134 },
135 );
136 }
137
138 {
140 let mut connections = self.relay_connections.write().await;
141 connections.insert(relay_id, relay_client.clone());
142 }
143
144 let messages_manager = relay_client
146 .persistence_manager()
147 .await
148 .messages_manager()
149 .clone();
150 let blob_service = Arc::clone(relay_client.blob_service().await?);
151
152 self.message_manager
153 .add_relay(relay_id, messages_manager, true)
154 .await?;
155 self.blob_service.add_relay(relay_id, blob_service).await;
156
157 tracing::info!(
158 "Successfully connected to relay {} at address: {}",
159 hex::encode(relay_id.as_bytes()),
160 successful_addr
161 );
162
163 self.start_connection_monitoring(relay_id, relay_client.clone());
165
166 self.update_client_secret_state().await;
168 self.notify_relay_status_change(
169 relay_id,
170 address,
171 RelayConnectionStatus::Connected {
172 connected_address: successful_addr,
173 },
174 )
175 .await;
176
177 Ok(())
178 }
179 Err(connection_errors) => {
180 tracing::warn!(
181 "Failed to connect to relay {} at any address. Errors: {:?}",
182 hex::encode(relay_id.as_bytes()),
183 connection_errors
184 );
185
186 let error_summary = connection_errors
188 .iter()
189 .map(|(addr, err)| format!("{}: {}", addr, err))
190 .collect::<Vec<_>>()
191 .join("; ");
192
193 let relay_info = RelayInfo {
194 relay_id,
195 relay_address: address.clone(),
196 };
197
198 let failed_status = RelayConnectionStatus::Failed {
199 error: format!("All connection attempts failed: {}", error_summary),
200 };
201
202 {
204 let mut info_map = self.relay_info.write().await;
205 info_map.insert(
206 relay_id,
207 RelayConnectionInfo {
208 info: relay_info,
209 status: failed_status.clone(),
210 },
211 );
212 }
213
214 self.notify_relay_status_change(relay_id, address, failed_status)
218 .await;
219
220 Err(ClientError::Generic(format!(
221 "Failed to connect to relay at any address: {}",
222 error_summary
223 )))
224 }
225 }
226 }
227
228 #[cfg_attr(feature = "frb-api", frb(ignore))]
260 pub fn add_relay_background(&self, address: RelayAddress) -> RelayConnectionHandle {
261 let relay_id = address.id();
262 let client = self.clone();
263 let address_clone = address.clone();
264
265 let (result_sender, result_receiver) = oneshot::channel();
266
267 let task_handle = tokio::spawn(async move {
268 let result = client.add_relay(address_clone).await;
269
270 let _ = result_sender.send(result);
272 });
273
274 RelayConnectionHandle {
275 relay_id,
276 relay_address: address,
277 result_receiver: Some(result_receiver),
278 _task_handle: task_handle,
279 }
280 }
281
282 async fn try_connect_to_relay_addresses(
286 &self,
287 address: &RelayAddress,
288 ) -> std::result::Result<(SocketAddr, RelayClient), Vec<(String, ClientError)>> {
289 use rand::seq::SliceRandom;
290
291 let network_addresses: Vec<_> = address.all_addresses().iter().cloned().collect();
292 if network_addresses.is_empty() {
293 return Err(vec![(
294 "no addresses".to_string(),
295 ClientError::Generic("No addresses provided".to_string()),
296 )]);
297 }
298
299 let mut shuffled_addresses = network_addresses;
301 shuffled_addresses.shuffle(&mut rand::thread_rng());
302
303 let mut connection_errors = Vec::new();
304
305 for network_addr in shuffled_addresses {
306 tracing::debug!("Attempting to connect to relay at: {}", network_addr);
307
308 let socket_addr = match tokio::time::timeout(
310 Duration::from_secs(5), network_addr.resolve_to_socket_addr(DEFAULT_PORT),
312 )
313 .await
314 {
315 Ok(Ok(addr)) => addr,
316 Ok(Err(e)) => {
317 connection_errors.push((
318 network_addr.to_string(),
319 ClientError::Generic(format!("DNS resolution failed: {}", e)),
320 ));
321 continue;
322 }
323 Err(_) => {
324 connection_errors.push((
325 network_addr.to_string(),
326 ClientError::Generic("DNS resolution timeout".to_string()),
327 ));
328 continue;
329 }
330 };
331
332 match tokio::time::timeout(
334 Duration::from_secs(10), self.connect_to_relay(address.public_key.clone(), socket_addr),
336 )
337 .await
338 {
339 Ok(Ok(relay_client)) => {
340 tracing::info!("Successfully connected to relay at: {}", socket_addr);
341 return Ok((socket_addr, relay_client));
342 }
343 Ok(Err(e)) => {
344 connection_errors.push((network_addr.to_string(), e));
345 }
346 Err(_) => {
347 connection_errors.push((
348 network_addr.to_string(),
349 ClientError::Generic("Connection timeout".to_string()),
350 ));
351 }
352 }
353 }
354
355 Err(connection_errors)
356 }
357
358 pub async fn remove_relay(&self, server_public_key: VerifyingKey) -> Result<bool> {
360 let relay_id = server_public_key.id();
361
362 let relay_info = {
364 let info_map = self.relay_info.read().await;
365 info_map.get(&relay_id).map(|info| info.info.clone())
366 };
367
368 self.stop_connection_monitoring(relay_id).await;
370
371 self.message_manager.remove_relay(&relay_id).await;
373 self.blob_service.remove_relay(&relay_id).await;
374
375 let removed = {
377 let mut connections = self.relay_connections.write().await;
378 connections.remove(&relay_id)
379 };
380
381 let had_active_connection = removed.is_some();
382 if let Some(relay_client) = removed {
383 relay_client.close().await;
384 }
385
386 let had_relay_info = {
388 let mut info_map = self.relay_info.write().await;
389 if let Some(info) = info_map.get_mut(&relay_id) {
390 info.status = RelayConnectionStatus::Disconnected {
391 error: None, };
393 true
394 } else {
395 false
396 }
397 };
398
399 let was_removed = had_active_connection || had_relay_info;
400
401 tracing::info!(
402 "Removed relay connection: {}",
403 hex::encode(relay_id.as_bytes())
404 );
405
406 self.update_client_secret_state().await;
408
409 if let Some(info) = relay_info {
411 self.notify_relay_status_change(
412 relay_id,
413 info.relay_address,
414 RelayConnectionStatus::Disconnected {
415 error: None, },
417 )
418 .await;
419 }
420
421 Ok(was_removed)
422 }
423
424 pub async fn get_relay_status(&self) -> Result<Vec<RelayConnectionInfo>> {
426 let info_map = self.relay_info.read().await;
427 Ok(info_map.values().cloned().collect())
428 }
429
430 pub async fn has_connected_relays(&self) -> bool {
432 self.overall_status().await.is_connected
433 }
434
435 pub async fn reconnect_failed_relays(&self) -> Result<usize> {
437 let failed_relays: Vec<RelayInfo> = {
438 let info_map = self.relay_info.read().await;
439 info_map
440 .values()
441 .filter(|info| matches!(info.status, RelayConnectionStatus::Failed { .. }))
442 .map(|info| info.info.clone())
443 .collect()
444 };
445
446 let mut reconnected = 0;
447 for relay_info in failed_relays {
448 if self.add_relay(relay_info.relay_address).await.is_ok() {
450 reconnected += 1;
451 }
452 }
453
454 Ok(reconnected)
455 }
456
457 async fn connect_to_relay(
459 &self,
460 server_public_key: VerifyingKey,
461 server_addr: SocketAddr,
462 ) -> Result<RelayClient> {
463 let relay_client = RelayClientBuilder::new()
464 .server_public_key(server_public_key)
465 .server_address(server_addr)
466 .storage(Arc::clone(&self.storage))
467 .client_keypair(Arc::clone(&self.client_secret.inner_keypair))
468 .autosubscribe(true)
469 .build()
470 .await?;
471
472 Ok(relay_client)
473 }
474
475 pub async fn close(&self) {
476 {
478 let mut monitors = self.connection_monitors.write().await;
479 for (relay_id, monitor_task) in monitors.iter() {
480 tracing::debug!(
481 "Stopping connection monitor for relay: {}",
482 hex::encode(relay_id.as_bytes())
483 );
484 monitor_task.abort();
485 }
486 monitors.clear();
487 }
488
489 let relay_clients = {
491 let mut connections = self.relay_connections.write().await;
492 let clients: Vec<_> = connections.values().cloned().collect();
493 connections.clear();
494 clients
495 };
496
497 for relay_client in relay_clients {
498 relay_client.close().await;
499 }
500 }
501
502 pub fn overall_status_stream(&self) -> impl futures::Stream<Item = OverallConnectionStatus> {
508 let client = self.clone();
509 let relay_receiver = client.subscribe_to_relay_status();
510
511 async_stream::stream! {
512 let mut relay_receiver = relay_receiver;
513
514 let mut current_status = client.overall_status().await;
516 yield current_status.clone();
517
518 let mut relay_states = std::collections::BTreeMap::new();
520
521 while let Ok(update) = relay_receiver.recv().await {
523 let was_connected = relay_states.get(&update.relay_id)
525 .map(|status| matches!(status, RelayConnectionStatus::Connected { .. }))
526 .unwrap_or(false);
527
528 let is_now_connected = matches!(update.status, RelayConnectionStatus::Connected { .. });
529
530 relay_states.insert(update.relay_id, update.status);
532
533 if was_connected && !is_now_connected {
535 current_status.connected_count = current_status.connected_count.saturating_sub(1);
537 } else if !was_connected && is_now_connected {
538 current_status.connected_count += 1;
540 }
541
542 current_status.total_count = relay_states.len();
544
545 current_status.is_connected = current_status.connected_count > 0;
547
548 yield current_status.clone();
549 }
550 }
551 }
552 pub async fn overall_status(&self) -> OverallConnectionStatus {
558 let (connected_count, total_count) = {
559 let info_map = self.relay_info.read().await;
560 let connected = info_map
561 .values()
562 .filter(|info| matches!(info.status, RelayConnectionStatus::Connected { .. }))
563 .count();
564 let total = info_map
566 .values()
567 .filter(|info| !matches!(info.status, RelayConnectionStatus::Failed { .. }))
568 .count();
569 (connected, total)
570 };
571
572 OverallConnectionStatus {
573 is_connected: connected_count > 0,
574 connected_count,
575 total_count,
576 }
577 }
578
579 fn start_connection_monitoring(&self, relay_id: KeyId, relay_client: RelayClient) {
581 let client = self.clone();
582 let connection = relay_client.connection().clone();
583 let monitors = Arc::clone(&self.connection_monitors);
584
585 let monitor_task = tokio::spawn(async move {
586 let closed_future = connection.closed();
588 let connection_error = closed_future.await;
589
590 let error_msg = connection_error.to_string();
591 tracing::warn!(
592 "Relay connection lost for relay {}: {}",
593 hex::encode(relay_id.as_bytes()),
594 error_msg
595 );
596
597 let relay_address = {
599 let info_map = client.relay_info.read().await;
600 info_map
601 .get(&relay_id)
602 .map(|info| info.info.relay_address.clone())
603 };
604
605 if let Some(relay_address) = relay_address {
606 {
608 let mut info_map = client.relay_info.write().await;
609 if let Some(info) = info_map.get_mut(&relay_id) {
610 info.status = RelayConnectionStatus::Disconnected {
611 error: Some(error_msg.clone()),
612 };
613 }
614 }
615
616 {
618 let mut connections = client.relay_connections.write().await;
619 connections.remove(&relay_id);
620 }
621
622 client.message_manager.remove_relay(&relay_id).await;
624 client.blob_service.remove_relay(&relay_id).await;
625
626 client.update_client_secret_state().await;
628
629 client
631 .notify_relay_status_change(
632 relay_id,
633 relay_address.clone(),
634 RelayConnectionStatus::Disconnected {
635 error: Some(error_msg),
636 },
637 )
638 .await;
639
640 let reconnect_client = client.clone();
642 tokio::spawn(async move {
643 tokio::time::sleep(Duration::from_secs(5)).await;
644
645 tracing::info!(
646 "Attempting automatic reconnection to relay: {}",
647 hex::encode(relay_id.as_bytes())
648 );
649
650 if let Err(e) = reconnect_client.add_relay(relay_address).await {
651 tracing::warn!(
652 "Automatic reconnection failed for relay {}: {}",
653 hex::encode(relay_id.as_bytes()),
654 e
655 );
656 }
657 });
658 }
659 });
660
661 tokio::spawn(async move {
663 let mut monitor_map = monitors.write().await;
664 monitor_map.insert(relay_id, monitor_task);
665 });
666 }
667
668 async fn stop_connection_monitoring(&self, relay_id: KeyId) {
670 let mut monitors = self.connection_monitors.write().await;
671 if let Some(monitor_task) = monitors.remove(&relay_id) {
672 monitor_task.abort();
673 }
674 }
675
676 async fn notify_relay_status_change(
678 &self,
679 relay_id: KeyId,
680 relay_address: RelayAddress,
681 status: RelayConnectionStatus,
682 ) {
683 let status_update = RelayStatusUpdate {
684 relay_id,
685 relay_address,
686 status,
687 };
688
689 let _ = self.relay_status_sender.try_broadcast(status_update);
691 }
692}
693
694#[cfg_attr(feature = "frb-api", frb(ignore))]
695impl Client {
696 pub fn subscribe_to_relay_status(&self) -> async_broadcast::Receiver<RelayStatusUpdate> {
701 self.relay_status_sender.new_receiver()
702 }
703}