core/
sauron.rs

1//! Sauron Actor is an actor which implements the object detection and localization pipeline.
2
3use crate::database::{create_schema, get_gps, smart_update_database};
4use crate::mailbox::MailboxHandle;
5use crate::utils::get_env;
6
7use chrono::{DateTime, Utc};
8use image::ImageReader;
9use serde_json::json;
10use sqlx::{PgPool, postgres::PgPoolOptions};
11use std::path::Path;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use algorithms::localize;
16use sauron::yolo;
17use types::{
18    cv::{Gps, Model, Object},
19    mailbox::{MailboxMessage, MailboxMessageType},
20};
21
22/// Sauron is an actor which implements the object detection and localization pipeline.
23struct Sauron {
24    receiver: mpsc::Receiver<SauronMessage>,
25    model: Model,
26    pool: PgPool,
27    mailbox: Option<MailboxHandle>,
28}
29
30enum SauronMessage {
31    RunSauron { file_path: String, timestamp: i64 },
32    SetMailbox { mailbox: MailboxHandle },
33}
34
35impl Sauron {
36    /// Create a new instance of Sauron.
37    ///
38    /// This is an internal constructor, to create a Sauron actor, call `SauronHandle::new()`.
39    async fn new(receiver: mpsc::Receiver<SauronMessage>) -> Self {
40        info!("Sauron Actor started");
41        let database_url = get_env(
42            "DATABASE_URL",
43            "postgresql://user:password@localhost:5432/local".to_string(),
44        );
45        info!("Sauron Connecting with url: {database_url}");
46
47        let pool = PgPoolOptions::new()
48            .max_connections(constants::CONNECTIONS_PER_DATABASE_POOL)
49            .connect(&database_url)
50            .await
51            .expect("Failed to create pool.");
52
53        if let Err(e) = create_schema(&pool).await {
54            eprintln!("Failed to create database schema: {}", e);
55        }
56
57        let model = yolo::load_model().expect("Loading Model Failed");
58
59        Sauron {
60            receiver,
61            model,
62            pool,
63            mailbox: None,
64        }
65    }
66
67    /// Process a message received by the Sauron actor.
68    async fn handle_message(&mut self, msg: SauronMessage) {
69        match msg {
70            SauronMessage::RunSauron {
71                file_path,
72                timestamp,
73            } => {
74                self.run_odlc(file_path, timestamp).await.unwrap_or(());
75            }
76            SauronMessage::SetMailbox { mailbox } => {
77                self.mailbox = Some(mailbox);
78                info!("Mailbox connected to Sauron");
79            }
80        }
81    }
82
83    /// Runs the object detection and localization pipeline.
84    ///
85    /// # Arguments
86    ///
87    /// * `file_path` - The path to the image file to localize
88    /// * `timestamp` - The timestamp of the image
89    async fn run_odlc(&self, file_path: String, timestamp: i64) -> anyhow::Result<()> {
90        // Runs Odlc pipeline
91        info!("Running ODLC pipeline for {}", file_path);
92
93        // Run detect
94        let image = ImageReader::open(Path::new(&file_path))?.decode()?;
95        let detections = yolo::detect(&self.model, &image).await?;
96
97        // get gps and interpolate them
98        let (gps1, gps2) = get_gps(&self.pool, &timestamp).await?;
99        let gps = Sauron::interpolate_gps(
100            &gps1.into(),
101            &gps2.into(),
102            DateTime::from_timestamp_millis(timestamp)
103                .expect("Failed to convert image timestamp to datetime"),
104        )
105        .await;
106
107        // localize each detection
108        let objects: Vec<Object> = detections
109            .iter()
110            .map(|detection| localize(gps, detection))
111            .collect();
112
113        // update objects in database
114        smart_update_database(&self.pool, objects.clone(), &file_path).await?;
115
116        // Send target found messages to mailbox
117        if let Some(ref mailbox) = self.mailbox {
118            for object in objects {
119                let message = MailboxMessage {
120                    msg_type: MailboxMessageType::TargetFound,
121                    data: json!({
122                        "lat": object.lat,
123                        "long": object.long,
124                        "class": object.class,
125                        "confidence": object.confidence,
126                        "image": file_path,
127                        "timestamp": timestamp,
128                    }),
129                };
130                mailbox.queue_message(message).await;
131            }
132        }
133
134        Ok(())
135    }
136
137    /// Returns GPS interpolating from the two given GPS points.
138    pub async fn interpolate_gps(a: &Gps, b: &Gps, time: DateTime<Utc>) -> Gps {
139        let difference = time - a.time;
140        let total = b.time - a.time;
141        let scale = (difference.num_milliseconds() as f64) / (total.num_milliseconds() as f64);
142
143        let lat = a.lat + (b.lat - a.lat) * scale;
144        let long = a.lat + (b.lat - a.lat) * scale;
145        let alt = a.alt + (b.alt - a.alt) * scale;
146        let heading = a.heading + (b.heading - a.heading) * scale;
147
148        Gps {
149            lat,
150            long,
151            alt,
152            heading,
153            time,
154        }
155    }
156}
157
158/// Internal function which makes the Sauron Actor start listening for `SauronMessage`.
159async fn run_sauron(mut actor: Sauron) {
160    while let Some(msg) = actor.receiver.recv().await {
161        actor.handle_message(msg).await;
162    }
163}
164
165/// Sauron's handle. Used to interact with the actor.
166#[derive(Clone)]
167pub struct SauronHandle {
168    sender: mpsc::Sender<SauronMessage>,
169}
170
171impl SauronHandle {
172    /// Creates a new Sauron actor, and returns its handle.
173    pub async fn new() -> Self {
174        let (sender, receiver) = mpsc::channel(8);
175        let actor = Sauron::new(receiver).await;
176        tokio::spawn(run_sauron(actor));
177
178        Self { sender }
179    }
180
181    /// Sends a photo to Sauron Actor for processing.
182    pub async fn send_photo(&self, file_path: String, timestamp: i64) {
183        let msg = SauronMessage::RunSauron {
184            file_path,
185            timestamp,
186        };
187
188        let _ = self.sender.send(msg).await;
189    }
190
191    /// Sets the handle for the Mailbox Actor which Sauron will send telemetry to.
192    pub async fn set_mailbox(&self, mailbox: MailboxHandle) {
193        let msg = SauronMessage::SetMailbox { mailbox };
194        let _ = self.sender.send(msg).await;
195    }
196}