1use chrono::Utc;
4use sqlx::{PgPool, postgres::PgPoolOptions};
5use std::cmp::Ordering;
6use std::collections::BinaryHeap;
7use std::env;
8use std::fs::File;
9use std::io::{BufRead, BufReader, Seek, SeekFrom};
10use std::path::PathBuf;
11use tokio::io::AsyncWriteExt;
12use tokio::net::{TcpListener, TcpStream};
13use tokio::sync::mpsc;
14use tracing::{debug, error, info};
15use types::{
16 cv::Gps,
17 mailbox::{GNCTelemetry, MailboxMessage, MailboxMessageType},
18};
19
20#[derive(Debug, Clone, Eq, PartialEq)]
22struct QueueItem {
23 priority: u8,
24 message: MailboxMessage,
25}
26
27impl Ord for QueueItem {
28 fn cmp(&self, other: &Self) -> Ordering {
29 other.priority.cmp(&self.priority)
30 }
31}
32
33impl PartialOrd for QueueItem {
34 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
35 Some(self.cmp(other))
36 }
37}
38
39#[allow(dead_code)]
41enum MailboxActorMessage {
42 QueueMessage {
43 message: MailboxMessage,
44 },
45 ProcessQueue,
46 SaveGpsToDb {
47 gps: Gps,
48 },
49 GetNextMessage {
50 respond_to: tokio::sync::oneshot::Sender<Option<MailboxMessage>>,
51 },
52}
53
54struct Mailbox {
56 receiver: mpsc::Receiver<MailboxActorMessage>,
57 queue: BinaryHeap<QueueItem>,
58 pool: PgPool,
59 telemetry_sender: mpsc::Sender<String>,
60 log_sender: mpsc::Sender<String>,
61}
62
63impl Mailbox {
64 async fn new(receiver: mpsc::Receiver<MailboxActorMessage>) -> Self {
65 info!("Mailbox Actor Started");
66
67 let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
68 debug!("Mailbox connecting to database: {}", database_url);
69
70 let pool = PgPoolOptions::new()
71 .max_connections(constants::CONNECTIONS_PER_DATABASE_POOL)
72 .connect(&database_url)
73 .await
74 .expect("Failed to create pool for Mailbox");
75
76 if let Err(e) = Self::create_mailbox_schema(&pool).await {
77 error!("Failed to create mailbox schema: {}", e);
78 }
79
80 let (telemetry_sender, telemetry_receiver) = mpsc::channel::<String>(100);
82
83 let (log_sender, log_receiver) = mpsc::channel::<String>(100);
85
86 tokio::task::spawn_blocking(move || {
88 Self::run_telemetry_broadcaster(telemetry_receiver);
89 });
90
91 tokio::task::spawn_blocking(move || {
93 Self::run_log_broadcaster(log_receiver);
94 });
95
96 Mailbox {
97 receiver,
98 queue: BinaryHeap::new(),
99 pool,
100 telemetry_sender,
101 log_sender,
102 }
103 }
104
105 fn run_telemetry_broadcaster(mut receiver: mpsc::Receiver<String>) {
106 let context = zmq::Context::new();
107 let publisher = context
108 .socket(zmq::PUB)
109 .expect("Failed to create ZMQ PUB socket for telemetry");
110
111 let telemetry_port = env::var("TELEMETRY_PORT").unwrap_or("5557".to_string());
112 let telemetry_addr = format!("tcp://0.0.0.0:{}", telemetry_port);
113
114 publisher
115 .bind(&telemetry_addr)
116 .expect("Failed to bind telemetry publisher");
117 info!(
118 "General-Purpose Telemetry port opened on {}",
119 telemetry_addr
120 );
121
122 while let Some(msg) = receiver.blocking_recv() {
124 if let Err(e) = publisher.send(&msg, 0) {
125 error!("Failed to broadcast telemetry: {}", e);
126 }
127 }
128 }
129
130 fn run_log_broadcaster(mut receiver: mpsc::Receiver<String>) {
131 let context = zmq::Context::new();
132 let publisher = context
133 .socket(zmq::PUB)
134 .expect("Failed to create ZMQ PUB socket for logs");
135
136 let logs_port = env::var("LOGS_PORT").unwrap_or("5558".to_string());
137 let logs_addr = format!("tcp://0.0.0.0:{}", logs_port);
138
139 publisher
140 .bind(&logs_addr)
141 .expect("Failed to bind log publisher");
142 info!("Log broadcasting port opened on {}", logs_addr);
143
144 while let Some(msg) = receiver.blocking_recv() {
146 if let Err(e) = publisher.send(&msg, 0) {
147 error!("Failed to broadcast log: {}", e);
148 }
149 }
150 }
151
152 async fn create_mailbox_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
153 sqlx::query(
154 r#"
155 CREATE TABLE IF NOT EXISTS mailbox_messages (
156 id SERIAL PRIMARY KEY,
157 timestamp BIGINT NOT NULL,
158 msg_type TEXT NOT NULL,
159 data JSONB NOT NULL,
160 processed BOOLEAN DEFAULT FALSE,
161 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
162 );
163 "#,
164 )
165 .execute(pool)
166 .await?;
167
168 sqlx::query(
169 r#"
170 CREATE INDEX IF NOT EXISTS idx_mailbox_timestamp ON mailbox_messages(timestamp);
171 "#,
172 )
173 .execute(pool)
174 .await?;
175
176 sqlx::query(
177 r#"
178 CREATE INDEX IF NOT EXISTS idx_mailbox_processed ON mailbox_messages(processed);
179 "#,
180 )
181 .execute(pool)
182 .await?;
183
184 info!("Mailbox schema created successfully");
185 Ok(())
186 }
187
188 async fn handle_message(&mut self, msg: MailboxActorMessage) {
189 match msg {
190 MailboxActorMessage::QueueMessage { message } => {
191 self.add_message(message).await;
192 }
193 MailboxActorMessage::ProcessQueue => {
194 self.process_queue().await;
195 }
196 MailboxActorMessage::SaveGpsToDb { gps } => {
197 if let Err(e) = self.save_gps_to_db(gps).await {
198 error!("Failed to save GPS to database: {}", e);
199 }
200 }
201 MailboxActorMessage::GetNextMessage { respond_to } => {
202 let next_msg = self.get_next_message();
203 let _ = respond_to.send(next_msg);
204 }
205 }
206 }
207
208 async fn add_message(&mut self, message: MailboxMessage) {
209 let priority = Self::get_priority(&message.msg_type);
210 debug!(
211 "Queuing message: {:?} with priority {}",
212 message.msg_type, priority
213 );
214
215 if matches!(message.msg_type, MailboxMessageType::GPTelemetry)
217 && let Ok(msg_json) = serde_json::to_string(&message)
218 {
219 let _ = self.telemetry_sender.send(msg_json).await;
220 }
221
222 if matches!(message.msg_type, MailboxMessageType::Log)
224 && let Ok(msg_json) = serde_json::to_string(&message)
225 {
226 let _ = self.log_sender.send(msg_json).await;
227 }
228
229 if let Err(e) = self.save_message_to_db(&message).await {
231 error!("Failed to save message to database: {}", e);
232 }
233
234 self.queue.push(QueueItem { priority, message });
235 }
236
237 fn get_next_message(&mut self) -> Option<MailboxMessage> {
238 self.queue.pop().map(|item| item.message)
239 }
240
241 async fn save_message_to_db(&self, message: &MailboxMessage) -> Result<(), sqlx::Error> {
242 let timestamp = Utc::now().timestamp_millis();
243 let msg_type = format!("{:?}", message.msg_type);
244
245 sqlx::query(
246 r#"
247 INSERT INTO mailbox_messages (timestamp, msg_type, data)
248 VALUES ($1, $2, $3)
249 "#,
250 )
251 .bind(timestamp)
252 .bind(msg_type)
253 .bind(&message.data)
254 .execute(&self.pool)
255 .await?;
256
257 Ok(())
258 }
259
260 async fn save_gps_to_db(&self, gps: Gps) -> Result<(), sqlx::Error> {
261 let timestamp = gps.time.timestamp_millis();
262
263 sqlx::query(
264 r#"
265 INSERT INTO gps (timestamp, lat, long, alt, heading)
266 VALUES ($1, $2, $3, $4, $5)
267 ON CONFLICT (timestamp) DO UPDATE
268 SET lat = EXCLUDED.lat,
269 long = EXCLUDED.long,
270 alt = EXCLUDED.alt,
271 heading = EXCLUDED.heading
272 "#,
273 )
274 .bind(timestamp)
275 .bind(gps.lat)
276 .bind(gps.long)
277 .bind(gps.alt)
278 .bind(gps.heading)
279 .execute(&self.pool)
280 .await?;
281
282 debug!("GPS data saved to database: timestamp={}", timestamp);
283 Ok(())
284 }
285
286 async fn process_queue(&mut self) {
287 while let Some(message) = self.get_next_message() {
288 debug!("Processing message: {:?}", message.msg_type);
289 }
290 }
291
292 fn get_priority(msg_type: &MailboxMessageType) -> u8 {
293 match msg_type {
294 MailboxMessageType::Error => 1,
295 MailboxMessageType::TargetFound => 2,
296 MailboxMessageType::Image => 3,
297 MailboxMessageType::GNCTelemetry => 2,
298 MailboxMessageType::GPTelemetry => 3,
299 MailboxMessageType::Log => 5,
300 }
301 }
302}
303
304#[derive(Clone)]
305pub struct MailboxHandle {
306 sender: mpsc::Sender<MailboxActorMessage>,
307}
308
309#[allow(dead_code)]
310impl MailboxHandle {
311 pub async fn new() -> Self {
312 let (sender, receiver) = mpsc::channel(100);
313 let actor = Mailbox::new(receiver).await;
314
315 tokio::spawn(run_mailbox(actor));
316
317 Self { sender }
318 }
319
320 pub async fn queue_message(&self, message: MailboxMessage) {
321 let msg = MailboxActorMessage::QueueMessage { message };
322 let _ = self.sender.send(msg).await;
323 }
324
325 pub async fn save_gps(&self, gps: Gps) {
326 let msg = MailboxActorMessage::SaveGpsToDb { gps };
327 let _ = self.sender.send(msg).await;
328 }
329
330 pub async fn process_queue(&self) {
331 let msg = MailboxActorMessage::ProcessQueue;
332 let _ = self.sender.send(msg).await;
333 }
334
335 pub async fn get_next_message(&self) -> Option<MailboxMessage> {
336 let (send, recv) = tokio::sync::oneshot::channel();
337 let msg = MailboxActorMessage::GetNextMessage { respond_to: send };
338
339 if self.sender.send(msg).await.is_ok() {
340 recv.await.ok().flatten()
341 } else {
342 None
343 }
344 }
345}
346
347async fn run_mailbox(mut actor: Mailbox) {
348 while let Some(msg) = actor.receiver.recv().await {
349 actor.handle_message(msg).await;
350 }
351}
352
353pub async fn start_gnc_listener(
354 mailbox: MailboxHandle,
355 shutdown_token: tokio_util::sync::CancellationToken,
356) {
357 info!("Starting GNC listener");
358
359 tokio::task::spawn_blocking(move || {
360 if let Err(e) = run_gnc_listener_blocking(mailbox, shutdown_token) {
361 error!("GNC listener error: {}", e);
362 }
363 });
364}
365
366fn run_gnc_listener_blocking(
367 mailbox: MailboxHandle,
368 shutdown_token: tokio_util::sync::CancellationToken,
369) -> Result<(), Box<dyn std::error::Error>> {
370 let context = zmq::Context::new();
371 let subscriber = context.socket(zmq::SUB)?;
372
373 let gnc_port = env::var("GNC_PORT").unwrap_or("5556".to_string());
374 let gnc_addr = format!("tcp://0.0.0.0:{}", gnc_port);
375
376 subscriber.connect(&gnc_addr)?;
377 subscriber.set_subscribe(b"")?; info!("GNC listener connected to {}", gnc_addr);
380
381 subscriber.set_rcvtimeo(1000)?;
382
383 loop {
384 if shutdown_token.is_cancelled() {
385 info!("GNC listener shutting down");
386 break;
387 }
388
389 match subscriber.recv_bytes(0) {
390 Ok(msg_bytes) => {
391 match serde_json::from_slice::<MailboxMessage>(&msg_bytes) {
392 Ok(message) => {
393 debug!("[GNC] Received message: {:?}", message.msg_type);
394
395 if let MailboxMessageType::GNCTelemetry = message.msg_type
397 && let Ok(gnc_telem) =
398 serde_json::from_value::<GNCTelemetry>(message.data.clone())
399 {
400 let gps = Gps {
402 lat: gnc_telem.lat,
403 long: gnc_telem.long,
404 alt: gnc_telem.alt,
405 heading: gnc_telem.heading,
406 time: chrono::DateTime::from_timestamp_millis(
407 (gnc_telem.time * 1000.0) as i64,
408 )
409 .unwrap_or_else(chrono::Utc::now),
410 };
411
412 let mailbox_clone = mailbox.clone();
414 tokio::runtime::Handle::current().block_on(async {
415 mailbox_clone.save_gps(gps).await;
416 });
417 }
418
419 let mailbox_clone = mailbox.clone();
421 tokio::runtime::Handle::current().block_on(async {
422 mailbox_clone.queue_message(message).await;
423 });
424 }
425 Err(e) => {
426 error!("Failed to parse GNC message: {}", e);
427 }
428 }
429 }
430 Err(e) => {
431 if e == zmq::Error::EAGAIN {
432 continue;
433 } else {
434 error!("Failed to receive from GNC: {}", e);
435 std::thread::sleep(std::time::Duration::from_millis(100));
436 }
437 }
438 }
439 }
440
441 Ok(())
442}
443
444pub async fn start_logfile_broadcaster(
446 _mailbox: MailboxHandle,
447 shutdown_token: tokio_util::sync::CancellationToken,
448) {
449 let log_addr = format!(
450 "0.0.0.0:{}",
451 env::var("LOGFILE_STREAM_PORT").unwrap_or("5559".to_string())
452 );
453
454 info!("Starting logfile broadcaster on {}", log_addr);
455
456 tokio::spawn(async move {
457 if let Err(e) = run_logfile_broadcaster(&log_addr, shutdown_token).await {
458 error!("Logfile broadcaster error: {}", e);
459 }
460 });
461}
462
463async fn run_logfile_broadcaster(
464 addr: &str,
465 shutdown_token: tokio_util::sync::CancellationToken,
466) -> Result<(), Box<dyn std::error::Error>> {
467 let listener = TcpListener::bind(addr).await?;
468 info!("Logfile broadcaster listening on {}", addr);
469
470 let logfile_path =
471 PathBuf::from(env::var("LOG_FILE_PATH").unwrap_or("./logs/feonix.log".to_string()));
472
473 loop {
474 tokio::select! {
475 result = listener.accept() => {
476 match result {
477 Ok((socket, addr)) => {
478 info!("New logfile subscriber connected: {}", addr);
479 let path = logfile_path.clone();
480 let shutdown_token = shutdown_token.clone();
481 tokio::spawn(async move {
482 if let Err(e) = stream_logfile(socket, path, shutdown_token).await {
483 error!("Error streaming logfile to {}: {}", addr, e);
484 }
485 });
486 }
487 Err(e) => {
488 error!("Failed to accept connection: {}", e);
489 break;
490 }
491 }
492 }
493 _ = shutdown_token.cancelled() => {
494 info!("Logfile broadcaster shutting down");
495 break;
496 }
497 }
498 }
499
500 Ok(())
501}
502
503async fn stream_logfile(
504 mut socket: TcpStream,
505 logfile_path: PathBuf,
506 shutdown_token: tokio_util::sync::CancellationToken,
507) -> Result<(), Box<dyn std::error::Error>> {
508 let file = match File::open(&logfile_path) {
510 Ok(f) => f,
511 Err(_) => {
512 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
514 File::open(&logfile_path)?
515 }
516 };
517
518 let mut reader = BufReader::new(file);
519 let mut position = 0u64;
520
521 loop {
523 let mut line = String::new();
524 let bytes_read = reader.read_line(&mut line)?;
525
526 if bytes_read == 0 {
527 break;
528 }
529
530 socket.write_all(line.as_bytes()).await?;
531 position += bytes_read as u64;
532 }
533
534 loop {
535 tokio::select! {
536 _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
537 let file = File::open(&logfile_path)?;
538 let mut reader = BufReader::new(file);
539 reader.seek(SeekFrom::Start(position))?;
540
541 loop {
542 let mut line = String::new();
543 let bytes_read = reader.read_line(&mut line)?;
544
545 if bytes_read == 0 {
546 break;
547 }
548
549 socket.write_all(line.as_bytes()).await?;
550 position += bytes_read as u64;
551 }
552 }
553 _ = shutdown_token.cancelled() => {
554 info!("Logfile streaming cancelled, closing connection");
555 break;
556 }
557 }
558 }
559
560 Ok(())
561}
562
563pub async fn start_telemetry_server(
564 mailbox: MailboxHandle,
565 shutdown_token: tokio_util::sync::CancellationToken,
566) {
567 info!("Telemetry server integrated with mailbox actor");
568 start_gnc_listener(mailbox, shutdown_token).await;
569}