glue/
lib.rs

1//! This crate adds networking functionality and builds upon the functions provided in ZeroMQ.
2
3// Private Modules
4mod publisher;
5mod subscriber;
6
7// Public Modules
8pub mod pipeline;
9pub mod sync;
10pub mod types;
11pub mod util;
12
13// Re-exports
14pub use crate::publisher::GluePublisher;
15pub use crate::subscriber::GlueSubscriber;
16
17// Imports
18use std::sync::LazyLock;
19
20pub static ZMQ_CONTEXT: LazyLock<zmq::Context> = LazyLock::new(zmq::Context::new);
21
22/// Unit Tests
23#[cfg(test)]
24mod tests {
25    use chrono::Utc;
26    use std::any::Any;
27    use std::thread;
28    use std::time::Duration;
29    use std::time::Instant;
30    use types::*;
31
32    use super::*;
33
34    #[test]
35    /// Tests JSON serialization and deserialization methods within TryFrom<zmq::Message> and Into<zmq::Message>
36    fn json_serialize_and_deserialize_works() {
37        let test_form_msg = Form {
38            state: "According to all known laws of aviation,".to_string(),
39        };
40
41        let test_form_msg_processed = Form::from(test_form_msg.try_into().unwrap());
42
43        println!("{:?}", test_form_msg_processed);
44        assert_eq!(
45            format!("{:?}", test_form_msg_processed),
46            "Form { state: \"According to all known laws of aviation,\" }"
47        );
48    }
49
50    #[test]
51    /// Tests byte array serialization and deserialization methods within TryFrom<zmq::Message> and Into<zmq::Message>
52    fn byte_array_serialize_and_deserialize_works() {
53        let test_image_msg = Image {
54            image: "data:image/png;base64,Itswingsaretoosmalltogetitsfatlittlebodyofftheground."
55                .to_string(),
56            time: Utc::now(),
57        };
58
59        let test_image_msg_processed = Image::from(test_image_msg.try_into().unwrap());
60
61        println!("{:?}", test_image_msg_processed);
62        assert_eq!(
63            format!("{:?}", test_image_msg_processed.image),
64            "\"data:image/png;base64,Itswingsaretoosmalltogetitsfatlittlebodyofftheground.\""
65        );
66    }
67
68    #[test]
69    /// Tests type inference via magic bytes
70    fn test_dynamic_msg_type_inference() {
71        let form_msg: Form = Form {
72            state: "i am in a state of disarray".to_string(),
73        };
74
75        let msg: zmq::Message = GlueMsg::Form(form_msg).into();
76        let msg: GlueMsg = msg.try_into().unwrap();
77
78        let inferred_msg_boxed: Box<dyn Any> = match msg {
79            GlueMsg::Form(m) => Box::new(m),
80            _ => Box::new(""),
81        };
82
83        let inferred_msg: &Form = inferred_msg_boxed.downcast_ref::<Form>().unwrap();
84
85        assert_eq!(
86            format!("{:?}", inferred_msg),
87            "Form { state: \"i am in a state of disarray\" }".to_string()
88        );
89
90        println!("{:?}", inferred_msg);
91    }
92
93    /// Simulate ODLC processing / GPS reading time
94    fn _do_work() {
95        thread::sleep(Duration::from_millis(100));
96    }
97
98    /// Start publisher thread that runs infinitely until `timeout_duration` is reached.
99    fn start_publisher_thread(
100        timeout_duration: Duration,
101        publisher_address: String,
102    ) -> thread::JoinHandle<()> {
103        thread::spawn(move || {
104            let publisher_socket = zmq::Socket::glue_new_pub(publisher_address)
105                .expect("Failed to open pub connection.");
106
107            let start_time = Instant::now();
108            while Instant::now().duration_since(start_time) < timeout_duration {
109                _do_work();
110
111                let test_form_msg = Form {
112                    state: "Idle".to_string(),
113                };
114                let test_image_msg = Image {
115                    image: "iVBORw0KGgoAAAANSUhEUgAAAAgAAAAIAQMAAAD+wSzIAAAABlBMVEX///+/v7+jQ3Y5AAAADklEQVQI12P4AIX8EAgALgAD/aNpbtEAAAAASUVORK5CYII".to_string(),
116                    time: Utc::now(),
117                };
118                let test_image_tagged_msg = ImageTagged {
119                    image: "iVBORw0KGgoAAAANSUhEUgAAAAgAAAAIAQMAAAD+wSzIAAAABlBMVEX///+/v7+jQ3Y5AAAADklEQVQI12P4AIX8EAgALgAD/aNpbtEAAAAASUVORK5CYII".to_string(),
120                    lat: 40.4237,
121                    long: 86.9212,
122                    time: Utc::now(),
123                };
124                let test_gps_msg = Gps {
125                    lat: 40.4237,
126                    long: 86.9212,
127                    alt: 130.4,
128                    heading: 0.0,
129                    time: Utc::now(),
130                };
131                let test_target_msg = Target {
132                    lat: 40.4173,
133                    long: 82.9071,
134                };
135                let test_heartbeat_msg = Heartbeat { time: Utc::now() };
136
137                let messages_to_send = [
138                    GlueMsg::Form(test_form_msg),
139                    GlueMsg::Image(test_image_msg),
140                    GlueMsg::ImageTagged(test_image_tagged_msg),
141                    GlueMsg::GPS(test_gps_msg),
142                    GlueMsg::Target(test_target_msg),
143                    GlueMsg::Heartbeat(test_heartbeat_msg),
144                ];
145
146                for msg in messages_to_send {
147                    publisher_socket.glue_send(msg).unwrap();
148                }
149            }
150        })
151    }
152
153    /// Start subscriber thread that runs infinitely until `timeout_duration` is reached.
154    fn start_subscriber_thread(
155        timeout_duration: Duration,
156        subscriber_address: String,
157    ) -> thread::JoinHandle<()> {
158        return thread::spawn(move || {
159            let subscriber_socket = zmq::Socket::glue_new_sub(subscriber_address)
160                .expect("Failed to open sub connection.");
161
162            let start_time = Instant::now();
163            while Instant::now().duration_since(start_time) < timeout_duration {
164                let msg: GlueMsg = subscriber_socket.glue_recv().unwrap();
165                println!("Recv'd message: {:?}", msg);
166            }
167        });
168    }
169
170    #[test]
171    /// Tests sending and receiving messages over TCP protocol.
172    /// Publisher will send a message every 100ms and live for 2 seconds before exiting.
173    /// Subscriber will print any received message to stdout and live for 1 seconds before exiting.
174    fn test_tcp_send_receive() {
175        let publisher_address = "tcp://*:21345".to_string();
176        let subscriber_address = "tcp://127.0.0.1:21345".to_string();
177
178        let pub_handle = start_publisher_thread(Duration::from_secs(2), publisher_address);
179        let sub_handle = start_subscriber_thread(Duration::from_secs(1), subscriber_address);
180
181        pub_handle
182            .join()
183            .expect("Failed to rejoin pub thread (TCP)");
184        sub_handle
185            .join()
186            .expect("Failed to rejoin sub thread (TCP)");
187    }
188
189    #[test]
190    /// Tests sending and receiving messages over IPC protocol.
191    /// Publisher will send a message every 100ms and live for 2 seconds before exiting.
192    /// Subscriber will print any received message to stdout and live for 1 seconds before exiting.
193    fn test_ipc_send_receive() {
194        let path = "/tmp/comm.ipc";
195        let _ = std::fs::remove_file(path);
196
197        let publisher_address = format!("ipc://{}", path);
198        let subscriber_address = format!("ipc://{}", path);
199
200        let pub_handle = start_publisher_thread(Duration::from_secs(2), publisher_address);
201        let sub_handle = start_subscriber_thread(Duration::from_secs(1), subscriber_address);
202
203        pub_handle
204            .join()
205            .expect("Failed to rejoin pub thread (IPC)");
206        sub_handle
207            .join()
208            .expect("Failed to rejoin sub thread (IPC)");
209
210        let _ = std::fs::remove_file(path);
211    }
212}