Skip to content

Commit 92e6439

Browse files
committed
language agnostic checkpointing for azure eventhub scaler
Signed-off-by: Christian Leinweber <[email protected]>
1 parent 9987d15 commit 92e6439

5 files changed

+477
-127
lines changed

Diff for: pkg/scalers/azure/azure_eventhub.go

+18-101
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,15 @@
11
package azure
22

33
import (
4-
"bytes"
5-
"context"
6-
"encoding/json"
74
"errors"
85
"fmt"
9-
"net/url"
106
"strings"
117

12-
"github.com/imdario/mergo"
13-
148
"github.com/Azure/azure-amqp-common-go/v3/aad"
159
eventhub "github.com/Azure/azure-event-hubs-go/v3"
16-
"github.com/Azure/azure-storage-blob-go/azblob"
1710
"github.com/Azure/go-autorest/autorest/azure"
18-
19-
kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
20-
"github.com/kedacore/keda/v2/pkg/util"
2111
)
2212

23-
type baseCheckpoint struct {
24-
Epoch int64 `json:"Epoch"`
25-
Offset string `json:"Offset"`
26-
Owner string `json:"Owner"`
27-
Token string `json:"Token"`
28-
}
29-
30-
// Checkpoint is the object eventhub processor stores in storage
31-
// for checkpointing event processors. This matches the object
32-
// stored by the eventhub C# sdk and Java sdk
33-
type Checkpoint struct {
34-
baseCheckpoint
35-
PartitionID string `json:"PartitionId"`
36-
SequenceNumber int64 `json:"SequenceNumber"`
37-
}
38-
39-
// Eventhub python sdk stores the checkpoint differently
40-
type pythonCheckpoint struct {
41-
baseCheckpoint
42-
PartitionID string `json:"partition_id"`
43-
SequenceNumber int64 `json:"sequence_number"`
44-
}
45-
4613
// EventHubInfo to keep event hub connection and resources
4714
type EventHubInfo struct {
4815
EventHubConnection string
@@ -51,6 +18,7 @@ type EventHubInfo struct {
5118
BlobContainer string
5219
Namespace string
5320
EventHubName string
21+
CheckpointType string
5422
}
5523

5624
// GetEventHubClient returns eventhub client
@@ -80,74 +48,6 @@ func GetEventHubClient(info EventHubInfo) (*eventhub.Hub, error) {
8048
return nil, aadErr
8149
}
8250

83-
// GetCheckpointFromBlobStorage accesses Blob storage and gets checkpoint information of a partition
84-
func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) {
85-
blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(httpClient, kedav1alpha1.PodIdentityProviderNone, info.StorageConnection, "")
86-
if err != nil {
87-
return Checkpoint{}, err
88-
}
89-
90-
var eventHubNamespace string
91-
var eventHubName string
92-
if info.EventHubConnection != "" {
93-
eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection)
94-
if err != nil {
95-
return Checkpoint{}, err
96-
}
97-
} else {
98-
eventHubNamespace = info.Namespace
99-
eventHubName = info.EventHubName
100-
}
101-
102-
// TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats
103-
var baseURL *url.URL
104-
// Checking blob store for C# and Java applications
105-
if info.BlobContainer != "" {
106-
// URL format - <storageEndpoint>/<blobContainer>/<eventHubConsumerGroup>/<partitionID>
107-
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s", info.BlobContainer, info.EventHubConsumerGroup, partitionID))
108-
baseURL = storageEndpoint.ResolveReference(path)
109-
} else {
110-
// Checking blob store for Azure functions
111-
// URL format - <storageEndpoint>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
112-
path, _ := url.Parse(fmt.Sprintf("/azure-webjobs-eventhub/%s/%s/%s/%s", eventHubNamespace, eventHubName, info.EventHubConsumerGroup, partitionID))
113-
baseURL = storageEndpoint.ResolveReference(path)
114-
}
115-
116-
// Create a BlockBlobURL object to a blob in the container.
117-
blobURL := azblob.NewBlockBlobURL(*baseURL, azblob.NewPipeline(blobCreds, azblob.PipelineOptions{}))
118-
119-
get, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
120-
if err != nil {
121-
return Checkpoint{}, fmt.Errorf("unable to download file from blob storage: %w", err)
122-
}
123-
124-
blobData := &bytes.Buffer{}
125-
reader := get.Body(azblob.RetryReaderOptions{})
126-
if _, err := blobData.ReadFrom(reader); err != nil {
127-
return Checkpoint{}, fmt.Errorf("failed to read blob data: %s", err)
128-
}
129-
defer reader.Close() // The client must close the response body when finished with it
130-
131-
return getCheckpoint(blobData.Bytes())
132-
}
133-
134-
func getCheckpoint(bytes []byte) (Checkpoint, error) {
135-
var checkpoint Checkpoint
136-
var pyCheckpoint pythonCheckpoint
137-
138-
if err := json.Unmarshal(bytes, &checkpoint); err != nil {
139-
return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err)
140-
}
141-
142-
if err := json.Unmarshal(bytes, &pyCheckpoint); err != nil {
143-
return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err)
144-
}
145-
146-
err := mergo.Merge(&checkpoint, Checkpoint(pyCheckpoint))
147-
148-
return checkpoint, err
149-
}
150-
15151
// ParseAzureEventHubConnectionString parses Event Hub connection string into (namespace, name)
15252
// Connection string should be in following format:
15353
// Endpoint=sb://eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=secretKey123;EntityPath=eventhub-name
@@ -177,3 +77,20 @@ func ParseAzureEventHubConnectionString(connectionString string) (string, string
17777

17878
return eventHubNamespace, eventHubName, nil
17979
}
80+
81+
func getHubAndNamespace(info EventHubInfo) (string, string, error) {
82+
var eventHubNamespace string
83+
var eventHubName string
84+
var err error
85+
if info.EventHubConnection != "" {
86+
eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection)
87+
if err != nil {
88+
return "", "", err
89+
}
90+
} else {
91+
eventHubNamespace = info.Namespace
92+
eventHubName = info.EventHubName
93+
}
94+
95+
return eventHubNamespace, eventHubName, nil
96+
}

Diff for: pkg/scalers/azure/azure_eventhub_checkpoint.go

+220
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package azure
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"net/url"
9+
"strconv"
10+
11+
"github.com/Azure/azure-storage-blob-go/azblob"
12+
kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
13+
"github.com/kedacore/keda/v2/pkg/util"
14+
)
15+
16+
// goCheckpoint struct to adapt GoSdk Checkpoint
17+
type goCheckpoint struct {
18+
Checkpoint struct {
19+
SequenceNumber int64 `json:"sequenceNumber"`
20+
Offset string `json:"offset"`
21+
} `json:"checkpoint"`
22+
PartitionID string `json:"partitionId"`
23+
}
24+
25+
// Checkpoint is the object eventhub processor stores in storage
26+
// for checkpointing event processors. This matches the object
27+
// stored by the eventhub C# sdk and Java sdk
28+
type Checkpoint struct {
29+
Epoch int64 `json:"Epoch"`
30+
Offset string `json:"Offset"`
31+
Owner string `json:"Owner"`
32+
Token string `json:"Token"`
33+
PartitionID string `json:"PartitionId"`
34+
SequenceNumber int64 `json:"SequenceNumber"`
35+
}
36+
37+
type checkpointer interface {
38+
resolvePath(info EventHubInfo) (*url.URL, error)
39+
extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error)
40+
}
41+
42+
type azureWebjobCheckpointer struct {
43+
partitionID string
44+
containerName string
45+
}
46+
47+
type defaultCheckpointer struct {
48+
partitionID string
49+
containerName string
50+
}
51+
52+
type goSdkCheckpointer struct {
53+
partitionID string
54+
containerName string
55+
}
56+
57+
// GetCheckpointFromBlobStorage reads depending of the CheckpointType the checkpoint from a azure storage
58+
func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) {
59+
60+
checkpointer := newCheckpointer(info, partitionID)
61+
return getCheckpoint(ctx, httpClient, info, checkpointer)
62+
}
63+
64+
func newCheckpointer(info EventHubInfo, partitionID string) checkpointer {
65+
if info.CheckpointType == "GoSdk" {
66+
return &goSdkCheckpointer{
67+
containerName: info.BlobContainer,
68+
partitionID: partitionID,
69+
}
70+
} else if info.CheckpointType == "AzureWebJob" || info.BlobContainer == "" {
71+
return &azureWebjobCheckpointer{
72+
containerName: "azure-webjobs-eventhub",
73+
partitionID: partitionID,
74+
}
75+
} else {
76+
return &defaultCheckpointer{
77+
containerName: info.BlobContainer,
78+
partitionID: partitionID,
79+
}
80+
}
81+
}
82+
83+
func (checkpointer *azureWebjobCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
84+
eventHubNamespace, eventHubName, err := getHubAndNamespace(info)
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
// URL format - <storageEndpoint>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
90+
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID))
91+
92+
return path, nil
93+
}
94+
95+
func (checkpointer *defaultCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
96+
eventHubNamespace, eventHubName, err := getHubAndNamespace(info)
97+
if err != nil {
98+
return nil, err
99+
}
100+
101+
// URL format - <storageEndpoint>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
102+
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID))
103+
104+
return path, nil
105+
}
106+
107+
// Resolve Path for AzureWebJob Checkpoint
108+
func (checkpointer *goSdkCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
109+
path, _ := url.Parse(fmt.Sprintf("/%s/%s", info.BlobContainer, checkpointer.partitionID))
110+
111+
return path, nil
112+
}
113+
114+
func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
115+
var checkpoint Checkpoint
116+
err := readToCheckpointFromBody(get, &checkpoint)
117+
if err != nil {
118+
return Checkpoint{}, err
119+
}
120+
121+
return checkpoint, nil
122+
}
123+
124+
func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
125+
return getCheckpointFromStorageMetadata(get, checkpointer.partitionID)
126+
}
127+
128+
func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
129+
var checkpoint goCheckpoint
130+
err := readToCheckpointFromBody(get, &checkpoint)
131+
if err != nil {
132+
return Checkpoint{}, err
133+
}
134+
135+
return Checkpoint{
136+
SequenceNumber: checkpoint.Checkpoint.SequenceNumber,
137+
Offset: checkpoint.Checkpoint.Offset,
138+
PartitionID: checkpoint.PartitionID,
139+
}, nil
140+
}
141+
142+
func getCheckpoint(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, checkpointer checkpointer) (Checkpoint, error) {
143+
blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(httpClient, kedav1alpha1.PodIdentityProviderNone, info.StorageConnection, "")
144+
if err != nil {
145+
return Checkpoint{}, err
146+
}
147+
148+
path, err := checkpointer.resolvePath(info)
149+
if err != nil {
150+
return Checkpoint{}, err
151+
}
152+
153+
baseURL := storageEndpoint.ResolveReference(path)
154+
155+
get, err := downloadBlob(ctx, baseURL, blobCreds)
156+
if err != nil {
157+
return Checkpoint{}, err
158+
}
159+
160+
return checkpointer.extractCheckpoint(get)
161+
}
162+
163+
func getCheckpointFromStorageMetadata(get *azblob.DownloadResponse, partitionID string) (Checkpoint, error) {
164+
checkpoint := Checkpoint{
165+
PartitionID: partitionID,
166+
}
167+
168+
metadata := get.NewMetadata()
169+
170+
if sequencenumber, ok := metadata["sequencenumber"]; ok {
171+
if !ok {
172+
if sequencenumber, ok = metadata["Sequencenumber"]; !ok {
173+
return Checkpoint{}, fmt.Errorf("sequencenumber on blob not found")
174+
}
175+
}
176+
177+
if sn, err := strconv.ParseInt(sequencenumber, 10, 64); err == nil {
178+
checkpoint.SequenceNumber = sn
179+
} else {
180+
return Checkpoint{}, fmt.Errorf("sequencenumber is not a valid int64 value: %w", err)
181+
}
182+
}
183+
184+
if offset, ok := metadata["offset"]; ok {
185+
if !ok {
186+
if offset, ok = metadata["Offset"]; !ok {
187+
return Checkpoint{}, fmt.Errorf("offset on blob not found")
188+
}
189+
}
190+
checkpoint.Offset = offset
191+
}
192+
193+
return checkpoint, nil
194+
}
195+
196+
func readToCheckpointFromBody(get *azblob.DownloadResponse, checkpoint interface{}) error {
197+
blobData := &bytes.Buffer{}
198+
199+
reader := get.Body(azblob.RetryReaderOptions{})
200+
if _, err := blobData.ReadFrom(reader); err != nil {
201+
return fmt.Errorf("failed to read blob data: %s", err)
202+
}
203+
defer reader.Close() // The client must close the response body when finished with it
204+
205+
if err := json.Unmarshal(blobData.Bytes(), &checkpoint); err != nil {
206+
return fmt.Errorf("failed to decode blob data: %s", err)
207+
}
208+
209+
return nil
210+
}
211+
212+
func downloadBlob(ctx context.Context, baseURL *url.URL, blobCreds azblob.Credential) (*azblob.DownloadResponse, error) {
213+
blobURL := azblob.NewBlockBlobURL(*baseURL, azblob.NewPipeline(blobCreds, azblob.PipelineOptions{}))
214+
215+
get, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
216+
if err != nil {
217+
return nil, fmt.Errorf("unable to download file from blob storage: %w", err)
218+
}
219+
return get, nil
220+
}

0 commit comments

Comments
 (0)