1use crate::sauron::{self, SauronHandle};
5use crate::{database::insert_image, utils::get_env};
6
7use a8mini_camera_rs::A8Mini;
8use anyhow::anyhow;
9use chrono::Utc;
10use constants::CAMERA_HEARTBEAT_FAILS_PER_RESTART;
11use sqlx::PgPool;
12use sqlx::postgres::PgPoolOptions;
13use std::time::Duration;
14use tokio::sync::mpsc;
15use tracing::{debug, error, info};
16
17use peripherals::peripherals::DroneCamera;
18
19struct Camera {
21 receiver: mpsc::Receiver<CameraMessage>, pool: PgPool, a8mini: A8Mini, sauron: Option<SauronHandle>, }
26
27enum CameraMessage {
29 TakePhoto,
30 SetSauron { sauron: sauron::SauronHandle },
31}
32
33impl Camera {
34 async fn new(receiver: mpsc::Receiver<CameraMessage>) -> Self {
38 info!("Camera Actor started");
39
40 let database_url = get_env(
41 "DATABASE_URL",
42 "postgresql://user:password@localhost:5432/local".to_string(),
43 );
44
45 let pool = PgPoolOptions::new()
46 .max_connections(constants::CONNECTIONS_PER_DATABASE_POOL)
47 .connect(&database_url)
48 .await
49 .expect("Failed to create pool.");
50
51 let camera = A8Mini::connect().await.unwrap();
52
53 Camera {
54 receiver,
55 pool: pool,
56 a8mini: camera,
57 sauron: None,
58 }
59 }
60
61 async fn handle_message(&mut self, msg: CameraMessage) -> anyhow::Result<()> {
63 match msg {
64 CameraMessage::TakePhoto => self.take_photo().await?,
65 CameraMessage::SetSauron { sauron } => self.sauron = Some(sauron),
66 };
67 Ok(())
68 }
69
70 async fn take_photo(&self) -> anyhow::Result<()> {
72 debug!("Taking photo...");
73
74 debug!("Verifying a8mini connection with heartbeat message...");
76 let mut heartbeat_fail_counter: u64 = 0;
77 while let Err(e) = self.a8mini.check_heartbeat().await {
78 heartbeat_fail_counter += 1;
79 error!("Camera failed with {}", e);
80 error!(
81 "Camera heartbeat failed {} times. Retrying...",
82 heartbeat_fail_counter
83 );
84 if heartbeat_fail_counter >= CAMERA_HEARTBEAT_FAILS_PER_RESTART {
85 return Err(anyhow!(
86 "Camera heartbeat fail counter exceeded limit ({}).",
87 heartbeat_fail_counter
88 ));
89 }
90 }
91
92 let time_before = Utc::now();
93
94 self.a8mini.take_photo().await?;
96
97 let time_after = Utc::now();
98 let average_time = time_before + (time_after - time_before) / 2;
99 let average_time_ms = average_time.timestamp_millis();
100 debug!("A8Mini took photo successfully");
101
102 let photo_path = self
104 .a8mini
105 .save_latest_photo("./untagged_image_folder".to_string()) .await?;
107 debug!("A8Mini saved photo to {}", photo_path);
108
109 let insert_image_promise = insert_image(&self.pool, &photo_path, &average_time_ms);
111
112 if let Some(sauron) = &self.sauron {
114 debug!(
115 "Sending photo_path {} and timestamp {} to Sauron",
116 photo_path, average_time_ms
117 );
118 sauron
119 .clone()
120 .send_photo(photo_path.clone(), average_time_ms)
121 .await;
122 }
123
124 insert_image_promise.await?;
128
129 Ok(())
130 }
131}
132
133async fn camera_listen(mut actor: Camera) {
135 while let Some(msg) = actor.receiver.recv().await {
136 if let Err(e) = actor.handle_message(msg).await {
137 error!("Failed to handle CameraMessage {}", e);
138 }
139 }
140}
141
142#[derive(Clone)]
144pub struct CameraHandle {
145 sender: mpsc::Sender<CameraMessage>,
146}
147
148impl CameraHandle {
149 pub async fn new() -> Self {
151 let (sender, receiver) = mpsc::channel(8);
152 let actor = Camera::new(receiver).await;
153 tokio::spawn(camera_listen(actor));
154
155 Self { sender }
156 }
157
158 pub async fn take_photo(&self) -> anyhow::Result<()> {
160 Ok(self.sender.send(CameraMessage::TakePhoto).await?)
161 }
162
163 pub async fn set_sauron(&self, sauron: SauronHandle) -> anyhow::Result<()> {
165 Ok(self
166 .sender
167 .send(CameraMessage::SetSauron { sauron })
168 .await?)
169 }
170}
171
172pub async fn start_camera_phototaking_loop(
174 camera: CameraHandle,
175 shutdown_token: tokio_util::sync::CancellationToken,
176) {
177 info!("Starting camera phototaking loop...");
178 tokio::spawn(async move {
179 run_camera_phototaking_loop(camera.clone(), shutdown_token).await;
180 });
181}
182
183async fn run_camera_phototaking_loop(
185 camera: CameraHandle,
186 shutdown_token: tokio_util::sync::CancellationToken,
187) {
188 info!("Camera phototaking loop started.");
189 loop {
190 tokio::select! {
192 biased;
193
194 _ = shutdown_token.cancelled() => {
195 info!("Camera phototaker shutting down");
196 break;
197 }
198
199 result = camera.take_photo() => {
200 if let Err(e) = result {
201 error!("Critical camera error in phototaking loop: {:?}. Retrying...", e);
202 tokio::time::sleep(Duration::from_secs(2)).await;
203 }
204 }
205 }
206 }
207}