Skip to content

Commit 518b926

Browse files
feat: add transfer service (#8201)
--story=119241405
1 parent cc6d05b commit 518b926

File tree

54 files changed

+5292
-122
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+5292
-122
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
### 请求方式
2+
3+
POST /transfer/v3/sync/cmdb/data
4+
5+
### 描述
6+
7+
触发同步cmdb数据任务
8+
9+
注意:
10+
11+
- 该接口仅用于在源环境触发同步指定的cmdb数据任务
12+
- 该接口是异步接口,触发后就会返回,可以通过rid追踪触发的同步任务
13+
- 同一时间只允许一个同步任务
14+
15+
### 输入参数
16+
17+
| 参数名称 | 参数类型 | 必选 | 描述 |
18+
|---------------|------------------|----|----------------------------------------------------------------------------------------------------|
19+
| resource_type | string || 同步数据的资源类型,枚举值:biz,set,module,host,host_relation,object_instance,inst_asst,service_instance,process |
20+
| sub_resource | string || 下级数据类型。resource为object_instance和inst_asst时代表需要同步的模型的bk_obj_id |
21+
| is_all | bool || 是否同步全量数据,is_all为false时必须传start或end |
22+
| start | map[string]int64 || 同步的起始区间ID信息,同步的数据不包含该ID代表的数据 |
23+
| end | map[string]int64 || 同步的结束区间ID信息,同步的数据包含该ID代表的数据 |
24+
25+
### 调用示例
26+
27+
```json
28+
{
29+
"resource_type": "object_instance",
30+
"sub_resource": "bk_switch",
31+
"is_all": false,
32+
"start": {
33+
"bk_inst_id": 10
34+
},
35+
"end": {
36+
"bk_inst_id": 100
37+
}
38+
}
39+
```
40+
41+
### 响应示例
42+
43+
```json
44+
{
45+
"result": true,
46+
"code": 0,
47+
"message": "",
48+
"permission": null,
49+
"data": null
50+
}
51+
```
52+
53+
### 响应参数说明
54+
55+
| 参数名称 | 参数类型 | 描述 |
56+
|------------|--------|----------------------------|
57+
| result | bool | 请求成功与否。true:请求成功;false请求失败 |
58+
| code | int | 错误编码。 0表示success,>0表示失败错误 |
59+
| message | string | 请求失败返回的错误信息 |
60+
| permission | object | 权限信息 |
61+
| data | object | 请求返回的数据 |
Loading
69.4 KB
Loading
18.8 KB
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
FROM hub.bktencent.com/blueking/centos7-cmdb:base
2+
ENV container docker
3+
COPY cmdb_transferservice /data/cmdb/cmdb_transferservice
4+
RUN mkdir /data/cmdb/cmdb_transferservice/logs
5+
RUN chmod +x /data/cmdb/cmdb_transferservice/cmdb_transferservice
6+
#time zone setting
7+
ENV TimeZone=Asia/Shanghai
8+
RUN ln -snf /usr/share/zoneinfo/$TimeZone /etc/localtime && echo $TimeZone > /etc/timezone

pkg/synchronize/types/medium.go

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making
3+
* 蓝鲸智云 - 配置平台 (BlueKing - Configuration System) available.
4+
* Copyright (C) 2017 THL A29 Limited,
5+
* a Tencent company. All rights reserved.
6+
* Licensed under the MIT License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at http://opensource.org/licenses/MIT
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the License is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12+
* either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
* We undertake not to change the open source license (MIT license) applicable
15+
* to the current version of the project delivered to anyone in the future.
16+
*/
17+
18+
package types
19+
20+
import (
21+
"encoding/json"
22+
)
23+
24+
// PushSyncDataOpt is the push full sync data option
25+
type PushSyncDataOpt struct {
26+
ResType ResType `json:"resource_type"`
27+
SubRes string `json:"sub_resource"`
28+
IsIncrement bool `json:"is_increment"`
29+
Data any `json:"data"`
30+
}
31+
32+
// PullSyncDataOpt is the pull sync data option
33+
type PullSyncDataOpt struct {
34+
ResType ResType `json:"resource_type"`
35+
SubRes string `json:"sub_resource"`
36+
IsIncrement bool `json:"is_increment"`
37+
Ack bool `json:"ack"`
38+
}
39+
40+
// TransferMediumResp is the transfer medium response
41+
type TransferMediumResp[T any] struct {
42+
Result bool `json:"result"`
43+
Code int `json:"code"`
44+
Message string `json:"message"`
45+
Data T `json:"data"`
46+
}
47+
48+
// PullSyncDataRes is the pull sync data result data
49+
type PullSyncDataRes struct {
50+
Total int64 `json:"total"`
51+
Info json.RawMessage `json:"info"`
52+
}
53+
54+
// FullSyncTransData is the full sync transfer data
55+
type FullSyncTransData struct {
56+
Name string `json:"name"`
57+
Start map[string]int64 `json:"start"`
58+
End map[string]int64 `json:"end"`
59+
Data any `json:"data"`
60+
}
61+
62+
// IncrSyncTransData is the incremental sync transfer data
63+
type IncrSyncTransData struct {
64+
Name string `json:"name"`
65+
UpsertInfo map[string][]json.RawMessage `json:"upsert_info"`
66+
DeleteInfo map[string][]json.RawMessage `json:"delete_info"`
67+
}

pkg/synchronize/types/protocol.go

+38
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,41 @@ func (o *CreateSyncDataOption) Validate() errors.RawErrorInfo {
5454

5555
return errors.RawErrorInfo{}
5656
}
57+
58+
// SyncCmdbDataOption defines sync cmdb data option
59+
type SyncCmdbDataOption struct {
60+
ResType ResType `json:"resource_type"`
61+
SubRes string `json:"sub_resource"`
62+
IsAll bool `json:"is_all"`
63+
Start map[string]int64 `json:"start"`
64+
End map[string]int64 `json:"end"`
65+
}
66+
67+
// Validate sync cmdb data option
68+
func (o *SyncCmdbDataOption) Validate() errors.RawErrorInfo {
69+
if rawErr := o.ResType.Validate(o.SubRes); rawErr.ErrCode != 0 {
70+
return rawErr
71+
}
72+
73+
if o.IsAll {
74+
if len(o.Start) != 0 || len(o.End) != 0 {
75+
return errors.RawErrorInfo{
76+
ErrCode: common.CCErrCommParamsIsInvalid,
77+
Args: []interface{}{"is_all", "start", "end"},
78+
}
79+
}
80+
return errors.RawErrorInfo{}
81+
}
82+
83+
if len(o.Start) == 0 && len(o.End) == 0 {
84+
return errors.RawErrorInfo{
85+
ErrCode: common.CCErrCommParamsIsInvalid,
86+
Args: []interface{}{"start", "end"},
87+
}
88+
}
89+
90+
return errors.RawErrorInfo{}
91+
}
92+
93+
// InfiniteEndID represent infinity for end id of id rule info
94+
const InfiniteEndID int64 = -1

pkg/synchronize/types/resource.go

+39-10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* to the current version of the project delivered to anyone in the future.
1616
*/
1717

18+
// Package types defines cmdb data syncer types
1819
package types
1920

2021
import (
@@ -38,30 +39,58 @@ const (
3839
HostRelation ResType = "host_relation"
3940
// ObjectInstance is the object instance synchronize resource type
4041
ObjectInstance ResType = "object_instance"
42+
// QuotedInstance is the quoted instance synchronize resource type
43+
QuotedInstance ResType = "quoted_instance"
4144
// InstAsst is the instance association synchronize resource type
4245
InstAsst ResType = "inst_asst"
46+
// ServiceInstance is the service instance synchronize resource type
47+
ServiceInstance ResType = "service_instance"
48+
// Process is the process synchronize resource type
49+
Process ResType = "process"
50+
// ProcessRelation is the process relation synchronize resource type
51+
ProcessRelation ResType = "process_relation"
4352
)
4453

45-
// AllResTypeMap stores all synchronize resource type
46-
var AllResTypeMap = map[ResType]struct{}{
47-
Biz: {},
48-
Set: {},
49-
Module: {},
50-
Host: {},
51-
HostRelation: {},
52-
ObjectInstance: {},
53-
InstAsst: {},
54+
var (
55+
// allResType is all synchronize resource type in the order of dependency
56+
allResType = []ResType{Biz, ObjectInstance, Set, Module, Host, HostRelation, InstAsst, ServiceInstance, Process,
57+
ProcessRelation, QuotedInstance}
58+
allResTypeMap = make(map[ResType]struct{})
59+
)
60+
61+
func init() {
62+
for _, resType := range allResType {
63+
allResTypeMap[resType] = struct{}{}
64+
}
65+
}
66+
67+
// ListAllResType list all synchronize resource type
68+
func ListAllResType() []ResType {
69+
return allResType
70+
}
71+
72+
// ListAllResTypeForIncrSync list all synchronize resource type for incremental sync
73+
func ListAllResTypeForIncrSync() []ResType {
74+
incrResTypes := make([]ResType, 0)
75+
for _, resType := range allResType {
76+
if resType == QuotedInstance {
77+
continue
78+
}
79+
incrResTypes = append(incrResTypes, resType)
80+
}
81+
return incrResTypes
5482
}
5583

5684
// ResTypeWithSubResMap stores all synchronize resource type with sub resource
5785
var ResTypeWithSubResMap = map[ResType]struct{}{
5886
ObjectInstance: {},
5987
InstAsst: {},
88+
QuotedInstance: {},
6089
}
6190

6291
// Validate resource type
6392
func (r ResType) Validate(subRes string) ccErr.RawErrorInfo {
64-
_, exists := AllResTypeMap[r]
93+
_, exists := allResTypeMap[r]
6594
if !exists {
6695
return ccErr.RawErrorInfo{
6796
ErrCode: common.CCErrCommParamsIsInvalid,

pkg/synchronize/types/syncer.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making
3+
* 蓝鲸智云 - 配置平台 (BlueKing - Configuration System) available.
4+
* Copyright (C) 2017 THL A29 Limited,
5+
* a Tencent company. All rights reserved.
6+
* Licensed under the MIT License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at http://opensource.org/licenses/MIT
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the License is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12+
* either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
* We undertake not to change the open source license (MIT license) applicable
15+
* to the current version of the project delivered to anyone in the future.
16+
*/
17+
18+
package types
19+
20+
import (
21+
"encoding/json"
22+
23+
"configcenter/src/common/watch"
24+
)
25+
26+
// ListDataOpt is the list data option
27+
type ListDataOpt struct {
28+
SubRes string
29+
Start map[string]int64
30+
End map[string]int64
31+
}
32+
33+
// ListDataRes is the list data result
34+
type ListDataRes struct {
35+
IsAll bool
36+
Data any
37+
NextStart map[string]int64
38+
}
39+
40+
// CompDataRes is the compare data result
41+
type CompDataRes struct {
42+
Insert any
43+
Update any
44+
Delete any
45+
RemainingSrc any
46+
}
47+
48+
// FullSyncLockKey is the full sync lock key
49+
const FullSyncLockKey = "cmdb_syncer:full_sync_lock"
50+
51+
// EventInfo is the incremental sync event info
52+
type EventInfo struct {
53+
EventType watch.EventType
54+
ResType ResType
55+
Oid string
56+
SubRes []string
57+
Detail json.RawMessage
58+
}

scripts/init.py

+18-1
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,22 @@ def generate_config_file(
535535
caFile:
536536
# 用于解密根据RFC1423加密的证书密钥的PEM块
537537
password:
538+
539+
# CMDB同步服务配置
540+
transferService:
541+
# 是否开启同步, 默认为false
542+
enableSync: false
543+
# 是否开启增量同步, 默认为false
544+
enableIncrSync: false
545+
# 同步服务的名称
546+
name:
547+
# 同步角色,src表示源环境(将数据同步到目标环境),dest表示目标环境(接收源环境的同步数据)
548+
role:
549+
# 全量同步周期,单位:小时,仅源环境需要配置
550+
syncIntervalHours:
551+
# 传输介质地址
552+
transferMediumAddress:
553+
- transfer.example.com
538554
'''
539555

540556
template = FileTemplate(common_file_template_str)
@@ -725,7 +741,8 @@ def main(argv):
725741
"cmdb_taskserver": 60012,
726742
"cmdb_cloudserver": 60013,
727743
"cmdb_authserver": 60014,
728-
"cmdb_cacheservice": 50010
744+
"cmdb_cacheservice": 50010,
745+
"cmdb_transferservice": 50011
729746
}
730747
arr = [
731748
"help", "discovery=", "database=", "redis_ip=", "redis_port=",

src/apimachinery/cacheservice/cache/event/api.go

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
// Interface TODO
2525
type Interface interface {
2626
WatchEvent(ctx context.Context, h http.Header, opts *watch.WatchEventOptions) (*string, errors.CCErrorCoder)
27+
InnerWatchEvent(ctx context.Context, h http.Header, opts *watch.WatchEventOptions) (*watch.WatchResp,
28+
errors.CCErrorCoder)
2729
}
2830

2931
// NewCacheClient TODO

src/apimachinery/cacheservice/cache/event/cache.go

+23
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,26 @@ func (e *eventCache) WatchEvent(ctx context.Context, h http.Header, opts *watch.
4242
}
4343
return &resp.Data, nil
4444
}
45+
46+
// InnerWatchEvent watch event for inner api
47+
func (e *eventCache) InnerWatchEvent(ctx context.Context, h http.Header, opts *watch.WatchEventOptions) (
48+
*watch.WatchResp, errors.CCErrorCoder) {
49+
50+
resp := new(watch.WatchEventResp)
51+
err := e.client.Post().
52+
WithContext(ctx).
53+
Body(opts).
54+
SubResourcef("/inner/watch/cache/event").
55+
WithHeaders(h).
56+
Do().
57+
Into(resp)
58+
59+
if err != nil {
60+
return nil, errors.CCHttpError
61+
}
62+
63+
if err := resp.CCError(); err != nil {
64+
return nil, err
65+
}
66+
return resp.Data, nil
67+
}

src/common/definitions.go

+3
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ const (
169169
// BKDBOR the db operator
170170
BKDBOR = "$or"
171171

172+
// BKDBNOR the not or db operator
173+
BKDBNOR = "$nor"
174+
172175
// BKDBAND the db operator
173176
BKDBAND = "$and"
174177

0 commit comments

Comments
 (0)