Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Handle rename rollup binlog #538

Merged
merged 3 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions pkg/ccr/handle/rename_rollup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package handle

import (
"github.com/selectdb/ccr_syncer/pkg/ccr"
"github.com/selectdb/ccr_syncer/pkg/ccr/record"
festruct "github.com/selectdb/ccr_syncer/pkg/rpc/kitex_gen/frontendservice"
log "github.com/sirupsen/logrus"
)

func init() {
ccr.RegisterJobHandle[*record.RenameRollup](festruct.TBinlogType_RENAME_ROLLUP, &RenameRollupHandle{})
}

type RenameRollupHandle struct {
}

func (h *RenameRollupHandle) IsBinlogCommitted(j *ccr.Job, record *record.RenameRollup) (bool, error) {
destTableName, err := j.GetDestNameBySrcId(record.TableId)
if err != nil {
log.Errorf("get dest table name by src id %d failed, err: %v", record.TableId, err)
return false, err
}

descResult, err := j.GetDestMeta().DescribeTableAll(destTableName)
if err != nil {
return false, err
}

if _, ok := descResult[record.NewRollupName]; !ok {
log.Infof("rollup %s is not renamed to %s in dest table %s, this binlog is not committed",
record.OldRollupName, record.NewRollupName, destTableName)
return false, nil
}

log.Infof("rollup %s is renamed to %s in dest table %s, this binlog is committed",
record.OldRollupName, record.NewRollupName, destTableName)
return true, nil
}

func (h *RenameRollupHandle) IsIdempotent() bool {
return false
}

func (h *RenameRollupHandle) Handle(j *ccr.Job, commitSeq int64, renameRollup *record.RenameRollup) error {
destTableName, err := j.GetDestNameBySrcId(renameRollup.TableId)
if err != nil {
return nil
}

newRollup := renameRollup.NewRollupName
oldRollup := renameRollup.OldRollupName
if oldRollup == "" {
log.Warnf("old rollup name is empty, sync rollup via partial snapshot, "+
"new rollup: %s, index id: %d, table id: %d, commit seq: %d",
newRollup, renameRollup.IndexId, renameRollup.TableId, commitSeq)
replace := true
tableName := destTableName
if j.IsTableSyncWithAlias() {
tableName = j.Src.Table
}
isView := false
return j.NewPartialSnapshot(renameRollup.TableId, tableName, nil, replace, isView)
}

return j.IDest.RenameRollup(destTableName, oldRollup, newRollup)
}
64 changes: 32 additions & 32 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (j *Job) isIncrementalSync() bool {
}
}

func (j *Job) isTableSyncWithAlias() bool {
func (j *Job) IsTableSyncWithAlias() bool {
return j.SyncType == TableSync && j.Src.Table != j.Dest.Table
}

Expand Down Expand Up @@ -446,7 +446,7 @@ func (j *Job) handlePartialSyncTableNotFound() error {
log.Warnf("force new partial snapshot, since table %d has renamed from %s to %s", tableId, table, newTableName)
replace := true // replace the old data to avoid blocking reading
isView := false
return j.newPartialSnapshot(tableId, newTableName, nil, replace, isView)
return j.NewPartialSnapshot(tableId, newTableName, nil, replace, isView)
} else {
return xerror.Errorf(xerror.Normal, "table sync but table has renamed from %s to %s, table id %d",
table, newTableName, tableId)
Expand Down Expand Up @@ -476,7 +476,7 @@ func (j *Job) partialSync() error {
log.Infof("partial sync status: done")
withAlias := len(j.progress.TableAliases) > 0
isView := j.progress.PartialSyncData.IsView
if err := j.newPartialSnapshot(tableId, table, partitions, withAlias, isView); err != nil {
if err := j.NewPartialSnapshot(tableId, table, partitions, withAlias, isView); err != nil {
return err
}

Expand Down Expand Up @@ -504,7 +504,7 @@ func (j *Job) partialSync() error {
log.Warnf("partial sync status: partition not found in the upstream, step to table partial sync")
replace := true // replace the old data to avoid blocking reading
isView := false // partition not found, so it's not a view
return j.newPartialSnapshot(tableId, table, nil, replace, isView)
return j.NewPartialSnapshot(tableId, table, nil, replace, isView)
} else if err != nil && err == base.ErrBackupTableNotFound {
return j.handlePartialSyncTableNotFound()
} else if err != nil {
Expand Down Expand Up @@ -555,7 +555,7 @@ func (j *Job) partialSync() error {
snapshotResp.Status.GetStatusCode())
replace := len(j.progress.TableAliases) > 0
isView := j.progress.PartialSyncData.IsView
return j.newPartialSnapshot(tableId, table, partitions, replace, isView)
return j.NewPartialSnapshot(tableId, table, partitions, replace, isView)
} else if snapshotResp.Status.GetStatusCode() != tstatus.TStatusCode_OK {
err = xerror.Errorf(xerror.FE, "get snapshot failed, status: %v", snapshotResp.Status)
return err
Expand Down Expand Up @@ -692,7 +692,7 @@ func (j *Job) partialSync() error {
AliasName: &aliasName,
}
tableRefs = append(tableRefs, tableRef)
} else if j.isTableSyncWithAlias() {
} else if j.IsTableSyncWithAlias() {
log.Infof("table sync snapshot not same name, table: %s, dest table: %s", j.Src.Table, j.Dest.Table)
tableRefs = make([]*festruct.TTableRef, 0)
tableRef := &festruct.TTableRef{
Expand Down Expand Up @@ -740,13 +740,13 @@ func (j *Job) partialSync() error {
log.Infof("force partial sync, because the snapshot %s is expired", restoreSnapshotName)
replace := len(j.progress.TableAliases) > 0
isView := j.progress.PartialSyncData.IsView
return j.newPartialSnapshot(tableId, table, partitions, replace, isView)
return j.NewPartialSnapshot(tableId, table, partitions, replace, isView)
}

restoreFinished, err := j.IDest.CheckRestoreFinished(restoreSnapshotName)
if errors.Is(err, base.ErrRestoreSignatureNotMatched) {
log.Warnf("force partial sync with replace, because the snapshot %s signature is not matched", restoreSnapshotName)
return j.newPartialSnapshot(tableId, table, nil, true, false) // only in partition sync.
return j.NewPartialSnapshot(tableId, table, nil, true, false) // only in partition sync.
} else if err != nil {
j.progress.NextSubVolatile(RestoreSnapshot, inMemoryData)
return err
Expand All @@ -773,7 +773,7 @@ func (j *Job) partialSync() error {
// Step 7: Update job progress && dest table id
// update job info, only for dest table id
var targetName = table
if j.isTableSyncWithAlias() {
if j.IsTableSyncWithAlias() {
targetName = j.Dest.Table
}
if alias, ok := j.progress.TableAliases[table]; ok {
Expand Down Expand Up @@ -1082,7 +1082,7 @@ func (j *Job) fullSync() error {
log.Infof("fullsync status: begin restore snapshot %s to %s", snapshotName, restoreSnapshotName)

var tableRefs []*festruct.TTableRef
if j.isTableSyncWithAlias() {
if j.IsTableSyncWithAlias() {
log.Debugf("table sync snapshot not same name, table: %s, dest table: %s", j.Src.Table, j.Dest.Table)
tableRefs = make([]*festruct.TTableRef, 0)
tableRef := &festruct.TTableRef{
Expand Down Expand Up @@ -1284,7 +1284,7 @@ func (j *Job) fullSync() error {
for _, table := range tables {
alias := j.progress.TableAliases[table]
targetName := table
if j.isTableSyncWithAlias() {
if j.IsTableSyncWithAlias() {
targetName = j.Dest.Table
}

Expand Down Expand Up @@ -2139,7 +2139,7 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error {
if createTable.IsCreateTableWithInvertedIndex() {
log.Infof("create table %s with inverted index, force partial snapshot, commit seq : %d", createTable.TableName, binlog.GetCommitSeq())
// we need to force replace table to ensure the index id is consistent
return j.newPartialSnapshot(createTable.TableId, createTable.TableName, nil, true, false)
return j.NewPartialSnapshot(createTable.TableId, createTable.TableName, nil, true, false)
}

// Some operations, such as DROP TABLE, will be skiped in the partial/full snapshot,
Expand All @@ -2155,7 +2155,7 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error {
createTable.TableName, binlog.GetCommitSeq())
replace := true
isView := false
return j.newPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace, isView)
return j.NewPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace, isView)
}
}

Expand All @@ -2177,14 +2177,14 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error {
binlog.GetCommitSeq(), errMsg)
replace := false // new view no need to replace
isView := true
return j.newPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace, isView)
return j.NewPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace, isView)
}
if len(createTable.TableName) > 0 && IsSessionVariableRequired(errMsg) { // ignore doris 2.0.3
log.Infof("a session variable is required to create table %s, force partial snapshot, commit seq: %d, msg: %s",
createTable.TableName, binlog.GetCommitSeq(), errMsg)
replace := false // new table no need to replace
isView := false
return j.newPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace, isView)
return j.NewPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace, isView)
}
return xerror.Wrapf(err, xerror.Normal, "create table %d", createTable.TableId)
}
Expand Down Expand Up @@ -2378,7 +2378,7 @@ func (j *Job) handleAlterRollup(alterJob *record.AlterJobV2) error {

replace := true
isView := false
return j.newPartialSnapshot(alterJob.TableId, alterJob.TableName, nil, replace, isView)
return j.NewPartialSnapshot(alterJob.TableId, alterJob.TableName, nil, replace, isView)
}

func (j *Job) handleSchemaChange(alterJob *record.AlterJobV2) error {
Expand Down Expand Up @@ -2420,7 +2420,7 @@ func (j *Job) handleSchemaChange(alterJob *record.AlterJobV2) error {

replaceTable := true
isView := false
return j.newPartialSnapshot(alterJob.TableId, alterJob.TableName, nil, replaceTable, isView)
return j.NewPartialSnapshot(alterJob.TableId, alterJob.TableName, nil, replaceTable, isView)
}

var allViewDeleted bool = false
Expand Down Expand Up @@ -2473,7 +2473,7 @@ func (j *Job) handleLightningSchemaChange(binlog *festruct.TBinlog) error {
}

tableAlias := ""
if j.isTableSyncWithAlias() {
if j.IsTableSyncWithAlias() {
tableAlias = j.Dest.Table
}
return j.IDest.LightningSchemaChange(j.Src.Database, tableAlias, lightningSchemaChange)
Expand Down Expand Up @@ -2585,14 +2585,14 @@ func (j *Job) handleReplacePartitions(binlog *festruct.TBinlog) error {
log.Warnf("replace partitions with non strict range is not supported yet, replace partition record: %s", string(data))
replace := true
isView := false
return j.newPartialSnapshot(replacePartition.TableId, replacePartition.TableName, nil, replace, isView)
return j.NewPartialSnapshot(replacePartition.TableId, replacePartition.TableName, nil, replace, isView)
}

if replacePartition.UseTempName {
log.Warnf("replace partitions with use tmp name is not supported yet, replace partition record: %s", string(data))
replace := true
isView := false
return j.newPartialSnapshot(replacePartition.TableId, replacePartition.TableName, nil, replace, isView)
return j.NewPartialSnapshot(replacePartition.TableId, replacePartition.TableName, nil, replace, isView)
}

oldPartitions := strings.Join(replacePartition.Partitions, ",")
Expand All @@ -2602,7 +2602,7 @@ func (j *Job) handleReplacePartitions(binlog *festruct.TBinlog) error {

partitions := replacePartition.Partitions
isView := false
return j.newPartialSnapshot(replacePartition.TableId, replacePartition.TableName, partitions, false, isView)
return j.NewPartialSnapshot(replacePartition.TableId, replacePartition.TableName, partitions, false, isView)
}

func (j *Job) handleModifyPartitions(binlog *festruct.TBinlog) error {
Expand Down Expand Up @@ -2711,15 +2711,15 @@ func (j *Job) handleReplaceTable(binlog *festruct.TBinlog) error {
} else if originTableSynced && record.SwapTable {
log.Infof("force new partial snapshot, origin table %s id %d already synced, commit seq: %d",
record.OriginTableName, record.OriginTableId, commitSeq)
return j.newPartialSnapshot(record.NewTableId, record.OriginTableName, nil, false, false)
return j.NewPartialSnapshot(record.NewTableId, record.OriginTableName, nil, false, false)
} else if newTableSynced && !record.SwapTable {
log.Infof("filter replace table binlog, the new table %s id %d already synced, commit seq: %d, swap = false",
record.NewTableName, record.NewTableId, commitSeq)
return nil
} else if newTableSynced && record.SwapTable {
log.Infof("force new partial snapshot, new table %s id %d already synced, commit seq: %d",
record.NewTableName, record.NewTableId, commitSeq)
return j.newPartialSnapshot(record.OriginTableId, record.NewTableName, nil, false, false)
return j.NewPartialSnapshot(record.OriginTableId, record.NewTableName, nil, false, false)
}
}

Expand Down Expand Up @@ -2785,7 +2785,7 @@ func (j *Job) handleModifyTableAddOrDropInvertedIndices(binlog *festruct.TBinlog

replace := true
isView := false
return j.newPartialSnapshot(record.TableId, tableName, nil, replace, isView)
return j.NewPartialSnapshot(record.TableId, tableName, nil, replace, isView)
}

func (j *Job) handleIndexChangeJob(binlog *festruct.TBinlog) error {
Expand Down Expand Up @@ -2872,11 +2872,11 @@ func (j *Job) handleRenamePartition(binlog *festruct.TBinlog) error {
newPartition, renamePartition.PartitionId, renamePartition.TableId, commitSeq)
replace := true
tableName := destTableName
if j.isTableSyncWithAlias() {
if j.IsTableSyncWithAlias() {
tableName = j.Src.Table
}
isView := false
return j.newPartialSnapshot(renamePartition.TableId, tableName, nil, replace, isView)
return j.NewPartialSnapshot(renamePartition.TableId, tableName, nil, replace, isView)
}
return j.IDest.RenamePartition(destTableName, oldPartition, newPartition)
}
Expand Down Expand Up @@ -2909,11 +2909,11 @@ func (j *Job) handleRenameRollup(binlog *festruct.TBinlog) error {
newRollup, renameRollup.IndexId, renameRollup.TableId, commitSeq)
replace := true
tableName := destTableName
if j.isTableSyncWithAlias() {
if j.IsTableSyncWithAlias() {
tableName = j.Src.Table
}
isView := false
return j.newPartialSnapshot(renameRollup.TableId, tableName, nil, replace, isView)
return j.NewPartialSnapshot(renameRollup.TableId, tableName, nil, replace, isView)
}

return j.IDest.RenameRollup(destTableName, oldRollup, newRollup)
Expand Down Expand Up @@ -2976,7 +2976,7 @@ func (j *Job) handleRecoverInfo(binlog *festruct.TBinlog) error {
}
log.Infof("recover info with for table %s, will trigger partial sync", tableName)
isView := false
return j.newPartialSnapshot(recoverInfo.TableId, tableName, nil, true, isView)
return j.NewPartialSnapshot(recoverInfo.TableId, tableName, nil, true, isView)
}

var partitions []string
Expand All @@ -2990,7 +2990,7 @@ func (j *Job) handleRecoverInfo(binlog *festruct.TBinlog) error {
// if source does multiple recover of partition, then there is a race
// condition and some recover might miss due to commitseq change after snapshot.
isView := false
return j.newPartialSnapshot(recoverInfo.TableId, recoverInfo.TableName, nil, true, isView)
return j.NewPartialSnapshot(recoverInfo.TableId, recoverInfo.TableName, nil, true, isView)
}

func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
Expand Down Expand Up @@ -3035,7 +3035,7 @@ func (j *Job) handleBinlogs(binlogs []*festruct.TBinlog) (error, bool) {
// Step 2: check job state, if not incrementalSync, such as DBPartialSync, break
if j.Extra.PartialSnapshotParams != nil {
params := j.Extra.PartialSnapshotParams
if err := j.newPartialSnapshot(params.TableId, params.TableName, params.Partitions, params.Replace, params.IsView); err != nil {
if err := j.NewPartialSnapshot(params.TableId, params.TableName, params.Partitions, params.Replace, params.IsView); err != nil {
return err, false
}
return nil, true
Expand Down Expand Up @@ -3813,7 +3813,7 @@ func (j *Job) NewSnapshot(commitSeq int64, fullSyncInfo string) error {
//
// If the replace is true, the restore task will load data into a new table and replaces the old
// one when restore finished. So replace requires whole table partial sync.
func (j *Job) newPartialSnapshot(tableId int64, table string, partitions []string, replace, isView bool) error {
func (j *Job) NewPartialSnapshot(tableId int64, table string, partitions []string, replace, isView bool) error {
if !isView && j.SyncType == TableSync && table != j.Src.Table {
return xerror.Errorf(xerror.Normal,
"partial sync table name is not equals to the source name %s, table: %s, sync type: table", j.Src.Table, table)
Expand Down
28 changes: 19 additions & 9 deletions pkg/ccr/record/rename_rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,36 @@ type RenameRollup struct {
OldRollupName string `json:"oR"`
}

func NewRenameRollupFromJson(data string) (*RenameRollup, error) {
var record RenameRollup
err := json.Unmarshal([]byte(data), &record)
func (renameRollup *RenameRollup) Deserialize(data string) error {
err := json.Unmarshal([]byte(data), &renameRollup)
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, "unmarshal rename rollup record error")
return xerror.Wrap(err, xerror.Normal, "unmarshal rename rollup record error")
}

if record.TableId == 0 {
return nil, xerror.Errorf(xerror.Normal, "rename rollup record table id not found")
if renameRollup.TableId == 0 {
return xerror.Errorf(xerror.Normal, "rename rollup record table id not found")
}

if record.NewRollupName == "" {
return nil, xerror.Errorf(xerror.Normal, "rename rollup record old rollup name not found")
if renameRollup.NewRollupName == "" {
return xerror.Errorf(xerror.Normal, "rename rollup record old rollup name not found")
}
return nil
}

return &record, nil
func NewRenameRollupFromJson(data string) (*RenameRollup, error) {
var renameRollup RenameRollup
if err := renameRollup.Deserialize(data); err != nil {
return nil, err
}
return &renameRollup, nil
}

// Stringer
func (r *RenameRollup) String() string {
return fmt.Sprintf("RenameRollup: DbId: %d, TableId: %d, IndexId: %d, NewRollupName: %s, OldRollupName: %s",
r.DbId, r.TableId, r.IndexId, r.NewRollupName, r.OldRollupName)
}

func (renameRollup *RenameRollup) GetTableId() int64 {
return renameRollup.TableId
}
Loading