1use chrono::Utc;
6use sqlx::{PgPool, postgres::PgPoolOptions};
7use std::cmp::Ordering;
8use std::collections::BinaryHeap;
9use std::env;
10use std::fs::File;
11use std::io::{BufRead, BufReader, Seek, SeekFrom};
12use std::path::PathBuf;
13use tokio::io::AsyncWriteExt;
14use tokio::net::{TcpListener, TcpStream};
15use tokio::sync::mpsc;
16use tracing::{debug, error, info};
17use types::{
18 cv::Gps,
19 mailbox::{GNCTelemetry, MailboxMessage, MailboxMessageType},
20};
21
22#[derive(Debug, Clone, Eq, PartialEq)]
24struct QueueItem {
25 priority: u8,
26 message: MailboxMessage,
27}
28
29impl Ord for QueueItem {
30 fn cmp(&self, other: &Self) -> Ordering {
31 other.priority.cmp(&self.priority)
32 }
33}
34
35impl PartialOrd for QueueItem {
36 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
37 Some(self.cmp(other))
38 }
39}
40
41#[allow(dead_code)]
43enum MailboxActorMessage {
44 QueueMessage {
45 message: MailboxMessage,
46 },
47 ProcessQueue,
48 SaveGpsToDb {
49 gps: Gps,
50 },
51 GetNextMessage {
52 respond_to: tokio::sync::oneshot::Sender<Option<MailboxMessage>>,
53 },
54}
55
56struct Mailbox {
58 receiver: mpsc::Receiver<MailboxActorMessage>,
59 queue: BinaryHeap<QueueItem>,
60 pool: PgPool,
61 telemetry_sender: mpsc::Sender<String>,
62 log_sender: mpsc::Sender<String>,
63}
64
65impl Mailbox {
66 async fn new(receiver: mpsc::Receiver<MailboxActorMessage>) -> Self {
67 info!("Mailbox Actor Started");
68
69 let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
70 debug!("Mailbox connecting to database: {}", database_url);
71
72 let pool = PgPoolOptions::new()
73 .max_connections(5)
74 .connect(&database_url)
75 .await
76 .expect("Failed to create pool for Mailbox");
77
78 if let Err(e) = Self::create_mailbox_schema(&pool).await {
79 error!("Failed to create mailbox schema: {}", e);
80 }
81
82 let (telemetry_sender, telemetry_receiver) = mpsc::channel::<String>(100);
84
85 let (log_sender, log_receiver) = mpsc::channel::<String>(100);
87
88 tokio::task::spawn_blocking(move || {
90 Self::run_telemetry_broadcaster(telemetry_receiver);
91 });
92
93 tokio::task::spawn_blocking(move || {
95 Self::run_log_broadcaster(log_receiver);
96 });
97
98 Mailbox {
99 receiver,
100 queue: BinaryHeap::new(),
101 pool,
102 telemetry_sender,
103 log_sender,
104 }
105 }
106
107 fn run_telemetry_broadcaster(mut receiver: mpsc::Receiver<String>) {
108 let context = zmq::Context::new();
109 let publisher = context
110 .socket(zmq::PUB)
111 .expect("Failed to create ZMQ PUB socket for telemetry");
112
113 let telemetry_port = env::var("TELEMETRY_PORT").unwrap_or("5557".to_string());
114 let telemetry_addr = format!("tcp://0.0.0.0:{}", telemetry_port);
115
116 publisher
117 .bind(&telemetry_addr)
118 .expect("Failed to bind telemetry publisher");
119 info!(
120 "General-Purpose Telemetry port opened on {}",
121 telemetry_addr
122 );
123
124 while let Some(msg) = receiver.blocking_recv() {
126 if let Err(e) = publisher.send(&msg, 0) {
127 error!("Failed to broadcast telemetry: {}", e);
128 }
129 }
130 }
131
132 fn run_log_broadcaster(mut receiver: mpsc::Receiver<String>) {
133 let context = zmq::Context::new();
134 let publisher = context
135 .socket(zmq::PUB)
136 .expect("Failed to create ZMQ PUB socket for logs");
137
138 let logs_port = env::var("LOGS_PORT").unwrap_or("5558".to_string());
139 let logs_addr = format!("tcp://0.0.0.0:{}", logs_port);
140
141 publisher
142 .bind(&logs_addr)
143 .expect("Failed to bind log publisher");
144 info!("Log broadcasting port opened on {}", logs_addr);
145
146 while let Some(msg) = receiver.blocking_recv() {
148 if let Err(e) = publisher.send(&msg, 0) {
149 error!("Failed to broadcast log: {}", e);
150 }
151 }
152 }
153
154 async fn create_mailbox_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
155 sqlx::query(
156 r#"
157 CREATE TABLE IF NOT EXISTS mailbox_messages (
158 id SERIAL PRIMARY KEY,
159 timestamp BIGINT NOT NULL,
160 msg_type TEXT NOT NULL,
161 data JSONB NOT NULL,
162 processed BOOLEAN DEFAULT FALSE,
163 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
164 );
165 "#,
166 )
167 .execute(pool)
168 .await?;
169
170 sqlx::query(
171 r#"
172 CREATE INDEX IF NOT EXISTS idx_mailbox_timestamp ON mailbox_messages(timestamp);
173 "#,
174 )
175 .execute(pool)
176 .await?;
177
178 sqlx::query(
179 r#"
180 CREATE INDEX IF NOT EXISTS idx_mailbox_processed ON mailbox_messages(processed);
181 "#,
182 )
183 .execute(pool)
184 .await?;
185
186 info!("Mailbox schema created successfully");
187 Ok(())
188 }
189
190 async fn handle_message(&mut self, msg: MailboxActorMessage) {
191 match msg {
192 MailboxActorMessage::QueueMessage { message } => {
193 self.add_message(message).await;
194 }
195 MailboxActorMessage::ProcessQueue => {
196 self.process_queue().await;
197 }
198 MailboxActorMessage::SaveGpsToDb { gps } => {
199 if let Err(e) = self.save_gps_to_db(gps).await {
200 error!("Failed to save GPS to database: {}", e);
201 }
202 }
203 MailboxActorMessage::GetNextMessage { respond_to } => {
204 let next_msg = self.get_next_message();
205 let _ = respond_to.send(next_msg);
206 }
207 }
208 }
209
210 async fn add_message(&mut self, message: MailboxMessage) {
211 let priority = Self::get_priority(&message.msg_type);
212 debug!(
213 "Queuing message: {:?} with priority {}",
214 message.msg_type, priority
215 );
216
217 if matches!(message.msg_type, MailboxMessageType::GPTelemetry)
219 && let Ok(msg_json) = serde_json::to_string(&message)
220 {
221 let _ = self.telemetry_sender.send(msg_json).await;
222 }
223
224 if matches!(message.msg_type, MailboxMessageType::Log)
226 && let Ok(msg_json) = serde_json::to_string(&message)
227 {
228 let _ = self.log_sender.send(msg_json).await;
229 }
230
231 if let Err(e) = self.save_message_to_db(&message).await {
233 error!("Failed to save message to database: {}", e);
234 }
235
236 self.queue.push(QueueItem { priority, message });
237 }
238
239 fn get_next_message(&mut self) -> Option<MailboxMessage> {
240 self.queue.pop().map(|item| item.message)
241 }
242
243 async fn save_message_to_db(&self, message: &MailboxMessage) -> Result<(), sqlx::Error> {
244 let timestamp = Utc::now().timestamp_millis();
245 let msg_type = format!("{:?}", message.msg_type);
246
247 sqlx::query(
248 r#"
249 INSERT INTO mailbox_messages (timestamp, msg_type, data)
250 VALUES ($1, $2, $3)
251 "#,
252 )
253 .bind(timestamp)
254 .bind(msg_type)
255 .bind(&message.data)
256 .execute(&self.pool)
257 .await?;
258
259 Ok(())
260 }
261
262 async fn save_gps_to_db(&self, gps: Gps) -> Result<(), sqlx::Error> {
263 let timestamp = gps.time.timestamp_millis();
264
265 sqlx::query(
266 r#"
267 INSERT INTO gps (timestamp, lat, long, alt, heading)
268 VALUES ($1, $2, $3, $4, $5)
269 ON CONFLICT (timestamp) DO UPDATE
270 SET lat = EXCLUDED.lat,
271 long = EXCLUDED.long,
272 alt = EXCLUDED.alt,
273 heading = EXCLUDED.heading
274 "#,
275 )
276 .bind(timestamp)
277 .bind(gps.lat)
278 .bind(gps.long)
279 .bind(gps.alt)
280 .bind(gps.heading)
281 .execute(&self.pool)
282 .await?;
283
284 debug!("GPS data saved to database: timestamp={}", timestamp);
285 Ok(())
286 }
287
288 async fn process_queue(&mut self) {
289 while let Some(message) = self.get_next_message() {
290 debug!("Processing message: {:?}", message.msg_type);
291 }
292 }
293
294 fn get_priority(msg_type: &MailboxMessageType) -> u8 {
295 match msg_type {
296 MailboxMessageType::Error => 1,
297 MailboxMessageType::TargetFound => 2,
298 MailboxMessageType::Image => 3,
299 MailboxMessageType::GNCTelemetry => 2,
300 MailboxMessageType::GPTelemetry => 3,
301 MailboxMessageType::Log => 5,
302 }
303 }
304}
305
306#[derive(Clone)]
307pub struct MailboxHandle {
308 sender: mpsc::Sender<MailboxActorMessage>,
309}
310
311#[allow(dead_code)]
312impl MailboxHandle {
313 pub async fn new() -> Self {
314 let (sender, receiver) = mpsc::channel(100);
315 let actor = Mailbox::new(receiver).await;
316
317 tokio::spawn(run_mailbox(actor));
318
319 Self { sender }
320 }
321
322 pub async fn queue_message(&self, message: MailboxMessage) {
323 let msg = MailboxActorMessage::QueueMessage { message };
324 let _ = self.sender.send(msg).await;
325 }
326
327 pub async fn save_gps(&self, gps: Gps) {
328 let msg = MailboxActorMessage::SaveGpsToDb { gps };
329 let _ = self.sender.send(msg).await;
330 }
331
332 pub async fn process_queue(&self) {
333 let msg = MailboxActorMessage::ProcessQueue;
334 let _ = self.sender.send(msg).await;
335 }
336
337 pub async fn get_next_message(&self) -> Option<MailboxMessage> {
338 let (send, recv) = tokio::sync::oneshot::channel();
339 let msg = MailboxActorMessage::GetNextMessage { respond_to: send };
340
341 if self.sender.send(msg).await.is_ok() {
342 recv.await.ok().flatten()
343 } else {
344 None
345 }
346 }
347}
348
349async fn run_mailbox(mut actor: Mailbox) {
350 while let Some(msg) = actor.receiver.recv().await {
351 actor.handle_message(msg).await;
352 }
353}
354
355pub async fn start_gnc_listener(
356 mailbox: MailboxHandle,
357 shutdown_token: tokio_util::sync::CancellationToken,
358) {
359 info!("Starting GNC listener");
360
361 tokio::task::spawn_blocking(move || {
362 if let Err(e) = run_gnc_listener_blocking(mailbox, shutdown_token) {
363 error!("GNC listener error: {}", e);
364 }
365 });
366}
367
368fn run_gnc_listener_blocking(
369 mailbox: MailboxHandle,
370 shutdown_token: tokio_util::sync::CancellationToken,
371) -> Result<(), Box<dyn std::error::Error>> {
372 let context = zmq::Context::new();
373 let subscriber = context.socket(zmq::SUB)?;
374
375 let gnc_port = env::var("GNC_PORT").unwrap_or("5556".to_string());
376 let gnc_addr = format!("tcp://0.0.0.0:{}", gnc_port);
377
378 subscriber.connect(&gnc_addr)?;
379 subscriber.set_subscribe(b"")?; info!("GNC listener connected to {}", gnc_addr);
382
383 subscriber.set_rcvtimeo(1000)?;
384
385 loop {
386 if shutdown_token.is_cancelled() {
387 info!("GNC listener shutting down");
388 break;
389 }
390
391 match subscriber.recv_bytes(0) {
392 Ok(msg_bytes) => {
393 match serde_json::from_slice::<MailboxMessage>(&msg_bytes) {
394 Ok(message) => {
395 debug!("[GNC] Received message: {:?}", message.msg_type);
396
397 if let MailboxMessageType::GNCTelemetry = message.msg_type
399 && let Ok(gnc_telem) =
400 serde_json::from_value::<GNCTelemetry>(message.data.clone())
401 {
402 let gps = Gps {
404 lat: gnc_telem.lat,
405 long: gnc_telem.long,
406 alt: gnc_telem.alt,
407 heading: gnc_telem.heading,
408 time: chrono::DateTime::from_timestamp_millis(
409 (gnc_telem.time * 1000.0) as i64,
410 )
411 .unwrap_or_else(chrono::Utc::now),
412 };
413
414 let mailbox_clone = mailbox.clone();
416 tokio::runtime::Handle::current().block_on(async {
417 mailbox_clone.save_gps(gps).await;
418 });
419 }
420
421 let mailbox_clone = mailbox.clone();
423 tokio::runtime::Handle::current().block_on(async {
424 mailbox_clone.queue_message(message).await;
425 });
426 }
427 Err(e) => {
428 error!("Failed to parse GNC message: {}", e);
429 }
430 }
431 }
432 Err(e) => {
433 if e == zmq::Error::EAGAIN {
434 continue;
435 } else {
436 error!("Failed to receive from GNC: {}", e);
437 std::thread::sleep(std::time::Duration::from_millis(100));
438 }
439 }
440 }
441 }
442
443 Ok(())
444}
445
446pub async fn start_logfile_broadcaster(
448 _mailbox: MailboxHandle,
449 shutdown_token: tokio_util::sync::CancellationToken,
450) {
451 let log_addr = format!(
452 "0.0.0.0:{}",
453 env::var("LOGFILE_STREAM_PORT").unwrap_or("5559".to_string())
454 );
455
456 info!("Starting logfile broadcaster on {}", log_addr);
457
458 tokio::spawn(async move {
459 if let Err(e) = run_logfile_broadcaster(&log_addr, shutdown_token).await {
460 error!("Logfile broadcaster error: {}", e);
461 }
462 });
463}
464
465async fn run_logfile_broadcaster(
466 addr: &str,
467 shutdown_token: tokio_util::sync::CancellationToken,
468) -> Result<(), Box<dyn std::error::Error>> {
469 let listener = TcpListener::bind(addr).await?;
470 info!("Logfile broadcaster listening on {}", addr);
471
472 let logfile_path =
473 PathBuf::from(env::var("LOG_FILE_PATH").unwrap_or("./logs/feonix.log".to_string()));
474
475 loop {
476 tokio::select! {
477 result = listener.accept() => {
478 match result {
479 Ok((socket, addr)) => {
480 info!("New logfile subscriber connected: {}", addr);
481 let path = logfile_path.clone();
482 let shutdown_token = shutdown_token.clone();
483 tokio::spawn(async move {
484 if let Err(e) = stream_logfile(socket, path, shutdown_token).await {
485 error!("Error streaming logfile to {}: {}", addr, e);
486 }
487 });
488 }
489 Err(e) => {
490 error!("Failed to accept connection: {}", e);
491 break;
492 }
493 }
494 }
495 _ = shutdown_token.cancelled() => {
496 info!("Logfile broadcaster shutting down");
497 break;
498 }
499 }
500 }
501
502 Ok(())
503}
504
505async fn stream_logfile(
506 mut socket: TcpStream,
507 logfile_path: PathBuf,
508 shutdown_token: tokio_util::sync::CancellationToken,
509) -> Result<(), Box<dyn std::error::Error>> {
510 let file = match File::open(&logfile_path) {
512 Ok(f) => f,
513 Err(_) => {
514 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
516 File::open(&logfile_path)?
517 }
518 };
519
520 let mut reader = BufReader::new(file);
521 let mut position = 0u64;
522
523 loop {
525 let mut line = String::new();
526 let bytes_read = reader.read_line(&mut line)?;
527
528 if bytes_read == 0 {
529 break;
530 }
531
532 socket.write_all(line.as_bytes()).await?;
533 position += bytes_read as u64;
534 }
535
536 loop {
537 tokio::select! {
538 _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
539 let file = File::open(&logfile_path)?;
540 let mut reader = BufReader::new(file);
541 reader.seek(SeekFrom::Start(position))?;
542
543 loop {
544 let mut line = String::new();
545 let bytes_read = reader.read_line(&mut line)?;
546
547 if bytes_read == 0 {
548 break;
549 }
550
551 socket.write_all(line.as_bytes()).await?;
552 position += bytes_read as u64;
553 }
554 }
555 _ = shutdown_token.cancelled() => {
556 info!("Logfile streaming cancelled, closing connection");
557 break;
558 }
559 }
560 }
561
562 Ok(())
563}
564
565pub async fn start_telemetry_server(
566 mailbox: MailboxHandle,
567 shutdown_token: tokio_util::sync::CancellationToken,
568) {
569 info!("Telemetry server integrated with mailbox actor");
570 start_gnc_listener(mailbox, shutdown_token).await;
571}