core/
mailbox.rs

1//! Mailbox is an actor to handle message routing and persistence.
2
3use chrono::Utc;
4use sqlx::{PgPool, postgres::PgPoolOptions};
5use std::cmp::Ordering;
6use std::collections::BinaryHeap;
7use std::env;
8use std::fs::File;
9use std::io::{BufRead, BufReader, Seek, SeekFrom};
10use std::path::PathBuf;
11use tokio::io::AsyncWriteExt;
12use tokio::net::{TcpListener, TcpStream};
13use tokio::sync::mpsc;
14use tracing::{debug, error, info};
15use types::{
16    cv::Gps,
17    mailbox::{GNCTelemetry, MailboxMessage, MailboxMessageType},
18};
19
20// Wrapper struct for items in the priority queue
21#[derive(Debug, Clone, Eq, PartialEq)]
22struct QueueItem {
23    priority: u8,
24    message: MailboxMessage,
25}
26
27impl Ord for QueueItem {
28    fn cmp(&self, other: &Self) -> Ordering {
29        other.priority.cmp(&self.priority)
30    }
31}
32
33impl PartialOrd for QueueItem {
34    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
35        Some(self.cmp(other))
36    }
37}
38
39// Messages that can be sent to the Mailbox actor
40#[allow(dead_code)]
41enum MailboxActorMessage {
42    QueueMessage {
43        message: MailboxMessage,
44    },
45    ProcessQueue,
46    SaveGpsToDb {
47        gps: Gps,
48    },
49    GetNextMessage {
50        respond_to: tokio::sync::oneshot::Sender<Option<MailboxMessage>>,
51    },
52}
53
54// The Mailbox actor
55struct Mailbox {
56    receiver: mpsc::Receiver<MailboxActorMessage>,
57    queue: BinaryHeap<QueueItem>,
58    pool: PgPool,
59    telemetry_sender: mpsc::Sender<String>,
60    log_sender: mpsc::Sender<String>,
61}
62
63impl Mailbox {
64    async fn new(receiver: mpsc::Receiver<MailboxActorMessage>) -> Self {
65        info!("Mailbox Actor Started");
66
67        let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
68        debug!("Mailbox connecting to database: {}", database_url);
69
70        let pool = PgPoolOptions::new()
71            .max_connections(constants::CONNECTIONS_PER_DATABASE_POOL)
72            .connect(&database_url)
73            .await
74            .expect("Failed to create pool for Mailbox");
75
76        if let Err(e) = Self::create_mailbox_schema(&pool).await {
77            error!("Failed to create mailbox schema: {}", e);
78        }
79
80        // Create a channel for general-purpose telemetry broadcasting
81        let (telemetry_sender, telemetry_receiver) = mpsc::channel::<String>(100);
82
83        // Create a channel for log broadcasting
84        let (log_sender, log_receiver) = mpsc::channel::<String>(100);
85
86        // Spawn telemetry broadcaster in a separate task
87        tokio::task::spawn_blocking(move || {
88            Self::run_telemetry_broadcaster(telemetry_receiver);
89        });
90
91        // Spawn log broadcaster in a separate task
92        tokio::task::spawn_blocking(move || {
93            Self::run_log_broadcaster(log_receiver);
94        });
95
96        Mailbox {
97            receiver,
98            queue: BinaryHeap::new(),
99            pool,
100            telemetry_sender,
101            log_sender,
102        }
103    }
104
105    fn run_telemetry_broadcaster(mut receiver: mpsc::Receiver<String>) {
106        let context = zmq::Context::new();
107        let publisher = context
108            .socket(zmq::PUB)
109            .expect("Failed to create ZMQ PUB socket for telemetry");
110
111        let telemetry_port = env::var("TELEMETRY_PORT").unwrap_or("5557".to_string());
112        let telemetry_addr = format!("tcp://0.0.0.0:{}", telemetry_port);
113
114        publisher
115            .bind(&telemetry_addr)
116            .expect("Failed to bind telemetry publisher");
117        info!(
118            "General-Purpose Telemetry port opened on {}",
119            telemetry_addr
120        );
121
122        // Block and wait for messages to broadcast
123        while let Some(msg) = receiver.blocking_recv() {
124            if let Err(e) = publisher.send(&msg, 0) {
125                error!("Failed to broadcast telemetry: {}", e);
126            }
127        }
128    }
129
130    fn run_log_broadcaster(mut receiver: mpsc::Receiver<String>) {
131        let context = zmq::Context::new();
132        let publisher = context
133            .socket(zmq::PUB)
134            .expect("Failed to create ZMQ PUB socket for logs");
135
136        let logs_port = env::var("LOGS_PORT").unwrap_or("5558".to_string());
137        let logs_addr = format!("tcp://0.0.0.0:{}", logs_port);
138
139        publisher
140            .bind(&logs_addr)
141            .expect("Failed to bind log publisher");
142        info!("Log broadcasting port opened on {}", logs_addr);
143
144        // Block and wait for messages to broadcast
145        while let Some(msg) = receiver.blocking_recv() {
146            if let Err(e) = publisher.send(&msg, 0) {
147                error!("Failed to broadcast log: {}", e);
148            }
149        }
150    }
151
152    async fn create_mailbox_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
153        sqlx::query(
154            r#"
155            CREATE TABLE IF NOT EXISTS mailbox_messages (
156                id SERIAL PRIMARY KEY,
157                timestamp BIGINT NOT NULL,
158                msg_type TEXT NOT NULL,
159                data JSONB NOT NULL,
160                processed BOOLEAN DEFAULT FALSE,
161                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
162            );
163            "#,
164        )
165        .execute(pool)
166        .await?;
167
168        sqlx::query(
169            r#"
170            CREATE INDEX IF NOT EXISTS idx_mailbox_timestamp ON mailbox_messages(timestamp);
171            "#,
172        )
173        .execute(pool)
174        .await?;
175
176        sqlx::query(
177            r#"
178            CREATE INDEX IF NOT EXISTS idx_mailbox_processed ON mailbox_messages(processed);
179            "#,
180        )
181        .execute(pool)
182        .await?;
183
184        info!("Mailbox schema created successfully");
185        Ok(())
186    }
187
188    async fn handle_message(&mut self, msg: MailboxActorMessage) {
189        match msg {
190            MailboxActorMessage::QueueMessage { message } => {
191                self.add_message(message).await;
192            }
193            MailboxActorMessage::ProcessQueue => {
194                self.process_queue().await;
195            }
196            MailboxActorMessage::SaveGpsToDb { gps } => {
197                if let Err(e) = self.save_gps_to_db(gps).await {
198                    error!("Failed to save GPS to database: {}", e);
199                }
200            }
201            MailboxActorMessage::GetNextMessage { respond_to } => {
202                let next_msg = self.get_next_message();
203                let _ = respond_to.send(next_msg);
204            }
205        }
206    }
207
208    async fn add_message(&mut self, message: MailboxMessage) {
209        let priority = Self::get_priority(&message.msg_type);
210        debug!(
211            "Queuing message: {:?} with priority {}",
212            message.msg_type, priority
213        );
214
215        // Broadcast GPTelemetry messages via ZMQ (bypass database)
216        if matches!(message.msg_type, MailboxMessageType::GPTelemetry)
217            && let Ok(msg_json) = serde_json::to_string(&message)
218        {
219            let _ = self.telemetry_sender.send(msg_json).await;
220        }
221
222        // Broadcast Log messages via ZMQ (bypass database)
223        if matches!(message.msg_type, MailboxMessageType::Log)
224            && let Ok(msg_json) = serde_json::to_string(&message)
225        {
226            let _ = self.log_sender.send(msg_json).await;
227        }
228
229        // Save to database (all messages)
230        if let Err(e) = self.save_message_to_db(&message).await {
231            error!("Failed to save message to database: {}", e);
232        }
233
234        self.queue.push(QueueItem { priority, message });
235    }
236
237    fn get_next_message(&mut self) -> Option<MailboxMessage> {
238        self.queue.pop().map(|item| item.message)
239    }
240
241    async fn save_message_to_db(&self, message: &MailboxMessage) -> Result<(), sqlx::Error> {
242        let timestamp = Utc::now().timestamp_millis();
243        let msg_type = format!("{:?}", message.msg_type);
244
245        sqlx::query(
246            r#"
247            INSERT INTO mailbox_messages (timestamp, msg_type, data)
248            VALUES ($1, $2, $3)
249            "#,
250        )
251        .bind(timestamp)
252        .bind(msg_type)
253        .bind(&message.data)
254        .execute(&self.pool)
255        .await?;
256
257        Ok(())
258    }
259
260    async fn save_gps_to_db(&self, gps: Gps) -> Result<(), sqlx::Error> {
261        let timestamp = gps.time.timestamp_millis();
262
263        sqlx::query(
264            r#"
265            INSERT INTO gps (timestamp, lat, long, alt, heading)
266            VALUES ($1, $2, $3, $4, $5)
267            ON CONFLICT (timestamp) DO UPDATE
268            SET lat = EXCLUDED.lat,
269                long = EXCLUDED.long,
270                alt = EXCLUDED.alt,
271                heading = EXCLUDED.heading
272            "#,
273        )
274        .bind(timestamp)
275        .bind(gps.lat)
276        .bind(gps.long)
277        .bind(gps.alt)
278        .bind(gps.heading)
279        .execute(&self.pool)
280        .await?;
281
282        debug!("GPS data saved to database: timestamp={}", timestamp);
283        Ok(())
284    }
285
286    async fn process_queue(&mut self) {
287        while let Some(message) = self.get_next_message() {
288            debug!("Processing message: {:?}", message.msg_type);
289        }
290    }
291
292    fn get_priority(msg_type: &MailboxMessageType) -> u8 {
293        match msg_type {
294            MailboxMessageType::Error => 1,
295            MailboxMessageType::TargetFound => 2,
296            MailboxMessageType::Image => 3,
297            MailboxMessageType::GNCTelemetry => 2,
298            MailboxMessageType::GPTelemetry => 3,
299            MailboxMessageType::Log => 5,
300        }
301    }
302}
303
304#[derive(Clone)]
305pub struct MailboxHandle {
306    sender: mpsc::Sender<MailboxActorMessage>,
307}
308
309#[allow(dead_code)]
310impl MailboxHandle {
311    pub async fn new() -> Self {
312        let (sender, receiver) = mpsc::channel(100);
313        let actor = Mailbox::new(receiver).await;
314
315        tokio::spawn(run_mailbox(actor));
316
317        Self { sender }
318    }
319
320    pub async fn queue_message(&self, message: MailboxMessage) {
321        let msg = MailboxActorMessage::QueueMessage { message };
322        let _ = self.sender.send(msg).await;
323    }
324
325    pub async fn save_gps(&self, gps: Gps) {
326        let msg = MailboxActorMessage::SaveGpsToDb { gps };
327        let _ = self.sender.send(msg).await;
328    }
329
330    pub async fn process_queue(&self) {
331        let msg = MailboxActorMessage::ProcessQueue;
332        let _ = self.sender.send(msg).await;
333    }
334
335    pub async fn get_next_message(&self) -> Option<MailboxMessage> {
336        let (send, recv) = tokio::sync::oneshot::channel();
337        let msg = MailboxActorMessage::GetNextMessage { respond_to: send };
338
339        if self.sender.send(msg).await.is_ok() {
340            recv.await.ok().flatten()
341        } else {
342            None
343        }
344    }
345}
346
347async fn run_mailbox(mut actor: Mailbox) {
348    while let Some(msg) = actor.receiver.recv().await {
349        actor.handle_message(msg).await;
350    }
351}
352
353pub async fn start_gnc_listener(
354    mailbox: MailboxHandle,
355    shutdown_token: tokio_util::sync::CancellationToken,
356) {
357    info!("Starting GNC listener");
358
359    tokio::task::spawn_blocking(move || {
360        if let Err(e) = run_gnc_listener_blocking(mailbox, shutdown_token) {
361            error!("GNC listener error: {}", e);
362        }
363    });
364}
365
366fn run_gnc_listener_blocking(
367    mailbox: MailboxHandle,
368    shutdown_token: tokio_util::sync::CancellationToken,
369) -> Result<(), Box<dyn std::error::Error>> {
370    let context = zmq::Context::new();
371    let subscriber = context.socket(zmq::SUB)?;
372
373    let gnc_port = env::var("GNC_PORT").unwrap_or("5556".to_string());
374    let gnc_addr = format!("tcp://0.0.0.0:{}", gnc_port);
375
376    subscriber.connect(&gnc_addr)?;
377    subscriber.set_subscribe(b"")?; // Subscribe to all messages
378
379    info!("GNC listener connected to {}", gnc_addr);
380
381    subscriber.set_rcvtimeo(1000)?;
382
383    loop {
384        if shutdown_token.is_cancelled() {
385            info!("GNC listener shutting down");
386            break;
387        }
388
389        match subscriber.recv_bytes(0) {
390            Ok(msg_bytes) => {
391                match serde_json::from_slice::<MailboxMessage>(&msg_bytes) {
392                    Ok(message) => {
393                        debug!("[GNC] Received message: {:?}", message.msg_type);
394
395                        // Special handling for GNC Telemetry data
396                        if let MailboxMessageType::GNCTelemetry = message.msg_type
397                            && let Ok(gnc_telem) =
398                                serde_json::from_value::<GNCTelemetry>(message.data.clone())
399                        {
400                            // Convert to GPS for database storage
401                            let gps = Gps {
402                                lat: gnc_telem.lat,
403                                long: gnc_telem.long,
404                                alt: gnc_telem.alt,
405                                heading: gnc_telem.heading,
406                                time: chrono::DateTime::from_timestamp_millis(
407                                    (gnc_telem.time * 1000.0) as i64,
408                                )
409                                .unwrap_or_else(chrono::Utc::now),
410                            };
411
412                            // Save GPS data to database
413                            let mailbox_clone = mailbox.clone();
414                            tokio::runtime::Handle::current().block_on(async {
415                                mailbox_clone.save_gps(gps).await;
416                            });
417                        }
418
419                        // Queue the message
420                        let mailbox_clone = mailbox.clone();
421                        tokio::runtime::Handle::current().block_on(async {
422                            mailbox_clone.queue_message(message).await;
423                        });
424                    }
425                    Err(e) => {
426                        error!("Failed to parse GNC message: {}", e);
427                    }
428                }
429            }
430            Err(e) => {
431                if e == zmq::Error::EAGAIN {
432                    continue;
433                } else {
434                    error!("Failed to receive from GNC: {}", e);
435                    std::thread::sleep(std::time::Duration::from_millis(100));
436                }
437            }
438        }
439    }
440
441    Ok(())
442}
443
444// Logfile streaming server
445pub async fn start_logfile_broadcaster(
446    _mailbox: MailboxHandle,
447    shutdown_token: tokio_util::sync::CancellationToken,
448) {
449    let log_addr = format!(
450        "0.0.0.0:{}",
451        env::var("LOGFILE_STREAM_PORT").unwrap_or("5559".to_string())
452    );
453
454    info!("Starting logfile broadcaster on {}", log_addr);
455
456    tokio::spawn(async move {
457        if let Err(e) = run_logfile_broadcaster(&log_addr, shutdown_token).await {
458            error!("Logfile broadcaster error: {}", e);
459        }
460    });
461}
462
463async fn run_logfile_broadcaster(
464    addr: &str,
465    shutdown_token: tokio_util::sync::CancellationToken,
466) -> Result<(), Box<dyn std::error::Error>> {
467    let listener = TcpListener::bind(addr).await?;
468    info!("Logfile broadcaster listening on {}", addr);
469
470    let logfile_path =
471        PathBuf::from(env::var("LOG_FILE_PATH").unwrap_or("./logs/feonix.log".to_string()));
472
473    loop {
474        tokio::select! {
475            result = listener.accept() => {
476                match result {
477                    Ok((socket, addr)) => {
478                        info!("New logfile subscriber connected: {}", addr);
479                        let path = logfile_path.clone();
480                        let shutdown_token = shutdown_token.clone();
481                        tokio::spawn(async move {
482                            if let Err(e) = stream_logfile(socket, path, shutdown_token).await {
483                                error!("Error streaming logfile to {}: {}", addr, e);
484                            }
485                        });
486                    }
487                    Err(e) => {
488                        error!("Failed to accept connection: {}", e);
489                        break;
490                    }
491                }
492            }
493            _ = shutdown_token.cancelled() => {
494                info!("Logfile broadcaster shutting down");
495                break;
496            }
497        }
498    }
499
500    Ok(())
501}
502
503async fn stream_logfile(
504    mut socket: TcpStream,
505    logfile_path: PathBuf,
506    shutdown_token: tokio_util::sync::CancellationToken,
507) -> Result<(), Box<dyn std::error::Error>> {
508    // Try to open file, create if doesn't exist
509    let file = match File::open(&logfile_path) {
510        Ok(f) => f,
511        Err(_) => {
512            // If file doesn't exist yet, wait and retry
513            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
514            File::open(&logfile_path)?
515        }
516    };
517
518    let mut reader = BufReader::new(file);
519    let mut position = 0u64;
520
521    // Send existing log contents
522    loop {
523        let mut line = String::new();
524        let bytes_read = reader.read_line(&mut line)?;
525
526        if bytes_read == 0 {
527            break;
528        }
529
530        socket.write_all(line.as_bytes()).await?;
531        position += bytes_read as u64;
532    }
533
534    loop {
535        tokio::select! {
536            _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
537                let file = File::open(&logfile_path)?;
538                let mut reader = BufReader::new(file);
539                reader.seek(SeekFrom::Start(position))?;
540
541                loop {
542                    let mut line = String::new();
543                    let bytes_read = reader.read_line(&mut line)?;
544
545                    if bytes_read == 0 {
546                        break;
547                    }
548
549                    socket.write_all(line.as_bytes()).await?;
550                    position += bytes_read as u64;
551                }
552            }
553            _ = shutdown_token.cancelled() => {
554                info!("Logfile streaming cancelled, closing connection");
555                break;
556            }
557        }
558    }
559
560    Ok(())
561}
562
563pub async fn start_telemetry_server(
564    mailbox: MailboxHandle,
565    shutdown_token: tokio_util::sync::CancellationToken,
566) {
567    info!("Telemetry server integrated with mailbox actor");
568    start_gnc_listener(mailbox, shutdown_token).await;
569}