glue/
sync.rs

1use crate::pipeline::PipelineBuilder;
2use crate::pipeline::PipelineLayer;
3use crate::publisher::GluePublisher;
4use crate::subscriber::GlueSubscriber;
5use anyhow::Result;
6use std::any::Any;
7use std::error::Error;
8use std::sync::mpsc::{Receiver, Sender, channel};
9use std::thread::{JoinHandle, spawn};
10
11pub struct Publisher<T>
12where
13    T: Into<zmq::Message> + Send + Sync + 'static,
14{
15    thread: JoinHandle<()>,
16    tx: Sender<T>,
17}
18
19impl<T> Publisher<T>
20where
21    T: Into<zmq::Message> + Send + Sync + 'static,
22{
23    pub fn new(send_address: String) -> Result<Self> {
24        let publisher = zmq::Socket::glue_new_pub(send_address)?;
25
26        let (tx, rx) = channel::<T>();
27        let thread = spawn(move || {
28            loop {
29                // Use blocking call recv() since its a pipeline and only a single pubisher
30                match rx.recv() {
31                    Ok(data) => match publisher.glue_send(data) {
32                        Ok(_) => {}
33                        Err(e) => eprintln!("Pipeline layer error: {}", e),
34                    },
35                    Err(e) => {
36                        eprintln!("Publisher Send Error: {}", e);
37                        return;
38                    }
39                };
40            }
41        });
42
43        Ok(Self { tx, thread })
44    }
45
46    pub fn send(&self, msg: T) -> Result<()> {
47        Ok(self.tx.send(msg)?)
48    }
49
50    pub fn gen_send(self) -> impl FnMut(T) -> Result<(), Box<dyn std::error::Error>> {
51        move |c| Ok(self.send(c)?)
52    }
53
54    pub fn join(self) -> Result<(), Box<dyn Any + Send + 'static>> {
55        drop(self.tx);
56        self.thread.join()
57    }
58}
59
60pub struct Subscriber<T>
61where
62    T: TryFrom<zmq::Message> + Send + Sync + 'static,
63    T::Error: std::error::Error + 'static,
64{
65    thread: JoinHandle<()>,
66    rx: Receiver<T>,
67}
68
69impl<T> Subscriber<T>
70where
71    T: TryFrom<zmq::Message> + Send + Sync + 'static,
72    T::Error: std::error::Error + 'static,
73{
74    pub fn new(recv_address: String) -> Result<Self> {
75        let subscriber = zmq::Socket::glue_new_sub(recv_address)?;
76
77        let (tx, rx) = channel::<T>();
78        let thread = spawn(move || {
79            loop {
80                // Use blocking call recv() since its a pipeline and only a single pubisher
81                match subscriber.glue_recv() {
82                    Ok(data) => match tx.send(data) {
83                        Ok(_) => {}
84                        Err(e) => eprintln!("Pipeline layer error: {}", e),
85                    },
86                    Err(e) => {
87                        eprintln!("Publisher Receive Error: {}", e);
88                        return;
89                    }
90                };
91            }
92        });
93
94        Ok(Self { rx, thread })
95    }
96
97    pub fn recv(&self) -> Result<T> {
98        Ok(self.rx.recv()?)
99    }
100
101    pub fn gen_recv(self) -> impl FnMut() -> Result<T, Box<dyn std::error::Error>> {
102        move || Ok(self.recv()?)
103    }
104
105    pub fn join(self) -> Result<(), Box<dyn Any + Send + 'static>> {
106        drop(self.rx);
107        self.thread.join()
108    }
109}
110
111impl<T> PipelineLayer<T> for Subscriber<T>
112where
113    T: TryFrom<zmq::Message> + Send + Sync + 'static,
114    T::Error: std::error::Error + 'static,
115{
116    fn to<R: Sync + Send + 'static, C>(self, c: C) -> impl PipelineLayer<R>
117    where
118        C: Fn(T) -> Result<R, Box<dyn Error>> + Sync + Send + 'static,
119    {
120        let (tx, rx) = channel::<R>();
121
122        let thread = spawn(move || {
123            let my_rx = self.rx;
124
125            loop {
126                // Use blocking call recv() since its a pipeline and only a single pubisher
127                match my_rx.recv() {
128                    Ok(data) => match c(data) {
129                        Ok(ret) => tx.send(ret).unwrap(),
130                        Err(e) => eprintln!("Pipeline layer error: {}", e),
131                    },
132                    Err(e) => {
133                        eprintln!("Pipeline Receive Error: {}", e);
134                        return;
135                    }
136                };
137            }
138        });
139
140        PipelineBuilder {
141            rx,
142            threads: vec![thread],
143        }
144    }
145
146    fn map_to<R: Sync + Send + 'static, C>(self, c: C) -> impl PipelineLayer<R>
147    where
148        C: Fn(T) -> Vec<R> + Sync + Send + 'static,
149    {
150        let (tx, rx) = channel::<R>();
151
152        let thread = spawn(move || {
153            let my_rx = self.rx;
154
155            loop {
156                // Use blocking call recv() since its a pipeline and only a single pubisher
157                match my_rx.recv() {
158                    Ok(data) => {
159                        for d in c(data) {
160                            tx.send(d).unwrap();
161                        }
162                    }
163                    Err(e) => {
164                        eprintln!("Pipeline Receive Error: {}", e);
165                        return;
166                    }
167                };
168            }
169        });
170
171        PipelineBuilder {
172            rx,
173            threads: vec![thread],
174        }
175    }
176
177    fn finish<C>(self, mut c: C) -> Vec<JoinHandle<()>>
178    where
179        C: FnMut(T) -> Result<(), Box<dyn Error>> + Send + 'static,
180    {
181        let thread = spawn(move || {
182            let my_rx = self.rx;
183
184            loop {
185                // Use blocking call recv() since its a pipeline and only a single pubisher
186                match my_rx.recv() {
187                    Ok(data) => match c(data) {
188                        Ok(()) => {}
189                        Err(e) => {
190                            eprintln!("Pipeline Finish Err: {}", e);
191                            return;
192                        }
193                    },
194                    Err(e) => {
195                        eprintln!("Pipeline Receive Error: {}", e);
196                        return;
197                    }
198                };
199            }
200        });
201
202        vec![thread]
203    }
204}