pubsub_multithread_inproc.cpp
Go to the documentation of this file.
1 #include <future>
2 #include <iostream>
3 #include <string>
4 #include <thread>
5 
6 #include "zmq.hpp"
7 #include "zmq_addon.hpp"
8 
10  // Prepare publisher
11  zmq::socket_t publisher(*ctx, zmq::socket_type::pub);
12  publisher.bind("inproc://#1");
13 
14  // Give the subscribers a chance to connect, so they don't lose any messages
15  std::this_thread::sleep_for(std::chrono::milliseconds(20));
16 
17  while (true) {
18  // Write three messages, each with an envelope and content
19  publisher.send(zmq::str_buffer("A"), zmq::send_flags::sndmore);
20  publisher.send(zmq::str_buffer("Message in A envelope"));
21  publisher.send(zmq::str_buffer("B"), zmq::send_flags::sndmore);
22  publisher.send(zmq::str_buffer("Message in B envelope"));
23  publisher.send(zmq::str_buffer("C"), zmq::send_flags::sndmore);
24  publisher.send(zmq::str_buffer("Message in C envelope"));
25  std::this_thread::sleep_for(std::chrono::milliseconds(100));
26  }
27 }
28 
30  // Prepare subscriber
31  zmq::socket_t subscriber(*ctx, zmq::socket_type::sub);
32  subscriber.connect("inproc://#1");
33 
34  // Thread2 opens "A" and "B" envelopes
35  subscriber.set(zmq::sockopt::subscribe, "A");
36  subscriber.set(zmq::sockopt::subscribe, "B");
37 
38  while (1) {
39  // Receive all parts of the message
40  std::vector<zmq::message_t> recv_msgs;
41  zmq::recv_result_t result =
42  zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs));
43  assert(result && "recv failed");
44  assert(*result == 2);
45 
46  std::cout << "Thread2: [" << recv_msgs[0].to_string() << "] "
47  << recv_msgs[1].to_string() << std::endl;
48  }
49 }
50 
52  // Prepare our context and subscriber
53  zmq::socket_t subscriber(*ctx, zmq::socket_type::sub);
54  subscriber.connect("inproc://#1");
55 
56  // Thread3 opens ALL envelopes
57  subscriber.set(zmq::sockopt::subscribe, "");
58 
59  while (1) {
60  // Receive all parts of the message
61  std::vector<zmq::message_t> recv_msgs;
62  zmq::recv_result_t result =
63  zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs));
64  assert(result && "recv failed");
65  assert(*result == 2);
66 
67  std::cout << "Thread3: [" << recv_msgs[0].to_string() << "] "
68  << recv_msgs[1].to_string() << std::endl;
69  }
70 }
71 
72 int main() {
73  /*
74  * No I/O threads are involved in passing messages using the inproc transport.
75  * Therefore, if you are using a ØMQ context for in-process messaging only you
76  * can initialise the context with zero I/O threads.
77  *
78  * Source: http://api.zeromq.org/4-3:zmq-inproc
79  */
80  zmq::context_t ctx(0);
81 
82  auto thread1 = std::async(std::launch::async, PublisherThread, &ctx);
83 
84  // Give the publisher a chance to bind, since inproc requires it
85  std::this_thread::sleep_for(std::chrono::milliseconds(10));
86 
87  auto thread2 = std::async(std::launch::async, SubscriberThread1, &ctx);
88  auto thread3 = std::async(std::launch::async, SubscriberThread2, &ctx);
89  thread1.wait();
90  thread2.wait();
91  thread3.wait();
92 
93  /*
94  * Output:
95  * An infinite loop of a mix of:
96  * Thread2: [A] Message in A envelope
97  * Thread2: [B] Message in B envelope
98  * Thread3: [A] Message in A envelope
99  * Thread3: [B] Message in B envelope
100  * Thread3: [C] Message in C envelope
101  */
102 }
zmq::socket_t
Definition: zmq.hpp:2188
main
int main()
Definition: pubsub_multithread_inproc.cpp:72
zmq::detail::socket_base::send
size_t send(const void *buf_, size_t len_, int flags_=0)
Definition: zmq.hpp:1923
zmq.hpp
zmq::context_t
Definition: zmq.hpp:799
PublisherThread
void PublisherThread(zmq::context_t *ctx)
Definition: pubsub_multithread_inproc.cpp:9
SubscriberThread2
void SubscriberThread2(zmq::context_t *ctx)
Definition: pubsub_multithread_inproc.cpp:51
zmq::detail::socket_base::connect
void connect(std::string const &addr)
Definition: zmq.hpp:1901
zmq::detail::socket_base::bind
void bind(std::string const &addr)
Definition: zmq.hpp:1883
SubscriberThread1
void SubscriberThread1(zmq::context_t *ctx)
Definition: pubsub_multithread_inproc.cpp:29
zmq_addon.hpp


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58