data_batcher.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <atomic>
19 #include <chrono>
20 #include <iostream>
21 #include <list>
22 #include <map>
23 #include <memory>
24 #include <mutex>
25 #include <string>
26 #include <unordered_map>
27 #include <stdexcept>
28 
30 
36 template<typename T>
37 class DataBatcher : public Service {
38 public:
39 
43  static const size_t kDefaultTriggerSize = SIZE_MAX;
44  static const size_t kDefaultMaxBatchSize = 1024;
45 
53  // NOLINTNEXTLINE(google-explicit-constructor, hicpp-explicit-conversions)
54  DataBatcher(size_t max_allowable_batch_size = DataBatcher::kDefaultMaxBatchSize,
55  size_t trigger_size = DataBatcher::kDefaultTriggerSize,
56  std::chrono::microseconds try_enqueue_duration = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::seconds(2))) {
57 
58  validateConfigurableSizes(max_allowable_batch_size, trigger_size);
59 
60  this->max_allowable_batch_size_.store(max_allowable_batch_size);
61  this->trigger_batch_size_.store(trigger_size);
62  this->try_enqueue_duration_ = try_enqueue_duration;
63 
65  }
66 
70  ~DataBatcher() override = default;
71 
78  virtual bool batchData(const T &data_to_batch) {
79  std::lock_guard<std::recursive_mutex> lk(mtx);
80 
81  this->batched_data_->push_back(data_to_batch);
82 
83  // check if we have exceeded the allowed bounds
84  auto allowed_max = getMaxAllowableBatchSize();
85  if (getCurrentBatchSize() > allowed_max) {
87  return false;
88  }
89 
90  // publish if the size has been configured
91  auto mbs = this->getTriggerBatchSize();
92  if (mbs != kDefaultTriggerSize && this->batched_data_->size() >= mbs) {
93  publishBatchedData(); // don't return publisher success / fail here
94  }
95 
96  return true;
97  }
98 
104  std::lock_guard<std::recursive_mutex> lk(mtx);
105 
106  return this->batched_data_->size();
107  }
108 
113  std::lock_guard<std::recursive_mutex> lk(mtx);
114 
115  this->batched_data_ = std::make_shared<std::list<T>>();
116  }
117 
123  void setTriggerBatchSize(size_t new_value) {
125 
126  this->trigger_batch_size_.store(new_value);
127  }
128 
136  return this->trigger_batch_size_.load();
137 
138  }
139 
147  return this->max_allowable_batch_size_.load();
148  }
149 
156  void setMaxAllowableBatchSize(int new_value) {
157 
159 
160  this->max_allowable_batch_size_.store(new_value);
161  }
162 
168  this->trigger_batch_size_.store(kDefaultTriggerSize);
169  }
170 
176  void setTryEnqueueDuration(std::chrono::microseconds duration) {
177  this->try_enqueue_duration_.store(duration);
178  }
179 
185  std::chrono::microseconds getTryEnqueueDuration() {
186  return this->try_enqueue_duration_.load();
187  }
188 
194  virtual bool publishBatchedData() = 0;
195 
202  static void validateConfigurableSizes(size_t batch_max_queue_size, size_t batch_trigger_publish_size) {
203 
204  if (0 == batch_max_queue_size || 0 == batch_trigger_publish_size) {
205  throw std::invalid_argument("0 is not a valid size");
206  }
207 
208  if(kDefaultTriggerSize != batch_trigger_publish_size && batch_trigger_publish_size >= batch_max_queue_size) {
209  throw std::invalid_argument("batch_trigger_publish_size must be less than batch_max_queue_size");
210  }
211  }
212 
217  bool shutdown() override {
218  bool is_shutdown = Service::shutdown();
219  std::lock_guard<std::recursive_mutex> lk(mtx);
220  this->emptyCollection(); // attempt to write to disk before discarding
221  return is_shutdown;
222  }
223 
224 protected:
225 
231  virtual void emptyCollection() {
232  std::lock_guard<std::recursive_mutex> lk(mtx);
233  this->batched_data_->clear();
234  }
235 
236  std::shared_ptr<std::list<T>> batched_data_;
237  mutable std::recursive_mutex mtx;
238 
239 private:
243  std::atomic<size_t> max_allowable_batch_size_{};
244  std::atomic<size_t> trigger_batch_size_{};
245  std::atomic<std::chrono::microseconds> try_enqueue_duration_{};
246 };
247 
static void validateConfigurableSizes(size_t batch_max_queue_size, size_t batch_trigger_publish_size)
Definition: data_batcher.h:202
void resetBatchedData()
Definition: data_batcher.h:112
void setTriggerBatchSize(size_t new_value)
Definition: data_batcher.h:123
std::atomic< std::chrono::microseconds > try_enqueue_duration_
Definition: data_batcher.h:245
void setMaxAllowableBatchSize(int new_value)
Definition: data_batcher.h:156
virtual bool batchData(const T &data_to_batch)
Definition: data_batcher.h:78
static const size_t kDefaultTriggerSize
Definition: data_batcher.h:43
std::recursive_mutex mtx
Definition: data_batcher.h:237
virtual bool publishBatchedData()=0
virtual void emptyCollection()
Definition: data_batcher.h:231
std::atomic< size_t > max_allowable_batch_size_
Definition: data_batcher.h:243
static const size_t kDefaultMaxBatchSize
Definition: data_batcher.h:44
virtual bool shutdown()
Definition: service.h:83
bool shutdown() override
Definition: data_batcher.h:217
void setTryEnqueueDuration(std::chrono::microseconds duration)
Definition: data_batcher.h:176
size_t getTriggerBatchSize()
Definition: data_batcher.h:135
void resetTriggerBatchSize()
Definition: data_batcher.h:167
size_t getMaxAllowableBatchSize()
Definition: data_batcher.h:146
std::atomic< size_t > trigger_batch_size_
Definition: data_batcher.h:244
std::chrono::microseconds getTryEnqueueDuration()
Definition: data_batcher.h:185
size_t getCurrentBatchSize()
Definition: data_batcher.h:103
std::shared_ptr< std::list< T > > batched_data_
Definition: data_batcher.h:236
~DataBatcher() override=default
DataBatcher(size_t max_allowable_batch_size=DataBatcher::kDefaultMaxBatchSize, size_t trigger_size=DataBatcher::kDefaultTriggerSize, std::chrono::microseconds try_enqueue_duration=std::chrono::duration_cast< std::chrono::microseconds >(std::chrono::seconds(2)))
Definition: data_batcher.h:54


dataflow_lite
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:22