core/
camera.rs

1//! Camera Actor is an actor which is responsible for taking photos and sending the filepath to the Sauron Actor
2//! and is the start of the whole image-processing pipeline.
3
4use crate::sauron::{self, SauronHandle};
5use crate::{database::insert_image, utils::get_env};
6
7use a8mini_camera_rs::A8Mini;
8use anyhow::anyhow;
9use chrono::Utc;
10use constants::CAMERA_HEARTBEAT_FAILS_PER_RESTART;
11use sqlx::PgPool;
12use sqlx::postgres::PgPoolOptions;
13use std::time::Duration;
14use tokio::sync::mpsc;
15use tracing::{debug, error, info};
16
17use peripherals::peripherals::DroneCamera;
18
19/// The Camera Actor
20struct Camera {
21    receiver: mpsc::Receiver<CameraMessage>, // this actor receives CameraMessages to communicate with it
22    pool: PgPool,                            // used to communicate to database
23    a8mini: A8Mini,                          // camera hardware
24    sauron: Option<SauronHandle>, // we send photo filepaths to Sauron Actor via the SauronHandle
25}
26
27/// Messages that can be sent to the Camera actor
28enum CameraMessage {
29    TakePhoto,
30    SetSauron { sauron: sauron::SauronHandle },
31}
32
33impl Camera {
34    /// Create a new instance of Camera Actor.
35    ///
36    /// This is an internal constructor, to create a Camera actor, call `CameraHandle::new()`.
37    async fn new(receiver: mpsc::Receiver<CameraMessage>) -> Self {
38        info!("Camera Actor started");
39
40        let database_url = get_env(
41            "DATABASE_URL",
42            "postgresql://user:password@localhost:5432/local".to_string(),
43        );
44
45        let pool = PgPoolOptions::new()
46            .max_connections(constants::CONNECTIONS_PER_DATABASE_POOL)
47            .connect(&database_url)
48            .await
49            .expect("Failed to create pool.");
50
51        let camera = A8Mini::connect().await.unwrap();
52
53        Camera {
54            receiver,
55            pool: pool,
56            a8mini: camera,
57            sauron: None,
58        }
59    }
60
61    /// Process a message received by the Camera actor.
62    async fn handle_message(&mut self, msg: CameraMessage) -> anyhow::Result<()> {
63        match msg {
64            CameraMessage::TakePhoto => self.take_photo().await?,
65            CameraMessage::SetSauron { sauron } => self.sauron = Some(sauron),
66        };
67        Ok(())
68    }
69
70    /// Calls the A8Mini API to take a photo and send the filepath to Sauron
71    async fn take_photo(&self) -> anyhow::Result<()> {
72        debug!("Taking photo...");
73
74        // Verify A8Mini camera connectivity with heartbeat message
75        debug!("Verifying a8mini connection with heartbeat message...");
76        let mut heartbeat_fail_counter: u64 = 0;
77        while let Err(e) = self.a8mini.check_heartbeat().await {
78            heartbeat_fail_counter += 1;
79            error!("Camera failed with {}", e);
80            error!(
81                "Camera heartbeat failed {} times. Retrying...",
82                heartbeat_fail_counter
83            );
84            if heartbeat_fail_counter >= CAMERA_HEARTBEAT_FAILS_PER_RESTART {
85                return Err(anyhow!(
86                    "Camera heartbeat fail counter exceeded limit ({}).",
87                    heartbeat_fail_counter
88                ));
89            }
90        }
91
92        let time_before = Utc::now();
93
94        // Use A8Mini API to take photo
95        self.a8mini.take_photo().await?;
96
97        let time_after = Utc::now();
98        let average_time = time_before + (time_after - time_before) / 2;
99        let average_time_ms = average_time.timestamp_millis();
100        debug!("A8Mini took photo successfully");
101
102        // Save the photo
103        let photo_path = self
104            .a8mini
105            .save_latest_photo("./untagged_image_folder".to_string()) // doesn't matter as this will be run in a docker container and everything will be wiped
106            .await?;
107        debug!("A8Mini saved photo to {}", photo_path);
108
109        // Insert the photo filepath to `images` table in postgres database
110        let insert_image_promise = insert_image(&self.pool, &photo_path, &average_time_ms);
111
112        // Send the photo filepath to Sauron Actor
113        if let Some(sauron) = &self.sauron {
114            debug!(
115                "Sending photo_path {} and timestamp {} to Sauron",
116                photo_path, average_time_ms
117            );
118            sauron
119                .clone()
120                .send_photo(photo_path.clone(), average_time_ms)
121                .await;
122        }
123
124        // Note: since the database insert is relatively expensive and may take a while to complete, we start the async
125        //   process early and only consume the promise after completing the relatively cheaper process of sending
126        //   filepath to sauron.
127        insert_image_promise.await?;
128
129        Ok(())
130    }
131}
132
133/// Internal function which makes the Camera Actor start listening for `CameraMessage`.
134async fn camera_listen(mut actor: Camera) {
135    while let Some(msg) = actor.receiver.recv().await {
136        if let Err(e) = actor.handle_message(msg).await {
137            error!("Failed to handle CameraMessage {}", e);
138        }
139    }
140}
141
142/// Camera's handle. Used to interact with the actor.
143#[derive(Clone)]
144pub struct CameraHandle {
145    sender: mpsc::Sender<CameraMessage>,
146}
147
148impl CameraHandle {
149    /// Creates a new Camera actor, and returns its handle.
150    pub async fn new() -> Self {
151        let (sender, receiver) = mpsc::channel(8);
152        let actor = Camera::new(receiver).await;
153        tokio::spawn(camera_listen(actor));
154
155        Self { sender }
156    }
157
158    /// Tell Camera Actor to take a photo.
159    pub async fn take_photo(&self) -> anyhow::Result<()> {
160        Ok(self.sender.send(CameraMessage::TakePhoto).await?)
161    }
162
163    /// Sets the handle for the Sauron Actor which Camera will send photos to.
164    pub async fn set_sauron(&self, sauron: SauronHandle) -> anyhow::Result<()> {
165        Ok(self
166            .sender
167            .send(CameraMessage::SetSauron { sauron })
168            .await?)
169    }
170}
171
172/// This spawns the camera phototaking infinite loop as a tokio blocking thread
173pub async fn start_camera_phototaking_loop(
174    camera: CameraHandle,
175    shutdown_token: tokio_util::sync::CancellationToken,
176) {
177    info!("Starting camera phototaking loop...");
178    tokio::spawn(async move {
179        run_camera_phototaking_loop(camera.clone(), shutdown_token).await;
180    });
181}
182
183/// Internal function which actually runs the actual camera phototaking infinite loop.
184async fn run_camera_phototaking_loop(
185    camera: CameraHandle,
186    shutdown_token: tokio_util::sync::CancellationToken,
187) {
188    info!("Camera phototaking loop started.");
189    loop {
190        // `biased` keyword favors listening to shutdown token
191        tokio::select! {
192            biased;
193
194            _ = shutdown_token.cancelled() => {
195                info!("Camera phototaker shutting down");
196                break;
197            }
198
199            result = camera.take_photo() => {
200                if let Err(e) = result {
201                    error!("Critical camera error in phototaking loop: {:?}. Retrying...", e);
202                    tokio::time::sleep(Duration::from_secs(2)).await;
203                }
204            }
205        }
206    }
207}