core/
sauron.rs

1//! Sauron is an actor which implements the object detection and localization pipeline.
2
3use crate::database::{create_schema, get_gps, smart_update_database};
4use crate::mailbox::MailboxHandle;
5use crate::utils::get_env;
6
7use chrono::{DateTime, Utc};
8use image::ImageReader;
9use serde_json::json;
10use sqlx::{PgPool, postgres::PgPoolOptions};
11use std::path::Path;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use algorithms::localize;
16use sauron::yolo;
17use types::{
18    cv::{Gps, Model, Object},
19    mailbox::{MailboxMessage, MailboxMessageType},
20};
21
22/// Sauron is an actor which implements the object detection and localization pipeline.
23struct Sauron {
24    receiver: mpsc::Receiver<SauronMessage>,
25    model: Model,
26    pool: PgPool,
27    mailbox: Option<MailboxHandle>,
28}
29
30#[allow(dead_code)]
31enum SauronMessage {
32    RunSauron { file_path: String, timestamp: i64 },
33    SetMailbox { mailbox: MailboxHandle },
34}
35
36impl Sauron {
37    /// Create a new instance of Sauron.
38    ///
39    /// To create a Sauron actor, call `SauronHandle::new()`.
40    async fn new(receiver: mpsc::Receiver<SauronMessage>) -> Self {
41        info!("Sauron Started");
42        let database_url = get_env(
43            "DATABASE_URL",
44            "postgresql://user:password@localhost:5432/local".to_string(),
45        );
46        info!("Sauron Connecting with url: {database_url}");
47
48        let pool = PgPoolOptions::new()
49            .max_connections(5)
50            .connect(&database_url)
51            .await
52            .expect("Failed to create pool.");
53
54        if let Err(e) = create_schema(&pool).await {
55            eprintln!("Failed to create database schema: {}", e);
56        }
57
58        let model = yolo::load_model().expect("Loading Model Failed");
59
60        Sauron {
61            receiver,
62            model,
63            pool,
64            mailbox: None,
65        }
66    }
67
68    /// Process a message received by the Sauron actor.
69    async fn handle_message(&mut self, msg: SauronMessage) {
70        match msg {
71            SauronMessage::RunSauron {
72                file_path,
73                timestamp,
74            } => {
75                self.run_odlc(file_path, timestamp).await.unwrap_or(());
76            }
77            SauronMessage::SetMailbox { mailbox } => {
78                self.mailbox = Some(mailbox);
79                info!("Mailbox connected to Sauron");
80            }
81        }
82    }
83
84    /// Runs the object detection and localization pipeline.
85    ///
86    /// # Arguments
87    ///
88    /// * `file_path` - The path to the image file to localize
89    /// * `timestamp` - The timestamp of the image
90    async fn run_odlc(&self, file_path: String, timestamp: i64) -> anyhow::Result<()> {
91        //Runs Odlc pipeline
92        info!("Running ODLC pipeline for {}", file_path);
93
94        // Run detect
95        let image = ImageReader::open(Path::new(&file_path))?.decode()?;
96        let detections = yolo::detect(&self.model, &image).await?;
97
98        // get gps and interpolate them
99        let (gps1, gps2) = get_gps(&self.pool, &timestamp).await?;
100        let gps = Sauron::interpolate_gps(
101            &gps1.into(),
102            &gps2.into(),
103            DateTime::from_timestamp_millis(timestamp)
104                .expect("Failed to convert image timestamp to datetime"),
105        )
106        .await;
107
108        // localize each detection
109        let objects: Vec<Object> = detections
110            .iter()
111            .map(|detection| localize(gps, detection))
112            .collect();
113
114        // update objects in database
115        smart_update_database(&self.pool, objects.clone(), &file_path).await?;
116
117        // Send target found messages to mailbox
118        if let Some(ref mailbox) = self.mailbox {
119            for object in objects {
120                let message = MailboxMessage {
121                    msg_type: MailboxMessageType::TargetFound,
122                    data: json!({
123                        "lat": object.lat,
124                        "long": object.long,
125                        "class": object.class,
126                        "confidence": object.confidence,
127                        "image": file_path,
128                        "timestamp": timestamp,
129                    }),
130                };
131                mailbox.queue_message(message).await;
132            }
133        }
134
135        Ok(())
136    }
137
138    /// Returns GPS interpolating from the two given GPS points.
139    pub async fn interpolate_gps(a: &Gps, b: &Gps, time: DateTime<Utc>) -> Gps {
140        let difference = time - a.time;
141        let total = b.time - a.time;
142        let scale = (difference.num_milliseconds() as f64) / (total.num_milliseconds() as f64);
143
144        let lat = a.lat + (b.lat - a.lat) * scale;
145        let long = a.lat + (b.lat - a.lat) * scale;
146        let alt = a.alt + (b.alt - a.alt) * scale;
147        let heading = a.heading + (b.heading - a.heading) * scale;
148
149        Gps {
150            lat,
151            long,
152            alt,
153            heading,
154            time,
155        }
156    }
157}
158
159/// Sauron's handle. Used to interact with the actor.
160#[derive(Clone)]
161pub struct SauronHandle {
162    sender: mpsc::Sender<SauronMessage>,
163}
164
165impl SauronHandle {
166    /// Creates a new Sauron actor, and returns its handle.
167    pub async fn new() -> Self {
168        let (sender, receiver) = mpsc::channel(8);
169        let actor = Sauron::new(receiver).await;
170        tokio::spawn(run_sauron(actor));
171
172        Self { sender }
173    }
174
175    /// Sends a photo to Sauron for processing.
176    pub async fn send_photo(&self, file_path: String, timestamp: i64) {
177        let msg = SauronMessage::RunSauron {
178            file_path,
179            timestamp,
180        };
181
182        let _ = self.sender.send(msg).await;
183    }
184
185    /// Sets the handle for the mailbox actor which Sauron will send telemetry to.
186    #[allow(dead_code)]
187    pub async fn set_mailbox(&self, mailbox: MailboxHandle) {
188        let msg = SauronMessage::SetMailbox { mailbox };
189        let _ = self.sender.send(msg).await;
190    }
191}
192
193/// Internal function which runs the Sauron actor.
194async fn run_sauron(mut actor: Sauron) {
195    while let Some(msg) = actor.receiver.recv().await {
196        actor.handle_message(msg).await;
197    }
198}