@@ -31,28 +31,22 @@ type batcher struct {
31
31
}
32
32
33
33
type worker struct {
34
- ch chan * schema.Resource
35
- flush chan chan struct {}
36
- rows schema. Resources
37
- builder * array.RecordBuilder // we can reuse that
38
- res chan <- message.SyncMessage
34
+ ch chan * schema.Resource
35
+ flush chan chan struct {}
36
+ curRows , maxRows int // todo: consider using capped int64 from https://github.com/cloudquery/plugin-sdk/pull/1647
37
+ builder * array.RecordBuilder // we can reuse that
38
+ res chan <- message.SyncMessage
39
39
}
40
40
41
41
// send must be called on len(rows) > 0
42
42
func (w * worker ) send () {
43
- for _ , row := range w .rows {
44
- scalar .AppendToRecordBuilder (w .builder , row .GetValues ())
45
- }
46
-
47
43
w .res <- & message.SyncInsert {Record : w .builder .NewRecord ()}
48
44
// we need to reserve here as NewRecord (& underlying NewArray calls) reset the memory
49
- w .builder .Reserve (cap (w .rows ))
50
-
51
- clear (w .rows ) // ease GC
52
- w .rows = w .rows [:0 ]
45
+ w .builder .Reserve (w .maxRows )
46
+ w .curRows = 0 // reset
53
47
}
54
48
55
- func (w * worker ) work (done <- chan struct {}, size int , timeout time.Duration ) {
49
+ func (w * worker ) work (done <- chan struct {}, timeout time.Duration ) {
56
50
ticker := writers .NewTicker (timeout )
57
51
defer ticker .Stop ()
58
52
tickerCh := ticker .Chan ()
@@ -61,25 +55,27 @@ func (w *worker) work(done <-chan struct{}, size int, timeout time.Duration) {
61
55
select {
62
56
case r , ok := <- w .ch :
63
57
if ! ok {
64
- if len ( w . rows ) > 0 {
58
+ if w . curRows > 0 {
65
59
w .send ()
66
60
}
67
61
return
68
62
}
69
63
70
- w .rows = append (w .rows , r )
71
- if len (w .rows ) == size {
64
+ // append to builder right away
65
+ scalar .AppendToRecordBuilder (w .builder , r .GetValues ())
66
+ w .curRows ++
67
+ if w .curRows == w .maxRows {
72
68
w .send ()
73
69
ticker .Reset (timeout )
74
70
}
75
71
76
72
case <- tickerCh :
77
- if len ( w . rows ) > 0 {
73
+ if w . curRows > 0 {
78
74
w .send ()
79
75
}
80
76
81
77
case ch := <- w .flush :
82
- if len ( w . rows ) > 0 {
78
+ if w . curRows > 0 {
83
79
w .send ()
84
80
ticker .Reset (timeout )
85
81
}
@@ -103,7 +99,7 @@ func (b *batcher) process(res *schema.Resource) {
103
99
104
100
// we alloc only ch here, as it may be needed right away
105
101
// for instance, if another goroutine will get the value allocated by us
106
- wr := & worker {ch : make (chan * schema.Resource , b . size )}
102
+ wr := & worker {ch : make (chan * schema.Resource , 5 )} // 5 is quite enough
107
103
v , loaded = b .workers .LoadOrStore (table .Name , wr )
108
104
if loaded {
109
105
// means that the worker was already in tne sync.Map, so we just discard the wr value
@@ -120,13 +116,13 @@ func (b *batcher) process(res *schema.Resource) {
120
116
121
117
// fill in the worker fields
122
118
wr .flush = make (chan chan struct {})
123
- wr .rows = make (schema. Resources , 0 , b .size )
119
+ wr .maxRows = b .size
124
120
wr .builder = array .NewRecordBuilder (memory .DefaultAllocator , table .ToArrowSchema ())
125
121
wr .res = b .res
126
122
wr .builder .Reserve (b .size )
127
123
128
124
// start processing
129
- wr .work (b .ctxDone , b .size , b . timeout )
125
+ wr .work (b .ctxDone , b .timeout )
130
126
}()
131
127
132
128
wr .ch <- res
0 commit comments