forked from selectdb/ccr-syncer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrename_rollup.go
66 lines (55 loc) · 2.01 KB
/
rename_rollup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package handle
import (
"github.com/selectdb/ccr_syncer/pkg/ccr"
"github.com/selectdb/ccr_syncer/pkg/ccr/record"
festruct "github.com/selectdb/ccr_syncer/pkg/rpc/kitex_gen/frontendservice"
log "github.com/sirupsen/logrus"
)
func init() {
ccr.RegisterJobHandle[*record.RenameRollup](festruct.TBinlogType_RENAME_ROLLUP, &RenameRollupHandle{})
}
type RenameRollupHandle struct {
}
func (h *RenameRollupHandle) IsBinlogCommitted(j *ccr.Job, record *record.RenameRollup) (bool, error) {
destTableName, err := j.GetDestNameBySrcId(record.TableId)
if err != nil {
log.Errorf("get dest table name by src id %d failed, err: %v", record.TableId, err)
return false, err
}
descResult, err := j.GetDestMeta().DescribeTableAll(destTableName)
if err != nil {
return false, err
}
if _, ok := descResult[record.NewRollupName]; !ok {
log.Infof("rollup %s is not renamed to %s in dest table %s, this binlog is not committed",
record.OldRollupName, record.NewRollupName, destTableName)
return false, nil
}
log.Infof("rollup %s is renamed to %s in dest table %s, this binlog is committed",
record.OldRollupName, record.NewRollupName, destTableName)
return true, nil
}
func (h *RenameRollupHandle) IsIdempotent() bool {
return false
}
func (h *RenameRollupHandle) Handle(j *ccr.Job, commitSeq int64, renameRollup *record.RenameRollup) error {
destTableName, err := j.GetDestNameBySrcId(renameRollup.TableId)
if err != nil {
return nil
}
newRollup := renameRollup.NewRollupName
oldRollup := renameRollup.OldRollupName
if oldRollup == "" {
log.Warnf("old rollup name is empty, sync rollup via partial snapshot, "+
"new rollup: %s, index id: %d, table id: %d, commit seq: %d",
newRollup, renameRollup.IndexId, renameRollup.TableId, commitSeq)
replace := true
tableName := destTableName
if j.IsTableSyncWithAlias() {
tableName = j.Src.Table
}
isView := false
return j.NewPartialSnapshot(renameRollup.TableId, tableName, nil, replace, isView)
}
return j.IDest.RenameRollup(destTableName, oldRollup, newRollup)
}