1mod publisher;
5mod subscriber;
6
7pub mod pipeline;
9pub mod sync;
10pub mod types;
11pub mod util;
12
13pub use crate::publisher::GluePublisher;
15pub use crate::subscriber::GlueSubscriber;
16
17use std::sync::LazyLock;
19
20pub static ZMQ_CONTEXT: LazyLock<zmq::Context> = LazyLock::new(zmq::Context::new);
21
22#[cfg(test)]
24mod tests {
25 use chrono::Utc;
26 use std::any::Any;
27 use std::thread;
28 use std::time::Duration;
29 use std::time::Instant;
30 use types::*;
31
32 use super::*;
33
34 #[test]
35 fn json_serialize_and_deserialize_works() {
37 let test_form_msg = Form {
38 state: "According to all known laws of aviation,".to_string(),
39 };
40
41 let test_form_msg_processed = Form::from(test_form_msg.try_into().unwrap());
42
43 println!("{:?}", test_form_msg_processed);
44 assert_eq!(
45 format!("{:?}", test_form_msg_processed),
46 "Form { state: \"According to all known laws of aviation,\" }"
47 );
48 }
49
50 #[test]
51 fn byte_array_serialize_and_deserialize_works() {
53 let test_image_msg = Image {
54 image: "."
55 .to_string(),
56 time: Utc::now(),
57 };
58
59 let test_image_msg_processed = Image::from(test_image_msg.try_into().unwrap());
60
61 println!("{:?}", test_image_msg_processed);
62 assert_eq!(
63 format!("{:?}", test_image_msg_processed.image),
64 "\".\""
65 );
66 }
67
68 #[test]
69 fn test_dynamic_msg_type_inference() {
71 let form_msg: Form = Form {
72 state: "i am in a state of disarray".to_string(),
73 };
74
75 let msg: zmq::Message = GlueMsg::Form(form_msg).into();
76 let msg: GlueMsg = msg.try_into().unwrap();
77
78 let inferred_msg_boxed: Box<dyn Any> = match msg {
79 GlueMsg::Form(m) => Box::new(m),
80 _ => Box::new(""),
81 };
82
83 let inferred_msg: &Form = inferred_msg_boxed.downcast_ref::<Form>().unwrap();
84
85 assert_eq!(
86 format!("{:?}", inferred_msg),
87 "Form { state: \"i am in a state of disarray\" }".to_string()
88 );
89
90 println!("{:?}", inferred_msg);
91 }
92
93 fn _do_work() {
95 thread::sleep(Duration::from_millis(100));
96 }
97
98 fn start_publisher_thread(
100 timeout_duration: Duration,
101 publisher_address: String,
102 ) -> thread::JoinHandle<()> {
103 thread::spawn(move || {
104 let publisher_socket = zmq::Socket::glue_new_pub(publisher_address)
105 .expect("Failed to open pub connection.");
106
107 let start_time = Instant::now();
108 while Instant::now().duration_since(start_time) < timeout_duration {
109 _do_work();
110
111 let test_form_msg = Form {
112 state: "Idle".to_string(),
113 };
114 let test_image_msg = Image {
115 image: "iVBORw0KGgoAAAANSUhEUgAAAAgAAAAIAQMAAAD+wSzIAAAABlBMVEX///+/v7+jQ3Y5AAAADklEQVQI12P4AIX8EAgALgAD/aNpbtEAAAAASUVORK5CYII".to_string(),
116 time: Utc::now(),
117 };
118 let test_image_tagged_msg = ImageTagged {
119 image: "iVBORw0KGgoAAAANSUhEUgAAAAgAAAAIAQMAAAD+wSzIAAAABlBMVEX///+/v7+jQ3Y5AAAADklEQVQI12P4AIX8EAgALgAD/aNpbtEAAAAASUVORK5CYII".to_string(),
120 lat: 40.4237,
121 long: 86.9212,
122 time: Utc::now(),
123 };
124 let test_gps_msg = Gps {
125 lat: 40.4237,
126 long: 86.9212,
127 alt: 130.4,
128 heading: 0.0,
129 time: Utc::now(),
130 };
131 let test_target_msg = Target {
132 lat: 40.4173,
133 long: 82.9071,
134 };
135 let test_heartbeat_msg = Heartbeat { time: Utc::now() };
136
137 let messages_to_send = [
138 GlueMsg::Form(test_form_msg),
139 GlueMsg::Image(test_image_msg),
140 GlueMsg::ImageTagged(test_image_tagged_msg),
141 GlueMsg::GPS(test_gps_msg),
142 GlueMsg::Target(test_target_msg),
143 GlueMsg::Heartbeat(test_heartbeat_msg),
144 ];
145
146 for msg in messages_to_send {
147 publisher_socket.glue_send(msg).unwrap();
148 }
149 }
150 })
151 }
152
153 fn start_subscriber_thread(
155 timeout_duration: Duration,
156 subscriber_address: String,
157 ) -> thread::JoinHandle<()> {
158 return thread::spawn(move || {
159 let subscriber_socket = zmq::Socket::glue_new_sub(subscriber_address)
160 .expect("Failed to open sub connection.");
161
162 let start_time = Instant::now();
163 while Instant::now().duration_since(start_time) < timeout_duration {
164 let msg: GlueMsg = subscriber_socket.glue_recv().unwrap();
165 println!("Recv'd message: {:?}", msg);
166 }
167 });
168 }
169
170 #[test]
171 fn test_tcp_send_receive() {
175 let publisher_address = "tcp://*:21345".to_string();
176 let subscriber_address = "tcp://127.0.0.1:21345".to_string();
177
178 let pub_handle = start_publisher_thread(Duration::from_secs(2), publisher_address);
179 let sub_handle = start_subscriber_thread(Duration::from_secs(1), subscriber_address);
180
181 pub_handle
182 .join()
183 .expect("Failed to rejoin pub thread (TCP)");
184 sub_handle
185 .join()
186 .expect("Failed to rejoin sub thread (TCP)");
187 }
188
189 #[test]
190 fn test_ipc_send_receive() {
194 let path = "/tmp/comm.ipc";
195 let _ = std::fs::remove_file(path);
196
197 let publisher_address = format!("ipc://{}", path);
198 let subscriber_address = format!("ipc://{}", path);
199
200 let pub_handle = start_publisher_thread(Duration::from_secs(2), publisher_address);
201 let sub_handle = start_subscriber_thread(Duration::from_secs(1), subscriber_address);
202
203 pub_handle
204 .join()
205 .expect("Failed to rejoin pub thread (IPC)");
206 sub_handle
207 .join()
208 .expect("Failed to rejoin sub thread (IPC)");
209
210 let _ = std::fs::remove_file(path);
211 }
212}