-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproducer.go
86 lines (77 loc) · 2.14 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package kafkauniverse
import (
"context"
"fmt"
"github.com/IBM/sarama"
"github.com/cloudtrust/kafka-client/misc"
)
type Producer interface {
SendMessageBytes(content []byte) error
SendPartitionedMessageBytes(partitionKey string, content []byte) error
Close() error
}
type producer struct {
initialized bool
cluster *cluster
id string
enabled bool
topic *string
producer sarama.SyncProducer
logger Logger
}
func newProducer(cluster *cluster, producerRep KafkaProducerRepresentation, logger Logger) (*producer, error) {
var enabled = true
if !cluster.enabled || (producerRep.Enabled != nil && !*producerRep.Enabled) {
enabled = false
}
return &producer{
initialized: false,
cluster: cluster,
id: *producerRep.ID,
enabled: enabled,
topic: producerRep.Topic,
logger: logger,
}, nil
}
// Close closes all resources
func (p *producer) Close() error {
if !p.initialized || !p.enabled {
return nil
}
return p.producer.Close()
}
func (p *producer) initialize() error {
if p.initialized {
return fmt.Errorf("producer %s already initialized", p.id)
}
if !p.enabled {
p.producer = &misc.NoopKafkaProducer{}
p.initialized = true
return nil
}
var err error
if p.producer, err = sarama.NewSyncProducer(p.cluster.brokers, p.cluster.saramaConfig); err != nil {
p.logger.Error(context.Background(), "msg", "Failed to initialize Kafka producer", "err", err)
return err
}
p.initialized = true
return nil
}
// SendMessageBytes sends a message in the producer topic
func (p *producer) SendMessageBytes(content []byte) error {
if !p.enabled {
return nil
}
msg := &sarama.ProducerMessage{Topic: *p.topic, Value: sarama.StringEncoder(content)}
var _, _, err = p.producer.SendMessage(msg)
return err
}
// SendPartitionedMessageBytes sends a message in the producer topic
func (p *producer) SendPartitionedMessageBytes(partitionKey string, content []byte) error {
if !p.enabled {
return nil
}
msg := &sarama.ProducerMessage{Topic: *p.topic, Key: sarama.StringEncoder(partitionKey), Value: sarama.StringEncoder(content)}
var _, _, err = p.producer.SendMessage(msg)
return err
}