zoe_relay/services/
messages.rs

1use crate::Service;
2use async_trait::async_trait;
3use futures::{SinkExt, StreamExt};
4use std::sync::Arc;
5use tarpc::server::{BaseChannel, Channel};
6use tracing::{debug, error, info};
7use zoe_message_store::{MessageStoreError, MessagesRpcService, RedisMessageStorage};
8use zoe_wire_protocol::{
9    MessageService as _, MessageServiceResponseWrap, MessagesServiceRequestWrap, StreamPair,
10};
11
12#[derive(Debug, thiserror::Error)]
13pub enum MessagesServiceError {
14    #[error("Message store error: {0}")]
15    MessageStoreError(#[from] MessageStoreError),
16
17    #[error("IO error: {0}")]
18    IoError(#[from] std::io::Error),
19
20    #[error("Serialization error: {0}")]
21    SerializationError(String),
22}
23
24pub struct MessagesService {
25    streams: StreamPair,
26    store: Arc<RedisMessageStorage>,
27}
28
29impl MessagesService {
30    pub fn new(streams: StreamPair, redis: RedisMessageStorage) -> Self {
31        let store = Arc::new(redis);
32        Self {
33            streams,
34            store: store.clone(),
35        }
36    }
37}
38
39// Background task functions have been moved to the message-store service
40// All subscription, filter updates, and catch-up requests are now handled as RPC calls
41// that spawn background tasks managed by the RPC service
42
43#[async_trait]
44impl Service for MessagesService {
45    type Error = MessagesServiceError;
46
47    async fn run(self) -> Result<(), Self::Error> {
48        let service_id = format!("{:p}", &self.store);
49        info!("🚀 Starting MessagesService {}", service_id);
50        let Self { mut streams, store } = self;
51        streams.send_ack().await?;
52        let (mut incoming, mut sink) =
53            streams.unpack_transports::<MessagesServiceRequestWrap, MessageServiceResponseWrap>();
54
55        let (mut client_transport, server_transport) = tarpc::transport::channel::unbounded();
56        let (mut msg_recv, mut catchup_recv, rpc) = MessagesRpcService::new(store);
57
58        let server = BaseChannel::with_defaults(server_transport);
59        let rpc_spawn = tokio::spawn(
60            server
61                .execute(rpc.serve())
62                // Handle all requests concurrently.
63                .for_each(|response| async move {
64                    tokio::spawn(response);
65                }),
66        );
67
68        let mut request_stream_closed = false;
69
70        loop {
71            tokio::select! {
72                // Poll for incoming RPC requests from client
73                request_result = incoming.next(), if !request_stream_closed => {
74                    match request_result {
75                        Some(Ok(request)) => {
76                            debug!("Received RPC request: {:?}", request);
77                            // All requests are now RPC requests - forward directly to RPC handler
78                            if let Err(e) = client_transport.send(request).await {
79                                error!("Failed to send request to RPC server: {}", e);
80                            }
81                        }
82                        Some(Err(e)) => {
83                            error!("Error reading request: {} (this may be due to client disconnecting)", e);
84                            // Give a brief moment for potential recovery before breaking
85                            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
86                            break;
87                        }
88                        None => {
89                            info!("Request stream closed - continuing to process responses");
90                            request_stream_closed = true;
91                            // Don't break - continue processing responses and background tasks
92                        }
93                    }
94                }
95
96                // Poll from rpc responses
97                rpc_message = client_transport.next() => {
98                    match rpc_message {
99                        Some(Ok(message)) => {
100                            // Forward message to client
101                            if let Err(e) = sink.send(MessageServiceResponseWrap::RpcResponse(Box::new(message))).await {
102                                error!("Failed to send response to client: {}", e);
103                                // Allow a brief moment for recovery before breaking
104                                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
105                                break;
106                            }
107                        }
108                        Some(Err(e)) => {
109                            error!("Error reading RPC message: {}", e);
110                            break;
111                        }
112                        None => {
113                            debug!("RPC channel closed - service shutting down");
114                            // RPC channel closed means no more responses can be processed
115                            break;
116                        }
117                    }
118                }
119
120                // Poll for messages from subscription task
121                stream_message = msg_recv.recv() => {
122                    match stream_message {
123                        Some(message) => {
124                            debug!("🔄 MessagesService {} received stream message from subscription task: {:?}", service_id, message);
125                            // Forward message to client
126                            if let Err(e) = sink.send(MessageServiceResponseWrap::StreamMessage(message)).await {
127                                error!("Failed to send stream message to client: {}", e);
128                                // Allow a brief moment for recovery before breaking
129                                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
130                                break;
131                            } else {
132                                debug!("✅ MessagesService {} successfully sent stream message to client", service_id);
133                            }
134                        }
135                        None => {
136                            debug!("Subscription channel closed");
137                            // Continue running - this can happen when subscription task ends
138                        }
139                    }
140                }
141
142                // Poll for responses from catch-up tasks
143                response_message = catchup_recv.recv() => {
144                    match response_message {
145                        Some(response) => {
146                            // Forward response to client
147                            if let Err(e) = sink.send(MessageServiceResponseWrap::CatchUpResponse(response)).await {
148                                error!("Failed to send catch-up response to client: {}", e);
149                                // Allow a brief moment for recovery before breaking
150                                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
151                                break;
152                            }
153                        }
154                        None => {
155                            debug!("Response channel closed");
156                            // Continue running
157                        }
158                    }
159                }
160            }
161        }
162
163        info!("MessagesService shutting down");
164        rpc_spawn.abort();
165        Ok(())
166    }
167}