diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..3b06f3c --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,16 @@ +version: 2 + +jobs: + test: + working_directory: ~/go/src/github.com/lightstep/varopt + docker: + - image: circleci/golang:1.15 + steps: + - checkout + - run: go test -v ./... + +workflows: + version: 2 + test: + jobs: + - test diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index c9770ee..131b6b3 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,58 @@ +[![Docs](https://godoc.org/github.com/lightstep/varopt?status.svg)](https://godoc.org/github.com/lightstep/varopt) + +# VarOpt Sampling Algorithm + This is an implementation of VarOpt, an unbiased weighted sampling algorithm described in the paper [Stream sampling for variance-optimal -estimation of subset sums](https://arxiv.org/pdf/0803.0473.pdf). +estimation of subset sums](https://arxiv.org/pdf/0803.0473.pdf) (2008) +by Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, and Mikkel +Thorup. + +VarOpt is a reservoir-type sampler that maintains a fixed-size sample +and provides a mechanism for merging unequal-weight samples. + +This repository also includes a simple reservoir sampling algorithm, +often useful in conjunction with weighed reservoir sampling, that +implements Algorithm R from [Random sampling with a +reservoir](https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R) +(1985) by Jeffrey Vitter. + +## Usage: Natural Weights + +A typical use of VarOpt sampling is to estimate network flows using +sample packets. In this use-case, the weight applied to each sample +is the size of the packet. Because VarOpt computes an unbiased +sample, sample data points can be summarized along secondary +dimensions. For example, we can select a subset of sampled packets +according to a secondary attribute, sum the sample weights, and the +result is expected to equal the size of packets corresponding to the +secondary attribute from the original population. + +See [weighted_test.go](https://github.com/lightstep/varopt/blob/master/weighted_test.go) for an example. + +## Usage: Inverse-probability Weights + +Another use for VarOpt sampling uses inverse-probability weights to +estimate frequencies while simultaneously controlling sample +diversity. Suppose a sequence of observations can be naturally +categorized into N different buckets. The goal in this case is to +compute a sample where each bucket is well represented, while +maintaining frequency estimates. + +In this use-case, the weight assigned to each observation is the +inverse probability of the bucket it belongs to. The result of +weighted sampling with inverse-probability weights is a uniform +expectated value; in this example we expect an equal number of +observations falling into each bucket. Each observation represents a +frequency of its sample weight (computed by VarOpt) divided by its +original weight (the inverse-probability). + +See [frequency_test.go](https://github.com/lightstep/varopt/blob/master/frequency_test.go) for an example. + +## Usage: Merging Samples + +VarOpt supports merging independently collected samples one +observation at a time. This is useful for building distributed +sampling schemes. In this use-case, each node in a distributed system +computes a weighted sample. To combine samples, simply input all the +observations and their corresponding weights into a new VarOpt sample. \ No newline at end of file diff --git a/benchmark_test.go b/benchmark_test.go new file mode 100644 index 0000000..131a08d --- /dev/null +++ b/benchmark_test.go @@ -0,0 +1,72 @@ +// Copyright 2019, LightStep Inc. +// +// The benchmark results point to a performance drop when the +// largeHeap starts to be used because of interface conversions in and +// out of the heap, primarily due to the heap interface. This +// suggests room for improvement by avoiding the built-in heap. + +/* +BenchmarkAdd_Norm_100-8 37540165 32.1 ns/op 8 B/op 0 allocs/op +BenchmarkAdd_Norm_10000-8 39850280 30.6 ns/op 8 B/op 0 allocs/op +BenchmarkAdd_Norm_1000000-8 7958835 183 ns/op 52 B/op 0 allocs/op +BenchmarkAdd_Exp_100-8 41565934 28.5 ns/op 8 B/op 0 allocs/op +BenchmarkAdd_Exp_10000-8 43622184 29.2 ns/op 8 B/op 0 allocs/op +BenchmarkAdd_Exp_1000000-8 8103663 220 ns/op 55 B/op 0 allocs/op +*/ + +package varopt_test + +import ( + "math" + "math/rand" + "testing" + + "github.com/lightstep/varopt" +) + +func normValue(rnd *rand.Rand) float64 { + return 1 + math.Abs(rnd.NormFloat64()) +} + +func expValue(rnd *rand.Rand) float64 { + return rnd.ExpFloat64() +} + +func BenchmarkAdd_Norm_100(b *testing.B) { + benchmarkAdd(b, 100, normValue) +} + +func BenchmarkAdd_Norm_10000(b *testing.B) { + benchmarkAdd(b, 10000, normValue) +} + +func BenchmarkAdd_Norm_1000000(b *testing.B) { + benchmarkAdd(b, 1000000, normValue) +} + +func BenchmarkAdd_Exp_100(b *testing.B) { + benchmarkAdd(b, 100, expValue) +} + +func BenchmarkAdd_Exp_10000(b *testing.B) { + benchmarkAdd(b, 10000, expValue) +} + +func BenchmarkAdd_Exp_1000000(b *testing.B) { + benchmarkAdd(b, 1000000, expValue) +} + +func benchmarkAdd(b *testing.B, size int, f func(rnd *rand.Rand) float64) { + b.ReportAllocs() + rnd := rand.New(rand.NewSource(3331)) + v := varopt.New(size, rnd) + weights := make([]float64, b.N) + for i := 0; i < b.N; i++ { + weights[i] = f(rnd) + } + + b.StartTimer() + for i := 0; i < b.N; i++ { + v.Add(nil, weights[i]) + } +} diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..fe0e2e6 --- /dev/null +++ b/doc.go @@ -0,0 +1,22 @@ +// Copyright 2019, LightStep Inc. + +/* +Package varopt contains an implementation of VarOpt, an unbiased weighted +sampling algorithm described in the paper "Stream sampling for +variance-optimal estimation of subset sums" +https://arxiv.org/pdf/0803.0473.pdf (2008), by Edith Cohen, Nick +Duffield, Haim Kaplan, Carsten Lund, and Mikkel Thorup. + +VarOpt is a reservoir-type sampler that maintains a fixed-size sample +and provides a mechanism for merging unequal-weight samples. + +This package also includes a simple reservoir sampling algorithm, +often useful in conjunction with weighed reservoir sampling, using +Algorithm R from "Random sampling with a +reservoir", https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R +(1985), by Jeffrey Vitter. + +See https://github.com/lightstep/varopt/blob/master/README.md for +more detail. +*/ +package varopt diff --git a/examples/tdigest/flat.go b/examples/tdigest/flat.go new file mode 100644 index 0000000..26246d7 --- /dev/null +++ b/examples/tdigest/flat.go @@ -0,0 +1,39 @@ +package tdigest + +import ( + "sort" +) + +// Flat uses uniform probability density for each centroid. +type Flat struct { + t *TDigest +} + +var _ ProbabilityFunc = &Flat{} + +func newFlatFunc(t *TDigest) *Flat { + return &Flat{t: t} +} + +func (f *Flat) Probability(value float64) float64 { + digest := f.t.Digest + sumw := f.t.SumWeight + if value <= digest[0].Mean { + return digest[0].Weight / (2 * sumw) + } + if value >= digest[len(digest)-1].Mean { + return digest[len(digest)-1].Weight / (2 * sumw) + } + + idx := sort.Search(len(digest), func(i int) bool { + return value < digest[i].Mean + }) + + lower := value - digest[idx-1].Mean + upper := digest[idx].Mean - value + + if lower > upper { + return digest[idx-1].Weight / sumw + } + return digest[idx].Weight / sumw +} diff --git a/examples/tdigest/stream.go b/examples/tdigest/stream.go new file mode 100644 index 0000000..6adb78f --- /dev/null +++ b/examples/tdigest/stream.go @@ -0,0 +1,300 @@ +package tdigest + +import ( + "fmt" + "math" + "math/rand" + "sort" + + "github.com/lightstep/varopt" +) + +type ( + QuantileFunc interface { + Quantile(value float64) float64 + } + + Config struct { + // quality determines the number of centroids used to + // compute the digest. + quality Quality + + // currentSize is the current sample size. + currentSize int + + // completeSize is the complete sample size. + completeSize int + + // windowSize is the number of points between + // re-computing the digest. + windowSize int + } + + currentWindow struct { + sample *varopt.Varopt + temporary []float64 + } + + completeWindow struct { + sample *varopt.Varopt + temporary []float64 + freelist []*float64 + } + + Stream struct { + // Config stores parameters. + Config + + // rnd is used by both both samplers. + rnd *rand.Rand + + // Buffer and tinput used as inputs to T-digest. + // Buffer is ordered by this code, tinput is sorted + // while computing the digest. This is twice the + // currentSize, since at each round we combine the + // prior distribution with the current. Note that + // T-digest supports zero-weight inputs, so we always + // pass entire slices, even when partially filled. + buffer []Item + tinput Input + + // The cycle count tells the number of T-digest + // iterations, indicating which half of buffer and + // weight to fill. + cycle int + + // digest is the current estimated distribution. + digest *TDigest + + // flushed prevents Add() after Quantile() is called. + flushed bool + + // current stores a sample of the latest window of + // data. + current currentWindow + + // current stores a sample of the complete stream of data. + complete completeWindow + } +) + +var _ QuantileFunc = &Stream{} + +func NewConfig(quality Quality, currentSize, completeSize, windowSize int) Config { + return Config{ + quality: quality, + currentSize: currentSize, + completeSize: completeSize, + windowSize: windowSize, + } +} + +func NewStream(config Config, rnd *rand.Rand) *Stream { + bufsz := 2 * config.currentSize + if config.completeSize > bufsz { + bufsz = config.completeSize + } + + stream := &Stream{ + Config: config, + digest: New(config.quality), + rnd: rnd, + buffer: make([]Item, bufsz), + tinput: make([]*Item, bufsz), + } + + for i := range stream.buffer { + stream.tinput[i] = &stream.buffer[i] + } + + stream.current = stream.newCurrentWindow(config.currentSize) + stream.complete = stream.newCompleteWindow(config.completeSize) + return stream +} + +func (s *Stream) newCurrentWindow(sampleSize int) currentWindow { + return currentWindow{ + sample: varopt.New(sampleSize, s.rnd), + temporary: make([]float64, 0, s.windowSize), + } +} + +func (v *currentWindow) add(value, weight float64) error { + v.temporary = append(v.temporary, value) + _, err := v.sample.Add(&v.temporary[len(v.temporary)-1], weight) + return err +} + +func (v *currentWindow) full() bool { + return len(v.temporary) == cap(v.temporary) +} + +func (v *currentWindow) size() int { + return v.sample.Size() +} + +func (v *currentWindow) get(i int) (interface{}, float64) { + value, weight := v.sample.Get(i) + freq := weight / v.sample.GetOriginalWeight(i) + + if math.IsNaN(freq) { + panic(fmt.Sprintln("NaN here", value, weight)) + } + + return value, freq +} + +func (v *currentWindow) restart() { + v.temporary = v.temporary[:0] + v.sample.Reset() +} + +func (s *Stream) newCompleteWindow(sampleSize int) completeWindow { + cw := completeWindow{ + sample: varopt.New(sampleSize, s.rnd), + temporary: make([]float64, sampleSize+1), + freelist: make([]*float64, sampleSize+1), + } + for i := range cw.temporary { + cw.freelist[i] = &cw.temporary[i] + } + return cw +} + +func (v *completeWindow) add(value, weight float64) error { + lastdata := len(v.freelist) - 1 + data := v.freelist[lastdata] + v.freelist = v.freelist[:lastdata] + + *data = value + eject, err := v.sample.Add(data, weight) + + if err != nil { + v.freelist = append(v.freelist, data) + } else if eject != nil { + v.freelist = append(v.freelist, eject.(*float64)) + } + return err +} + +func (v *completeWindow) size() int { + return v.sample.Size() +} + +func (v *completeWindow) get(i int) (interface{}, float64) { + value, weight := v.sample.Get(i) + freq := weight / v.sample.GetOriginalWeight(i) + return value, freq +} + +func (s *Stream) Add(x float64) error { + var weight float64 + + if s.cycle == 0 { + weight = 1 + } else { + prob := s.digest.Probability(x) + weight = 1 / prob + } + + if err := s.current.add(x, weight); err != nil { + return err + } + + if !s.current.full() { + return nil + } + + return s.recompute() +} + +func (s *Stream) recompute() error { + // Recompute the T-Digest from the new weighted sample + // combined with the prior data. This computes a MAP + // estimate. + offset := s.currentSize * (s.cycle % 2) + + for i := 0; i < s.current.size(); i++ { + valueI, freq := s.current.get(i) + value := *(valueI.(*float64)) + + //fmt.Println("New W", value, freq) + + s.buffer[offset+i].Value = value + s.buffer[offset+i].Weight = freq + + s.complete.add(value, freq) + } + + // Fill in zero-weight values in case the sample size was + // smaller than capacity at the end of the stream. + for i := s.current.size(); i < s.currentSize; i++ { + s.buffer[offset+i] = Item{} + } + + if err := s.digest.Compute(s.tinput[0 : 2*s.currentSize]); err != nil { + return err + } + + s.current.restart() + s.cycle++ + return nil +} + +func (s *Stream) flush() error { + if s.flushed { + // Note: this could be fixed if needed. + return fmt.Errorf("Do not Add() after calling Quantile()") + } + s.flushed = true + + for i := 0; i < s.current.size(); i++ { + valueI, freq := s.current.get(i) + value := *(valueI.(*float64)) + + s.complete.add(value, freq) + } + + sumWeight := 0.0 + for i := 0; i < s.complete.size(); i++ { + valueI, freq := s.complete.get(i) + value := *(valueI.(*float64)) + + s.buffer[i].Value = value + s.buffer[i].Weight = freq + s.tinput[i] = &s.buffer[i] + sumWeight += freq + } + + s.tinput = s.tinput[0:s.complete.size()] + sort.Sort(&s.tinput) + + sum := 0.0 + for _, t := range s.tinput { + sum += t.Weight + t.Weight = sum / sumWeight + } + + return nil +} + +// Quantile returns the estimated value for a given quantile. +// +// Note: TDigest can implement QuantileFunc itself, but in this +// implementation we use the stream sample directly. See section 2.9 +// of the T-digest paper for recommendations about interpolating the +// CDF of a T-digest. +func (s *Stream) Quantile(quantile float64) float64 { + if !s.flushed { + s.flush() + } + + idx := sort.Search(len(s.tinput), func(i int) bool { + return quantile <= s.tinput[i].Weight + }) + + if idx == len(s.tinput) { + return s.tinput[len(s.tinput)-1].Value + } + return s.tinput[idx].Value +} diff --git a/examples/tdigest/stream_test.go b/examples/tdigest/stream_test.go new file mode 100644 index 0000000..f6a4ecd --- /dev/null +++ b/examples/tdigest/stream_test.go @@ -0,0 +1,178 @@ +package tdigest_test + +import ( + "math" + "math/rand" + "sort" + "testing" + + "github.com/lightstep/varopt/examples/tdigest" + "github.com/stretchr/testify/require" +) + +func TestStreamNormal(t *testing.T) { + const ( + mean = 10 + stddev = 10 + ) + + testStreamDigest(t, + []float64{.1, .25, .5, .75, .9, .99, .999}, + []float64{.5, .50, .1, .50, .5, .05, .050}, + func(rnd *rand.Rand) float64 { + return mean + stddev*rnd.NormFloat64() + }) +} + +func TestStreamExponential(t *testing.T) { + testStreamDigest(t, + []float64{.1, .25, .5, .75, .9, .99, .999}, + []float64{.5, .50, 1., 1.5, 1., .20, .050}, + func(rnd *rand.Rand) float64 { + return rnd.ExpFloat64() + }) +} + +func TestStreamAlternating(t *testing.T) { + count := 0 + testStreamDigest(t, + // Quantiles <0.66 == -1 + // Quantiles >=0.66 == +1 + []float64{.1, .5, .66, .67, .7, .8, .87, .9}, + // This performs poorly, but it's hard to show + // concisely with this test framework. At the 90th + // percentile, we finally see a +1 + []float64{10, 50, 66, 67, 70, 80, 21, 24}, + func(rnd *rand.Rand) float64 { + c := count + count++ + count = count % 3 + switch c { + case 0, 2: + return -1 + default: + return +1 + } + }) +} + +func TestStreamLinear(t *testing.T) { + count := 0 + testStreamDigest(t, + []float64{.1, .25, .5, .75, .9, .99, .999}, + []float64{.5, .5, .5, 1., .5, .5, .5}, + func(rnd *rand.Rand) float64 { + c := count + count++ + return float64(c) + }) +} + +func TestStreamUniform(t *testing.T) { + testStreamDigest(t, + []float64{.1, .25, .5, .75, .9, .99, .999}, + []float64{.5, .5, .5, 1., .5, .5, .5}, + func(rnd *rand.Rand) float64 { + return rnd.Float64() + }) +} + +func testStreamDigest(t *testing.T, quantiles, tolerance []float64, f func(rnd *rand.Rand) float64) { + const ( + quality = 5000 + sampleSize = 5000 + windowSize = 25000 + streamCount = 1000000 + ) + + rnd := rand.New(rand.NewSource(31181)) + correct := &CorrectQuantile{} + + stream := tdigest.NewStream(tdigest.NewConfig(quality, sampleSize, sampleSize, windowSize), rnd) + + for i := 0; i < streamCount; i++ { + value := f(rnd) + correct.Add(value) + err := stream.Add(value) + if err != nil { + t.Error("Stream add error", err) + } + } + + for i, q := range quantiles { + require.GreaterOrEqual(t, tolerance[i], math.Abs(correct.Distance(stream, q)), + "at quantile=%v", q) + } +} + +type Quantiler interface { + Quantile(float64) float64 +} + +type CorrectQuantile struct { + values []float64 + sorted bool +} + +// Distance returns quantile distance in percent units. +func (l *CorrectQuantile) Distance(qq Quantiler, quant float64) float64 { + value := qq.Quantile(quant) + actual := l.LookupQuantile(value) + return 100 * (actual - quant) +} + +func (l *CorrectQuantile) Add(f float64) { + l.values = append(l.values, f) + l.sorted = false +} + +func (l *CorrectQuantile) Quantile(f float64) float64 { + if len(l.values) == 0 { + return math.NaN() + } + if !l.sorted { + sort.Float64s(l.values) + l.sorted = true + } + quantileLocation := float64(len(l.values)) * f + if quantileLocation <= 0 { + return l.values[0] + } + if quantileLocation >= float64(len(l.values)-1) { + return l.values[len(l.values)-1] + } + beforeIndex := int(math.Floor(quantileLocation)) + afterIndex := beforeIndex + 1 + delta := l.values[afterIndex] - l.values[beforeIndex] + if delta == 0 { + return l.values[beforeIndex] + } + return l.values[beforeIndex] + delta*(quantileLocation-float64(beforeIndex)) +} + +func (l *CorrectQuantile) LookupQuantile(value float64) float64 { + if !l.sorted { + sort.Float64s(l.values) + l.sorted = true + } + + idx := sort.Search(len(l.values), func(i int) bool { + return l.values[i] >= value + }) + + if idx == 0 { + return 0 + } + + if idx == len(l.values) { + return 1 + } + + above := l.values[idx] + below := l.values[idx-1] + diff := above - below + if diff == 0 { + panic("impossible") + } + return (float64(idx) + (value-below)/diff) / float64(len(l.values)-1) +} diff --git a/examples/tdigest/tdigest.go b/examples/tdigest/tdigest.go new file mode 100644 index 0000000..ae51a4e --- /dev/null +++ b/examples/tdigest/tdigest.go @@ -0,0 +1,180 @@ +// Package tdigest uses the t-digest Algorithm 1 to compute an ordered +// list of non-overlapping centroids, which can be used to estimate +// quantiles and probability density. +// +// What T-digest calls a "compression" paramter is called Quality here. +// +// See the 2019 paper here: https://arxiv.org/pdf/1902.04023.pdf +package tdigest + +import ( + "fmt" + "math" + "sort" +) + +type ( + ProbabilityFunc interface { + Probability(value float64) float64 + } + + Centroid struct { + Sum float64 // Total sum of values + Weight float64 // Total weight of values + Mean float64 // Sum / Weight + Count int // Number of distinct values + } + + // Quality is a quality parameter, must be > 0. + // + // Forcing the compression parameter to be an integer + // simplifies the code, because each centroid / quantile has + // k(i) == 1. + Quality int + + // Item is a value/weight pair for input to the digest. + Item struct { + Value float64 + Weight float64 + } + + // input is for sorting Items. + Input []*Item + + // TDigest is computed using Algorithm 1 from the T-digest + // paper. + TDigest struct { + Quality Quality + SumWeight float64 + Digest []Centroid + + ProbabilityFunc + } +) + +var _ ProbabilityFunc = &TDigest{} + +var ( + ErrEmptyDataSet = fmt.Errorf("Empty data set") + ErrInvalidWeight = fmt.Errorf("Negative or NaN weight") + ErrInvalidInterp = fmt.Errorf("Unknown interpolation") +) + +// The T-digest scaling function maps quantile to index, where +// 0 <= index < quality +func (quality Quality) digestK(q float64) float64 { + return float64(quality) * (0.5 + math.Asin(2*q-1)/math.Pi) +} + +// The T-digest inverse-scaling function maps index to quantile. +func (quality Quality) inverseK(k float64) float64 { + if k > float64(quality) { + return 1 + } + return 0.5 * (math.Sin(math.Pi*(k/float64(quality)-0.5)) + 1) +} + +func New(quality Quality) *TDigest { + return &TDigest{ + Quality: quality, + Digest: make([]Centroid, 0, quality), + } +} + +func (t *TDigest) Compute(in Input) error { + t.Digest = t.Digest[:0] + + if len(in) == 0 { + return ErrEmptyDataSet + } + + // Compute the total weight, check for invalid weights. + // combine weights for equal values. + + sumWeight := 0.0 + + for _, it := range in { + we := it.Weight + if we < 0 || math.IsNaN(we) || math.IsInf(we, +1) { + return ErrInvalidWeight + } + + sumWeight += we + } + + // Both of the following loops require sorted data. + sort.Sort(&in) + + outIndex := 0 + for i := 0; i < len(in); i++ { + va := in[i].Value + we := in[i].Weight + + if we == 0 { + continue + } + + if outIndex != 0 && i != 0 && in[i-1].Value == va { + in[outIndex-1].Weight += we + continue + } + + in[outIndex].Value = va + in[outIndex].Weight = we + outIndex++ + } + + in = in[0:outIndex] + + // T-digest's Algorithm 1. The step above to de-duplicate + // values ensures that each bucket has a non-zero width. + digest := t.Digest + + qleft := 0.0 + qlimit := t.Quality.inverseK(1) + + current := Centroid{} + + for pos := 0; pos < len(in); pos++ { + vpos := in[pos].Value + wpos := in[pos].Weight + + q := qleft + (current.Weight+wpos)/sumWeight + + if q <= qlimit { + current.Sum += vpos * wpos + current.Weight += wpos + current.Count++ + continue + } + + if current.Count > 0 { + digest = append(digest, current) + } + qleft += current.Weight / sumWeight + + qlimit = t.Quality.inverseK(t.Quality.digestK(qleft) + 1) + current.Sum = vpos * wpos + current.Weight = wpos + current.Count = 1 + } + digest = append(digest, current) + + t.Digest = digest + t.SumWeight = sumWeight + t.ProbabilityFunc = newFlatFunc(t) + + return nil +} + +func (in *Input) Len() int { + return len(*in) +} + +func (in *Input) Swap(i, j int) { + (*in)[i], (*in)[j] = (*in)[j], (*in)[i] +} + +func (in *Input) Less(i, j int) bool { + return (*in)[i].Value < (*in)[j].Value +} diff --git a/frequency_test.go b/frequency_test.go new file mode 100644 index 0000000..e4b3429 --- /dev/null +++ b/frequency_test.go @@ -0,0 +1,143 @@ +// Copyright 2019, LightStep Inc. + +package varopt_test + +import ( + "fmt" + "math" + "math/rand" + + "github.com/lightstep/varopt" +) + +type curve struct { + color string + mean float64 + stddev float64 +} + +type point struct { + color int + xvalue float64 +} + +var colors = []curve{ + {color: "red", mean: 10, stddev: 15}, + {color: "green", mean: 30, stddev: 10}, + {color: "blue", mean: 50, stddev: 20}, +} + +// This example shows how to use Varopt sampling to estimate +// frequencies with the use of inverse probability weights. The use +// of inverse probability creates a uniform expected value, in this of +// the number of sample points per second. +// +// While the number of expected points per second is uniform, the +// output sample weights are expected to match the original +// frequencies. +func ExampleVaropt_GetOriginalWeight() { + // Number of points. + const totalCount = 1e6 + + // Relative size of the sample. + const sampleRatio = 0.01 + + // Ensure this test is deterministic. + rnd := rand.New(rand.NewSource(104729)) + + // Construct a timeseries consisting of three colored signals, + // for x=0 to x=60 seconds. + var points []point + + // origCounts stores the original signals at second granularity. + origCounts := make([][]int, len(colors)) + for i := range colors { + origCounts[i] = make([]int, 60) + } + + // Construct the signals by choosing a random color, then + // using its Gaussian to compute a timestamp. + for len(points) < totalCount { + choose := rnd.Intn(len(colors)) + series := colors[choose] + xvalue := rnd.NormFloat64()*series.stddev + series.mean + + if xvalue < 0 || xvalue > 60 { + continue + } + origCounts[choose][int(math.Floor(xvalue))]++ + points = append(points, point{ + color: choose, + xvalue: xvalue, + }) + } + + // Compute the total number of points per second. This will be + // used to establish the per-second probability. + xcount := make([]int, 60) + for _, point := range points { + xcount[int(math.Floor(point.xvalue))]++ + } + + // Compute the sample with using the inverse probability as a + // weight. This ensures a uniform distribution of points in each + // second. + sampleSize := int(sampleRatio * float64(totalCount)) + sampler := varopt.New(sampleSize, rnd) + for _, point := range points { + second := int(math.Floor(point.xvalue)) + prob := float64(xcount[second]) / float64(totalCount) + sampler.Add(point, 1/prob) + } + + // sampleCounts stores the reconstructed signals. + sampleCounts := make([][]float64, len(colors)) + for i := range colors { + sampleCounts[i] = make([]float64, 60) + } + + // pointCounts stores the number of points per second. + pointCounts := make([]int, 60) + + // Reconstruct the signals using the output sample weights. + // The effective count of each sample point is its output + // weight divided by its original weight. + for i := 0; i < sampler.Size(); i++ { + sample, weight := sampler.Get(i) + origWeight := sampler.GetOriginalWeight(i) + point := sample.(point) + second := int(math.Floor(point.xvalue)) + sampleCounts[point.color][second] += (weight / origWeight) + pointCounts[second]++ + } + + // Compute standard deviation of sample points per second. + sum := 0.0 + mean := float64(sampleSize) / 60 + for s := 0; s < 60; s++ { + e := float64(pointCounts[s]) - mean + sum += e * e + } + stddev := math.Sqrt(sum / (60 - 1)) + + fmt.Printf("Samples per second mean %.2f\n", mean) + fmt.Printf("Samples per second standard deviation %.2f\n", stddev) + + // Compute mean absolute percentage error between sampleCounts + // and origCounts for each signal. + for c := range colors { + mae := 0.0 + for s := 0; s < 60; s++ { + mae += math.Abs(sampleCounts[c][s]-float64(origCounts[c][s])) / float64(origCounts[c][s]) + } + mae /= 60 + fmt.Printf("Mean absolute percentage error (%s) = %.2f%%\n", colors[c].color, mae*100) + } + + // Output: + // Samples per second mean 166.67 + // Samples per second standard deviation 13.75 + // Mean absolute percentage error (red) = 25.16% + // Mean absolute percentage error (green) = 14.30% + // Mean absolute percentage error (blue) = 14.23% +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e61e0b6 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module github.com/lightstep/varopt + +go 1.15 + +require github.com/stretchr/testify v1.4.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8fdee58 --- /dev/null +++ b/go.sum @@ -0,0 +1,11 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/sampleheap.go b/internal/sampleheap.go new file mode 100644 index 0000000..c3d82a5 --- /dev/null +++ b/internal/sampleheap.go @@ -0,0 +1,57 @@ +// Copyright 2019, LightStep Inc. + +package internal + +type Vsample struct { + Sample interface{} + Weight float64 +} + +type SampleHeap []Vsample + +func (sh *SampleHeap) Push(v Vsample) { + l := append(*sh, v) + n := len(l) - 1 + + // This copies the body of heap.up(). + j := n + for { + i := (j - 1) / 2 // parent + if i == j || l[j].Weight >= l[i].Weight { + break + } + l[i], l[j] = l[j], l[i] + j = i + } + + *sh = l +} + +func (sh *SampleHeap) Pop() Vsample { + l := *sh + n := len(l) - 1 + result := l[0] + l[0] = l[n] + l = l[:n] + + // This copies the body of heap.down(). + i := 0 + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && l[j2].Weight < l[j1].Weight { + j = j2 // = 2*i + 2 // right child + } + if l[j].Weight >= l[i].Weight { + break + } + l[i], l[j] = l[j], l[i] + i = j + } + + *sh = l + return result +} diff --git a/internal/sampleheap_test.go b/internal/sampleheap_test.go new file mode 100644 index 0000000..50615c0 --- /dev/null +++ b/internal/sampleheap_test.go @@ -0,0 +1,58 @@ +// Copyright 2019, LightStep Inc. + +package internal_test + +import ( + "container/heap" + "math/rand" + "testing" + + "github.com/lightstep/varopt/internal" + "github.com/stretchr/testify/require" +) + +type simpleHeap []float64 + +func (s *simpleHeap) Len() int { + return len(*s) +} + +func (s *simpleHeap) Swap(i, j int) { + (*s)[i], (*s)[j] = (*s)[j], (*s)[i] +} + +func (s *simpleHeap) Less(i, j int) bool { + return (*s)[i] < (*s)[j] +} + +func (s *simpleHeap) Push(x interface{}) { + *s = append(*s, x.(float64)) +} + +func (s *simpleHeap) Pop() interface{} { + old := *s + n := len(old) + x := old[n-1] + *s = old[0 : n-1] + return x +} + +func TestLargeHeap(t *testing.T) { + var L internal.SampleHeap + var S simpleHeap + + for i := 0; i < 1e6; i++ { + v := rand.NormFloat64() + L.Push(internal.Vsample{Weight: v}) + heap.Push(&S, v) + } + + for len(S) > 0 { + v1 := heap.Pop(&S).(float64) + v2 := L.Pop().Weight + + require.Equal(t, v1, v2) + } + + require.Equal(t, 0, len(L)) +} diff --git a/simple/doc.go b/simple/doc.go new file mode 100644 index 0000000..0d79ebe --- /dev/null +++ b/simple/doc.go @@ -0,0 +1,4 @@ +// Copyright 2019, LightStep Inc. + +/* Package simple implements an unweighted reservoir sampling algorithm. */ +package simple diff --git a/simple/simple.go b/simple/simple.go new file mode 100644 index 0000000..e3fb340 --- /dev/null +++ b/simple/simple.go @@ -0,0 +1,65 @@ +// Copyright 2019, LightStep Inc. + +package simple + +import ( + "math/rand" + + "github.com/lightstep/varopt" +) + +// Simple implements unweighted reservoir sampling using Algorithm R +// from "Random sampling with a reservoir" by Jeffrey Vitter (1985) +// https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R +type Simple struct { + capacity int + observed int + buffer []varopt.Sample + rnd *rand.Rand +} + +// New returns a simple reservoir sampler with given capacity +// (i.e., reservoir size) and random number generator. +func New(capacity int, rnd *rand.Rand) *Simple { + return &Simple{ + capacity: capacity, + rnd: rnd, + } +} + +// Add considers a new observation for the sample. Items have unit +// weight. +func (s *Simple) Add(span varopt.Sample) { + s.observed++ + + if s.buffer == nil { + s.buffer = make([]varopt.Sample, 0, s.capacity) + } + + if len(s.buffer) < s.capacity { + s.buffer = append(s.buffer, span) + return + } + + // Give this a capacity/observed chance of replacing an existing entry. + index := s.rnd.Intn(s.observed) + if index < s.capacity { + s.buffer[index] = span + } +} + +// Get returns the i'th selected item from the sample. +func (s *Simple) Get(i int) varopt.Sample { + return s.buffer[i] +} + +// Size returns the number of items in the sample. If the reservoir is +// full, Size() equals Capacity(). +func (s *Simple) Size() int { + return len(s.buffer) +} + +// Count returns the number of items that were observed. +func (s *Simple) Count() int { + return s.observed +} diff --git a/simple/simple_test.go b/simple/simple_test.go new file mode 100644 index 0000000..4ea9592 --- /dev/null +++ b/simple/simple_test.go @@ -0,0 +1,41 @@ +// Copyright 2019, LightStep Inc. + +package simple_test + +import ( + "math/rand" + "testing" + + "github.com/lightstep/varopt/simple" + "github.com/stretchr/testify/require" +) + +type iRec int + +func TestSimple(t *testing.T) { + const ( + popSize = 1e6 + sampleProb = 0.1 + sampleSize int = popSize * sampleProb + epsilon = 0.01 + ) + + rnd := rand.New(rand.NewSource(17167)) + + ss := simple.New(sampleSize, rnd) + + psum := 0. + for i := 0; i < popSize; i++ { + ss.Add(iRec(i)) + psum += float64(i) + } + + require.Equal(t, ss.Size(), sampleSize) + + ssum := 0.0 + for i := 0; i < sampleSize; i++ { + ssum += float64(ss.Get(i).(iRec)) + } + + require.InEpsilon(t, ssum/float64(ss.Size()), psum/popSize, epsilon) +} diff --git a/spline.go.dead b/spline.go.dead new file mode 100644 index 0000000..973a2e8 --- /dev/null +++ b/spline.go.dead @@ -0,0 +1,72 @@ +package tdigest + +import ( + "fmt" + "math" + + "github.com/jmacd/gospline" +) + +// MonotoneSpline uses cubic monotone interpolation. +// TODO: If this works out, the gospline can be optimized a bit. +type MonotoneSpline struct { + t *TDigest + spline gospline.Spline + iwidth float64 +} + +var _ ProbabilityFunc = &MonotoneSpline{} + +func newSplineFunc(t *TDigest) *MonotoneSpline { + xvalues := make([]float64, len(t.Digest)+2) + yvalues := make([]float64, len(t.Digest)+2) + + // Place 0 and 1 probabilities + xvalues[0] = -1e100 + xvalues[len(t.Digest)+1] = +1e100 + yvalues[0] = 0 + yvalues[len(t.Digest)+1] = 1 + + cum := 0.0 + min := math.MaxFloat64 + max := -math.MaxFloat64 + for i, c := range t.Digest { + mean := c.Sum / c.Weight + c.Mean = mean + cum += c.Weight / t.SumWeight + xvalues[i+1] = mean + yvalues[i+1] = cum + min = math.Min(min, mean) + max = math.Max(max, mean) + } + + return &MonotoneSpline{ + t: t, + spline: gospline.NewMonotoneSpline(xvalues, yvalues), + iwidth: (max - min) / (8 * float64(len(t.Digest))), + } +} + +// Probability interpolates the PDF. Uses cubic monotone spline +// interpolation. +// +// TODO Instead of computing At() twice, can we interpolate the PDF +// directly? +func (m *MonotoneSpline) Probability(value float64) float64 { + if value < m.t.Digest[0].Mean { + return m.t.Digest[0].Weight / (2 * m.t.SumWeight) + } + if value > m.t.Digest[len(m.t.Digest)-1].Mean { + return m.t.Digest[len(m.t.Digest)-1].Weight / (2 * m.t.SumWeight) + } + + // TODO: Note that this not symmetrical, should be. Requires + // care on the boundaries to center this (vs. the above + // special cases). + below := m.spline.At(value) + above := m.spline.At(value + m.iwidth) + if above == below { + fmt.Println("Zero prob at", value, below, above) + } + return (above - below) / (m.iwidth) +} diff --git a/varopt.go b/varopt.go index d3cd2f1..dbe605b 100644 --- a/varopt.go +++ b/varopt.go @@ -1,25 +1,32 @@ -// Stream sampling for variance-optimal estimation of subset sums -// Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, Mikkel Thorup -// 2008 -// https://arxiv.org/pdf/0803.0473.pdf +// Copyright 2019, LightStep Inc. package varopt import ( - "container/heap" "fmt" + "math" "math/rand" + + "github.com/lightstep/varopt/internal" ) +// Varopt implements the algorithm from Stream sampling for +// variance-optimal estimation of subset sums Edith Cohen, Nick +// Duffield, Haim Kaplan, Carsten Lund, Mikkel Thorup 2008 +// +// https://arxiv.org/pdf/0803.0473.pdf type Varopt struct { - // Large-weight items - L largeHeap + // Random number generator + rnd *rand.Rand + + // Large-weight items stored in a min-heap. + L internal.SampleHeap // Light-weight items. - T []vsample + T []internal.Vsample // Temporary buffer. - X []vsample + X []internal.Vsample // Current threshold tau float64 @@ -31,40 +38,56 @@ type Varopt struct { totalWeight float64 } -type vsample struct { - sample Sample - weight float64 -} +// Sample is an empty interface that represents a sample item. +// Sampling algorithms treat these as opaque, as their weight is +// passed in separately. +type Sample interface{} -type largeHeap []vsample +var ErrInvalidWeight = fmt.Errorf("Negative, Zero, Inf or NaN weight") -func NewVaropt(capacity int) *Varopt { - v := InitVaropt(capacity) - return &v -} - -func InitVaropt(capacity int) Varopt { - return Varopt{ +// New returns a new Varopt sampler with given capacity (i.e., +// reservoir size) and random number generator. +func New(capacity int, rnd *rand.Rand) *Varopt { + return &Varopt{ capacity: capacity, + rnd: rnd, + L: make(internal.SampleHeap, 0, capacity), + T: make(internal.SampleHeap, 0, capacity), } } -func (s *Varopt) Add(sample Sample, weight float64) { - individual := vsample{ - sample: sample, - weight: weight, +// Reset returns the sampler to its initial state, maintaining its +// capacity and random number source. +func (s *Varopt) Reset() { + s.L = s.L[:0] + s.T = s.T[:0] + s.X = s.X[:0] + s.tau = 0 + s.totalCount = 0 + s.totalWeight = 0 +} + +// Add considers a new observation for the sample with given weight. +// If there is an item ejected from the sample as a result, the item +// is returned to allow re-use of memory. +// +// An error will be returned if the weight is either negative or NaN. +func (s *Varopt) Add(sample Sample, weight float64) (Sample, error) { + individual := internal.Vsample{ + Sample: sample, + Weight: weight, } - if weight <= 0 { - panic(fmt.Sprint("Invalid weight <= 0: ", weight)) + if weight <= 0 || math.IsNaN(weight) || math.IsInf(weight, 1) { + return nil, ErrInvalidWeight } s.totalCount++ s.totalWeight += weight if s.Size() < s.capacity { - heap.Push(&s.L, individual) - return + s.L.Push(individual) + return nil, nil } // the X <- {} step from the paper is not done here, @@ -73,16 +96,16 @@ func (s *Varopt) Add(sample Sample, weight float64) { W := s.tau * float64(len(s.T)) if weight > s.tau { - heap.Push(&s.L, individual) + s.L.Push(individual) } else { s.X = append(s.X, individual) W += weight } - for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].weight { - h := heap.Pop(&s.L).(vsample) + for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].Weight { + h := s.L.Pop() s.X = append(s.X, h) - W += h.weight + W += h.Weight } s.tau = W / float64(len(s.T)+len(s.X)-1) @@ -90,27 +113,31 @@ func (s *Varopt) Add(sample Sample, weight float64) { d := 0 for d < len(s.X) && r >= 0 { - wxd := s.X[d].weight + wxd := s.X[d].Weight r -= (1 - wxd/s.tau) d++ } + var eject Sample if r < 0 { if d < len(s.X) { s.X[d], s.X[len(s.X)-1] = s.X[len(s.X)-1], s.X[d] } + eject = s.X[len(s.X)-1].Sample s.X = s.X[:len(s.X)-1] } else { - ti := rand.Intn(len(s.T)) + ti := s.rnd.Intn(len(s.T)) s.T[ti], s.T[len(s.T)-1] = s.T[len(s.T)-1], s.T[ti] + eject = s.T[len(s.T)-1].Sample s.T = s.T[:len(s.T)-1] } s.T = append(s.T, s.X...) s.X = s.X[:0] + return eject, nil } func (s *Varopt) uniform() float64 { for { - r := rand.Float64() + r := s.rnd.Float64() if r != 0.0 { return r } @@ -122,60 +149,48 @@ func (s *Varopt) uniform() float64 { // GetOriginalWeight(i). func (s *Varopt) Get(i int) (Sample, float64) { if i < len(s.L) { - return s.L[i].sample, s.L[i].weight + return s.L[i].Sample, s.L[i].Weight } - return s.T[i-len(s.L)].sample, s.tau + return s.T[i-len(s.L)].Sample, s.tau } +// GetOriginalWeight returns the original input weight of the sample +// item that was passed to Add(). This can be useful for computing a +// frequency from the adjusted sample weight. func (s *Varopt) GetOriginalWeight(i int) float64 { if i < len(s.L) { - return s.L[i].weight + return s.L[i].Weight } - return s.T[i-len(s.L)].weight + return s.T[i-len(s.L)].Weight } +// Capacity returns the size of the reservoir. This is the maximum +// size of the sample. func (s *Varopt) Capacity() int { return s.capacity } +// Size returns the current number of items in the sample. If the +// reservoir is full, this returns Capacity(). func (s *Varopt) Size() int { return len(s.L) + len(s.T) } +// TotalWeight returns the sum of weights that were passed to Add(). func (s *Varopt) TotalWeight() float64 { return s.totalWeight } +// TotalCount returns the number of calls to Add(). func (s *Varopt) TotalCount() int { return s.totalCount } +// Tau returns the current large-weight threshold. Weights larger +// than Tau() carry their exact weight in the sample. See the VarOpt +// paper for details. func (s *Varopt) Tau() float64 { return s.tau } - -func (b largeHeap) Len() int { - return len(b) -} - -func (b largeHeap) Swap(i, j int) { - b[i], b[j] = b[j], b[i] -} - -func (b largeHeap) Less(i, j int) bool { - return b[i].weight < b[j].weight -} - -func (b *largeHeap) Push(x interface{}) { - *b = append(*b, x.(vsample)) -} - -func (b *largeHeap) Pop() interface{} { - old := *b - n := len(old) - x := old[n-1] - *b = old[0 : n-1] - return x -} diff --git a/varopt_test.go b/varopt_test.go index 4a19b80..a56c691 100644 --- a/varopt_test.go +++ b/varopt_test.go @@ -1,9 +1,14 @@ +// Copyright 2019, LightStep Inc. + package varopt_test import ( "math" + "math/rand" "testing" + "github.com/lightstep/varopt" + "github.com/lightstep/varopt/simple" "github.com/stretchr/testify/require" ) @@ -11,18 +16,15 @@ import ( // There are odd and even numbers, in equal amount // There are last-digits 0-9 in equal amount // -// Much like simple_test.go, we will test the mean is correct and, -// because unbiased, also the odd/even and last-digit-0-9 groupings -// will be balanced. +// We will test the mean is correct and, because unbiased, also the +// odd/even and last-digit-0-9 groupings will be balanced. const ( numBlocks = 100 popSize = 1e7 sampleProb = 0.001 sampleSize int = popSize * sampleProb - // TODO epsilon is somewhat variable b/c we're using the - // static rand w/o a fixed seed for the test. - epsilon = 0.06 + epsilon = 0.08 ) func TestUnbiased(t *testing.T) { @@ -53,7 +55,7 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { extra = popSize - bigSize*numBig - smallSize*numSmall ) - population := make([]Sample, popSize) + population := make([]varopt.Sample, popSize) psum := 0.0 @@ -67,17 +69,17 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { // population[i], population[j] = population[j], population[i] // }) - smallBlocks := make([][]Sample, numSmall) - bigBlocks := make([][]Sample, numBig) + smallBlocks := make([][]varopt.Sample, numSmall) + bigBlocks := make([][]varopt.Sample, numBig) for i := 0; i < numSmall; i++ { - smallBlocks[i] = make([]Sample, smallSize) + smallBlocks[i] = make([]varopt.Sample, smallSize) } for i := 0; i < numBig; i++ { if i == 0 { - bigBlocks[0] = make([]Sample, bigSize+extra) + bigBlocks[0] = make([]varopt.Sample, bigSize+extra) } else { - bigBlocks[i] = make([]Sample, bigSize) + bigBlocks[i] = make([]varopt.Sample, bigSize) } } @@ -97,22 +99,23 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { require.Equal(t, len(population), pos) maxDiff := 0.0 + rnd := rand.New(rand.NewSource(98887)) - func(allBlockLists ...[][][]Sample) { + func(allBlockLists ...[][][]varopt.Sample) { for _, blockLists := range allBlockLists { - varopt := NewVaropt(sampleSize) + vsample := varopt.New(sampleSize, rnd) for _, blockList := range blockLists { for _, block := range blockList { - simple := NewSimple(sampleSize) + ss := simple.New(sampleSize, rnd) for _, s := range block { - simple.Add(s) + ss.Add(s) } - weight := simple.Weight() - for i := 0; i < simple.Size(); i++ { - varopt.Add(simple.Get(i), weight) + weight := float64(ss.Count()) / float64(ss.Size()) + for i := 0; i < ss.Size(); i++ { + vsample.Add(ss.Get(i), weight) } } } @@ -121,8 +124,8 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { odd := 0.0 even := 0.0 - for i := 0; i < varopt.Size(); i++ { - v, w := varopt.Get(i) + for i := 0; i < vsample.Size(); i++ { + v, w := vsample.Get(i) vi := v.(int) if vi%2 == 0 { even++ @@ -140,7 +143,98 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { require.InEpsilon(t, odd, even, epsilon) } }( - [][][]Sample{bigBlocks, smallBlocks}, - [][][]Sample{smallBlocks, bigBlocks}, + [][][]varopt.Sample{bigBlocks, smallBlocks}, + [][][]varopt.Sample{smallBlocks, bigBlocks}, ) } + +func TestInvalidWeight(t *testing.T) { + rnd := rand.New(rand.NewSource(98887)) + v := varopt.New(1, rnd) + + _, err := v.Add(nil, math.NaN()) + require.Equal(t, err, varopt.ErrInvalidWeight) + + _, err = v.Add(nil, -1) + require.Equal(t, err, varopt.ErrInvalidWeight) + + _, err = v.Add(nil, 0) + require.Equal(t, err, varopt.ErrInvalidWeight) +} + +func TestReset(t *testing.T) { + const capacity = 10 + const insert = 100 + rnd := rand.New(rand.NewSource(98887)) + v := varopt.New(capacity, rnd) + + sum := 0. + for i := 1.; i <= insert; i++ { + v.Add(nil, i) + sum += i + } + + require.Equal(t, capacity, v.Size()) + require.Equal(t, insert, v.TotalCount()) + require.Equal(t, sum, v.TotalWeight()) + require.Less(t, 0., v.Tau()) + + v.Reset() + + require.Equal(t, 0, v.Size()) + require.Equal(t, 0, v.TotalCount()) + require.Equal(t, 0., v.TotalWeight()) + require.Equal(t, 0., v.Tau()) +} + +func TestEject(t *testing.T) { + const capacity = 100 + const rounds = 10000 + const maxvalue = 10000 + + entries := make([]int, capacity+1) + freelist := make([]*int, capacity+1) + + for i := range entries { + freelist[i] = &entries[i] + } + + // Make two deterministically equal samplers + rnd1 := rand.New(rand.NewSource(98887)) + rnd2 := rand.New(rand.NewSource(98887)) + vsrc := rand.New(rand.NewSource(98887)) + + expected := varopt.New(capacity, rnd1) + ejector := varopt.New(capacity, rnd2) + + for i := 0; i < rounds; i++ { + value := vsrc.Intn(maxvalue) + weight := vsrc.ExpFloat64() + + _, _ = expected.Add(&value, weight) + + lastitem := len(freelist) - 1 + item := freelist[lastitem] + freelist = freelist[:lastitem] + + *item = value + eject, _ := ejector.Add(item, weight) + + if eject != nil { + freelist = append(freelist, eject.(*int)) + } + } + + require.Equal(t, expected.Size(), ejector.Size()) + require.Equal(t, expected.TotalCount(), ejector.TotalCount()) + require.Equal(t, expected.TotalWeight(), ejector.TotalWeight()) + require.Equal(t, expected.Tau(), ejector.Tau()) + + for i := 0; i < capacity; i++ { + expectItem, expectWeight := expected.Get(i) + ejectItem, ejectWeight := expected.Get(i) + + require.Equal(t, *expectItem.(*int), *ejectItem.(*int)) + require.Equal(t, expectWeight, ejectWeight) + } +} diff --git a/weighted_test.go b/weighted_test.go new file mode 100644 index 0000000..1e82043 --- /dev/null +++ b/weighted_test.go @@ -0,0 +1,82 @@ +// Copyright 2019, LightStep Inc. + +package varopt_test + +import ( + "fmt" + "math" + "math/rand" + + "github.com/lightstep/varopt" +) + +type packet struct { + size int + color string + protocol string +} + +func ExampleNew() { + const totalPackets = 1e6 + const sampleRatio = 0.01 + + colors := []string{"red", "green", "blue"} + protocols := []string{"http", "tcp", "udp"} + + sizeByColor := map[string]int{} + sizeByProtocol := map[string]int{} + trueTotalWeight := 0.0 + + rnd := rand.New(rand.NewSource(32491)) + sampler := varopt.New(totalPackets*sampleRatio, rnd) + + for i := 0; i < totalPackets; i++ { + packet := packet{ + size: 1 + rnd.Intn(100000), + color: colors[rnd.Intn(len(colors))], + protocol: protocols[rnd.Intn(len(protocols))], + } + + sizeByColor[packet.color] += packet.size + sizeByProtocol[packet.protocol] += packet.size + trueTotalWeight += float64(packet.size) + + sampler.Add(packet, float64(packet.size)) + } + + estSizeByColor := map[string]float64{} + estSizeByProtocol := map[string]float64{} + estTotalWeight := 0.0 + + for i := 0; i < sampler.Size(); i++ { + sample, weight := sampler.Get(i) + packet := sample.(packet) + estSizeByColor[packet.color] += weight + estSizeByProtocol[packet.protocol] += weight + estTotalWeight += weight + } + + // Compute mean average percentage error for colors + colorMape := 0.0 + for _, c := range colors { + colorMape += math.Abs(float64(sizeByColor[c])-estSizeByColor[c]) / float64(sizeByColor[c]) + } + colorMape /= float64(len(colors)) + + // Compute mean average percentage error for protocols + protocolMape := 0.0 + for _, p := range protocols { + protocolMape += math.Abs(float64(sizeByProtocol[p])-estSizeByProtocol[p]) / float64(sizeByProtocol[p]) + } + protocolMape /= float64(len(protocols)) + + // Compute total sum error percentage + fmt.Printf("Total sum error %.2g%%\n", 100*math.Abs(estTotalWeight-trueTotalWeight)/trueTotalWeight) + fmt.Printf("Color mean absolute percentage error %.2f%%\n", 100*colorMape) + fmt.Printf("Protocol mean absolute percentage error %.2f%%\n", 100*protocolMape) + + // Output: + // Total sum error 2.4e-11% + // Color mean absolute percentage error 0.73% + // Protocol mean absolute percentage error 1.62% +}