Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 69293dd

Browse files
committedApr 11, 2021
add default checkpointer for backward compatibility to azure eventhub scaler
Signed-off-by: Christian Leinweber <[email protected]>
1 parent 92e6439 commit 69293dd

File tree

3 files changed

+195
-35
lines changed

3 files changed

+195
-35
lines changed
 

Diff for: ‎go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ require (
1919
github.com/go-logr/logr v0.4.0
2020
github.com/go-logr/zapr v0.4.0 // indirect
2121
github.com/go-openapi/spec v0.20.3
22+
github.com/go-playground/assert/v2 v2.0.1
2223
github.com/go-redis/redis v6.15.9+incompatible
2324
github.com/go-sql-driver/mysql v1.6.0
2425
github.com/golang/mock v1.5.0

Diff for: ‎pkg/scalers/azure/azure_eventhub_checkpoint.go

+83-27
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strconv"
1010

1111
"github.com/Azure/azure-storage-blob-go/azblob"
12+
"github.com/imdario/mergo"
1213
kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
1314
"github.com/kedacore/keda/v2/pkg/util"
1415
)
@@ -22,18 +23,26 @@ type goCheckpoint struct {
2223
PartitionID string `json:"partitionId"`
2324
}
2425

25-
// Checkpoint is the object eventhub processor stores in storage
26-
// for checkpointing event processors. This matches the object
27-
// stored by the eventhub C# sdk and Java sdk
26+
type baseCheckpoint struct {
27+
Epoch int64 `json:"Epoch"`
28+
Offset string `json:"Offset"`
29+
Owner string `json:"Owner"`
30+
Token string `json:"Token"`
31+
}
32+
2833
type Checkpoint struct {
29-
Epoch int64 `json:"Epoch"`
30-
Offset string `json:"Offset"`
31-
Owner string `json:"Owner"`
32-
Token string `json:"Token"`
34+
baseCheckpoint
3335
PartitionID string `json:"PartitionId"`
3436
SequenceNumber int64 `json:"SequenceNumber"`
3537
}
3638

39+
// Older python sdk stores the checkpoint differently
40+
type pythonCheckpoint struct {
41+
baseCheckpoint
42+
PartitionID string `json:"partition_id"`
43+
SequenceNumber int64 `json:"sequence_number"`
44+
}
45+
3746
type checkpointer interface {
3847
resolvePath(info EventHubInfo) (*url.URL, error)
3948
extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error)
@@ -44,7 +53,7 @@ type azureWebjobCheckpointer struct {
4453
containerName string
4554
}
4655

47-
type defaultCheckpointer struct {
56+
type blobMetadataCheckpointer struct {
4857
partitionID string
4958
containerName string
5059
}
@@ -54,6 +63,11 @@ type goSdkCheckpointer struct {
5463
containerName string
5564
}
5665

66+
type defaultCheckpointer struct {
67+
partitionID string
68+
containerName string
69+
}
70+
5771
// GetCheckpointFromBlobStorage reads depending of the CheckpointType the checkpoint from a azure storage
5872
func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) {
5973

@@ -67,6 +81,11 @@ func newCheckpointer(info EventHubInfo, partitionID string) checkpointer {
6781
containerName: info.BlobContainer,
6882
partitionID: partitionID,
6983
}
84+
} else if info.CheckpointType == "BlobMetadata" {
85+
return &blobMetadataCheckpointer{
86+
containerName: info.BlobContainer,
87+
partitionID: partitionID,
88+
}
7089
} else if info.CheckpointType == "AzureWebJob" || info.BlobContainer == "" {
7190
return &azureWebjobCheckpointer{
7291
containerName: "azure-webjobs-eventhub",
@@ -80,63 +99,100 @@ func newCheckpointer(info EventHubInfo, partitionID string) checkpointer {
8099
}
81100
}
82101

102+
// resolve path for AzureWebJobCheckpointer
83103
func (checkpointer *azureWebjobCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
84104
eventHubNamespace, eventHubName, err := getHubAndNamespace(info)
85105
if err != nil {
86106
return nil, err
87107
}
88108

89-
// URL format - <storageEndpoint>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
90109
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID))
91110

92111
return path, nil
93112
}
94113

95-
func (checkpointer *defaultCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
114+
// extract checkpoint for AzureWebJobCheckpointer
115+
func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
116+
var checkpoint Checkpoint
117+
err := readToCheckpointFromBody(get, &checkpoint)
118+
if err != nil {
119+
return Checkpoint{}, err
120+
}
121+
122+
return checkpoint, nil
123+
}
124+
125+
// resolve path for BlobMetadataCheckpointer
126+
func (checkpointer *blobMetadataCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
96127
eventHubNamespace, eventHubName, err := getHubAndNamespace(info)
97128
if err != nil {
98129
return nil, err
99130
}
100131

101-
// URL format - <storageEndpoint>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
102132
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID))
103133

104134
return path, nil
105135
}
106136

107-
// Resolve Path for AzureWebJob Checkpoint
137+
// extract checkpoint for BlobMetadataCheckpointer
138+
func (checkpointer *blobMetadataCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
139+
return getCheckpointFromStorageMetadata(get, checkpointer.partitionID)
140+
}
141+
142+
// resolve path for GoSdkCheckpointer
108143
func (checkpointer *goSdkCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
109144
path, _ := url.Parse(fmt.Sprintf("/%s/%s", info.BlobContainer, checkpointer.partitionID))
110145

111146
return path, nil
112147
}
113148

114-
func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
115-
var checkpoint Checkpoint
149+
// extract checkpoint for GoSdkCheckpointer
150+
func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
151+
var checkpoint goCheckpoint
116152
err := readToCheckpointFromBody(get, &checkpoint)
117153
if err != nil {
118154
return Checkpoint{}, err
119155
}
120156

121-
return checkpoint, nil
157+
return Checkpoint{
158+
SequenceNumber: checkpoint.Checkpoint.SequenceNumber,
159+
baseCheckpoint: baseCheckpoint{
160+
Offset: checkpoint.Checkpoint.Offset,
161+
},
162+
PartitionID: checkpoint.PartitionID,
163+
}, nil
122164
}
123165

124-
func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
125-
return getCheckpointFromStorageMetadata(get, checkpointer.partitionID)
166+
// resolve path for DefaultCheckpointer
167+
func (checkpointer *defaultCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
168+
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s", info.BlobContainer, info.EventHubConsumerGroup, checkpointer.partitionID))
169+
170+
return path, nil
126171
}
127172

128-
func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
129-
var checkpoint goCheckpoint
130-
err := readToCheckpointFromBody(get, &checkpoint)
131-
if err != nil {
132-
return Checkpoint{}, err
173+
//extract checkpoint with deprecated Python sdk checkpoint for backward compatibility
174+
func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
175+
var checkpoint Checkpoint
176+
var pyCheckpoint pythonCheckpoint
177+
blobData := &bytes.Buffer{}
178+
179+
reader := get.Body(azblob.RetryReaderOptions{})
180+
if _, err := blobData.ReadFrom(reader); err != nil {
181+
return Checkpoint{}, fmt.Errorf("failed to read blob data: %s", err)
133182
}
183+
defer reader.Close() // The client must close the response body when finished with it
134184

135-
return Checkpoint{
136-
SequenceNumber: checkpoint.Checkpoint.SequenceNumber,
137-
Offset: checkpoint.Checkpoint.Offset,
138-
PartitionID: checkpoint.PartitionID,
139-
}, nil
185+
if err := json.Unmarshal(blobData.Bytes(), &checkpoint); err != nil {
186+
return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err)
187+
}
188+
189+
if err := json.Unmarshal(blobData.Bytes(), &pyCheckpoint); err != nil {
190+
return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err)
191+
}
192+
193+
err := mergo.Merge(&checkpoint, Checkpoint(pyCheckpoint))
194+
195+
return checkpoint, err
140196
}
141197

142198
func getCheckpoint(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, checkpointer checkpointer) (Checkpoint, error) {

Diff for: ‎pkg/scalers/azure/azure_eventhub_test.go

+111-8
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) {
3737
assert.Equal(t, err, nil)
3838

3939
expectedCheckpoint := Checkpoint{
40-
Offset: offset,
40+
baseCheckpoint: baseCheckpoint{
41+
Offset: offset,
42+
},
4143
PartitionID: partitionID,
4244
SequenceNumber: sequencenumber,
4345
}
@@ -55,7 +57,89 @@ func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) {
5557
assert.Equal(t, check, expectedCheckpoint)
5658
}
5759

58-
func TestCheckpointFromBlobStorageWithDefault(t *testing.T) {
60+
func TestCheckpointFromBlobStorageDefault(t *testing.T) {
61+
if StorageConnectionString == "" {
62+
return
63+
}
64+
65+
partitionID := "0"
66+
offset := "1005"
67+
consumerGroup := "$Default"
68+
69+
sequencenumber := int64(1)
70+
71+
containerName := "defaultcontainer"
72+
checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
73+
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
74+
urlPath := fmt.Sprintf("%s/", consumerGroup)
75+
76+
ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
77+
assert.Equal(t, err, nil)
78+
79+
expectedCheckpoint := Checkpoint{
80+
baseCheckpoint: baseCheckpoint{
81+
Offset: offset,
82+
},
83+
PartitionID: partitionID,
84+
SequenceNumber: sequencenumber,
85+
}
86+
87+
eventHubInfo := EventHubInfo{
88+
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub",
89+
StorageConnection: StorageConnectionString,
90+
EventHubConsumerGroup: consumerGroup,
91+
EventHubName: "hub",
92+
BlobContainer: containerName,
93+
}
94+
95+
check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0")
96+
_ = check.Offset
97+
_ = expectedCheckpoint.Offset
98+
assert.Equal(t, check, expectedCheckpoint)
99+
}
100+
101+
func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T) {
102+
if StorageConnectionString == "" {
103+
return
104+
}
105+
106+
partitionID := "0"
107+
offset := "1006"
108+
consumerGroup := "$Default"
109+
110+
sequencenumber := int64(1)
111+
112+
containerName := "defaultcontainerpython"
113+
checkpointFormat := "{\"Offset\":\"%s\",\"sequence_number\":%d,\"partition_id\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
114+
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
115+
urlPath := fmt.Sprintf("%s/", consumerGroup)
116+
117+
ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
118+
assert.Equal(t, err, nil)
119+
120+
expectedCheckpoint := Checkpoint{
121+
baseCheckpoint: baseCheckpoint{
122+
Offset: offset,
123+
},
124+
PartitionID: partitionID,
125+
SequenceNumber: sequencenumber,
126+
}
127+
128+
eventHubInfo := EventHubInfo{
129+
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub",
130+
StorageConnection: StorageConnectionString,
131+
EventHubConsumerGroup: consumerGroup,
132+
EventHubName: "hub",
133+
BlobContainer: containerName,
134+
}
135+
136+
check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0")
137+
_ = check.Offset
138+
_ = expectedCheckpoint.Offset
139+
assert.Equal(t, check, expectedCheckpoint)
140+
}
141+
142+
func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) {
59143
if StorageConnectionString == "" {
60144
return
61145
}
@@ -71,14 +155,16 @@ func TestCheckpointFromBlobStorageWithDefault(t *testing.T) {
71155
"sequencenumber": strconv.FormatInt(sequencenumber, 10),
72156
}
73157

74-
containerName := "defaultcontainer"
158+
containerName := "blobmetadatacontainer"
75159
urlPath := fmt.Sprintf("eventhubnamespace.servicebus.windows.net/hub/%s/checkpoint/", consumerGroup)
76160

77161
ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, "", metadata)
78162
assert.Equal(t, err, nil)
79163

80164
expectedCheckpoint := Checkpoint{
81-
Offset: offset,
165+
baseCheckpoint: baseCheckpoint{
166+
Offset: offset,
167+
},
82168
PartitionID: partitionID,
83169
SequenceNumber: sequencenumber,
84170
}
@@ -89,6 +175,7 @@ func TestCheckpointFromBlobStorageWithDefault(t *testing.T) {
89175
EventHubConsumerGroup: consumerGroup,
90176
EventHubName: "hub",
91177
BlobContainer: containerName,
178+
CheckpointType: "BlobMetadata",
92179
}
93180

94181
check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0")
@@ -118,7 +205,9 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) {
118205
assert.Equal(t, err, nil)
119206

120207
expectedCheckpoint := Checkpoint{
121-
Offset: offset,
208+
baseCheckpoint: baseCheckpoint{
209+
Offset: offset,
210+
},
122211
PartitionID: partitionID,
123212
SequenceNumber: sequencenumber,
124213
}
@@ -162,11 +251,24 @@ func TestShouldParseCheckpointForWebJobWithCheckpointType(t *testing.T) {
162251
assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0")
163252
}
164253

165-
func TestShouldParseCheckpointForDefaultWithCheckpointType(t *testing.T) {
254+
func TestShouldParseCheckpointForDefault(t *testing.T) {
255+
eventHubInfo := EventHubInfo{
256+
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test",
257+
EventHubConsumerGroup: "$Default",
258+
BlobContainer: "DefaultContainer",
259+
}
260+
261+
cp := newCheckpointer(eventHubInfo, "0")
262+
url, _ := cp.resolvePath(eventHubInfo)
263+
264+
assert.Equal(t, url.Path, "/DefaultContainer/$Default/0")
265+
}
266+
267+
func TestShouldParseCheckpointForBlobMetadataWithCheckpointType(t *testing.T) {
166268
eventHubInfo := EventHubInfo{
167269
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test",
168270
EventHubConsumerGroup: "$Default",
169-
CheckpointType: "Default",
271+
CheckpointType: "BlobMetadata",
170272
BlobContainer: "containername",
171273
}
172274

@@ -176,11 +278,12 @@ func TestShouldParseCheckpointForDefaultWithCheckpointType(t *testing.T) {
176278
assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$Default/checkpoint/0")
177279
}
178280

179-
func TestShouldParseCheckpointForDefault(t *testing.T) {
281+
func TestShouldParseCheckpointForBlobMetadata(t *testing.T) {
180282
eventHubInfo := EventHubInfo{
181283
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test",
182284
EventHubConsumerGroup: "$Default",
183285
BlobContainer: "containername",
286+
CheckpointType: "BlobMetadata",
184287
}
185288

186289
cp := newCheckpointer(eventHubInfo, "0")

0 commit comments

Comments
 (0)
Please sign in to comment.