Skip to content

Commit cc959e4

Browse files
author
jmacd
committed
Initial checkin
0 parents  commit cc959e4

File tree

3 files changed

+330
-0
lines changed

3 files changed

+330
-0
lines changed

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
This is an implementation of VarOpt, an unbiased weighted sampling
2+
algorithm described in the paper [Stream sampling for variance-optimal
3+
estimation of subset sums](https://arxiv.org/pdf/0803.0473.pdf).

varopt.go

+181
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// Stream sampling for variance-optimal estimation of subset sums
2+
// Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, Mikkel Thorup
3+
// 2008
4+
// https://arxiv.org/pdf/0803.0473.pdf
5+
6+
package varopt
7+
8+
import (
9+
"container/heap"
10+
"fmt"
11+
"math/rand"
12+
)
13+
14+
type Varopt struct {
15+
// Large-weight items
16+
L largeHeap
17+
18+
// Light-weight items.
19+
T []vsample
20+
21+
// Temporary buffer.
22+
X []vsample
23+
24+
// Current threshold
25+
tau float64
26+
27+
// Size of sample & scale
28+
capacity int
29+
30+
totalCount int
31+
totalWeight float64
32+
}
33+
34+
type vsample struct {
35+
sample Sample
36+
weight float64
37+
}
38+
39+
type largeHeap []vsample
40+
41+
func NewVaropt(capacity int) *Varopt {
42+
v := InitVaropt(capacity)
43+
return &v
44+
}
45+
46+
func InitVaropt(capacity int) Varopt {
47+
return Varopt{
48+
capacity: capacity,
49+
}
50+
}
51+
52+
func (s *Varopt) Add(sample Sample, weight float64) {
53+
individual := vsample{
54+
sample: sample,
55+
weight: weight,
56+
}
57+
58+
if weight <= 0 {
59+
panic(fmt.Sprint("Invalid weight <= 0: ", weight))
60+
}
61+
62+
s.totalCount++
63+
s.totalWeight += weight
64+
65+
if s.Size() < s.capacity {
66+
heap.Push(&s.L, individual)
67+
return
68+
}
69+
70+
// the X <- {} step from the paper is not done here,
71+
// but rather at the bottom of the function
72+
73+
W := s.tau * float64(len(s.T))
74+
75+
if weight > s.tau {
76+
heap.Push(&s.L, individual)
77+
} else {
78+
s.X = append(s.X, individual)
79+
W += weight
80+
}
81+
82+
for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].weight {
83+
h := heap.Pop(&s.L).(vsample)
84+
s.X = append(s.X, h)
85+
W += h.weight
86+
}
87+
88+
s.tau = W / float64(len(s.T)+len(s.X)-1)
89+
r := s.uniform()
90+
d := 0
91+
92+
for d < len(s.X) && r >= 0 {
93+
wxd := s.X[d].weight
94+
r -= (1 - wxd/s.tau)
95+
d++
96+
}
97+
if r < 0 {
98+
if d < len(s.X) {
99+
s.X[d], s.X[len(s.X)-1] = s.X[len(s.X)-1], s.X[d]
100+
}
101+
s.X = s.X[:len(s.X)-1]
102+
} else {
103+
ti := rand.Intn(len(s.T))
104+
s.T[ti], s.T[len(s.T)-1] = s.T[len(s.T)-1], s.T[ti]
105+
s.T = s.T[:len(s.T)-1]
106+
}
107+
s.T = append(s.T, s.X...)
108+
s.X = s.X[:0]
109+
}
110+
111+
func (s *Varopt) uniform() float64 {
112+
for {
113+
r := rand.Float64()
114+
if r != 0.0 {
115+
return r
116+
}
117+
}
118+
}
119+
120+
// Get() returns the i'th sample and its adjusted weight. To obtain
121+
// the sample's original weight (i.e. what was passed to Add), use
122+
// GetOriginalWeight(i).
123+
func (s *Varopt) Get(i int) (Sample, float64) {
124+
if i < len(s.L) {
125+
return s.L[i].sample, s.L[i].weight
126+
}
127+
128+
return s.T[i-len(s.L)].sample, s.tau
129+
}
130+
131+
func (s *Varopt) GetOriginalWeight(i int) float64 {
132+
if i < len(s.L) {
133+
return s.L[i].weight
134+
}
135+
136+
return s.T[i-len(s.L)].weight
137+
}
138+
139+
func (s *Varopt) Capacity() int {
140+
return s.capacity
141+
}
142+
143+
func (s *Varopt) Size() int {
144+
return len(s.L) + len(s.T)
145+
}
146+
147+
func (s *Varopt) TotalWeight() float64 {
148+
return s.totalWeight
149+
}
150+
151+
func (s *Varopt) TotalCount() int {
152+
return s.totalCount
153+
}
154+
155+
func (s *Varopt) Tau() float64 {
156+
return s.tau
157+
}
158+
159+
func (b largeHeap) Len() int {
160+
return len(b)
161+
}
162+
163+
func (b largeHeap) Swap(i, j int) {
164+
b[i], b[j] = b[j], b[i]
165+
}
166+
167+
func (b largeHeap) Less(i, j int) bool {
168+
return b[i].weight < b[j].weight
169+
}
170+
171+
func (b *largeHeap) Push(x interface{}) {
172+
*b = append(*b, x.(vsample))
173+
}
174+
175+
func (b *largeHeap) Pop() interface{} {
176+
old := *b
177+
n := len(old)
178+
x := old[n-1]
179+
*b = old[0 : n-1]
180+
return x
181+
}

varopt_test.go

+146
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package varopt_test
2+
3+
import (
4+
"math"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
// There are 2 unequal sizes of simple block
11+
// There are odd and even numbers, in equal amount
12+
// There are last-digits 0-9 in equal amount
13+
//
14+
// Much like simple_test.go, we will test the mean is correct and,
15+
// because unbiased, also the odd/even and last-digit-0-9 groupings
16+
// will be balanced.
17+
const (
18+
numBlocks = 100
19+
popSize = 1e7
20+
sampleProb = 0.001
21+
sampleSize int = popSize * sampleProb
22+
23+
// TODO epsilon is somewhat variable b/c we're using the
24+
// static rand w/o a fixed seed for the test.
25+
epsilon = 0.06
26+
)
27+
28+
func TestUnbiased(t *testing.T) {
29+
var (
30+
// Ratio of big blocks to small blocks
31+
bigBlockRatios = []float64{0.1, 0.3, 0.5, 0.7, 0.9, 1.0}
32+
// Ratio of big block size to small block size
33+
bigSizeRatios = []float64{0.1, 0.2, 0.4}
34+
)
35+
36+
for _, bbr := range bigBlockRatios {
37+
for _, bsr := range bigSizeRatios {
38+
testUnbiased(t, bbr, bsr)
39+
}
40+
}
41+
}
42+
43+
func testUnbiased(t *testing.T, bbr, bsr float64) {
44+
var (
45+
numBig = int(numBlocks * bbr)
46+
numSmall = numBlocks - numBig
47+
48+
factor = float64(numBig)/bsr + float64(numSmall)
49+
50+
smallSize = int(popSize / factor)
51+
bigSize = int(float64(smallSize) / bsr)
52+
53+
extra = popSize - bigSize*numBig - smallSize*numSmall
54+
)
55+
56+
population := make([]Sample, popSize)
57+
58+
psum := 0.0
59+
60+
for i := range population {
61+
population[i] = i
62+
psum += float64(i)
63+
}
64+
65+
// Note: We're leaving the data unsorted to prove lack of bias
66+
// rand.Shuffle(len(population), func(i, j int) {
67+
// population[i], population[j] = population[j], population[i]
68+
// })
69+
70+
smallBlocks := make([][]Sample, numSmall)
71+
bigBlocks := make([][]Sample, numBig)
72+
73+
for i := 0; i < numSmall; i++ {
74+
smallBlocks[i] = make([]Sample, smallSize)
75+
}
76+
for i := 0; i < numBig; i++ {
77+
if i == 0 {
78+
bigBlocks[0] = make([]Sample, bigSize+extra)
79+
} else {
80+
bigBlocks[i] = make([]Sample, bigSize)
81+
}
82+
}
83+
84+
pos := 0
85+
for i := 0; i < numSmall; i++ {
86+
for j := 0; j < len(smallBlocks[i]); j++ {
87+
smallBlocks[i][j] = population[pos]
88+
pos++
89+
}
90+
}
91+
for i := 0; i < numBig; i++ {
92+
for j := 0; j < len(bigBlocks[i]); j++ {
93+
bigBlocks[i][j] = population[pos]
94+
pos++
95+
}
96+
}
97+
require.Equal(t, len(population), pos)
98+
99+
maxDiff := 0.0
100+
101+
func(allBlockLists ...[][][]Sample) {
102+
for _, blockLists := range allBlockLists {
103+
varopt := NewVaropt(sampleSize)
104+
105+
for _, blockList := range blockLists {
106+
for _, block := range blockList {
107+
simple := NewSimple(sampleSize)
108+
109+
for _, s := range block {
110+
simple.Add(s)
111+
}
112+
113+
weight := simple.Weight()
114+
for i := 0; i < simple.Size(); i++ {
115+
varopt.Add(simple.Get(i), weight)
116+
}
117+
}
118+
}
119+
120+
vsum := 0.0
121+
odd := 0.0
122+
even := 0.0
123+
124+
for i := 0; i < varopt.Size(); i++ {
125+
v, w := varopt.Get(i)
126+
vi := v.(int)
127+
if vi%2 == 0 {
128+
even++
129+
} else {
130+
odd++
131+
}
132+
133+
vsum += w * float64(vi)
134+
}
135+
136+
diff := math.Abs(vsum-psum) / psum
137+
maxDiff = math.Max(maxDiff, diff)
138+
139+
require.InEpsilon(t, vsum, psum, epsilon)
140+
require.InEpsilon(t, odd, even, epsilon)
141+
}
142+
}(
143+
[][][]Sample{bigBlocks, smallBlocks},
144+
[][][]Sample{smallBlocks, bigBlocks},
145+
)
146+
}

0 commit comments

Comments
 (0)