1use crate::pipeline::PipelineBuilder;
2use crate::pipeline::PipelineLayer;
3use crate::publisher::GluePublisher;
4use crate::subscriber::GlueSubscriber;
5use anyhow::Result;
6use std::any::Any;
7use std::error::Error;
8use std::sync::mpsc::{Receiver, Sender, channel};
9use std::thread::{JoinHandle, spawn};
10
11pub struct Publisher<T>
12where
13 T: Into<zmq::Message> + Send + Sync + 'static,
14{
15 thread: JoinHandle<()>,
16 tx: Sender<T>,
17}
18
19impl<T> Publisher<T>
20where
21 T: Into<zmq::Message> + Send + Sync + 'static,
22{
23 pub fn new(send_address: String) -> Result<Self> {
24 let publisher = zmq::Socket::glue_new_pub(send_address)?;
25
26 let (tx, rx) = channel::<T>();
27 let thread = spawn(move || {
28 loop {
29 match rx.recv() {
31 Ok(data) => match publisher.glue_send(data) {
32 Ok(_) => {}
33 Err(e) => eprintln!("Pipeline layer error: {}", e),
34 },
35 Err(e) => {
36 eprintln!("Publisher Send Error: {}", e);
37 return;
38 }
39 };
40 }
41 });
42
43 Ok(Self { tx, thread })
44 }
45
46 pub fn send(&self, msg: T) -> Result<()> {
47 Ok(self.tx.send(msg)?)
48 }
49
50 pub fn gen_send(self) -> impl FnMut(T) -> Result<(), Box<dyn std::error::Error>> {
51 move |c| Ok(self.send(c)?)
52 }
53
54 pub fn join(self) -> Result<(), Box<dyn Any + Send + 'static>> {
55 drop(self.tx);
56 self.thread.join()
57 }
58}
59
60pub struct Subscriber<T>
61where
62 T: TryFrom<zmq::Message> + Send + Sync + 'static,
63 T::Error: std::error::Error + 'static,
64{
65 thread: JoinHandle<()>,
66 rx: Receiver<T>,
67}
68
69impl<T> Subscriber<T>
70where
71 T: TryFrom<zmq::Message> + Send + Sync + 'static,
72 T::Error: std::error::Error + 'static,
73{
74 pub fn new(recv_address: String) -> Result<Self> {
75 let subscriber = zmq::Socket::glue_new_sub(recv_address)?;
76
77 let (tx, rx) = channel::<T>();
78 let thread = spawn(move || {
79 loop {
80 match subscriber.glue_recv() {
82 Ok(data) => match tx.send(data) {
83 Ok(_) => {}
84 Err(e) => eprintln!("Pipeline layer error: {}", e),
85 },
86 Err(e) => {
87 eprintln!("Publisher Receive Error: {}", e);
88 return;
89 }
90 };
91 }
92 });
93
94 Ok(Self { rx, thread })
95 }
96
97 pub fn recv(&self) -> Result<T> {
98 Ok(self.rx.recv()?)
99 }
100
101 pub fn gen_recv(self) -> impl FnMut() -> Result<T, Box<dyn std::error::Error>> {
102 move || Ok(self.recv()?)
103 }
104
105 pub fn join(self) -> Result<(), Box<dyn Any + Send + 'static>> {
106 drop(self.rx);
107 self.thread.join()
108 }
109}
110
111impl<T> PipelineLayer<T> for Subscriber<T>
112where
113 T: TryFrom<zmq::Message> + Send + Sync + 'static,
114 T::Error: std::error::Error + 'static,
115{
116 fn to<R: Sync + Send + 'static, C>(self, c: C) -> impl PipelineLayer<R>
117 where
118 C: Fn(T) -> Result<R, Box<dyn Error>> + Sync + Send + 'static,
119 {
120 let (tx, rx) = channel::<R>();
121
122 let thread = spawn(move || {
123 let my_rx = self.rx;
124
125 loop {
126 match my_rx.recv() {
128 Ok(data) => match c(data) {
129 Ok(ret) => tx.send(ret).unwrap(),
130 Err(e) => eprintln!("Pipeline layer error: {}", e),
131 },
132 Err(e) => {
133 eprintln!("Pipeline Receive Error: {}", e);
134 return;
135 }
136 };
137 }
138 });
139
140 PipelineBuilder {
141 rx,
142 threads: vec![thread],
143 }
144 }
145
146 fn map_to<R: Sync + Send + 'static, C>(self, c: C) -> impl PipelineLayer<R>
147 where
148 C: Fn(T) -> Vec<R> + Sync + Send + 'static,
149 {
150 let (tx, rx) = channel::<R>();
151
152 let thread = spawn(move || {
153 let my_rx = self.rx;
154
155 loop {
156 match my_rx.recv() {
158 Ok(data) => {
159 for d in c(data) {
160 tx.send(d).unwrap();
161 }
162 }
163 Err(e) => {
164 eprintln!("Pipeline Receive Error: {}", e);
165 return;
166 }
167 };
168 }
169 });
170
171 PipelineBuilder {
172 rx,
173 threads: vec![thread],
174 }
175 }
176
177 fn finish<C>(self, mut c: C) -> Vec<JoinHandle<()>>
178 where
179 C: FnMut(T) -> Result<(), Box<dyn Error>> + Send + 'static,
180 {
181 let thread = spawn(move || {
182 let my_rx = self.rx;
183
184 loop {
185 match my_rx.recv() {
187 Ok(data) => match c(data) {
188 Ok(()) => {}
189 Err(e) => {
190 eprintln!("Pipeline Finish Err: {}", e);
191 return;
192 }
193 },
194 Err(e) => {
195 eprintln!("Pipeline Receive Error: {}", e);
196 return;
197 }
198 };
199 }
200 });
201
202 vec![thread]
203 }
204}