glue/
subscriber.rs

1use crate::ZMQ_CONTEXT;
2use anyhow::Result;
3
4pub trait GlueSubscriber: Send {
5    fn glue_new_sub(receive_address: String) -> Result<impl GlueSubscriber>;
6    fn glue_recv<T>(&self) -> Result<T, Box<dyn std::error::Error>>
7    where
8        T: TryFrom<zmq::Message> + Send + Sync + 'static,
9        T::Error: std::error::Error + 'static;
10    fn glue_gen_recv<T>(self) -> impl FnMut() -> Result<T, Box<dyn std::error::Error>>
11    where
12        T: TryFrom<zmq::Message> + Send + Sync + 'static,
13        T::Error: std::error::Error + 'static;
14}
15
16impl GlueSubscriber for zmq::Socket {
17    /// Opens a new subscriber port for receiving messages connected to `receive_address` given a `context`.
18    fn glue_new_sub(receive_address: String) -> Result<impl GlueSubscriber> {
19        let socket = ZMQ_CONTEXT.socket(zmq::SUB)?;
20        socket.connect(&receive_address)?;
21        socket.set_subscribe(b"")?;
22        socket.set_conflate(true)?;
23
24        Ok(socket)
25    }
26
27    /// ### A more convenient wrapper for `receive_serialized()`.
28    /// Receives an arbitrary `msg` that implements TryFrom<zmq::Message> from designated `publisher` socket.
29    /// Requires type annotation.
30    fn glue_recv<T>(&self) -> Result<T, Box<dyn std::error::Error>>
31    where
32        T: TryFrom<zmq::Message> + Send + Sync + 'static,
33        T::Error: std::error::Error + 'static,
34    {
35        let mut serialized_msg = zmq::Message::new();
36        self.recv(&mut serialized_msg, 0)?;
37        let msg: T = serialized_msg.try_into()?;
38        Ok(msg)
39    }
40
41    fn glue_gen_recv<T>(self) -> impl FnMut() -> Result<T, Box<dyn std::error::Error>>
42    where
43        T: TryFrom<zmq::Message> + Send + Sync + 'static,
44        T::Error: std::error::Error + 'static,
45    {
46        move || self.glue_recv()
47    }
48}