zoe_relay/
relay.rs

1//! # Zoe Relay
2//!
3//! A clean, minimal QUIC relay server with ed25519 bi-directional authentication for service routing.
4//!
5//! ## Features
6//!
7//! - **QUIC Transport**: High-performance transport with TLS 1.3 and ed25519 identity verification
8//! - **Service Routing**: Routes connections to different services based on a u8 service identifier
9//! - **Bi-directional Streams**: Full duplex communication between client and server
10//! - **Ed25519 Authentication**: Client identity verification via embedded public keys in certificates
11//! - **Trait-based Design**: Clean abstraction for implementing service handlers
12//!
13//! ## Architecture
14//!
15//! The relay accepts QUIC connections, authenticates clients via ed25519 keys, reads the first byte
16//! of the stream to determine the service type, and routes the connection to the appropriate service handler:
17//!
18//! ```text
19//! Client → QUIC Connection → ed25519 Auth → Read Service ID (u8) → Route to Service
20//!    ↓           ↓                ↓              ↓                    ↓
21//! Certificate  TLS 1.3        Extract Key    First Byte        ServiceRouter::create_service
22//! ```
23//!
24//! ## Usage
25//!
26//! ### Implementing a Service Router
27//!
28//! ```rust
29//! use zoe_relay::{RelayServer, ServiceRouter};
30//! use ed25519_dalek::SigningKey;
31//! use std::net::SocketAddr;
32//! use zoe_wire_protocol::KeyPair;
33//!
34//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
35//! let addr: SocketAddr = "127.0.0.1:4433".parse()?;
36//! let server_keypair = KeyPair::generate_ml_dsa44(&mut rand::rngs::OsRng);
37//! let router = MyServiceRouter; // Your ServiceRouter implementation
38//!
39//! let server = RelayServer::new(addr, server_keypair, router)?;
40//! println!("🚀 Relay server running on {}", addr);
41//! server.run().await?;
42//! # Ok(())
43//! # }
44//! # use async_trait::async_trait;
45//! # struct MyServiceRouter;
46//! # struct MyService;
47//! # #[derive(Debug, thiserror::Error)]
48//! # enum MyError {
49//! #     #[error("Service error")]
50//! #     ServiceError,
51//! # }
52//! # #[async_trait]
53//! # impl ServiceRouter for MyServiceRouter {
54//! #     type Error = MyError;
55//! #     type ServiceId = u8;
56//! #     type Service = MyService;
57//! #     async fn parse_service_id(&self, service_id: u8) -> Result<Self::ServiceId, Self::Error> { Ok(service_id) }
58//! #     async fn create_service(&self, _: &Self::ServiceId, _: &zoe_relay::ConnectionInfo, _: zoe_wire_protocol::StreamPair) -> Result<Self::Service, Self::Error> { Ok(MyService) }
59//! # }
60//! # #[async_trait]
61//! # impl zoe_relay::Service for MyService {
62//! #     type Error = MyError;
63//! #     async fn run(self) -> Result<(), Self::Error> { Ok(()) }
64//! # }
65//! ```
66//!
67//! For detailed service routing examples, see the [`router`](crate::router) module documentation.
68//!
69//! ## Transport Details
70//!
71//! ### QUIC with Ed25519 Authentication
72//!
73//! - **QUIC Protocol**: Multiplexed, encrypted transport with connection-level authentication
74//! - **TLS 1.3**: Latest TLS with ed25519-derived certificates
75//! - **Client Authentication**: Client identity verification via ed25519 keys embedded in certificates
76//! - **Certificate Embedding**: Public keys embedded in X.509 certificate extensions
77//!
78//! ### Server Protocol Flow
79//!
80//! 1. **Connection Establishment**: Client connects via QUIC with ed25519 certificate
81//! 2. **Mutual Authentication**: Server and client verify each other's ed25519 certificates
82//! 3. **Connection Handling**: Server extracts client public key and connection metadata
83//! 4. **Service Delegation**: Hands over streams and client info to the [`ServiceRouter`]
84//!
85//! ## Security Model
86//!
87//! ### Authentication Flow
88//!
89//! 1. **Certificate Generation**: Ed25519 keys embedded in deterministic self-signed certificates
90//! 2. **QUIC Handshake**: TLS authentication with client certificate verification
91//! 3. **Key Extraction**: Server extracts client's ed25519 public key from certificate
92//! 4. **Service Routing**: Authenticated client streams are routed to appropriate services
93//!
94//! ### Identity and Trust
95//!
96//! - **Certificate-based**: Client identity is embedded in the certificate
97//! - **Key-based identity**: Identity is the ed25519 public key itself
98//! - **Connection-scoped**: Authentication valid for entire QUIC connection lifetime
99//! - **Service-agnostic**: Authentication happens once, all services trust the identity
100
101use anyhow::Result;
102use quinn::{Connection, Endpoint};
103use std::sync::Arc;
104use std::{net::SocketAddr, path::PathBuf};
105use zoe_blob_store::{BlobServiceImpl, BlobStoreError};
106use zoe_message_store::RedisMessageStorage;
107use zoe_wire_protocol::{ConnectionInfo, CryptoError, VerifyingKey};
108
109use tokio::io::{AsyncReadExt, AsyncWriteExt};
110use tracing::{debug, error, info};
111use zoe_wire_protocol::{connection::server::create_server_endpoint, KeyPair, StreamPair};
112
113use crate::{RelayServiceRouter, Service, ServiceError, ServiceRouter};
114
115#[derive(Debug, thiserror::Error)]
116pub enum RelayServerBuilderError {
117    #[error("Blob directory not set")]
118    BlobDirNotSet,
119
120    #[error("Blob service error: {0}")]
121    BlobServiceError(BlobStoreError),
122
123    #[error("Redis URL not set")]
124    RedisUrlNotSet,
125
126    #[error("Redis message storage error: {0}")]
127    RedisMessageStorageError(zoe_message_store::MessageStoreError),
128
129    #[error("Relay server error: {0}")]
130    RelayServerError(RelayServerError),
131}
132
133#[derive(Default)]
134pub struct RelayServerBuilder {
135    server_keypair: Option<KeyPair>,
136    address: Option<SocketAddr>,
137    redis_url: Option<String>,
138    blob_dir: Option<PathBuf>,
139}
140
141impl RelayServerBuilder {
142    pub fn server_keypair(mut self, server_keypair: KeyPair) -> Self {
143        self.server_keypair = Some(server_keypair);
144        self
145    }
146
147    pub fn address(mut self, address: SocketAddr) -> Self {
148        self.address = Some(address);
149        self
150    }
151
152    pub fn redis_url(mut self, redis_url: String) -> Self {
153        self.redis_url = Some(redis_url);
154        self
155    }
156
157    pub fn blob_dir(mut self, blob_dir: PathBuf) -> Self {
158        self.blob_dir = Some(blob_dir);
159        self
160    }
161
162    async fn create_default_router(&self) -> Result<RelayServiceRouter, RelayServerBuilderError> {
163        let Some(blob_dir) = &self.blob_dir else {
164            return Err(RelayServerBuilderError::BlobDirNotSet);
165        };
166        let Some(redis_url) = &self.redis_url else {
167            return Err(RelayServerBuilderError::RedisUrlNotSet);
168        };
169
170        let blob_service = BlobServiceImpl::new(blob_dir.clone())
171            .await
172            .map_err(RelayServerBuilderError::BlobServiceError)?;
173        let message_service = RedisMessageStorage::new(redis_url.clone())
174            .await
175            .map_err(RelayServerBuilderError::RedisMessageStorageError)?;
176        Ok(RelayServiceRouter::new(blob_service, message_service))
177    }
178
179    pub async fn build(self) -> Result<RelayServer<RelayServiceRouter>, RelayServerBuilderError> {
180        let router = self.create_default_router().await?;
181        self.build_with_router(router).await
182    }
183
184    pub async fn build_with_router<R: ServiceRouter + Send + Sync + 'static>(
185        self,
186        router: R,
187    ) -> Result<RelayServer<R>, RelayServerBuilderError> {
188        let address = self.address.unwrap_or(SocketAddr::from(([0, 0, 0, 0], 0)));
189        let server_keypair = self
190            .server_keypair
191            .unwrap_or(KeyPair::generate_ed25519(&mut rand::rngs::OsRng));
192
193        RelayServer::new(address, server_keypair, router)
194            .map_err(RelayServerBuilderError::RelayServerError)
195    }
196}
197
198#[derive(Debug, thiserror::Error)]
199pub enum RelayServerError {
200    #[error("Crypto error: {0}")]
201    CryptoError(CryptoError),
202}
203
204/// Main relay server that accepts QUIC connections with transport authentication
205pub struct RelayServer<R: ServiceRouter> {
206    pub endpoint: Endpoint,
207    server_keypair: Arc<KeyPair>,
208    router: Arc<R>,
209}
210
211impl<R: ServiceRouter + 'static> RelayServer<R> {
212    pub fn builder() -> RelayServerBuilder {
213        RelayServerBuilder::default()
214    }
215
216    pub fn public_key(&self) -> VerifyingKey {
217        self.server_keypair.public_key()
218    }
219
220    /// Create a new relay server
221    ///
222    /// # Arguments
223    /// * `addr` - The address to bind the server to
224    /// * `server_keypair` - The server keypair for transport security (Ed25519 or ML-DSA-44)
225    /// * `router` - The service router implementation
226    pub fn new(
227        addr: SocketAddr,
228        server_keypair: KeyPair,
229        router: R,
230    ) -> Result<Self, RelayServerError> {
231        let server_keypair = Arc::new(server_keypair);
232        let endpoint =
233            create_server_endpoint(addr, &server_keypair).map_err(RelayServerError::CryptoError)?;
234
235        Ok(Self {
236            endpoint,
237            server_keypair,
238            router: Arc::new(router),
239        })
240    }
241
242    pub fn local_addr(&self) -> Result<SocketAddr> {
243        Ok(self.endpoint.local_addr()?)
244    }
245
246    /// Run the relay server, accepting and handling connections
247    pub async fn run(self) -> Result<()> {
248        debug!(address = ?self.endpoint.local_addr()?,
249            "Relay server starting",
250        );
251        let server_identity = self.server_keypair.public_key();
252        debug!(
253            server_identity = ?server_identity,
254            "Server identity:",
255        );
256
257        while let Some(conn) = self.endpoint.accept().await {
258            let router = Arc::clone(&self.router);
259            let server_keypair = Arc::clone(&self.server_keypair);
260
261            tokio::spawn(async move {
262                match conn.await {
263                    Ok(connection) => {
264                        if let Err(e) =
265                            Self::handle_connection(connection, router, server_keypair).await
266                        {
267                            error!("Connection handling failed: {}", e);
268                        }
269                    }
270                    Err(e) => {
271                        error!("Connection failed: {}", e);
272                    }
273                }
274            });
275        }
276
277        Ok(())
278    }
279
280    /// Handle a single QUIC connection
281    async fn handle_connection(
282        connection: Connection,
283        router: Arc<R>,
284        server_keypair: Arc<KeyPair>,
285    ) -> Result<()> {
286        let remote_addr = connection.remote_address();
287        info!("🔗 New connection from {}", remote_addr);
288
289        // Perform ML-DSA challenge-response handshake
290        let (send, recv) = match connection.open_bi().await {
291            Ok((send, recv)) => (send, recv),
292            Err(e) => {
293                error!("Failed to accept handshake stream: {}", e);
294                connection.close(0u32.into(), b"Failed to accept handshake stream");
295                return Ok(());
296            }
297        };
298
299        tracing::trace!("🔗 Handshake streams accepted");
300
301        // Perform the actual challenge handshake
302        let verified_keys = match crate::challenge::perform_multi_challenge_handshake(
303            send,
304            recv,
305            server_keypair.as_ref(), // Use the server's keypair for challenge
306        )
307        .await
308        {
309            Ok(keys) => {
310                info!(
311                    "✅ ML-DSA handshake successful, verified {} keys",
312                    keys.len()
313                );
314                keys
315            }
316            Err(e) => {
317                error!("❌ ML-DSA handshake failed: {}", e);
318                connection.close(0u32.into(), b"ML-DSA handshake failed");
319                return Ok(());
320            }
321        };
322
323        // TODO: Extract client public key from TLS certificate
324        // For now, using a placeholder key - this should be extracted from the client's TLS certificate
325        let placeholder_transport_key = server_keypair.public_key(); // Temporary placeholder
326
327        // Create connection info with verified keys
328        let connection_info = ConnectionInfo::with_verified_keys(
329            placeholder_transport_key,
330            verified_keys,
331            remote_addr,
332        );
333
334        info!(
335            "🔐 Connection established with {} verified keys",
336            connection_info.verified_key_count()
337        );
338
339        // Accept the bi-directional streams for services
340        while let Ok((mut send, mut recv)) = connection.accept_bi().await {
341            let service_id = recv.read_u8().await?;
342            info!(?remote_addr, "📋 Service ID: {}", service_id);
343            let service_id = match router.parse_service_id(service_id).await {
344                Ok(service_id) => service_id,
345                Err(error) => {
346                    error!(?error, "Invalid service ID: {}", service_id);
347                    send.write_u8(ServiceError::UnknownService.as_u8()).await?; // we immediately close this stream with unknown service id
348                    continue;
349                }
350            };
351
352            let streams = StreamPair { recv, send };
353            let service = match router
354                .create_service(&service_id, &connection_info, streams)
355                .await
356            {
357                Ok(service) => service,
358                Err(error) => {
359                    error!(?error, "Failed to create service: {:?}", service_id);
360                    // server error
361                    // send.write_u8(ServiceError::ServiceCreationError as u8).await?; // we immediately close this stream with unknown service id
362                    continue;
363                }
364            };
365            tokio::spawn(async move {
366                if let Err(e) = service.run().await {
367                    error!("Service failed: {}", e);
368                } else {
369                    info!("Service ended");
370                }
371            });
372        }
373
374        Ok(())
375    }
376}
377
378pub type FullRelayServer = RelayServer<RelayServiceRouter>;