Skip to content

Commit f86dcb5

Browse files
authored
feat: Batch resources into a single record on source side (#1642)
1 parent 10db5f3 commit f86dcb5

File tree

4 files changed

+273
-53
lines changed

4 files changed

+273
-53
lines changed

Diff for: scheduler/batch.go

+206
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/apache/arrow/go/v16/arrow/array"
9+
"github.com/apache/arrow/go/v16/arrow/memory"
10+
"github.com/cloudquery/plugin-sdk/v4/message"
11+
"github.com/cloudquery/plugin-sdk/v4/scalar"
12+
"github.com/cloudquery/plugin-sdk/v4/schema"
13+
"github.com/cloudquery/plugin-sdk/v4/writers"
14+
"github.com/rs/zerolog"
15+
)
16+
17+
type (
18+
BatchSettings struct {
19+
MaxRows int
20+
Timeout time.Duration
21+
}
22+
23+
BatchOption func(settings *BatchSettings)
24+
)
25+
26+
func WithBatchOptions(options ...BatchOption) Option {
27+
return func(s *Scheduler) {
28+
if s.batchSettings == nil {
29+
s.batchSettings = new(BatchSettings)
30+
}
31+
for _, o := range options {
32+
o(s.batchSettings)
33+
}
34+
}
35+
}
36+
37+
func WithBatchMaxRows(rows int) BatchOption {
38+
return func(s *BatchSettings) {
39+
s.MaxRows = rows
40+
}
41+
}
42+
43+
func WithBatchTimeout(timeout time.Duration) BatchOption {
44+
return func(s *BatchSettings) {
45+
s.Timeout = timeout
46+
}
47+
}
48+
49+
func (s *BatchSettings) getBatcher(ctx context.Context, res chan<- message.SyncMessage, logger zerolog.Logger) batcherInterface {
50+
if s.Timeout > 0 && s.MaxRows > 1 {
51+
return &batcher{
52+
done: ctx.Done(),
53+
res: res,
54+
maxRows: s.MaxRows,
55+
timeout: s.Timeout,
56+
logger: logger.With().Int("max_rows", s.MaxRows).Dur("timeout", s.Timeout).Logger(),
57+
}
58+
}
59+
60+
return &nopBatcher{res: res}
61+
}
62+
63+
type batcherInterface interface {
64+
process(res *schema.Resource)
65+
close()
66+
}
67+
68+
type nopBatcher struct {
69+
res chan<- message.SyncMessage
70+
}
71+
72+
func (n *nopBatcher) process(resource *schema.Resource) {
73+
n.res <- &message.SyncInsert{Record: resource.GetValues().ToArrowRecord(resource.Table.ToArrowSchema())}
74+
}
75+
76+
func (*nopBatcher) close() {}
77+
78+
var _ batcherInterface = (*nopBatcher)(nil)
79+
80+
type batcher struct {
81+
done <-chan struct{}
82+
83+
res chan<- message.SyncMessage
84+
85+
maxRows int
86+
timeout time.Duration
87+
88+
// using sync primitives by value here implies that batcher is to be used by pointer only
89+
// workers is a sync.Map rather than a map + mutex pair
90+
// because worker allocation & lookup falls into one of the sync.Map use-cases,
91+
// namely, ever-growing cache (write once, read many times).
92+
workers sync.Map // k = table name, v = *worker
93+
wg sync.WaitGroup
94+
95+
logger zerolog.Logger
96+
}
97+
98+
type worker struct {
99+
ch chan *schema.Resource
100+
flush chan chan struct{}
101+
curRows, maxRows int
102+
builder *array.RecordBuilder // we can reuse that
103+
res chan<- message.SyncMessage
104+
logger zerolog.Logger
105+
}
106+
107+
// send must be called on len(rows) > 0
108+
func (w *worker) send() {
109+
w.logger.Debug().Int("current_rows", w.curRows).Msg("send")
110+
w.res <- &message.SyncInsert{Record: w.builder.NewRecord()}
111+
// we need to reserve here as NewRecord (& underlying NewArray calls) reset the memory
112+
w.builder.Reserve(w.maxRows)
113+
w.curRows = 0 // reset
114+
}
115+
116+
func (w *worker) work(done <-chan struct{}, timeout time.Duration) {
117+
ticker := writers.NewTicker(timeout)
118+
defer ticker.Stop()
119+
tickerCh := ticker.Chan()
120+
121+
for {
122+
select {
123+
case r, ok := <-w.ch:
124+
if !ok {
125+
if w.curRows > 0 {
126+
w.send()
127+
}
128+
return
129+
}
130+
131+
// append to builder
132+
scalar.AppendToRecordBuilder(w.builder, r.GetValues())
133+
w.curRows++
134+
// check if we need to flush
135+
if w.maxRows > 0 && w.curRows == w.maxRows {
136+
w.send()
137+
ticker.Reset(timeout)
138+
}
139+
140+
case <-tickerCh:
141+
if w.curRows > 0 {
142+
w.send()
143+
}
144+
145+
case ch := <-w.flush:
146+
if w.curRows > 0 {
147+
w.send()
148+
ticker.Reset(timeout)
149+
}
150+
close(ch)
151+
152+
case <-done:
153+
// this means the request was cancelled
154+
return // after this NO other call will succeed
155+
}
156+
}
157+
}
158+
159+
func (b *batcher) process(res *schema.Resource) {
160+
table := res.Table
161+
// already running worker
162+
v, loaded := b.workers.Load(table.Name)
163+
if loaded {
164+
v.(*worker).ch <- res
165+
return
166+
}
167+
168+
// we alloc only ch here, as it may be needed right away
169+
// for instance, if another goroutine will get the value allocated by us
170+
wr := &worker{ch: make(chan *schema.Resource, 5)} // 5 is quite enough
171+
v, loaded = b.workers.LoadOrStore(table.Name, wr)
172+
if loaded {
173+
// means that the worker was already in tne sync.Map, so we just discard the wr value
174+
close(wr.ch) // for GC
175+
v.(*worker).ch <- res // send res to the already allocated worker
176+
return
177+
}
178+
179+
// fill in the required data
180+
// start wr
181+
b.wg.Add(1)
182+
go func() {
183+
defer b.wg.Done()
184+
185+
// fill in the worker fields
186+
wr.flush = make(chan chan struct{})
187+
wr.maxRows = b.maxRows
188+
wr.builder = array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
189+
wr.res = b.res
190+
wr.builder.Reserve(b.maxRows)
191+
wr.logger = b.logger.With().Str("table", table.Name).Logger()
192+
193+
// start processing
194+
wr.work(b.done, b.timeout)
195+
}()
196+
197+
wr.ch <- res
198+
}
199+
200+
func (b *batcher) close() {
201+
b.workers.Range(func(_, v any) bool {
202+
close(v.(*worker).ch)
203+
return true
204+
})
205+
b.wg.Wait()
206+
}

Diff for: scheduler/benchmark_test.go.backup

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12-
"github.com/apache/arrow/go/v15/arrow"
12+
"github.com/apache/arrow/go/v16/arrow"
1313
"github.com/cloudquery/plugin-pb-go/specs"
1414
"github.com/cloudquery/plugin-sdk/v4/schema"
1515
"github.com/rs/zerolog"

Diff for: scheduler/scheduler.go

+11-17
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"sync/atomic"
1010
"time"
1111

12-
"github.com/apache/arrow/go/v16/arrow"
1312
"github.com/cloudquery/plugin-sdk/v4/caser"
1413
"github.com/cloudquery/plugin-sdk/v4/message"
1514
"github.com/cloudquery/plugin-sdk/v4/schema"
@@ -105,6 +104,9 @@ type Scheduler struct {
105104

106105
// The maximum number of go routines that can be spawned for a specific resource
107106
singleResourceMaxConcurrency int64
107+
108+
// Controls how records are constructed on the source side.
109+
batchSettings *BatchSettings
108110
}
109111

110112
type syncClient struct {
@@ -124,6 +126,7 @@ func NewScheduler(opts ...Option) *Scheduler {
124126
maxDepth: DefaultMaxDepth,
125127
singleResourceMaxConcurrency: DefaultSingleResourceMaxConcurrency,
126128
singleNestedTableMaxConcurrency: DefaultSingleNestedTableMaxConcurrency,
129+
batchSettings: new(BatchSettings),
127130
}
128131
for _, opt := range opts {
129132
opt(&s)
@@ -207,22 +210,22 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s
207210
panic(fmt.Errorf("unknown scheduler %s", s.strategy.String()))
208211
}
209212
}()
213+
214+
b := s.batchSettings.getBatcher(ctx, res, s.logger)
215+
defer b.close() // wait for all resources to be processed
216+
done := ctx.Done() // no need to do the lookups in loop
210217
for resource := range resources {
211218
select {
212-
case res <- &message.SyncInsert{Record: resourceToRecord(resource)}:
213-
case <-ctx.Done():
219+
case <-done:
214220
s.logger.Debug().Msg("sync context cancelled")
215221
return context.Cause(ctx)
222+
default:
223+
b.process(resource)
216224
}
217225
}
218226
return context.Cause(ctx)
219227
}
220228

221-
func resourceToRecord(resource *schema.Resource) arrow.Record {
222-
vector := resource.GetValues()
223-
return vector.ToArrowRecord(resource.Table.ToArrowSchema())
224-
}
225-
226229
func (s *syncClient) logTablesMetrics(tables schema.Tables, client Client) {
227230
clientName := client.ID()
228231
for _, table := range tables {
@@ -310,12 +313,3 @@ func maxDepth(tables schema.Tables) uint64 {
310313
}
311314
return depth
312315
}
313-
314-
// unparam's suggestion to remove the second parameter is not good advice here.
315-
// nolint:unparam
316-
func max(a, b int) int {
317-
if a > b {
318-
return a
319-
}
320-
return b
321-
}

0 commit comments

Comments
 (0)