26 #include <unordered_map> 56 std::chrono::microseconds try_enqueue_duration = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::seconds(2))) {
79 std::lock_guard<std::recursive_mutex> lk(
mtx);
92 if (mbs != kDefaultTriggerSize && this->
batched_data_->size() >= mbs) {
104 std::lock_guard<std::recursive_mutex> lk(
mtx);
113 std::lock_guard<std::recursive_mutex> lk(
mtx);
204 if (0 == batch_max_queue_size || 0 == batch_trigger_publish_size) {
205 throw std::invalid_argument(
"0 is not a valid size");
208 if(kDefaultTriggerSize != batch_trigger_publish_size && batch_trigger_publish_size >= batch_max_queue_size) {
209 throw std::invalid_argument(
"batch_trigger_publish_size must be less than batch_max_queue_size");
219 std::lock_guard<std::recursive_mutex> lk(
mtx);
232 std::lock_guard<std::recursive_mutex> lk(
mtx);
237 mutable std::recursive_mutex
mtx;
static void validateConfigurableSizes(size_t batch_max_queue_size, size_t batch_trigger_publish_size)
void setTriggerBatchSize(size_t new_value)
std::atomic< std::chrono::microseconds > try_enqueue_duration_
void setMaxAllowableBatchSize(int new_value)
virtual bool batchData(const T &data_to_batch)
static const size_t kDefaultTriggerSize
virtual bool publishBatchedData()=0
virtual void emptyCollection()
std::atomic< size_t > max_allowable_batch_size_
static const size_t kDefaultMaxBatchSize
void setTryEnqueueDuration(std::chrono::microseconds duration)
size_t getTriggerBatchSize()
void resetTriggerBatchSize()
size_t getMaxAllowableBatchSize()
std::atomic< size_t > trigger_batch_size_
std::chrono::microseconds getTryEnqueueDuration()
size_t getCurrentBatchSize()
std::shared_ptr< std::list< T > > batched_data_
~DataBatcher() override=default
DataBatcher(size_t max_allowable_batch_size=DataBatcher::kDefaultMaxBatchSize, size_t trigger_size=DataBatcher::kDefaultTriggerSize, std::chrono::microseconds try_enqueue_duration=std::chrono::duration_cast< std::chrono::microseconds >(std::chrono::seconds(2)))