Skip to content

feat: Batch resources into a single record on source side #1642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 68 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
eeaa7d7
init
candiduslynx Jun 5, 2024
7acc015
slice
candiduslynx Jun 5, 2024
7c85ec8
use in batchwriter
candiduslynx Jun 5, 2024
0b8687c
restructure
candiduslynx Jun 5, 2024
1d752eb
mixedbatchwriter
candiduslynx Jun 5, 2024
c4b18a1
streamingbatchwriter
candiduslynx Jun 5, 2024
55b3f39
tests
candiduslynx Jun 5, 2024
2adb40f
lint
candiduslynx Jun 5, 2024
a6e73a1
move to internal
candiduslynx Jun 5, 2024
151b54c
tests
candiduslynx Jun 5, 2024
de5993f
Merge branch 'main' into feat/batch-slicer
candiduslynx Jun 5, 2024
ce4f915
less timeouts
candiduslynx Jun 5, 2024
7af6baa
feat: Account for bytes limit properly when batching records for writ…
candiduslynx Jun 5, 2024
f7eeb40
simple worker
candiduslynx Apr 22, 2024
8dae0f3
tweaks
candiduslynx Apr 22, 2024
f301499
extra send
candiduslynx Apr 22, 2024
2e4930b
flush only on len(rows) > 0
candiduslynx Apr 22, 2024
36d4c77
rename func
candiduslynx Apr 22, 2024
2ccaa9a
Update scheduler/batch.go
candiduslynx Apr 22, 2024
48ceb7f
no need to check size
candiduslynx Apr 22, 2024
de7d438
make tests work
candiduslynx Apr 22, 2024
dcc6d3b
ease GC, too
candiduslynx Apr 22, 2024
caab5ae
try simplified a bit
candiduslynx Apr 23, 2024
626cc2f
staged approach + sync.Map reasoning
candiduslynx Apr 23, 2024
f6557d9
prealloc chan
candiduslynx Apr 23, 2024
23cab99
Revert "prealloc chan"
candiduslynx Apr 23, 2024
fdf6bc4
use select more efficiently
candiduslynx Apr 23, 2024
8bc9ff6
forgot close
candiduslynx Apr 23, 2024
7628946
less calls for channels
candiduslynx Apr 24, 2024
d34930b
shared tick
candiduslynx Apr 24, 2024
9cf1555
prealloc builder once
candiduslynx Apr 24, 2024
dda4b78
use 5s batch timeout
candiduslynx Apr 24, 2024
6df38ce
own ticker, 5s int
candiduslynx Apr 24, 2024
ec91f5a
lint
candiduslynx Apr 24, 2024
470c576
v16
candiduslynx May 11, 2024
feef533
optimize the worker alloc
candiduslynx May 13, 2024
6853081
ease gc
candiduslynx May 13, 2024
9a6c023
add ByteSIze to scalars & vector
candiduslynx May 21, 2024
e4dd4e9
size batch
candiduslynx Jun 5, 2024
8c2cd19
lint
candiduslynx Jun 5, 2024
f0e4306
lint
candiduslynx Jun 5, 2024
e81b2b8
less test pain
candiduslynx Jun 5, 2024
6b65292
Merge branch 'multirow' into feat/batch-source
candiduslynx Jun 5, 2024
3f5fd54
Merge branch 'main' into feat/batch-source
candiduslynx Jun 13, 2024
4576f7c
leave byte size out of scope
candiduslynx Jun 13, 2024
b2a9678
less diff
candiduslynx Jun 13, 2024
4691b77
less diff
candiduslynx Jun 13, 2024
be9c869
no need for batch pkg
candiduslynx Jun 13, 2024
a06aacd
revert writers to their original form
candiduslynx Jun 13, 2024
1718350
Revert "revert writers to their original form"
candiduslynx Jun 13, 2024
de1d8a8
Reapply "revert writers to their original form"
candiduslynx Jun 13, 2024
03761a4
Revert "leave byte size out of scope"
candiduslynx Jun 13, 2024
4a54446
rm extra transformation
candiduslynx Jun 13, 2024
4440744
Reapply "leave byte size out of scope"
candiduslynx Jun 13, 2024
dec00ad
fix
candiduslynx Jun 13, 2024
6f8e67f
experiment with flush on demand
candiduslynx Jun 13, 2024
e33e2af
Revert "experiment with flush on demand"
candiduslynx Jun 13, 2024
8ab74d9
Reapply "experiment with flush on demand"
candiduslynx Jun 13, 2024
374ff0c
less allocs
candiduslynx Jun 13, 2024
e4ec14d
Revert "less allocs"
candiduslynx Jun 13, 2024
f19062f
Revert "Reapply "experiment with flush on demand""
candiduslynx Jun 13, 2024
3f825cc
Merge branch 'main' into feat/batch-source
candiduslynx Jun 20, 2024
7d1ef4e
expose settings to control source batching
candiduslynx Jun 20, 2024
df5162b
test
candiduslynx Jun 20, 2024
85e8963
logs
candiduslynx Jun 20, 2024
d8c8f8b
Merge branch 'main' into feat/batch-source
candiduslynx Jun 20, 2024
ed5e8a6
add timeout to logs
candiduslynx Jun 20, 2024
7f5c7ec
Merge branch 'main' into feat/batch-source
kodiakhq[bot] Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions scheduler/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package scheduler

import (
"context"
"sync"
"time"

"github.com/apache/arrow/go/v16/arrow/array"
"github.com/apache/arrow/go/v16/arrow/memory"
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/scalar"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/cloudquery/plugin-sdk/v4/writers"
"github.com/rs/zerolog"
)

type (
BatchSettings struct {
MaxRows int
Timeout time.Duration
}

BatchOption func(settings *BatchSettings)
)

func WithBatchOptions(options ...BatchOption) Option {
return func(s *Scheduler) {
if s.batchSettings == nil {
s.batchSettings = new(BatchSettings)
}
for _, o := range options {
o(s.batchSettings)
}
}
}

func WithBatchMaxRows(rows int) BatchOption {
return func(s *BatchSettings) {
s.MaxRows = rows
}
}

func WithBatchTimeout(timeout time.Duration) BatchOption {
return func(s *BatchSettings) {
s.Timeout = timeout
}
}

func (s *BatchSettings) getBatcher(ctx context.Context, res chan<- message.SyncMessage, logger zerolog.Logger) batcherInterface {
if s.Timeout > 0 && s.MaxRows > 1 {
return &batcher{
done: ctx.Done(),
res: res,
maxRows: s.MaxRows,
timeout: s.Timeout,
logger: logger.With().Int("max_rows", s.MaxRows).Dur("timeout", s.Timeout).Logger(),
}
}

return &nopBatcher{res: res}
}

type batcherInterface interface {
process(res *schema.Resource)
close()
}

type nopBatcher struct {
res chan<- message.SyncMessage
}

func (n *nopBatcher) process(resource *schema.Resource) {
n.res <- &message.SyncInsert{Record: resource.GetValues().ToArrowRecord(resource.Table.ToArrowSchema())}
}

func (*nopBatcher) close() {}

var _ batcherInterface = (*nopBatcher)(nil)

type batcher struct {
done <-chan struct{}

res chan<- message.SyncMessage

maxRows int
timeout time.Duration

// using sync primitives by value here implies that batcher is to be used by pointer only
// workers is a sync.Map rather than a map + mutex pair
// because worker allocation & lookup falls into one of the sync.Map use-cases,
// namely, ever-growing cache (write once, read many times).
workers sync.Map // k = table name, v = *worker
wg sync.WaitGroup

logger zerolog.Logger
}

type worker struct {
ch chan *schema.Resource
flush chan chan struct{}
curRows, maxRows int
builder *array.RecordBuilder // we can reuse that
res chan<- message.SyncMessage
logger zerolog.Logger
}

// send must be called on len(rows) > 0
func (w *worker) send() {
w.logger.Debug().Int("current_rows", w.curRows).Msg("send")
w.res <- &message.SyncInsert{Record: w.builder.NewRecord()}
// we need to reserve here as NewRecord (& underlying NewArray calls) reset the memory
w.builder.Reserve(w.maxRows)
w.curRows = 0 // reset
}

func (w *worker) work(done <-chan struct{}, timeout time.Duration) {
ticker := writers.NewTicker(timeout)
defer ticker.Stop()
tickerCh := ticker.Chan()

for {
select {
case r, ok := <-w.ch:
if !ok {
if w.curRows > 0 {
w.send()
}
return
}

// append to builder
scalar.AppendToRecordBuilder(w.builder, r.GetValues())
w.curRows++
// check if we need to flush
if w.maxRows > 0 && w.curRows == w.maxRows {
w.send()
ticker.Reset(timeout)
}

case <-tickerCh:
if w.curRows > 0 {
w.send()
}

case ch := <-w.flush:
if w.curRows > 0 {
w.send()
ticker.Reset(timeout)
}
close(ch)

case <-done:
// this means the request was cancelled
return // after this NO other call will succeed
}
}
}

func (b *batcher) process(res *schema.Resource) {
table := res.Table
// already running worker
v, loaded := b.workers.Load(table.Name)
if loaded {
v.(*worker).ch <- res
return
}

// we alloc only ch here, as it may be needed right away
// for instance, if another goroutine will get the value allocated by us
wr := &worker{ch: make(chan *schema.Resource, 5)} // 5 is quite enough
v, loaded = b.workers.LoadOrStore(table.Name, wr)
if loaded {
// means that the worker was already in tne sync.Map, so we just discard the wr value
close(wr.ch) // for GC
v.(*worker).ch <- res // send res to the already allocated worker
return
}

// fill in the required data
// start wr
b.wg.Add(1)
go func() {
defer b.wg.Done()

// fill in the worker fields
wr.flush = make(chan chan struct{})
wr.maxRows = b.maxRows
wr.builder = array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
wr.res = b.res
wr.builder.Reserve(b.maxRows)
wr.logger = b.logger.With().Str("table", table.Name).Logger()

// start processing
wr.work(b.done, b.timeout)
}()

wr.ch <- res
}

func (b *batcher) close() {
b.workers.Range(func(_, v any) bool {
close(v.(*worker).ch)
return true
})
b.wg.Wait()
}
2 changes: 1 addition & 1 deletion scheduler/benchmark_test.go.backup
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"testing"
"time"

"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v16/arrow"
"github.com/cloudquery/plugin-pb-go/specs"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/rs/zerolog"
Expand Down
28 changes: 11 additions & 17 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync/atomic"
"time"

"github.com/apache/arrow/go/v16/arrow"
"github.com/cloudquery/plugin-sdk/v4/caser"
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/schema"
Expand Down Expand Up @@ -105,6 +104,9 @@ type Scheduler struct {

// The maximum number of go routines that can be spawned for a specific resource
singleResourceMaxConcurrency int64

// Controls how records are constructed on the source side.
batchSettings *BatchSettings
}

type syncClient struct {
Expand All @@ -124,6 +126,7 @@ func NewScheduler(opts ...Option) *Scheduler {
maxDepth: DefaultMaxDepth,
singleResourceMaxConcurrency: DefaultSingleResourceMaxConcurrency,
singleNestedTableMaxConcurrency: DefaultSingleNestedTableMaxConcurrency,
batchSettings: new(BatchSettings),
}
for _, opt := range opts {
opt(&s)
Expand Down Expand Up @@ -207,22 +210,22 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s
panic(fmt.Errorf("unknown scheduler %s", s.strategy.String()))
}
}()

b := s.batchSettings.getBatcher(ctx, res, s.logger)
defer b.close() // wait for all resources to be processed
done := ctx.Done() // no need to do the lookups in loop
for resource := range resources {
select {
case res <- &message.SyncInsert{Record: resourceToRecord(resource)}:
case <-ctx.Done():
case <-done:
s.logger.Debug().Msg("sync context cancelled")
return context.Cause(ctx)
default:
b.process(resource)
}
}
return context.Cause(ctx)
}

func resourceToRecord(resource *schema.Resource) arrow.Record {
vector := resource.GetValues()
return vector.ToArrowRecord(resource.Table.ToArrowSchema())
}

func (s *syncClient) logTablesMetrics(tables schema.Tables, client Client) {
clientName := client.ID()
for _, table := range tables {
Expand Down Expand Up @@ -310,12 +313,3 @@ func maxDepth(tables schema.Tables) uint64 {
}
return depth
}

// unparam's suggestion to remove the second parameter is not good advice here.
// nolint:unparam
func max(a, b int) int {
if a > b {
return a
}
return b
}
Loading