Program Listing for File digestible.hpp
↰ Return to documentation for file (include/digestible/digestible.hpp)
#ifndef DIGESTIBLE_HPP_
#define DIGESTIBLE_HPP_
/*
* Licensed to Ted Dunning under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <cassert>
#include <cmath>
#include <functional>
#include <limits>
#include <stdexcept>
#include <utility>
#include <vector>
namespace digestible
{
// XXX: yes, this could be a std::pair. But being able to refer to values by name
// instead of .first and .second makes merge(), quantile(), and
// cumulative_distribution() way more readable.
template <typename Values = float, typename Weight = unsigned>
struct centroid
{
Values mean;
Weight weight;
centroid(Values new_mean, Weight new_weight) : mean(new_mean), weight(new_weight) {}
};
template <typename Values = float, typename Weight = unsigned>
inline bool operator<(const centroid<Values, Weight> & lhs, const centroid<Values, Weight> & rhs)
{
return (lhs.mean < rhs.mean);
}
template <typename Values = float, typename Weight = unsigned>
inline bool operator>(const centroid<Values, Weight> & lhs, const centroid<Values, Weight> & rhs)
{
return (lhs.mean > rhs.mean);
}
template <typename Values = float, typename Weight = unsigned>
class tdigest
{
using centroid_t = centroid<Values, Weight>;
struct tdigest_impl
{
std::vector<centroid_t> values;
Weight total_weight;
explicit tdigest_impl(size_t size) : total_weight(0) { values.reserve(size); }
tdigest_impl() = delete;
enum class insert_result { OK, NEED_COMPRESS };
insert_result insert(Values value, Weight weight)
{
assert(weight);
values.emplace_back(value, weight);
total_weight += weight;
return (
values.size() != values.capacity() ? insert_result::OK : insert_result::NEED_COMPRESS);
}
insert_result insert(const centroid_t & val) { return (insert(val.mean, val.weight)); }
void reset()
{
values.clear();
total_weight = 0;
}
size_t capacity() const { return (values.capacity()); }
};
tdigest_impl one;
tdigest_impl two;
// XXX: buffer multiplier must be > 0. BUT how much greater
// will affect size vs speed balance. The effects of which
// have not been studied. Set to 2 to favor size. Informal
// benchmarks for this value yielded acceptable results.
static constexpr size_t buffer_multiplier = 2;
tdigest_impl buffer;
tdigest_impl * active;
Values min_val, max_val;
bool run_forward;
void swap(tdigest &);
public:
explicit tdigest(size_t size);
tdigest() = delete;
tdigest(const tdigest &);
tdigest(tdigest &&) noexcept;
tdigest & operator=(tdigest);
void insert(Values value) { insert(value, 1); }
void insert(Values value, Weight weight)
{
if (buffer.insert(value, weight) == tdigest_impl::insert_result::NEED_COMPRESS) {
merge();
}
}
void insert(const tdigest<Values, Weight> & src);
void reset();
std::vector<std::pair<Values, Weight>> get() const;
void merge();
double cumulative_distribution(Values x) const;
double quantile(double p) const;
size_t centroid_count() const { return ((*active).values.size()); }
size_t size() const { return ((*active).total_weight); }
Values max() const { return (max_val); }
Values min() const { return (min_val); }
};
template <typename Values, typename Weight>
tdigest<Values, Weight>::tdigest(size_t size)
: one(size),
two(size),
buffer(size * buffer_multiplier),
active(&one),
min_val(std::numeric_limits<Values>::max()),
max_val(std::numeric_limits<Values>::lowest()),
run_forward(true)
{
}
template <typename Values, typename Weight>
tdigest<Values, Weight>::tdigest(const tdigest<Values, Weight> & other)
: one(other.one),
two(other.two),
buffer(other.buffer),
active(other.active == &other.one ? &one : &two),
min_val(other.min_val),
max_val(other.max_val),
run_forward(other.run_forward)
{
}
template <typename Values, typename Weight>
void tdigest<Values, Weight>::swap(tdigest<Values, Weight> & other)
{
std::swap(one, other.one);
std::swap(two, other.two);
std::swap(buffer, other.buffer);
active = other.active == &other.one ? &one : &two;
other.active = active == &one ? &other.one : &other.two;
std::swap(min_val, other.min_val);
std::swap(max_val, other.max_val);
std::swap(run_forward, other.run_forward);
}
template <typename Values, typename Weight>
tdigest<Values, Weight>::tdigest(tdigest<Values, Weight> && other) noexcept
: tdigest(other.one.capacity())
{
swap(other);
}
template <typename Values, typename Weight>
tdigest<Values, Weight> & tdigest<Values, Weight>::operator=(tdigest<Values, Weight> other)
{
swap(other);
return (*this);
}
template <typename Values, typename Weight>
void tdigest<Values, Weight>::insert(const tdigest<Values, Weight> & src)
{
max_val = std::max(max_val, src.max_val);
min_val = std::min(min_val, src.min_val);
auto insert_fn = [this](const auto & val) {
if (buffer.insert(val) == tdigest_impl::insert_result::NEED_COMPRESS) {
merge();
}
};
std::for_each(src.active->values.begin(), src.active->values.end(), insert_fn);
// Explicitly merge any unmerged data for a consistent end state.
merge();
}
template <typename Values, typename Weight>
void tdigest<Values, Weight>::reset()
{
one.reset();
two.reset();
buffer.reset();
active = &one;
min_val = std::numeric_limits<Values>::max();
max_val = std::numeric_limits<Values>::lowest();
}
template <typename Values, typename Weight>
std::vector<std::pair<Values, Weight>> tdigest<Values, Weight>::get() const
{
std::vector<std::pair<Values, Weight>> to_return;
std::transform(
active->values.begin(), active->values.end(), std::back_inserter(to_return),
[](const centroid_t & val) { return (std::make_pair(val.mean, val.weight)); });
return (to_return);
}
inline double Z(double compression, double n)
{
return (4 * log(n / compression) + 24);
}
inline double normalizer_fn(double compression, double n)
{
return (compression / Z(compression, n));
}
inline double k(double q, double normalizer)
{
const double q_min = 1e-15;
const double q_max = 1 - q_min;
if (q < q_min) {
return (2 * k(q_min, normalizer));
} else if (q > q_max) {
return (2 * k(q_max, normalizer));
}
return (log(q / (1 - q)) * normalizer);
}
inline double q(double k, double normalizer)
{
double w = exp(k / normalizer);
return (w / (1 + w));
}
template <typename Values, typename Weight>
void tdigest<Values, Weight>::merge()
{
auto & inactive = (&one == active) ? two : one;
if (buffer.values.empty()) {
return;
}
auto & inputs = buffer.values;
if (run_forward) {
std::sort(inputs.begin(), inputs.end(), std::less<centroid_t>());
// Update min/max values only if sorted first/last centroids are single points.
min_val = std::min(
min_val,
inputs.front().weight == 1 ? inputs.front().mean : std::numeric_limits<Values>::max());
max_val = std::max(
max_val, inputs.back().weight == 1 ? inputs.back().mean : std::numeric_limits<Values>::min());
} else {
std::sort(inputs.begin(), inputs.end(), std::greater<centroid_t>());
// Update min/max values only if sorted first/last centroids are single points.
min_val = std::min(
min_val, inputs.back().weight == 1 ? inputs.back().mean : std::numeric_limits<Values>::max());
max_val = std::max(
max_val,
inputs.front().weight == 1 ? inputs.front().mean : std::numeric_limits<Values>::min());
}
const Weight new_total_weight = buffer.total_weight + active->total_weight;
const double normalizer = normalizer_fn(inactive.values.capacity(), new_total_weight);
double k1 = k(0, normalizer);
double next_q_limit_weight = new_total_weight * q(k1 + 1, normalizer);
double weight_so_far = 0;
double weight_to_add = inputs.front().weight;
double mean_to_add = inputs.front().mean;
auto compress_fn = [&inactive, new_total_weight, &k1, normalizer, &next_q_limit_weight,
&weight_so_far, &weight_to_add, &mean_to_add](const centroid_t & current) {
if ((weight_so_far + weight_to_add + current.weight) <= next_q_limit_weight) {
weight_to_add += current.weight;
assert(weight_to_add);
mean_to_add = mean_to_add + (current.mean - mean_to_add) * current.weight / weight_to_add;
} else {
weight_so_far += weight_to_add;
double new_q = static_cast<double>(weight_so_far) / static_cast<double>(new_total_weight);
k1 = k(new_q, normalizer);
next_q_limit_weight = new_total_weight * q(k1 + 1, normalizer);
if constexpr (std::is_integral<Values>::value) {
mean_to_add = std::round(mean_to_add);
}
inactive.insert(mean_to_add, weight_to_add);
mean_to_add = current.mean;
weight_to_add = current.weight;
}
};
std::for_each(inputs.begin() + 1, inputs.end(), compress_fn);
if (weight_to_add != 0) {
inactive.insert(mean_to_add, weight_to_add);
}
if (!run_forward) {
std::sort(inactive.values.begin(), inactive.values.end());
}
run_forward = !run_forward;
buffer.reset();
// Seed buffer with the current t-digest in preparation for the next
// merge.
inputs.assign(inactive.values.begin(), inactive.values.end());
auto new_inactive = active;
active = &inactive;
new_inactive->reset();
}
inline double lerp(double a, double b, double t) noexcept
{
if ((a <= 0 && b >= 0) || (a >= 0 && b <= 0)) return t * b + (1 - t) * a;
if (t == 1) return b;
const double x = a + t * (b - a);
if ((t > 1) == (b > a))
return b < x ? x : b;
else
return x < b ? x : b;
}
template <typename Values, typename Weight>
double tdigest<Values, Weight>::quantile(double p) const
{
if (p < 0 || p > 100) {
throw std::out_of_range("Requested quantile must be between 0 and 100.");
}
if (active->values.empty()) {
return (0);
} else if (active->values.size() == 1) {
return (active->values.front().mean);
}
const Weight index = (p / 100) * active->total_weight;
if (index < 1) {
return (min_val);
}
// For smaller quantiles, interpolate between minimum value and the first
// centroid.
const auto & first = active->values.front();
if (first.weight > 1 && index < (first.weight / 2)) {
return (lerp(min_val, first.mean, static_cast<double>(index - 1) / (first.weight / 2 - 1)));
}
if (index > active->total_weight - 1) {
return (max_val);
}
// For larger quantiles, interpolate between maximum value and the last
// centroid.
const auto & last = active->values.back();
if (last.weight > 1 && active->total_weight - index <= last.weight / 2) {
return (
max_val - static_cast<double>(active->total_weight - index - 1) / (last.weight / 2 - 1) *
(max_val - last.mean));
}
Weight weight_so_far = active->values.front().weight / 2;
double quantile = 0;
auto quantile_fn = [index, &weight_so_far, &quantile](
const centroid_t & left, const centroid_t & right) {
Weight delta_weight = (left.weight + right.weight) / 2;
if (weight_so_far + delta_weight > index) {
Weight lower = index - weight_so_far;
Weight upper = weight_so_far + delta_weight - index;
quantile = (left.mean * upper + right.mean * lower) / (lower + upper);
return (true);
}
weight_so_far += delta_weight;
return (false);
};
// Even though we're using adjacent_find here, we don't actually intend to find
// anything. We just want to iterate over pairs of centroids until we calculate
// the quantile.
auto it = std::adjacent_find(active->values.begin(), active->values.end(), quantile_fn);
// Did we fail to find a pair of bracketing centroids?
if (it == active->values.end()) {
// Must be between max_val and the last centroid.
if (max_val == active->values.back().mean) {
return active->values[active->values.size() - 2].mean;
}
return active->values.back().mean;
}
return (quantile);
}
template <typename Values, typename Weight>
double tdigest<Values, Weight>::cumulative_distribution(Values x) const
{
if (active->values.empty()) {
return (1.0);
}
if (active->values.size() == 1) {
if (x < min_val) {
return (0);
}
if (x > max_val) {
return (1.0);
}
if (x - min_val <= (max_val - min_val)) {
return (0.5);
}
return ((x - min_val) / (max_val - min_val));
}
// From here on out we divide by active->total_weight in multiple places
// along several code paths.
// Let's make sure we're not going to divide by zero.
assert(active->total_weight);
// Is x at one of the extremes?
if (x < active->values.front().mean) {
const auto & first = active->values.front();
if (first.mean - min_val > 0) {
return (
lerp(1, first.weight / 2 - 1, (x - min_val) / (first.mean - min_val)) /
active->total_weight);
}
return (0);
}
if (x > active->values.back().mean) {
const auto & last = active->values.back();
if (max_val - last.mean > 0) {
return (
1 -
(lerp(
1, last.weight / 2 - 1, (max_val - x) / (max_val - last.mean) / active->total_weight)));
}
return (1.0);
}
Weight weight_so_far = 0;
double cdf = 0;
auto cdf_fn = [x, &weight_so_far, &cdf, total_weight = active->total_weight](
const centroid_t & left, const centroid_t & right) {
assert(total_weight);
if (left.mean <= x && x < right.mean) {
// x is bracketed between left and right.
Weight delta_weight = (right.weight + left.weight) / static_cast<Weight>(2);
double base = weight_so_far + (left.weight / static_cast<Weight>(2));
cdf = lerp(
base, base + delta_weight,
static_cast<double>((x - left.mean)) / (right.mean - left.mean)) /
total_weight;
return (true);
}
weight_so_far += left.weight;
return (false);
};
auto it = std::adjacent_find(active->values.begin(), active->values.end(), cdf_fn);
// Did we fail to find a pair of bracketing centroids?
if (it == active->values.end()) {
// Might be between max_val and the last centroid.
if (x == active->values.back().mean) {
return ((1 - 0.5 / active->total_weight));
}
}
return (cdf);
}
} // namespace digestible
#endif // DIGESTIBLE_HPP_