glue/pipeline/
pipeline_builder.rs

1use crate::pipeline::PipelineLayer;
2use std::error::Error;
3use std::sync::mpsc::{Receiver, channel};
4use std::thread::{JoinHandle, spawn};
5
6pub struct PipelineBuilder<T: Sync + Send + 'static> {
7    pub(crate) rx: Receiver<T>,
8    pub(crate) threads: Vec<JoinHandle<()>>,
9}
10
11impl<T: Sync + Send + 'static> PipelineBuilder<T> {
12    pub fn new<C>(mut c: C) -> Self
13    where
14        C: FnMut() -> Result<T, Box<dyn Error>> + Send + 'static,
15    {
16        let (tx, rx) = channel::<T>();
17        let threads = vec![spawn(move || {
18            loop {
19                match c() {
20                    Ok(data) => tx.send(data).unwrap(),
21                    Err(e) => eprintln!("Pipeline Gen Error: {}", e),
22                };
23            }
24        })];
25
26        PipelineBuilder { rx, threads }
27    }
28}
29
30impl<T: Sync + Send + 'static> PipelineLayer<T> for PipelineBuilder<T> {
31    fn to<R: Sync + Send + 'static, C>(mut self, c: C) -> impl PipelineLayer<R>
32    where
33        C: Fn(T) -> Result<R, Box<dyn Error>> + Sync + Send + 'static,
34    {
35        let (tx, rx) = channel::<R>();
36
37        self.threads.push(spawn(move || {
38            let my_rx = self.rx;
39
40            loop {
41                // Use blocking call recv() since its a pipeline and only a single pubisher
42                match my_rx.recv() {
43                    Ok(data) => match c(data) {
44                        Ok(ret) => tx.send(ret).unwrap(),
45                        Err(e) => eprintln!("Pipeline layer error: {}", e),
46                    },
47                    Err(e) => {
48                        eprintln!("Pipeline Receive Error: {}", e);
49                        return;
50                    }
51                };
52            }
53        }));
54
55        PipelineBuilder {
56            rx,
57            threads: self.threads,
58        }
59    }
60
61    fn map_to<R: Sync + Send + 'static, C>(mut self, c: C) -> impl PipelineLayer<R>
62    where
63        C: Fn(T) -> Vec<R> + Sync + Send + 'static,
64    {
65        let (tx, rx) = channel::<R>();
66
67        self.threads.push(spawn(move || {
68            let my_rx = self.rx;
69
70            loop {
71                // Use blocking call recv() since its a pipeline and only a single pubisher
72                match my_rx.recv() {
73                    Ok(data) => {
74                        for d in c(data) {
75                            tx.send(d).unwrap();
76                        }
77                    }
78                    Err(e) => {
79                        eprintln!("Pipeline Receive Error: {}", e);
80                        return;
81                    }
82                };
83            }
84        }));
85
86        PipelineBuilder {
87            rx,
88            threads: self.threads,
89        }
90    }
91
92    fn finish<C>(mut self, mut c: C) -> Vec<JoinHandle<()>>
93    where
94        C: FnMut(T) -> Result<(), Box<dyn Error>> + Send + 'static,
95    {
96        self.threads.push(spawn(move || {
97            let my_rx = self.rx;
98
99            loop {
100                // Use blocking call recv() since its a pipeline and only a single pubisher
101                match my_rx.recv() {
102                    Ok(data) => match c(data) {
103                        Ok(()) => {}
104                        Err(e) => {
105                            eprintln!("Pipeline Finish Err: {}", e);
106                            return;
107                        }
108                    },
109                    Err(e) => {
110                        eprintln!("Pipeline Receive Error: {}", e);
111                        return;
112                    }
113                };
114            }
115        }));
116
117        self.threads
118    }
119}