Skip to content

chore: Batch writer benchmarks #1552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 6, 2024
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ lint:

.PHONY: benchmark
benchmark:
go test -bench=Benchmark -run="^$$" ./...
go test -bench=Benchmark -run="^$$" ./... | grep -v 'BenchmarkWriterMemory/'
go test -bench=BenchmarkWriterMemory -run="^$$" ./writers/

benchmark-ci:
go install go.bobheadxi.dev/[email protected]
go test -bench . -benchmem ./... -run="^$$" | gobenchdata --json bench.json
{ go test -bench . -benchmem ./... -run="^$$" | grep -v 'BenchmarkWriterMemory/' && \
go test -bench=BenchmarkWriterMemory -benchmem -test.benchtime 10000x ./writers/ -run="^$$"; } | gobenchdata --json bench.json
rm -rf .delta.* && go run scripts/benchmark-delta/main.go bench.json
1 change: 1 addition & 0 deletions scripts/benchmark-delta/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func main() {
for _, run := range d {
for _, suite := range run.Suites {
for _, bm := range suite.Benchmarks {
bm.Name = strings.ReplaceAll(bm.Name, "/", "_")
if bm.NsPerOp > 0 {
fmt.Println(bm.Name, "ns/op", bm.NsPerOp)
deltaResults = append(deltaResults, deltaResult{
Expand Down
218 changes: 218 additions & 0 deletions writers/writers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package writers_test

import (
"context"
"math/rand"
"runtime"
"sort"
"strconv"
"testing"

"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/cloudquery/plugin-sdk/v4/writers"
"github.com/cloudquery/plugin-sdk/v4/writers/batchwriter"
"github.com/cloudquery/plugin-sdk/v4/writers/mixedbatchwriter"
"github.com/cloudquery/plugin-sdk/v4/writers/streamingbatchwriter"
"golang.org/x/exp/maps"
)

type bCase struct {
name string
wr writers.Writer
rec func() arrow.Record
}

func BenchmarkWriterMemory(b *testing.B) {
batchwriterOpts := map[string][]batchwriter.Option{
"defaults": nil,
"batch10k bytes100M": {batchwriter.WithBatchSizeBytes(100000000), batchwriter.WithBatchSize(10000)},
}
mixedbatchwriterOpts := map[string][]mixedbatchwriter.Option{
"defaults": nil,
"batch10k bytes100M": {mixedbatchwriter.WithBatchSizeBytes(100000000), mixedbatchwriter.WithBatchSize(10000)},
}
streamingbatchwriterOpts := map[string][]streamingbatchwriter.Option{
"defaults": nil,
"bytes100M": {streamingbatchwriter.WithBatchSizeBytes(100000000)},
}

var bCases []bCase
bCases = append(bCases, writerMatrix("BatchWriter", batchwriter.New, newBatchWriterClient(), makeRecord, batchwriterOpts)...)
bCases = append(bCases, writerMatrix("BatchWriter wide", batchwriter.New, newBatchWriterClient(), makeWideRecord, batchwriterOpts)...)
bCases = append(bCases, writerMatrix("MixedBatchWriter", mixedbatchwriter.New, newMixedBatchWriterClient(), makeRecord, mixedbatchwriterOpts)...)
bCases = append(bCases, writerMatrix("MixedBatchWriter wide", mixedbatchwriter.New, newMixedBatchWriterClient(), makeWideRecord, mixedbatchwriterOpts)...)
bCases = append(bCases, writerMatrix("StreamingBatchWriter", streamingbatchwriter.New, newStreamingBatchWriterClient(), makeRecord, streamingbatchwriterOpts)...)
bCases = append(bCases, writerMatrix("StreamingBatchWriter wide", streamingbatchwriter.New, newStreamingBatchWriterClient(), makeWideRecord, streamingbatchwriterOpts)...)

for _, c := range bCases {
c := c
b.Run(c.name, func(b *testing.B) {
var (
mStart runtime.MemStats
mEnd runtime.MemStats
)

ch := make(chan message.WriteMessage)
errCh := make(chan error)
go func() {
defer close(errCh)
errCh <- c.wr.Write(context.Background(), ch)
}()

runtime.ReadMemStats(&mStart)
b.ResetTimer()
for i := 0; i < b.N; i++ {
rec := c.rec()
ch <- &message.WriteInsert{
Record: rec,
}
}
close(ch)
err := <-errCh

b.StopTimer()

if err != nil {
b.Fatal(err)
}

runtime.ReadMemStats(&mEnd)

allocatedBytes := mEnd.Alloc - mStart.Alloc
b.ReportMetric(float64(allocatedBytes)/float64(b.N), "bytes/op") // this is different from -benchmem result "B/op"
})
}
}

func makeRecord() func() arrow.Record {
table := &schema.Table{
Name: "test_table",
Columns: schema.ColumnList{
{
Name: "col1",
Type: arrow.BinaryTypes.String,
},
},
}
sc := table.ToArrowSchema()

return func() arrow.Record {
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
bldr.Field(0).(*array.StringBuilder).Append("test")
return bldr.NewRecord()
}
}

func makeWideRecord() func() arrow.Record {
table := &schema.Table{
Name: "test_wide_table",
Columns: schema.ColumnList{
{
Name: "col1",
Type: arrow.BinaryTypes.String,
},
},
}

const numWideCols = 200
randVals := make([]int64, numWideCols)
for i := 0; i < numWideCols; i++ {
table.Columns = append(table.Columns, schema.Column{
Name: "wide_col" + strconv.Itoa(i),
Type: arrow.PrimitiveTypes.Int64,
})
randVals[i] = rand.Int63()
}
sc := table.ToArrowSchema()

return func() arrow.Record {
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
bldr.Field(0).(*array.StringBuilder).Append("test")
for i := 0; i < numWideCols; i++ {
bldr.Field(i + 1).(*array.Int64Builder).Append(randVals[i])
Comment on lines +135 to +136
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for i := 0; i < numWideCols; i++ {
bldr.Field(i + 1).(*array.Int64Builder).Append(randVals[i])
for i := 1; i <= numWideCols; i++ {
bldr.Field(i).(*array.Int64Builder).Append(randVals[i])

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you'd need to do randVals[i-1], adds compexity and potential copy/paste errors, just to avoid an + 1

}
return bldr.NewRecord()
}
}

func writerMatrix[T writers.Writer, C any, O ~func(T)](prefix string, constructor func(C, ...O) (T, error), client C, recordMaker func() func() arrow.Record, optsMatrix map[string][]O) []bCase {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have a problem...

bCases := make([]bCase, 0, len(optsMatrix))

k := maps.Keys(optsMatrix)
sort.Strings(k)

for _, name := range k {
opts := optsMatrix[name]
wr, err := constructor(client, opts...)
if err != nil {
panic(err)
}
bCases = append(bCases, bCase{
name: prefix + " " + name,
wr: wr,
rec: recordMaker(),
})
}
return bCases
}

type mixedbatchwriterClient struct {
mixedbatchwriter.IgnoreMigrateTableBatch
mixedbatchwriter.UnimplementedDeleteStaleBatch
mixedbatchwriter.UnimplementedDeleteRecordsBatch
}

func newMixedBatchWriterClient() mixedbatchwriter.Client {
return &mixedbatchwriterClient{}
}

func (mixedbatchwriterClient) InsertBatch(_ context.Context, msgs message.WriteInserts) error {
for _, m := range msgs {
m.Record.Release()
}
return nil
}

var _ mixedbatchwriter.Client = (*mixedbatchwriterClient)(nil)

type batchwriterClient struct {
batchwriter.IgnoreMigrateTables
batchwriter.UnimplementedDeleteStale
batchwriter.UnimplementedDeleteRecord
}

func newBatchWriterClient() batchwriter.Client {
return &batchwriterClient{}
}

func (batchwriterClient) WriteTableBatch(_ context.Context, _ string, msgs message.WriteInserts) error {
for _, m := range msgs {
m.Record.Release()
}
return nil
}

var _ batchwriter.Client = (*batchwriterClient)(nil)

type streamingbatchwriterClient struct {
streamingbatchwriter.IgnoreMigrateTable
streamingbatchwriter.UnimplementedDeleteStale
streamingbatchwriter.UnimplementedDeleteRecords
}

func newStreamingBatchWriterClient() streamingbatchwriter.Client {
return &streamingbatchwriterClient{}
}

func (streamingbatchwriterClient) WriteTable(_ context.Context, ch <-chan *message.WriteInsert) error {
for m := range ch {
m.Record.Release()
}
return nil
}

var _ streamingbatchwriter.Client = (*streamingbatchwriterClient)(nil)
Loading