zoe_relay/relay.rs
1//! # Zoe Relay
2//!
3//! A clean, minimal QUIC relay server with ed25519 bi-directional authentication for service routing.
4//!
5//! ## Features
6//!
7//! - **QUIC Transport**: High-performance transport with TLS 1.3 and ed25519 identity verification
8//! - **Service Routing**: Routes connections to different services based on a u8 service identifier
9//! - **Bi-directional Streams**: Full duplex communication between client and server
10//! - **Ed25519 Authentication**: Client identity verification via embedded public keys in certificates
11//! - **Trait-based Design**: Clean abstraction for implementing service handlers
12//!
13//! ## Architecture
14//!
15//! The relay accepts QUIC connections, authenticates clients via ed25519 keys, reads the first byte
16//! of the stream to determine the service type, and routes the connection to the appropriate service handler:
17//!
18//! ```text
19//! Client → QUIC Connection → ed25519 Auth → Read Service ID (u8) → Route to Service
20//! ↓ ↓ ↓ ↓ ↓
21//! Certificate TLS 1.3 Extract Key First Byte ServiceRouter::create_service
22//! ```
23//!
24//! ## Usage
25//!
26//! ### Implementing a Service Router
27//!
28//! ```rust
29//! use zoe_relay::{RelayServer, ServiceRouter};
30//! use ed25519_dalek::SigningKey;
31//! use std::net::SocketAddr;
32//! use zoe_wire_protocol::KeyPair;
33//!
34//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
35//! let addr: SocketAddr = "127.0.0.1:4433".parse()?;
36//! let server_keypair = KeyPair::generate_ml_dsa44(&mut rand::rngs::OsRng);
37//! let router = MyServiceRouter; // Your ServiceRouter implementation
38//!
39//! let server = RelayServer::new(addr, server_keypair, router)?;
40//! println!("🚀 Relay server running on {}", addr);
41//! server.run().await?;
42//! # Ok(())
43//! # }
44//! # use async_trait::async_trait;
45//! # struct MyServiceRouter;
46//! # struct MyService;
47//! # #[derive(Debug, thiserror::Error)]
48//! # enum MyError {
49//! # #[error("Service error")]
50//! # ServiceError,
51//! # }
52//! # #[async_trait]
53//! # impl ServiceRouter for MyServiceRouter {
54//! # type Error = MyError;
55//! # type ServiceId = u8;
56//! # type Service = MyService;
57//! # async fn parse_service_id(&self, service_id: u8) -> Result<Self::ServiceId, Self::Error> { Ok(service_id) }
58//! # async fn create_service(&self, _: &Self::ServiceId, _: &zoe_relay::ConnectionInfo, _: zoe_wire_protocol::StreamPair) -> Result<Self::Service, Self::Error> { Ok(MyService) }
59//! # }
60//! # #[async_trait]
61//! # impl zoe_relay::Service for MyService {
62//! # type Error = MyError;
63//! # async fn run(self) -> Result<(), Self::Error> { Ok(()) }
64//! # }
65//! ```
66//!
67//! For detailed service routing examples, see the [`router`](crate::router) module documentation.
68//!
69//! ## Transport Details
70//!
71//! ### QUIC with Ed25519 Authentication
72//!
73//! - **QUIC Protocol**: Multiplexed, encrypted transport with connection-level authentication
74//! - **TLS 1.3**: Latest TLS with ed25519-derived certificates
75//! - **Client Authentication**: Client identity verification via ed25519 keys embedded in certificates
76//! - **Certificate Embedding**: Public keys embedded in X.509 certificate extensions
77//!
78//! ### Server Protocol Flow
79//!
80//! 1. **Connection Establishment**: Client connects via QUIC with ed25519 certificate
81//! 2. **Mutual Authentication**: Server and client verify each other's ed25519 certificates
82//! 3. **Connection Handling**: Server extracts client public key and connection metadata
83//! 4. **Service Delegation**: Hands over streams and client info to the [`ServiceRouter`]
84//!
85//! ## Security Model
86//!
87//! ### Authentication Flow
88//!
89//! 1. **Certificate Generation**: Ed25519 keys embedded in deterministic self-signed certificates
90//! 2. **QUIC Handshake**: TLS authentication with client certificate verification
91//! 3. **Key Extraction**: Server extracts client's ed25519 public key from certificate
92//! 4. **Service Routing**: Authenticated client streams are routed to appropriate services
93//!
94//! ### Identity and Trust
95//!
96//! - **Certificate-based**: Client identity is embedded in the certificate
97//! - **Key-based identity**: Identity is the ed25519 public key itself
98//! - **Connection-scoped**: Authentication valid for entire QUIC connection lifetime
99//! - **Service-agnostic**: Authentication happens once, all services trust the identity
100
101use anyhow::Result;
102use quinn::{Connection, Endpoint};
103use std::sync::Arc;
104use std::{net::SocketAddr, path::PathBuf};
105use zoe_blob_store::{BlobServiceImpl, BlobStoreError};
106use zoe_message_store::RedisMessageStorage;
107use zoe_wire_protocol::{ConnectionInfo, CryptoError, VerifyingKey};
108
109use tokio::io::{AsyncReadExt, AsyncWriteExt};
110use tracing::{debug, error, info};
111use zoe_wire_protocol::{connection::server::create_server_endpoint, KeyPair, StreamPair};
112
113use crate::{RelayServiceRouter, Service, ServiceError, ServiceRouter};
114
115#[derive(Debug, thiserror::Error)]
116pub enum RelayServerBuilderError {
117 #[error("Blob directory not set")]
118 BlobDirNotSet,
119
120 #[error("Blob service error: {0}")]
121 BlobServiceError(BlobStoreError),
122
123 #[error("Redis URL not set")]
124 RedisUrlNotSet,
125
126 #[error("Redis message storage error: {0}")]
127 RedisMessageStorageError(zoe_message_store::MessageStoreError),
128
129 #[error("Relay server error: {0}")]
130 RelayServerError(RelayServerError),
131}
132
133#[derive(Default)]
134pub struct RelayServerBuilder {
135 server_keypair: Option<KeyPair>,
136 address: Option<SocketAddr>,
137 redis_url: Option<String>,
138 blob_dir: Option<PathBuf>,
139}
140
141impl RelayServerBuilder {
142 pub fn server_keypair(mut self, server_keypair: KeyPair) -> Self {
143 self.server_keypair = Some(server_keypair);
144 self
145 }
146
147 pub fn address(mut self, address: SocketAddr) -> Self {
148 self.address = Some(address);
149 self
150 }
151
152 pub fn redis_url(mut self, redis_url: String) -> Self {
153 self.redis_url = Some(redis_url);
154 self
155 }
156
157 pub fn blob_dir(mut self, blob_dir: PathBuf) -> Self {
158 self.blob_dir = Some(blob_dir);
159 self
160 }
161
162 async fn create_default_router(&self) -> Result<RelayServiceRouter, RelayServerBuilderError> {
163 let Some(blob_dir) = &self.blob_dir else {
164 return Err(RelayServerBuilderError::BlobDirNotSet);
165 };
166 let Some(redis_url) = &self.redis_url else {
167 return Err(RelayServerBuilderError::RedisUrlNotSet);
168 };
169
170 let blob_service = BlobServiceImpl::new(blob_dir.clone())
171 .await
172 .map_err(RelayServerBuilderError::BlobServiceError)?;
173 let message_service = RedisMessageStorage::new(redis_url.clone())
174 .await
175 .map_err(RelayServerBuilderError::RedisMessageStorageError)?;
176 Ok(RelayServiceRouter::new(blob_service, message_service))
177 }
178
179 pub async fn build(self) -> Result<RelayServer<RelayServiceRouter>, RelayServerBuilderError> {
180 let router = self.create_default_router().await?;
181 self.build_with_router(router).await
182 }
183
184 pub async fn build_with_router<R: ServiceRouter + Send + Sync + 'static>(
185 self,
186 router: R,
187 ) -> Result<RelayServer<R>, RelayServerBuilderError> {
188 let address = self.address.unwrap_or(SocketAddr::from(([0, 0, 0, 0], 0)));
189 let server_keypair = self
190 .server_keypair
191 .unwrap_or(KeyPair::generate_ed25519(&mut rand::rngs::OsRng));
192
193 RelayServer::new(address, server_keypair, router)
194 .map_err(RelayServerBuilderError::RelayServerError)
195 }
196}
197
198#[derive(Debug, thiserror::Error)]
199pub enum RelayServerError {
200 #[error("Crypto error: {0}")]
201 CryptoError(CryptoError),
202}
203
204/// Main relay server that accepts QUIC connections with transport authentication
205pub struct RelayServer<R: ServiceRouter> {
206 pub endpoint: Endpoint,
207 server_keypair: Arc<KeyPair>,
208 router: Arc<R>,
209}
210
211impl<R: ServiceRouter + 'static> RelayServer<R> {
212 pub fn builder() -> RelayServerBuilder {
213 RelayServerBuilder::default()
214 }
215
216 pub fn public_key(&self) -> VerifyingKey {
217 self.server_keypair.public_key()
218 }
219
220 /// Create a new relay server
221 ///
222 /// # Arguments
223 /// * `addr` - The address to bind the server to
224 /// * `server_keypair` - The server keypair for transport security (Ed25519 or ML-DSA-44)
225 /// * `router` - The service router implementation
226 pub fn new(
227 addr: SocketAddr,
228 server_keypair: KeyPair,
229 router: R,
230 ) -> Result<Self, RelayServerError> {
231 let server_keypair = Arc::new(server_keypair);
232 let endpoint =
233 create_server_endpoint(addr, &server_keypair).map_err(RelayServerError::CryptoError)?;
234
235 Ok(Self {
236 endpoint,
237 server_keypair,
238 router: Arc::new(router),
239 })
240 }
241
242 pub fn local_addr(&self) -> Result<SocketAddr> {
243 Ok(self.endpoint.local_addr()?)
244 }
245
246 /// Run the relay server, accepting and handling connections
247 pub async fn run(self) -> Result<()> {
248 debug!(address = ?self.endpoint.local_addr()?,
249 "Relay server starting",
250 );
251 let server_identity = self.server_keypair.public_key();
252 debug!(
253 server_identity = ?server_identity,
254 "Server identity:",
255 );
256
257 while let Some(conn) = self.endpoint.accept().await {
258 let router = Arc::clone(&self.router);
259 let server_keypair = Arc::clone(&self.server_keypair);
260
261 tokio::spawn(async move {
262 match conn.await {
263 Ok(connection) => {
264 if let Err(e) =
265 Self::handle_connection(connection, router, server_keypair).await
266 {
267 error!("Connection handling failed: {}", e);
268 }
269 }
270 Err(e) => {
271 error!("Connection failed: {}", e);
272 }
273 }
274 });
275 }
276
277 Ok(())
278 }
279
280 /// Handle a single QUIC connection
281 async fn handle_connection(
282 connection: Connection,
283 router: Arc<R>,
284 server_keypair: Arc<KeyPair>,
285 ) -> Result<()> {
286 let remote_addr = connection.remote_address();
287 info!("🔗 New connection from {}", remote_addr);
288
289 // Perform ML-DSA challenge-response handshake
290 let (send, recv) = match connection.open_bi().await {
291 Ok((send, recv)) => (send, recv),
292 Err(e) => {
293 error!("Failed to accept handshake stream: {}", e);
294 connection.close(0u32.into(), b"Failed to accept handshake stream");
295 return Ok(());
296 }
297 };
298
299 tracing::trace!("🔗 Handshake streams accepted");
300
301 // Perform the actual challenge handshake
302 let verified_keys = match crate::challenge::perform_multi_challenge_handshake(
303 send,
304 recv,
305 server_keypair.as_ref(), // Use the server's keypair for challenge
306 )
307 .await
308 {
309 Ok(keys) => {
310 info!(
311 "✅ ML-DSA handshake successful, verified {} keys",
312 keys.len()
313 );
314 keys
315 }
316 Err(e) => {
317 error!("❌ ML-DSA handshake failed: {}", e);
318 connection.close(0u32.into(), b"ML-DSA handshake failed");
319 return Ok(());
320 }
321 };
322
323 // TODO: Extract client public key from TLS certificate
324 // For now, using a placeholder key - this should be extracted from the client's TLS certificate
325 let placeholder_transport_key = server_keypair.public_key(); // Temporary placeholder
326
327 // Create connection info with verified keys
328 let connection_info = ConnectionInfo::with_verified_keys(
329 placeholder_transport_key,
330 verified_keys,
331 remote_addr,
332 );
333
334 info!(
335 "🔐 Connection established with {} verified keys",
336 connection_info.verified_key_count()
337 );
338
339 // Accept the bi-directional streams for services
340 while let Ok((mut send, mut recv)) = connection.accept_bi().await {
341 let service_id = recv.read_u8().await?;
342 info!(?remote_addr, "📋 Service ID: {}", service_id);
343 let service_id = match router.parse_service_id(service_id).await {
344 Ok(service_id) => service_id,
345 Err(error) => {
346 error!(?error, "Invalid service ID: {}", service_id);
347 send.write_u8(ServiceError::UnknownService.as_u8()).await?; // we immediately close this stream with unknown service id
348 continue;
349 }
350 };
351
352 let streams = StreamPair { recv, send };
353 let service = match router
354 .create_service(&service_id, &connection_info, streams)
355 .await
356 {
357 Ok(service) => service,
358 Err(error) => {
359 error!(?error, "Failed to create service: {:?}", service_id);
360 // server error
361 // send.write_u8(ServiceError::ServiceCreationError as u8).await?; // we immediately close this stream with unknown service id
362 continue;
363 }
364 };
365 tokio::spawn(async move {
366 if let Err(e) = service.run().await {
367 error!("Service failed: {}", e);
368 } else {
369 info!("Service ended");
370 }
371 });
372 }
373
374 Ok(())
375 }
376}
377
378pub type FullRelayServer = RelayServer<RelayServiceRouter>;