zoe_relay/services/
messages.rs1use crate::Service;
2use async_trait::async_trait;
3use futures::{SinkExt, StreamExt};
4use std::sync::Arc;
5use tarpc::server::{BaseChannel, Channel};
6use tracing::{debug, error, info};
7use zoe_message_store::{MessageStoreError, MessagesRpcService, RedisMessageStorage};
8use zoe_wire_protocol::{
9 MessageService as _, MessageServiceResponseWrap, MessagesServiceRequestWrap, StreamPair,
10};
11
12#[derive(Debug, thiserror::Error)]
13pub enum MessagesServiceError {
14 #[error("Message store error: {0}")]
15 MessageStoreError(#[from] MessageStoreError),
16
17 #[error("IO error: {0}")]
18 IoError(#[from] std::io::Error),
19
20 #[error("Serialization error: {0}")]
21 SerializationError(String),
22}
23
24pub struct MessagesService {
25 streams: StreamPair,
26 store: Arc<RedisMessageStorage>,
27}
28
29impl MessagesService {
30 pub fn new(streams: StreamPair, redis: RedisMessageStorage) -> Self {
31 let store = Arc::new(redis);
32 Self {
33 streams,
34 store: store.clone(),
35 }
36 }
37}
38
39#[async_trait]
44impl Service for MessagesService {
45 type Error = MessagesServiceError;
46
47 async fn run(self) -> Result<(), Self::Error> {
48 let service_id = format!("{:p}", &self.store);
49 info!("🚀 Starting MessagesService {}", service_id);
50 let Self { mut streams, store } = self;
51 streams.send_ack().await?;
52 let (mut incoming, mut sink) =
53 streams.unpack_transports::<MessagesServiceRequestWrap, MessageServiceResponseWrap>();
54
55 let (mut client_transport, server_transport) = tarpc::transport::channel::unbounded();
56 let (mut msg_recv, mut catchup_recv, rpc) = MessagesRpcService::new(store);
57
58 let server = BaseChannel::with_defaults(server_transport);
59 let rpc_spawn = tokio::spawn(
60 server
61 .execute(rpc.serve())
62 .for_each(|response| async move {
64 tokio::spawn(response);
65 }),
66 );
67
68 let mut request_stream_closed = false;
69
70 loop {
71 tokio::select! {
72 request_result = incoming.next(), if !request_stream_closed => {
74 match request_result {
75 Some(Ok(request)) => {
76 debug!("Received RPC request: {:?}", request);
77 if let Err(e) = client_transport.send(request).await {
79 error!("Failed to send request to RPC server: {}", e);
80 }
81 }
82 Some(Err(e)) => {
83 error!("Error reading request: {} (this may be due to client disconnecting)", e);
84 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
86 break;
87 }
88 None => {
89 info!("Request stream closed - continuing to process responses");
90 request_stream_closed = true;
91 }
93 }
94 }
95
96 rpc_message = client_transport.next() => {
98 match rpc_message {
99 Some(Ok(message)) => {
100 if let Err(e) = sink.send(MessageServiceResponseWrap::RpcResponse(Box::new(message))).await {
102 error!("Failed to send response to client: {}", e);
103 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
105 break;
106 }
107 }
108 Some(Err(e)) => {
109 error!("Error reading RPC message: {}", e);
110 break;
111 }
112 None => {
113 debug!("RPC channel closed - service shutting down");
114 break;
116 }
117 }
118 }
119
120 stream_message = msg_recv.recv() => {
122 match stream_message {
123 Some(message) => {
124 debug!("🔄 MessagesService {} received stream message from subscription task: {:?}", service_id, message);
125 if let Err(e) = sink.send(MessageServiceResponseWrap::StreamMessage(message)).await {
127 error!("Failed to send stream message to client: {}", e);
128 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
130 break;
131 } else {
132 debug!("✅ MessagesService {} successfully sent stream message to client", service_id);
133 }
134 }
135 None => {
136 debug!("Subscription channel closed");
137 }
139 }
140 }
141
142 response_message = catchup_recv.recv() => {
144 match response_message {
145 Some(response) => {
146 if let Err(e) = sink.send(MessageServiceResponseWrap::CatchUpResponse(response)).await {
148 error!("Failed to send catch-up response to client: {}", e);
149 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
151 break;
152 }
153 }
154 None => {
155 debug!("Response channel closed");
156 }
158 }
159 }
160 }
161 }
162
163 info!("MessagesService shutting down");
164 rpc_spawn.abort();
165 Ok(())
166 }
167}