core/
mailbox.rs

1//! Mailbox is an actor to handle message routing and persistence.
2
3// TODO: Split into many files, cleanup dead code, and maybe finish implementing?
4
5use chrono::Utc;
6use sqlx::{PgPool, postgres::PgPoolOptions};
7use std::cmp::Ordering;
8use std::collections::BinaryHeap;
9use std::env;
10use std::fs::File;
11use std::io::{BufRead, BufReader, Seek, SeekFrom};
12use std::path::PathBuf;
13use tokio::io::AsyncWriteExt;
14use tokio::net::{TcpListener, TcpStream};
15use tokio::sync::mpsc;
16use tracing::{debug, error, info};
17use types::{
18    cv::Gps,
19    mailbox::{GNCTelemetry, MailboxMessage, MailboxMessageType},
20};
21
22// Wrapper struct for items in the priority queue
23#[derive(Debug, Clone, Eq, PartialEq)]
24struct QueueItem {
25    priority: u8,
26    message: MailboxMessage,
27}
28
29impl Ord for QueueItem {
30    fn cmp(&self, other: &Self) -> Ordering {
31        other.priority.cmp(&self.priority)
32    }
33}
34
35impl PartialOrd for QueueItem {
36    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
37        Some(self.cmp(other))
38    }
39}
40
41// Messages that can be sent to the Mailbox actor
42#[allow(dead_code)]
43enum MailboxActorMessage {
44    QueueMessage {
45        message: MailboxMessage,
46    },
47    ProcessQueue,
48    SaveGpsToDb {
49        gps: Gps,
50    },
51    GetNextMessage {
52        respond_to: tokio::sync::oneshot::Sender<Option<MailboxMessage>>,
53    },
54}
55
56// The Mailbox actor
57struct Mailbox {
58    receiver: mpsc::Receiver<MailboxActorMessage>,
59    queue: BinaryHeap<QueueItem>,
60    pool: PgPool,
61    telemetry_sender: mpsc::Sender<String>,
62    log_sender: mpsc::Sender<String>,
63}
64
65impl Mailbox {
66    async fn new(receiver: mpsc::Receiver<MailboxActorMessage>) -> Self {
67        info!("Mailbox Actor Started");
68
69        let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
70        debug!("Mailbox connecting to database: {}", database_url);
71
72        let pool = PgPoolOptions::new()
73            .max_connections(5)
74            .connect(&database_url)
75            .await
76            .expect("Failed to create pool for Mailbox");
77
78        if let Err(e) = Self::create_mailbox_schema(&pool).await {
79            error!("Failed to create mailbox schema: {}", e);
80        }
81
82        // Create a channel for general-purpose telemetry broadcasting
83        let (telemetry_sender, telemetry_receiver) = mpsc::channel::<String>(100);
84
85        // Create a channel for log broadcasting
86        let (log_sender, log_receiver) = mpsc::channel::<String>(100);
87
88        // Spawn telemetry broadcaster in a separate task
89        tokio::task::spawn_blocking(move || {
90            Self::run_telemetry_broadcaster(telemetry_receiver);
91        });
92
93        // Spawn log broadcaster in a separate task
94        tokio::task::spawn_blocking(move || {
95            Self::run_log_broadcaster(log_receiver);
96        });
97
98        Mailbox {
99            receiver,
100            queue: BinaryHeap::new(),
101            pool,
102            telemetry_sender,
103            log_sender,
104        }
105    }
106
107    fn run_telemetry_broadcaster(mut receiver: mpsc::Receiver<String>) {
108        let context = zmq::Context::new();
109        let publisher = context
110            .socket(zmq::PUB)
111            .expect("Failed to create ZMQ PUB socket for telemetry");
112
113        let telemetry_port = env::var("TELEMETRY_PORT").unwrap_or("5557".to_string());
114        let telemetry_addr = format!("tcp://0.0.0.0:{}", telemetry_port);
115
116        publisher
117            .bind(&telemetry_addr)
118            .expect("Failed to bind telemetry publisher");
119        info!(
120            "General-Purpose Telemetry port opened on {}",
121            telemetry_addr
122        );
123
124        // Block and wait for messages to broadcast
125        while let Some(msg) = receiver.blocking_recv() {
126            if let Err(e) = publisher.send(&msg, 0) {
127                error!("Failed to broadcast telemetry: {}", e);
128            }
129        }
130    }
131
132    fn run_log_broadcaster(mut receiver: mpsc::Receiver<String>) {
133        let context = zmq::Context::new();
134        let publisher = context
135            .socket(zmq::PUB)
136            .expect("Failed to create ZMQ PUB socket for logs");
137
138        let logs_port = env::var("LOGS_PORT").unwrap_or("5558".to_string());
139        let logs_addr = format!("tcp://0.0.0.0:{}", logs_port);
140
141        publisher
142            .bind(&logs_addr)
143            .expect("Failed to bind log publisher");
144        info!("Log broadcasting port opened on {}", logs_addr);
145
146        // Block and wait for messages to broadcast
147        while let Some(msg) = receiver.blocking_recv() {
148            if let Err(e) = publisher.send(&msg, 0) {
149                error!("Failed to broadcast log: {}", e);
150            }
151        }
152    }
153
154    async fn create_mailbox_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
155        sqlx::query(
156            r#"
157            CREATE TABLE IF NOT EXISTS mailbox_messages (
158                id SERIAL PRIMARY KEY,
159                timestamp BIGINT NOT NULL,
160                msg_type TEXT NOT NULL,
161                data JSONB NOT NULL,
162                processed BOOLEAN DEFAULT FALSE,
163                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
164            );
165            "#,
166        )
167        .execute(pool)
168        .await?;
169
170        sqlx::query(
171            r#"
172            CREATE INDEX IF NOT EXISTS idx_mailbox_timestamp ON mailbox_messages(timestamp);
173            "#,
174        )
175        .execute(pool)
176        .await?;
177
178        sqlx::query(
179            r#"
180            CREATE INDEX IF NOT EXISTS idx_mailbox_processed ON mailbox_messages(processed);
181            "#,
182        )
183        .execute(pool)
184        .await?;
185
186        info!("Mailbox schema created successfully");
187        Ok(())
188    }
189
190    async fn handle_message(&mut self, msg: MailboxActorMessage) {
191        match msg {
192            MailboxActorMessage::QueueMessage { message } => {
193                self.add_message(message).await;
194            }
195            MailboxActorMessage::ProcessQueue => {
196                self.process_queue().await;
197            }
198            MailboxActorMessage::SaveGpsToDb { gps } => {
199                if let Err(e) = self.save_gps_to_db(gps).await {
200                    error!("Failed to save GPS to database: {}", e);
201                }
202            }
203            MailboxActorMessage::GetNextMessage { respond_to } => {
204                let next_msg = self.get_next_message();
205                let _ = respond_to.send(next_msg);
206            }
207        }
208    }
209
210    async fn add_message(&mut self, message: MailboxMessage) {
211        let priority = Self::get_priority(&message.msg_type);
212        debug!(
213            "Queuing message: {:?} with priority {}",
214            message.msg_type, priority
215        );
216
217        // Broadcast GPTelemetry messages via ZMQ (bypass database)
218        if matches!(message.msg_type, MailboxMessageType::GPTelemetry)
219            && let Ok(msg_json) = serde_json::to_string(&message)
220        {
221            let _ = self.telemetry_sender.send(msg_json).await;
222        }
223
224        // Broadcast Log messages via ZMQ (bypass database)
225        if matches!(message.msg_type, MailboxMessageType::Log)
226            && let Ok(msg_json) = serde_json::to_string(&message)
227        {
228            let _ = self.log_sender.send(msg_json).await;
229        }
230
231        // Save to database (all messages)
232        if let Err(e) = self.save_message_to_db(&message).await {
233            error!("Failed to save message to database: {}", e);
234        }
235
236        self.queue.push(QueueItem { priority, message });
237    }
238
239    fn get_next_message(&mut self) -> Option<MailboxMessage> {
240        self.queue.pop().map(|item| item.message)
241    }
242
243    async fn save_message_to_db(&self, message: &MailboxMessage) -> Result<(), sqlx::Error> {
244        let timestamp = Utc::now().timestamp_millis();
245        let msg_type = format!("{:?}", message.msg_type);
246
247        sqlx::query(
248            r#"
249            INSERT INTO mailbox_messages (timestamp, msg_type, data)
250            VALUES ($1, $2, $3)
251            "#,
252        )
253        .bind(timestamp)
254        .bind(msg_type)
255        .bind(&message.data)
256        .execute(&self.pool)
257        .await?;
258
259        Ok(())
260    }
261
262    async fn save_gps_to_db(&self, gps: Gps) -> Result<(), sqlx::Error> {
263        let timestamp = gps.time.timestamp_millis();
264
265        sqlx::query(
266            r#"
267            INSERT INTO gps (timestamp, lat, long, alt, heading)
268            VALUES ($1, $2, $3, $4, $5)
269            ON CONFLICT (timestamp) DO UPDATE
270            SET lat = EXCLUDED.lat,
271                long = EXCLUDED.long,
272                alt = EXCLUDED.alt,
273                heading = EXCLUDED.heading
274            "#,
275        )
276        .bind(timestamp)
277        .bind(gps.lat)
278        .bind(gps.long)
279        .bind(gps.alt)
280        .bind(gps.heading)
281        .execute(&self.pool)
282        .await?;
283
284        debug!("GPS data saved to database: timestamp={}", timestamp);
285        Ok(())
286    }
287
288    async fn process_queue(&mut self) {
289        while let Some(message) = self.get_next_message() {
290            debug!("Processing message: {:?}", message.msg_type);
291        }
292    }
293
294    fn get_priority(msg_type: &MailboxMessageType) -> u8 {
295        match msg_type {
296            MailboxMessageType::Error => 1,
297            MailboxMessageType::TargetFound => 2,
298            MailboxMessageType::Image => 3,
299            MailboxMessageType::GNCTelemetry => 2,
300            MailboxMessageType::GPTelemetry => 3,
301            MailboxMessageType::Log => 5,
302        }
303    }
304}
305
306#[derive(Clone)]
307pub struct MailboxHandle {
308    sender: mpsc::Sender<MailboxActorMessage>,
309}
310
311#[allow(dead_code)]
312impl MailboxHandle {
313    pub async fn new() -> Self {
314        let (sender, receiver) = mpsc::channel(100);
315        let actor = Mailbox::new(receiver).await;
316
317        tokio::spawn(run_mailbox(actor));
318
319        Self { sender }
320    }
321
322    pub async fn queue_message(&self, message: MailboxMessage) {
323        let msg = MailboxActorMessage::QueueMessage { message };
324        let _ = self.sender.send(msg).await;
325    }
326
327    pub async fn save_gps(&self, gps: Gps) {
328        let msg = MailboxActorMessage::SaveGpsToDb { gps };
329        let _ = self.sender.send(msg).await;
330    }
331
332    pub async fn process_queue(&self) {
333        let msg = MailboxActorMessage::ProcessQueue;
334        let _ = self.sender.send(msg).await;
335    }
336
337    pub async fn get_next_message(&self) -> Option<MailboxMessage> {
338        let (send, recv) = tokio::sync::oneshot::channel();
339        let msg = MailboxActorMessage::GetNextMessage { respond_to: send };
340
341        if self.sender.send(msg).await.is_ok() {
342            recv.await.ok().flatten()
343        } else {
344            None
345        }
346    }
347}
348
349async fn run_mailbox(mut actor: Mailbox) {
350    while let Some(msg) = actor.receiver.recv().await {
351        actor.handle_message(msg).await;
352    }
353}
354
355pub async fn start_gnc_listener(
356    mailbox: MailboxHandle,
357    shutdown_token: tokio_util::sync::CancellationToken,
358) {
359    info!("Starting GNC listener");
360
361    tokio::task::spawn_blocking(move || {
362        if let Err(e) = run_gnc_listener_blocking(mailbox, shutdown_token) {
363            error!("GNC listener error: {}", e);
364        }
365    });
366}
367
368fn run_gnc_listener_blocking(
369    mailbox: MailboxHandle,
370    shutdown_token: tokio_util::sync::CancellationToken,
371) -> Result<(), Box<dyn std::error::Error>> {
372    let context = zmq::Context::new();
373    let subscriber = context.socket(zmq::SUB)?;
374
375    let gnc_port = env::var("GNC_PORT").unwrap_or("5556".to_string());
376    let gnc_addr = format!("tcp://0.0.0.0:{}", gnc_port);
377
378    subscriber.connect(&gnc_addr)?;
379    subscriber.set_subscribe(b"")?; // Subscribe to all messages
380
381    info!("GNC listener connected to {}", gnc_addr);
382
383    subscriber.set_rcvtimeo(1000)?;
384
385    loop {
386        if shutdown_token.is_cancelled() {
387            info!("GNC listener shutting down");
388            break;
389        }
390
391        match subscriber.recv_bytes(0) {
392            Ok(msg_bytes) => {
393                match serde_json::from_slice::<MailboxMessage>(&msg_bytes) {
394                    Ok(message) => {
395                        debug!("[GNC] Received message: {:?}", message.msg_type);
396
397                        // Special handling for GNC Telemetry data
398                        if let MailboxMessageType::GNCTelemetry = message.msg_type
399                            && let Ok(gnc_telem) =
400                                serde_json::from_value::<GNCTelemetry>(message.data.clone())
401                        {
402                            // Convert to GPS for database storage
403                            let gps = Gps {
404                                lat: gnc_telem.lat,
405                                long: gnc_telem.long,
406                                alt: gnc_telem.alt,
407                                heading: gnc_telem.heading,
408                                time: chrono::DateTime::from_timestamp_millis(
409                                    (gnc_telem.time * 1000.0) as i64,
410                                )
411                                .unwrap_or_else(chrono::Utc::now),
412                            };
413
414                            // Save GPS data to database
415                            let mailbox_clone = mailbox.clone();
416                            tokio::runtime::Handle::current().block_on(async {
417                                mailbox_clone.save_gps(gps).await;
418                            });
419                        }
420
421                        // Queue the message
422                        let mailbox_clone = mailbox.clone();
423                        tokio::runtime::Handle::current().block_on(async {
424                            mailbox_clone.queue_message(message).await;
425                        });
426                    }
427                    Err(e) => {
428                        error!("Failed to parse GNC message: {}", e);
429                    }
430                }
431            }
432            Err(e) => {
433                if e == zmq::Error::EAGAIN {
434                    continue;
435                } else {
436                    error!("Failed to receive from GNC: {}", e);
437                    std::thread::sleep(std::time::Duration::from_millis(100));
438                }
439            }
440        }
441    }
442
443    Ok(())
444}
445
446// Logfile streaming server
447pub async fn start_logfile_broadcaster(
448    _mailbox: MailboxHandle,
449    shutdown_token: tokio_util::sync::CancellationToken,
450) {
451    let log_addr = format!(
452        "0.0.0.0:{}",
453        env::var("LOGFILE_STREAM_PORT").unwrap_or("5559".to_string())
454    );
455
456    info!("Starting logfile broadcaster on {}", log_addr);
457
458    tokio::spawn(async move {
459        if let Err(e) = run_logfile_broadcaster(&log_addr, shutdown_token).await {
460            error!("Logfile broadcaster error: {}", e);
461        }
462    });
463}
464
465async fn run_logfile_broadcaster(
466    addr: &str,
467    shutdown_token: tokio_util::sync::CancellationToken,
468) -> Result<(), Box<dyn std::error::Error>> {
469    let listener = TcpListener::bind(addr).await?;
470    info!("Logfile broadcaster listening on {}", addr);
471
472    let logfile_path =
473        PathBuf::from(env::var("LOG_FILE_PATH").unwrap_or("./logs/feonix.log".to_string()));
474
475    loop {
476        tokio::select! {
477            result = listener.accept() => {
478                match result {
479                    Ok((socket, addr)) => {
480                        info!("New logfile subscriber connected: {}", addr);
481                        let path = logfile_path.clone();
482                        let shutdown_token = shutdown_token.clone();
483                        tokio::spawn(async move {
484                            if let Err(e) = stream_logfile(socket, path, shutdown_token).await {
485                                error!("Error streaming logfile to {}: {}", addr, e);
486                            }
487                        });
488                    }
489                    Err(e) => {
490                        error!("Failed to accept connection: {}", e);
491                        break;
492                    }
493                }
494            }
495            _ = shutdown_token.cancelled() => {
496                info!("Logfile broadcaster shutting down");
497                break;
498            }
499        }
500    }
501
502    Ok(())
503}
504
505async fn stream_logfile(
506    mut socket: TcpStream,
507    logfile_path: PathBuf,
508    shutdown_token: tokio_util::sync::CancellationToken,
509) -> Result<(), Box<dyn std::error::Error>> {
510    // Try to open file, create if doesn't exist
511    let file = match File::open(&logfile_path) {
512        Ok(f) => f,
513        Err(_) => {
514            // If file doesn't exist yet, wait and retry
515            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
516            File::open(&logfile_path)?
517        }
518    };
519
520    let mut reader = BufReader::new(file);
521    let mut position = 0u64;
522
523    // Send existing log contents
524    loop {
525        let mut line = String::new();
526        let bytes_read = reader.read_line(&mut line)?;
527
528        if bytes_read == 0 {
529            break;
530        }
531
532        socket.write_all(line.as_bytes()).await?;
533        position += bytes_read as u64;
534    }
535
536    loop {
537        tokio::select! {
538            _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
539                let file = File::open(&logfile_path)?;
540                let mut reader = BufReader::new(file);
541                reader.seek(SeekFrom::Start(position))?;
542
543                loop {
544                    let mut line = String::new();
545                    let bytes_read = reader.read_line(&mut line)?;
546
547                    if bytes_read == 0 {
548                        break;
549                    }
550
551                    socket.write_all(line.as_bytes()).await?;
552                    position += bytes_read as u64;
553                }
554            }
555            _ = shutdown_token.cancelled() => {
556                info!("Logfile streaming cancelled, closing connection");
557                break;
558            }
559        }
560    }
561
562    Ok(())
563}
564
565pub async fn start_telemetry_server(
566    mailbox: MailboxHandle,
567    shutdown_token: tokio_util::sync::CancellationToken,
568) {
569    info!("Telemetry server integrated with mailbox actor");
570    start_gnc_listener(mailbox, shutdown_token).await;
571}