-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcluster.go
108 lines (91 loc) · 3.03 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package kafkauniverse
import (
"context"
"fmt"
"os"
"strings"
"github.com/IBM/sarama"
"github.com/cloudtrust/kafka-client/misc"
)
type cluster struct {
enabled bool
brokers []string
saramaConfig *sarama.Config
consumerGroups map[string]sarama.ConsumerGroup
logger Logger
}
func newCluster(ctx context.Context, conf KafkaClusterRepresentation, envKeyPrefix string, logger Logger) (*cluster, error) {
var secret = getEnvVariable(envKeyPrefix, *conf.ID, "_CLIENT_SECRET")
if secret != nil {
conf.Security.ClientSecret = secret
}
var saramaConfig, err = newSaramaConfig(ctx, conf, logger)
if err != nil {
return nil, err
}
if conf.SaramaLogEnabled != nil {
sarama.Logger = misc.NewSaramaLogger(logger.Info, *conf.SaramaLogEnabled)
}
var enabled = conf.Enabled == nil || *conf.Enabled
return &cluster{
enabled: enabled,
brokers: conf.Brokers,
saramaConfig: saramaConfig,
consumerGroups: make(map[string]sarama.ConsumerGroup),
logger: logger,
}, nil
}
func getEnvVariable(prefix string, clusterID string, suffix string) *string {
var key = prefix + getEnvVariableName(clusterID) + suffix
var value = os.Getenv(key)
if value != "" {
return &value
}
return nil
}
func getEnvVariableName(clusterID string) string {
return strings.ReplaceAll(strings.ToUpper(clusterID), "-", "_")
}
func newSaramaConfig(ctx context.Context, conf KafkaClusterRepresentation, logger Logger) (*sarama.Config, error) {
version, err := sarama.ParseKafkaVersion(*conf.Version)
if err != nil {
logger.Warn(ctx, "msg", "Failed to parse Kafka version", "err", err, "version", *conf.Version)
return nil, fmt.Errorf("can't parse kafka version %s", *conf.Version)
}
config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Return.Errors = true
config.Producer.Return.Successes = true
// Enables Oauth2 authentification
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
config.Net.SASL.TokenProvider = misc.NewTokenProvider(*conf.Security.ClientID, *conf.Security.ClientSecret, *conf.Security.TokenURL)
config.Net.TLS.Enable = *conf.TLSEnabled
return config, nil
}
func (c *cluster) Close() error {
var anError error
for name, consumerGroup := range c.consumerGroups {
if err := consumerGroup.Close(); err != nil {
c.logger.Warn(context.Background(), "msg", "Failed to close consumer group", "group", name, "err", err)
anError = err
}
}
return anError
}
func (c *cluster) getConsumerGroup(consumerGroupName string) (sarama.ConsumerGroup, error) {
if !c.enabled {
return &misc.NoopKafkaConsumerGroup{}, nil
}
if cg, ok := c.consumerGroups[consumerGroupName]; ok {
return cg, nil
}
consumer, err := sarama.NewConsumerGroup(c.brokers, consumerGroupName, c.saramaConfig)
if err != nil {
c.logger.Warn(context.Background(), "msg", "Failed to create consumer group", "group", consumerGroupName, "err", err)
return nil, err
}
c.consumerGroups[consumerGroupName] = consumer
return consumer, nil
}