Awaiter.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include <swarmio/Mailbox.h>
4 #include <swarmio/Exception.h>
5 #include <mutex>
6 #include <chrono>
7 #include <atomic>
8 #include <condition_variable>
9 #include <exception>
10 
12 {
18  template <class T>
19  class Awaiter : public Mailbox
20  {
21  private:
22 
28 
34 
40  bool _valid;
41 
47  std::exception_ptr _exception = nullptr;
48 
53  std::mutex _mutex;
54 
60  std::condition_variable _conditionVariable;
61 
62  protected:
63 
70  virtual T ExtractResponse(const Node* node, const data::Message* message) = 0;
71 
77  virtual bool IsFinished()
78  {
79  return _valid;
80  }
81 
82  public:
83 
90  Awaiter(Endpoint* endpoint, uint64_t requestIdentifier)
91  : Mailbox(endpoint), _requestIdentifier(requestIdentifier), _valid(false) { }
92 
98  Awaiter(const T& value)
99  : Mailbox(), _requestIdentifier(0), _valid(true), _response(value) { }
100 
106  Awaiter(Awaiter&& other)
107  : Mailbox(std::move(other))
108  {
109  // Lock
110  std::unique_lock<std::mutex> guard(other._mutex);
111 
112  // Pass event handling to this instance
113  other.FinishMovingTo(this);
114 
115  // Copy over values
116  _valid = other._valid;
117  _response = other._response;
118  _requestIdentifier = other._requestIdentifier;
119  }
120 
128  {
129  std::unique_lock<std::mutex> guard(_mutex);
130  if (_valid)
131  {
132  if (_exception)
133  {
134  std::rethrow_exception(_exception);
135  }
136  else
137  {
138  return _response;
139  }
140  }
141  else
142  {
143  throw Exception("Response is not yet available");
144  }
145  }
146 
155  virtual bool ReceiveMessage(const Node* sender, const data::Message* message) override
156  {
157  // Check if we still need the response and then check the reply_to field
158  if (!IsFinished() &&
159  message->header().reply_to() == _requestIdentifier)
160  {
161  // Acquire lock
162  std::unique_lock<std::mutex> guard(_mutex);
163 
164  // Extract the response
165  try
166  {
167  _response = ExtractResponse(sender, message);
168  _exception = nullptr;
169  }
170  catch (...)
171  {
172  _exception = std::current_exception();
173  }
174 
175  // Mark as valid and notify all clients
176  _valid = true;
177  guard.unlock();
178  _conditionVariable.notify_all();
179 
180  // Handled
181  return true;
182  }
183  else
184  {
185  // Not handled
186  return false;
187  }
188  }
189 
197  bool WaitForResponse(const std::chrono::milliseconds& timeout)
198  {
199  // Acquire lock
200  std::unique_lock<std::mutex> guard(_mutex);
201 
202  // Wait
203  return _conditionVariable.wait_for(guard, timeout, [this]{ return _valid; });
204  }
205 
213  {
214  std::unique_lock<std::mutex> guard(_mutex);
215  if (_valid)
216  {
217  return (bool)_exception;
218  }
219  else
220  {
221  throw Exception("Response is not yet available");
222  }
223  }
224 
230  uint64_t GetIdentifier() const
231  {
232  return _requestIdentifier;
233  }
234  };
235 }
std::exception_ptr _exception
The exception that was thrown while processing the response.
Definition: Awaiter.h:47
std::mutex _mutex
Mutex for the condition variable.
Definition: Awaiter.h:53
A special mailbox that handles responses to a message.
Definition: Awaiter.h:19
T _response
The value of the response.
Definition: Awaiter.h:33
Exception class thrown by all library classes.
virtual bool ReceiveMessage(const Node *sender, const data::Message *message) override
Delivery point of all messages.
Definition: Awaiter.h:155
Awaiter(Endpoint *endpoint, uint64_t requestIdentifier)
Construct a new ResponseAwaiter object.
Definition: Awaiter.h:90
bool HasException()
Checks whether the result of the processing has ended with an exception.
Definition: Awaiter.h:212
virtual bool IsFinished()
Check whether the last message has been received.
Definition: Awaiter.h:77
bool _valid
Set to true after the initial response has been received.
Definition: Awaiter.h:40
std::condition_variable _conditionVariable
Condition variable to signal when the response arrives.
Definition: Awaiter.h:60
Abstract base class for Endpoint implementations.
Definition: Endpoint.h:25
virtual T ExtractResponse(const Node *node, const data::Message *message)=0
Called when a response for the original message has been received.
Awaiter(const T &value)
Construct an awaiter with a cached value.
Definition: Awaiter.h:98
T GetResponse()
Get the response value. Will throw an exception if called before the response is received.
Definition: Awaiter.h:127
uint64_t GetIdentifier() const
Get request identifier.
Definition: Awaiter.h:230
bool WaitForResponse(const std::chrono::milliseconds &timeout)
Wait for the response to become available or util the timeout period ellapses.
Definition: Awaiter.h:197
Awaiter(Awaiter &&other)
Move an Awaiter object.
Definition: Awaiter.h:106
Represents a Node the Endpoint knows about and can send messages to.
Abstract base class for Mailbox implementations.
Definition: Mailbox.h:13
uint64_t _requestIdentifier
Request identifier.
Definition: Awaiter.h:27


swarmros
Author(s):
autogenerated on Fri Apr 3 2020 03:42:47