zoe_client/client/api/
relay.rs

1use super::super::Client;
2use crate::error::Result;
3use crate::util::DEFAULT_PORT;
4use crate::{
5    ClientError, OverallConnectionStatus, RelayClient, RelayClientBuilder, RelayConnectionInfo,
6    RelayConnectionStatus, RelayInfo, RelayStatusUpdate,
7};
8use async_broadcast;
9use std::net::SocketAddr;
10use std::sync::Arc;
11use tokio::sync::oneshot;
12use tokio::task::JoinHandle;
13use tokio::time::Duration;
14use zoe_app_primitives::RelayAddress;
15use zoe_wire_protocol::{KeyId, VerifyingKey};
16
17#[cfg(feature = "frb-api")]
18use flutter_rust_bridge::frb;
19
20/// Handle for a background relay connection attempt
21///
22/// This handle allows you to optionally poll the connection result, but the connection
23/// attempt continues in the background regardless of whether you poll it or not.
24/// The relay status will be updated via the normal status broadcast channels.
25#[derive(Debug)]
26#[cfg_attr(feature = "frb-api", frb(ignore))]
27pub struct RelayConnectionHandle {
28    /// The relay ID being connected to
29    pub relay_id: KeyId,
30    /// The relay address being connected to  
31    pub relay_address: RelayAddress,
32    /// Optional receiver for the final connection result
33    /// If you don't need the result, you can ignore this
34    result_receiver: Option<oneshot::Receiver<Result<()>>>,
35    /// Background task handle - kept to prevent the task from being dropped
36    _task_handle: JoinHandle<()>,
37}
38
39#[cfg_attr(feature = "frb-api", frb(ignore))]
40impl RelayConnectionHandle {
41    /// Poll for the connection result (non-blocking)
42    ///
43    /// Returns:
44    /// - `Ok(Some(result))` if the connection attempt has completed
45    /// - `Ok(None)` if the connection is still in progress
46    /// - `Err(_)` if there was an error polling (shouldn't happen normally)
47    pub fn try_result(&mut self) -> Result<Option<Result<()>>> {
48        if let Some(receiver) = &mut self.result_receiver {
49            match receiver.try_recv() {
50                Ok(result) => {
51                    self.result_receiver = None; // Consume the receiver
52                    Ok(Some(result))
53                }
54                Err(oneshot::error::TryRecvError::Empty) => Ok(None),
55                Err(oneshot::error::TryRecvError::Closed) => {
56                    self.result_receiver = None;
57                    Ok(Some(Err(ClientError::Generic(
58                        "Connection task was cancelled".to_string(),
59                    ))))
60                }
61            }
62        } else {
63            // Result was already consumed
64            Ok(None)
65        }
66    }
67
68    /// Wait for the connection result (blocking)
69    ///
70    /// This consumes the handle and waits for the background task to complete.
71    /// If you don't call this, the connection will still complete in the background.
72    pub async fn await_result(mut self) -> Result<()> {
73        if let Some(receiver) = self.result_receiver.take() {
74            match receiver.await {
75                Ok(result) => result,
76                Err(_) => Err(ClientError::Generic(
77                    "Connection task was cancelled".to_string(),
78                )),
79            }
80        } else {
81            Err(ClientError::Generic(
82                "Result was already consumed".to_string(),
83            ))
84        }
85    }
86
87    /// Detach from the result, allowing the connection to complete purely in the background
88    ///
89    /// After calling this, you can only track the connection status via the relay status
90    /// broadcast channels. The connection attempt will continue regardless.
91    pub fn detach(mut self) {
92        self.result_receiver = None;
93        // The task handle is kept alive until the handle is dropped
94    }
95}
96
97// Relay management methods (only available in offline mode)
98#[cfg_attr(feature = "frb-api", frb)]
99impl Client {
100    /// Add a relay server to the client
101    ///
102    /// This will attempt to connect to all addresses in the RelayAddress in random order
103    /// with a 10-second timeout per attempt. Only adds the relay to local state if a
104    /// connection succeeds.
105    pub async fn add_relay(&self, address: RelayAddress) -> Result<()> {
106        let relay_id = address.id();
107
108        // Notify about connecting status
109        self.notify_relay_status_change(
110            relay_id,
111            address.clone(),
112            RelayConnectionStatus::Connecting,
113        )
114        .await;
115
116        // Try to connect to any of the addresses
117        match self.try_connect_to_relay_addresses(&address).await {
118            Ok((successful_addr, relay_client)) => {
119                let relay_info = RelayInfo {
120                    relay_id,
121                    relay_address: address.clone(),
122                };
123
124                // Update relay info with successful connection
125                {
126                    let mut info_map = self.relay_info.write().await;
127                    info_map.insert(
128                        relay_id,
129                        RelayConnectionInfo {
130                            info: relay_info.clone(),
131                            status: RelayConnectionStatus::Connected {
132                                connected_address: successful_addr,
133                            },
134                        },
135                    );
136                }
137
138                // Add to connections
139                {
140                    let mut connections = self.relay_connections.write().await;
141                    connections.insert(relay_id, relay_client.clone());
142                }
143
144                // Add services to multi-relay managers
145                let messages_manager = relay_client
146                    .persistence_manager()
147                    .await
148                    .messages_manager()
149                    .clone();
150                let blob_service = Arc::clone(relay_client.blob_service().await?);
151
152                self.message_manager
153                    .add_relay(relay_id, messages_manager, true)
154                    .await?;
155                self.blob_service.add_relay(relay_id, blob_service).await;
156
157                tracing::info!(
158                    "Successfully connected to relay {} at address: {}",
159                    hex::encode(relay_id.as_bytes()),
160                    successful_addr
161                );
162
163                // Start connection monitoring for this relay
164                self.start_connection_monitoring(relay_id, relay_client.clone());
165
166                // Update observable states
167                self.update_client_secret_state().await;
168                self.notify_relay_status_change(
169                    relay_id,
170                    address,
171                    RelayConnectionStatus::Connected {
172                        connected_address: successful_addr,
173                    },
174                )
175                .await;
176
177                Ok(())
178            }
179            Err(connection_errors) => {
180                tracing::warn!(
181                    "Failed to connect to relay {} at any address. Errors: {:?}",
182                    hex::encode(relay_id.as_bytes()),
183                    connection_errors
184                );
185
186                // Add to relay info with failed status so we can track and retry later
187                let error_summary = connection_errors
188                    .iter()
189                    .map(|(addr, err)| format!("{}: {}", addr, err))
190                    .collect::<Vec<_>>()
191                    .join("; ");
192
193                let relay_info = RelayInfo {
194                    relay_id,
195                    relay_address: address.clone(),
196                };
197
198                let failed_status = RelayConnectionStatus::Failed {
199                    error: format!("All connection attempts failed: {}", error_summary),
200                };
201
202                // Store the failed relay info for future reconnection attempts
203                {
204                    let mut info_map = self.relay_info.write().await;
205                    info_map.insert(
206                        relay_id,
207                        RelayConnectionInfo {
208                            info: relay_info,
209                            status: failed_status.clone(),
210                        },
211                    );
212                }
213
214                // Don't update client secret for failed connections
215                // (only successful connections should be persisted)
216
217                self.notify_relay_status_change(relay_id, address, failed_status)
218                    .await;
219
220                Err(ClientError::Generic(format!(
221                    "Failed to connect to relay at any address: {}",
222                    error_summary
223                )))
224            }
225        }
226    }
227
228    /// Add a relay server to the client in the background
229    ///
230    /// This returns immediately with a handle that you can optionally poll for the result.
231    /// The connection attempt continues in the background regardless of whether you poll the handle.
232    /// Relay status updates will be sent via the normal broadcast channels.
233    ///
234    /// # Usage Examples
235    ///
236    /// ```rust
237    /// // Fire and forget - just start the connection in background
238    /// let handle = client.add_relay_background(relay_address);
239    /// handle.detach(); // Connection continues in background
240    ///
241    /// // Poll occasionally for result
242    /// let mut handle = client.add_relay_background(relay_address);
243    /// loop {
244    ///     match handle.try_result()? {
245    ///         Some(result) => {
246    ///             // Connection completed
247    ///             break result;
248    ///         }
249    ///         None => {
250    ///             // Still connecting, do other work
251    ///             tokio::time::sleep(Duration::from_millis(100)).await;
252    ///         }
253    ///     }
254    /// }
255    ///
256    /// // Wait for completion
257    /// let result = client.add_relay_background(relay_address).await_result().await?;
258    /// ```
259    #[cfg_attr(feature = "frb-api", frb(ignore))]
260    pub fn add_relay_background(&self, address: RelayAddress) -> RelayConnectionHandle {
261        let relay_id = address.id();
262        let client = self.clone();
263        let address_clone = address.clone();
264
265        let (result_sender, result_receiver) = oneshot::channel();
266
267        let task_handle = tokio::spawn(async move {
268            let result = client.add_relay(address_clone).await;
269
270            // Try to send the result, but don't panic if receiver was dropped
271            let _ = result_sender.send(result);
272        });
273
274        RelayConnectionHandle {
275            relay_id,
276            relay_address: address,
277            result_receiver: Some(result_receiver),
278            _task_handle: task_handle,
279        }
280    }
281
282    /// Try to connect to a relay using all its addresses in random order
283    ///
284    /// Returns the successful address and relay client, or all connection errors
285    async fn try_connect_to_relay_addresses(
286        &self,
287        address: &RelayAddress,
288    ) -> std::result::Result<(SocketAddr, RelayClient), Vec<(String, ClientError)>> {
289        use rand::seq::SliceRandom;
290
291        let network_addresses: Vec<_> = address.all_addresses().iter().cloned().collect();
292        if network_addresses.is_empty() {
293            return Err(vec![(
294                "no addresses".to_string(),
295                ClientError::Generic("No addresses provided".to_string()),
296            )]);
297        }
298
299        // Randomize the order of connection attempts
300        let mut shuffled_addresses = network_addresses;
301        shuffled_addresses.shuffle(&mut rand::thread_rng());
302
303        let mut connection_errors = Vec::new();
304
305        for network_addr in shuffled_addresses {
306            tracing::debug!("Attempting to connect to relay at: {}", network_addr);
307
308            // Resolve address with timeout
309            let socket_addr = match tokio::time::timeout(
310                Duration::from_secs(5), // 5s for DNS resolution
311                network_addr.resolve_to_socket_addr(DEFAULT_PORT),
312            )
313            .await
314            {
315                Ok(Ok(addr)) => addr,
316                Ok(Err(e)) => {
317                    connection_errors.push((
318                        network_addr.to_string(),
319                        ClientError::Generic(format!("DNS resolution failed: {}", e)),
320                    ));
321                    continue;
322                }
323                Err(_) => {
324                    connection_errors.push((
325                        network_addr.to_string(),
326                        ClientError::Generic("DNS resolution timeout".to_string()),
327                    ));
328                    continue;
329                }
330            };
331
332            // Attempt connection with timeout
333            match tokio::time::timeout(
334                Duration::from_secs(10), // 10s for connection attempt
335                self.connect_to_relay(address.public_key.clone(), socket_addr),
336            )
337            .await
338            {
339                Ok(Ok(relay_client)) => {
340                    tracing::info!("Successfully connected to relay at: {}", socket_addr);
341                    return Ok((socket_addr, relay_client));
342                }
343                Ok(Err(e)) => {
344                    connection_errors.push((network_addr.to_string(), e));
345                }
346                Err(_) => {
347                    connection_errors.push((
348                        network_addr.to_string(),
349                        ClientError::Generic("Connection timeout".to_string()),
350                    ));
351                }
352            }
353        }
354
355        Err(connection_errors)
356    }
357
358    /// Remove a relay connection (offline mode only)
359    pub async fn remove_relay(&self, server_public_key: VerifyingKey) -> Result<bool> {
360        let relay_id = server_public_key.id();
361
362        // Get relay info before removing
363        let relay_info = {
364            let info_map = self.relay_info.read().await;
365            info_map.get(&relay_id).map(|info| info.info.clone())
366        };
367
368        // Stop connection monitoring
369        self.stop_connection_monitoring(relay_id).await;
370
371        // Remove from multi-relay managers
372        self.message_manager.remove_relay(&relay_id).await;
373        self.blob_service.remove_relay(&relay_id).await;
374
375        // Close and remove connection
376        let removed = {
377            let mut connections = self.relay_connections.write().await;
378            connections.remove(&relay_id)
379        };
380
381        let had_active_connection = removed.is_some();
382        if let Some(relay_client) = removed {
383            relay_client.close().await;
384        }
385
386        // Update relay info (or remove if it exists)
387        let had_relay_info = {
388            let mut info_map = self.relay_info.write().await;
389            if let Some(info) = info_map.get_mut(&relay_id) {
390                info.status = RelayConnectionStatus::Disconnected {
391                    error: None, // Manual removal, no error
392                };
393                true
394            } else {
395                false
396            }
397        };
398
399        let was_removed = had_active_connection || had_relay_info;
400
401        tracing::info!(
402            "Removed relay connection: {}",
403            hex::encode(relay_id.as_bytes())
404        );
405
406        // Update observable states
407        self.update_client_secret_state().await;
408
409        // Notify about disconnection if we had the relay info
410        if let Some(info) = relay_info {
411            self.notify_relay_status_change(
412                relay_id,
413                info.relay_address,
414                RelayConnectionStatus::Disconnected {
415                    error: None, // Manual removal, no error
416                },
417            )
418            .await;
419        }
420
421        Ok(was_removed)
422    }
423
424    /// Get list of all configured relays with their connection status
425    pub async fn get_relay_status(&self) -> Result<Vec<RelayConnectionInfo>> {
426        let info_map = self.relay_info.read().await;
427        Ok(info_map.values().cloned().collect())
428    }
429
430    /// Check if any relays are currently connected
431    pub async fn has_connected_relays(&self) -> bool {
432        self.overall_status().await.is_connected
433    }
434
435    /// Attempt to reconnect to all failed relays
436    pub async fn reconnect_failed_relays(&self) -> Result<usize> {
437        let failed_relays: Vec<RelayInfo> = {
438            let info_map = self.relay_info.read().await;
439            info_map
440                .values()
441                .filter(|info| matches!(info.status, RelayConnectionStatus::Failed { .. }))
442                .map(|info| info.info.clone())
443                .collect()
444        };
445
446        let mut reconnected = 0;
447        for relay_info in failed_relays {
448            // Use the full RelayAddress which contains all configured addresses
449            if self.add_relay(relay_info.relay_address).await.is_ok() {
450                reconnected += 1;
451            }
452        }
453
454        Ok(reconnected)
455    }
456
457    /// Connect to a specific relay (internal method)
458    async fn connect_to_relay(
459        &self,
460        server_public_key: VerifyingKey,
461        server_addr: SocketAddr,
462    ) -> Result<RelayClient> {
463        let relay_client = RelayClientBuilder::new()
464            .server_public_key(server_public_key)
465            .server_address(server_addr)
466            .storage(Arc::clone(&self.storage))
467            .client_keypair(Arc::clone(&self.client_secret.inner_keypair))
468            .autosubscribe(true)
469            .build()
470            .await?;
471
472        Ok(relay_client)
473    }
474
475    pub async fn close(&self) {
476        // Stop all connection monitors
477        {
478            let mut monitors = self.connection_monitors.write().await;
479            for (relay_id, monitor_task) in monitors.iter() {
480                tracing::debug!(
481                    "Stopping connection monitor for relay: {}",
482                    hex::encode(relay_id.as_bytes())
483                );
484                monitor_task.abort();
485            }
486            monitors.clear();
487        }
488
489        // Close all relay connections
490        let relay_clients = {
491            let mut connections = self.relay_connections.write().await;
492            let clients: Vec<_> = connections.values().cloned().collect();
493            connections.clear();
494            clients
495        };
496
497        for relay_client in relay_clients {
498            relay_client.close().await;
499        }
500    }
501
502    /// Create a stream of overall connection status computed from relay status updates
503    ///
504    /// This is a computed stream that automatically updates when any relay status changes.
505    /// It maintains local state and only locks once for initial state, then updates based on
506    /// incoming relay status changes without additional locking.
507    pub fn overall_status_stream(&self) -> impl futures::Stream<Item = OverallConnectionStatus> {
508        let client = self.clone();
509        let relay_receiver = client.subscribe_to_relay_status();
510
511        async_stream::stream! {
512            let mut relay_receiver = relay_receiver;
513
514            // Get initial status using existing function (only lock once)
515            let mut current_status = client.overall_status().await;
516            yield current_status.clone();
517
518            // Keep track of relay states locally to avoid locking
519            let mut relay_states = std::collections::BTreeMap::new();
520
521            // Update local state based on relay status changes
522            while let Ok(update) = relay_receiver.recv().await {
523                // Update our local tracking of this relay's status
524                let was_connected = relay_states.get(&update.relay_id)
525                    .map(|status| matches!(status, RelayConnectionStatus::Connected { .. }))
526                    .unwrap_or(false);
527
528                let is_now_connected = matches!(update.status, RelayConnectionStatus::Connected { .. });
529
530                // Update local relay state
531                relay_states.insert(update.relay_id, update.status);
532
533                // Update overall status based on the change
534                if was_connected && !is_now_connected {
535                    // A relay disconnected
536                    current_status.connected_count = current_status.connected_count.saturating_sub(1);
537                } else if !was_connected && is_now_connected {
538                    // A relay connected
539                    current_status.connected_count += 1;
540                }
541
542                // Update total count (relay was added to our tracking)
543                current_status.total_count = relay_states.len();
544
545                // Update is_connected flag
546                current_status.is_connected = current_status.connected_count > 0;
547
548                yield current_status.clone();
549            }
550        }
551    }
552    /// Calculate the current overall connection status
553    ///
554    /// This is computed from the current relay states, ensuring it's always accurate but makes it
555    /// a bit more expensive to compute. For live updates it is recommended to use `overall_status_stream`
556    /// instead.
557    pub async fn overall_status(&self) -> OverallConnectionStatus {
558        let (connected_count, total_count) = {
559            let info_map = self.relay_info.read().await;
560            let connected = info_map
561                .values()
562                .filter(|info| matches!(info.status, RelayConnectionStatus::Connected { .. }))
563                .count();
564            // Only count successfully connected or disconnected relays, not failed ones
565            let total = info_map
566                .values()
567                .filter(|info| !matches!(info.status, RelayConnectionStatus::Failed { .. }))
568                .count();
569            (connected, total)
570        };
571
572        OverallConnectionStatus {
573            is_connected: connected_count > 0,
574            connected_count,
575            total_count,
576        }
577    }
578
579    /// Start monitoring a relay connection for disconnections
580    fn start_connection_monitoring(&self, relay_id: KeyId, relay_client: RelayClient) {
581        let client = self.clone();
582        let connection = relay_client.connection().clone();
583        let monitors = Arc::clone(&self.connection_monitors);
584
585        let monitor_task = tokio::spawn(async move {
586            // Monitor the connection for closure
587            let closed_future = connection.closed();
588            let connection_error = closed_future.await;
589
590            let error_msg = connection_error.to_string();
591            tracing::warn!(
592                "Relay connection lost for relay {}: {}",
593                hex::encode(relay_id.as_bytes()),
594                error_msg
595            );
596
597            // Get relay info for status update
598            let relay_address = {
599                let info_map = client.relay_info.read().await;
600                info_map
601                    .get(&relay_id)
602                    .map(|info| info.info.relay_address.clone())
603            };
604
605            if let Some(relay_address) = relay_address {
606                // Update relay status to disconnected with error details
607                {
608                    let mut info_map = client.relay_info.write().await;
609                    if let Some(info) = info_map.get_mut(&relay_id) {
610                        info.status = RelayConnectionStatus::Disconnected {
611                            error: Some(error_msg.clone()),
612                        };
613                    }
614                }
615
616                // Remove from active connections
617                {
618                    let mut connections = client.relay_connections.write().await;
619                    connections.remove(&relay_id);
620                }
621
622                // Remove from multi-relay managers
623                client.message_manager.remove_relay(&relay_id).await;
624                client.blob_service.remove_relay(&relay_id).await;
625
626                // Update observable states
627                client.update_client_secret_state().await;
628
629                // Notify about disconnection
630                client
631                    .notify_relay_status_change(
632                        relay_id,
633                        relay_address.clone(),
634                        RelayConnectionStatus::Disconnected {
635                            error: Some(error_msg),
636                        },
637                    )
638                    .await;
639
640                // Attempt automatic reconnection after a delay
641                let reconnect_client = client.clone();
642                tokio::spawn(async move {
643                    tokio::time::sleep(Duration::from_secs(5)).await;
644
645                    tracing::info!(
646                        "Attempting automatic reconnection to relay: {}",
647                        hex::encode(relay_id.as_bytes())
648                    );
649
650                    if let Err(e) = reconnect_client.add_relay(relay_address).await {
651                        tracing::warn!(
652                            "Automatic reconnection failed for relay {}: {}",
653                            hex::encode(relay_id.as_bytes()),
654                            e
655                        );
656                    }
657                });
658            }
659        });
660
661        // Store the monitor task
662        tokio::spawn(async move {
663            let mut monitor_map = monitors.write().await;
664            monitor_map.insert(relay_id, monitor_task);
665        });
666    }
667
668    /// Stop monitoring a relay connection
669    async fn stop_connection_monitoring(&self, relay_id: KeyId) {
670        let mut monitors = self.connection_monitors.write().await;
671        if let Some(monitor_task) = monitors.remove(&relay_id) {
672            monitor_task.abort();
673        }
674    }
675
676    /// Notify about relay status change
677    async fn notify_relay_status_change(
678        &self,
679        relay_id: KeyId,
680        relay_address: RelayAddress,
681        status: RelayConnectionStatus,
682    ) {
683        let status_update = RelayStatusUpdate {
684            relay_id,
685            relay_address,
686            status,
687        };
688
689        // Send to broadcast channel - ignore if no receivers
690        let _ = self.relay_status_sender.try_broadcast(status_update);
691    }
692}
693
694#[cfg_attr(feature = "frb-api", frb(ignore))]
695impl Client {
696    /// Subscribe to per-relay connection status updates
697    ///
698    /// This provides real-time updates about individual relay connection status changes.
699    /// Each relay reports its status independently via this broadcast channel.
700    pub fn subscribe_to_relay_status(&self) -> async_broadcast::Receiver<RelayStatusUpdate> {
701        self.relay_status_sender.new_receiver()
702    }
703}