glue/pipeline/
pipeline_builder.rs1use crate::pipeline::PipelineLayer;
2use std::error::Error;
3use std::sync::mpsc::{Receiver, channel};
4use std::thread::{JoinHandle, spawn};
5
6pub struct PipelineBuilder<T: Sync + Send + 'static> {
7 pub(crate) rx: Receiver<T>,
8 pub(crate) threads: Vec<JoinHandle<()>>,
9}
10
11impl<T: Sync + Send + 'static> PipelineBuilder<T> {
12 pub fn new<C>(mut c: C) -> Self
13 where
14 C: FnMut() -> Result<T, Box<dyn Error>> + Send + 'static,
15 {
16 let (tx, rx) = channel::<T>();
17 let threads = vec![spawn(move || {
18 loop {
19 match c() {
20 Ok(data) => tx.send(data).unwrap(),
21 Err(e) => eprintln!("Pipeline Gen Error: {}", e),
22 };
23 }
24 })];
25
26 PipelineBuilder { rx, threads }
27 }
28}
29
30impl<T: Sync + Send + 'static> PipelineLayer<T> for PipelineBuilder<T> {
31 fn to<R: Sync + Send + 'static, C>(mut self, c: C) -> impl PipelineLayer<R>
32 where
33 C: Fn(T) -> Result<R, Box<dyn Error>> + Sync + Send + 'static,
34 {
35 let (tx, rx) = channel::<R>();
36
37 self.threads.push(spawn(move || {
38 let my_rx = self.rx;
39
40 loop {
41 match my_rx.recv() {
43 Ok(data) => match c(data) {
44 Ok(ret) => tx.send(ret).unwrap(),
45 Err(e) => eprintln!("Pipeline layer error: {}", e),
46 },
47 Err(e) => {
48 eprintln!("Pipeline Receive Error: {}", e);
49 return;
50 }
51 };
52 }
53 }));
54
55 PipelineBuilder {
56 rx,
57 threads: self.threads,
58 }
59 }
60
61 fn map_to<R: Sync + Send + 'static, C>(mut self, c: C) -> impl PipelineLayer<R>
62 where
63 C: Fn(T) -> Vec<R> + Sync + Send + 'static,
64 {
65 let (tx, rx) = channel::<R>();
66
67 self.threads.push(spawn(move || {
68 let my_rx = self.rx;
69
70 loop {
71 match my_rx.recv() {
73 Ok(data) => {
74 for d in c(data) {
75 tx.send(d).unwrap();
76 }
77 }
78 Err(e) => {
79 eprintln!("Pipeline Receive Error: {}", e);
80 return;
81 }
82 };
83 }
84 }));
85
86 PipelineBuilder {
87 rx,
88 threads: self.threads,
89 }
90 }
91
92 fn finish<C>(mut self, mut c: C) -> Vec<JoinHandle<()>>
93 where
94 C: FnMut(T) -> Result<(), Box<dyn Error>> + Send + 'static,
95 {
96 self.threads.push(spawn(move || {
97 let my_rx = self.rx;
98
99 loop {
100 match my_rx.recv() {
102 Ok(data) => match c(data) {
103 Ok(()) => {}
104 Err(e) => {
105 eprintln!("Pipeline Finish Err: {}", e);
106 return;
107 }
108 },
109 Err(e) => {
110 eprintln!("Pipeline Receive Error: {}", e);
111 return;
112 }
113 };
114 }
115 }));
116
117 self.threads
118 }
119}