Skip to content

Commit 4082710

Browse files
Dynamic send and batch send splits for milestone 1.5 (#367)
* Improve producer edge cases * remove the code duplication when the producer is closed correctly or not * make the unconfirmed operation more atomic to avoid rare cases when the unconfirmed operations were out of the mutex lock * Increase the timeout for the Reliable producers and consumers. It was too low. Now, there is one more wait for the producer to wait for pending messages to be flushed when closed. * refactor publish confirm channel with a lock to check if it is valid or not and avoid race conditions during the closing * handle when the producer is not found during the confirmation. It can happen when the producer is closed before all the messages are confirmed Signed-off-by: Gabriele Santomaggio <[email protected]> * Improve producer edge cases * remove the code duplication when the producer is closed correctly or not * make the unconfirmed operation more atomic to avoid rare cases when the unconfirmed operations were out of the mutex lock * Increase the timeout for the Reliable producers and consumers. It was too low. Now, there is one more wait for the producer to wait for pending messages to be flushed when closed. * refactor publish confirm channel with a lock to check if it is valid or not and avoid race conditions during the closing * handle when the producer is not found during the confirmation. It can happen when the producer is closed before all the messages are confirmed Signed-off-by: Gabriele Santomaggio <[email protected]> * Bump Windows version to rabbitmq 4.0.5 and erlang 27.2 Signed-off-by: Gabriele Santomaggio <[email protected]> * update dependencies Signed-off-by: Gabriele Santomaggio <[email protected]> * temporary remove Windows test due to actions/checkout#1186 --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Alberto Moretti <[email protected]>
1 parent ab4d470 commit 4082710

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1293
-883
lines changed

Diff for: .ci/versions.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
2-
"erlang": "26.2.3",
3-
"rabbitmq": "3.13.0"
2+
"erlang": "27.2",
3+
"rabbitmq": "4.0.5"
44
}

Diff for: .github/workflows/build_and_test.yml

+25-24
Original file line numberDiff line numberDiff line change
@@ -63,30 +63,31 @@ jobs:
6363
verbose: true # optional (default = false)
6464
env:
6565
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
66-
test-win32:
67-
runs-on: windows-latest
68-
strategy:
69-
matrix:
70-
go: [ '1.22']
71-
steps:
72-
- uses: actions/checkout@v4
73-
- uses: actions/setup-go@v5
74-
id: setup_go
75-
with:
76-
go-version: ${{ matrix.go }}
77-
check-latest: true
78-
- name: Cache installers
79-
uses: actions/cache@v4
80-
with:
81-
# Note: the cache path is relative to the workspace directory
82-
# https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action
83-
path: ~/installers
84-
key: ${{ runner.os }}-v0-${{ hashFiles('.ci/versions.json') }}
85-
- name: Install and start RabbitMQ
86-
run: ./.ci/install.ps1
87-
- name: Install GNU make
88-
run: choco install make
89-
- run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }}
66+
# temporany removed due of https://github.com/actions/checkout/issues/1186
67+
# test-win32:
68+
# runs-on: windows-latest
69+
# strategy:
70+
# matrix:
71+
# go: [ '1.22']
72+
# steps:
73+
# - uses: actions/checkout@v4
74+
# - uses: actions/setup-go@v5
75+
# id: setup_go
76+
# with:
77+
# go-version: ${{ matrix.go }}
78+
# check-latest: true
79+
# - name: Cache installers
80+
# uses: actions/cache@v4
81+
# with:
82+
# # Note: the cache path is relative to the workspace directory
83+
# # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action
84+
# path: ~/installers
85+
# key: ${{ runner.os }}-v0-${{ hashFiles('.ci/versions.json') }}
86+
# - name: Install and start RabbitMQ
87+
# run: ./.ci/install.ps1
88+
# - name: Install GNU make
89+
# run: choco install make
90+
# - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }}
9091
publish:
9192
runs-on: ubuntu-latest
9293
needs: [test]

Diff for: README.md

+38-76
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
1616
- [Run server with Docker](#run-server-with-docker)
1717
- [Getting started for impatient](#getting-started-for-impatient)
1818
- [Examples](#examples)
19+
- [Client best practices](#client-best-practices)
1920
- [Usage](#usage)
2021
* [Connect](#connect)
2122
* [Multi hosts](#multi-hosts)
@@ -67,7 +68,7 @@ You may need a server to test locally. Let's start the broker:
6768
```shell
6869
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672\
6970
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' \
70-
rabbitmq:3-management
71+
rabbitmq:4-management
7172
```
7273
The broker should start in a few seconds. When it’s ready, enable the `stream` plugin and `stream_management`:
7374
```shell
@@ -85,6 +86,11 @@ See [getting started](./examples/getting_started.go) example.
8586

8687
See [examples](./examples/) directory for more use cases.
8788

89+
### Client best practices
90+
91+
This client provides a set of best practices to use the client in the best way. </br>
92+
See [best practices](./best_practices/README.md) for more details.
93+
8894
# Usage
8995

9096
### Connect
@@ -251,15 +257,8 @@ To publish a message you need a `*stream.Producer` instance:
251257
producer, err := env.NewProducer("my-stream", nil)
252258
```
253259

254-
With `ProducerOptions` is possible to customize the Producer behaviour:
255-
```golang
256-
type ProducerOptions struct {
257-
Name string // Producer name, it is useful to handle deduplication messages
258-
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
259-
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
260-
BatchPublishingDelay int // Period to send a batch of messages.
261-
}
262-
```
260+
With `ProducerOptions` is possible to customize the Producer behaviour.
261+
263262

264263
The client provides two interfaces to send messages.
265264
`send`:
@@ -277,30 +276,35 @@ for z := 0; z < 10; z++ {
277276
err = producer.BatchSend(messages)
278277
```
279278

279+
### `Send` vs `BatchSend`
280280

281-
`producer.Send`:
282-
- accepts one message as parameter
283-
- automatically aggregates the messages
284-
- automatically splits the messages in case the size is bigger than `requestedMaxFrameSize`
285-
- automatically splits the messages based on batch-size
286-
- sends the messages in case nothing happens in `producer-send-timeout`
287-
- is asynchronous
281+
The `BatchSend` is the primitive to send the messages. It is up to the user to manage the aggregation.
282+
`Send` introduces a smart layer to publish messages and internally uses `BatchSend`.
288283

289-
`producer.BatchSend`:
290-
- accepts an array messages as parameter
291-
- is synchronous
284+
Starting from version 1.5.0, the `Send` uses a dynamic send.
285+
The client sends the message buffer regardless of any timeout.</br>
292286

293-
Close the producer:
294-
`producer.Close()` the producer is removed from the server. TCP connection is closed if there aren't </b>
295-
other producers
287+
What should you use? </br>
288+
The `Send` method is the best choice for most of the cases:</br>
289+
- It is asynchronous
290+
- It is smart to aggregate the messages in a batch with a low-latency
291+
- It is smart to split the messages in case the size is bigger than `requestedMaxFrameSize`
292+
- You can play with `BatchSize` parameter to increase the throughput
296293

297-
### `Send` vs `BatchSend`
294+
The `BatchSend` is useful in case you need to manage the aggregation by yourself. </br>
295+
It gives you more control over the aggregation process: </br>
296+
- It is synchronous
297+
- It is up to the user to manage the aggregation
298+
- It is up to the user to split the messages in case the size is bigger than `requestedMaxFrameSize`
299+
- It can be faster than `Send` in case the aggregation is managed by the user.
298300

299-
The `BatchSend` is the primitive to send the messages, `Send` introduces a smart layer to publish messages and internally uses `BatchSend`.
301+
#### Throughput vs Latency</br>
302+
With both methods you can have low-latency and/or high-throughput. </br>
303+
The `Send` is the best choice for low-latency without care about aggregation.
304+
With `BatchSend` you have more control.</br>
300305

301-
The `Send` interface works in most of the cases, In some condition is about 15/20 slower than `BatchSend`. See also this [thread](https://groups.google.com/g/rabbitmq-users/c/IO_9-BbCzgQ).
302-
303-
See also "Client performances" example in the [examples](./examples/performances/) directory
306+
Performance test tool can help you to test `Send` and `BatchSend` </br>
307+
See also the [Performance test tool](#performance-test-tool) section.
304308

305309
### Publish Confirmation
306310

@@ -350,10 +354,13 @@ the values `messageStatus.GetMessage().GetPublishingId()` and `messageStatus.Get
350354

351355
See also "Getting started" example in the [examples](./examples/) directory
352356

353-
354-
355357
### Deduplication
356358

359+
The deduplication is a feature that allows to avoid the duplication of messages. </br>
360+
It is enabled by the user by setting the producer name with the options: </br>
361+
```golang
362+
producer, err := env.NewProducer(streamName, stream.NewProducerOptions().SetName("my_producer"))
363+
```
357364
The stream plugin can handle deduplication data, see this blog post for more details:
358365
https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/ </br>
359366
You can find a "Deduplication" example in the [examples](./examples/) directory. </br>
@@ -596,55 +603,10 @@ Like the standard stream, you should avoid to store the offset for each single m
596603
### Performance test tool
597604
598605
Performance test tool it is useful to execute tests.
606+
The performance test tool is in the [perfTest](./perfTest) directory. </br>
599607
See also the [Java Performance](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#the-performance-tool) tool
600608
601609
602-
To install you can download the version from github:
603-
604-
Mac:
605-
```
606-
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
607-
```
608-
609-
Linux:
610-
```
611-
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
612-
```
613-
614-
Windows
615-
```
616-
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
617-
```
618-
619-
execute `stream-perf-test --help` to see the parameters. By default it executes a test with one producer, one consumer.
620-
621-
here an example:
622-
```shell
623-
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
624-
```
625-
626-
### Performance test tool Docker
627-
A docker image is available: `pivotalrabbitmq/go-stream-perf-test`, to test it:
628-
629-
Run the server is host mode:
630-
```shell
631-
docker run -it --rm --name rabbitmq --network host \
632-
rabbitmq:3-management
633-
```
634-
enable the plugin:
635-
```
636-
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
637-
```
638-
then run the docker image:
639-
```shell
640-
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
641-
```
642-
643-
To see all the parameters:
644-
```shell
645-
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
646-
```
647-
648610
### Build form source
649611
650612
```shell

Diff for: best_practices/README.md

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
Client best practices
2+
=====================
3+
4+
The scope of this document is to provide a set of best practices for the client applications that use the Go client library.</br>
5+
6+
7+
#### General recommendations
8+
- Messages are not thread-safe, you should not share the same message between different go-routines or different Send/BatchSend calls.
9+
- Use the producer name only if you need deduplication.
10+
- Avoid to store the consumer offset to the server too often.
11+
- `Send` works well in most of the cases, use `BatchSend` when you need more control.
12+
- Connections/producers/consumers are designed to be long-lived. You should avoid creating and closing them too often.
13+
- The library is generally thread-safe,even it is better to use one producer/consumer per go-routine.
14+
15+
#### Default configuration
16+
17+
The default configuration of the client library is designed to be used in most of the cases.
18+
No particular tuning is required. Just follow the [Getting started](../examples/getting_started.go) example.
19+
20+
#### Multiple producers and consumers
21+
22+
Each connection can support multiple producers and consumers, you can reduce the number of connections by using the same connection for multiple producers and consumers.</br>
23+
With:
24+
```golang
25+
SetMaxConsumersPerClient(10).
26+
SetMaxConsumersPerClient(10)
27+
```
28+
The TCP connection will be shared between the producers and consumers.
29+
Note about consumers: One slow consumer can block the others, so it is important:
30+
- To have a good balance between the number of consumers and the speed of the consumers.
31+
- work application side to avoid slow consumers, for example, by using a go-routines/buffers.
32+
33+
#### High throughput
34+
35+
To achieve high throughput, you should use one producer per connection, and one consumer per connection.
36+
This will avoid lock contention between the producers when sending messages and between the consumers when receiving messages.
37+
38+
The method `Send` is usually enough to achieve high throughput.
39+
In some case you can use the `BatchSend` method. See the `Send` vs `BatchSend` documentation for more details.
40+
41+
#### Low latency
42+
43+
To achieve Low latency, you should use one producer per connection, and one consumer per connection.
44+
45+
The method `Send` is the best choice to achieve low latency. Default values are tuned for low latency.
46+
You can change the `BatchSize` parameter to increase or reduce the max number of messages sent in one batch.
47+
Note: Since the client uses dynamic send, the `BatchSize` parameter is a hint to the client, the client can send less than the `BatchSize`.
48+
49+
#### Store several text based messages
50+
51+
In case you want to store logs, text-based or big messages, you can use the `Sub Entries Batching` method.
52+
Where it is possible to store multiple messages in one entry and compress the entry with different algorithms.
53+
It is useful to reduce the disk space and the network bandwidth.
54+
See the `Sub Entries Batching` documentation for more details.</br>
55+
56+
#### Store several small messages
57+
58+
In case you want to store a lot of small messages, you can use the `BatchSend` method.
59+
Where it is possible to store multiple messages in one entry. This will avoid creating small chunks on the server side.</br>
60+
61+
62+
#### Avoid duplications
63+
64+
In case you want to store messages with deduplication, you need to set the producer name and the deduplication id.
65+
See the `Deduplication` documentation for more details.</br>
66+
67+
68+
#### Consumer fail over
69+
70+
In case you want to have a consumer fail over, you can use the `Single Active Consumer` method.
71+
Where only one consumer is active at a time, and the other consumers are in standby mode.
72+
73+
#### Reliable producer and consumer
74+
75+
The client library provides a reliable producer and consumer, where the producer and consumer can recover from a connection failure.
76+
See the `Reliable` documentation for more details.</br>
77+
78+
79+
#### Scaling the streams
80+
81+
In case you want to scale the streams, you can use the `Super Stream` method.
82+
Where you can have multiple streams and only one stream is active at a time.
83+
See the `Super Stream` documentation for more details.</br>
84+
85+
86+
#### Filtering the data when consuming
87+
88+
In case you want to filter the data when consuming, you can use the `Stream Filtering` method.
89+
Where you can filter the data based on the metadata.
90+
See the `Stream Filtering` documentation for more details.</br>
91+
92+
93+
#### Using a load balancer
94+
95+
In case you want to use a load balancer, you can use the `Using a load balancer` method.
96+
In Kubernetes, you can use the service name as load balancer dns.
97+
See the `Using a load balancer` documentation for more details.</br>
98+
99+
100+
101+
102+
103+
104+
105+
106+

Diff for: examples/README.md

-2
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,3 @@ Stream examples
1616
- [Single Active Consumer](./single_active_consumer) - Single Active Consumer example
1717
- [Reliable](./reliable) - Reliable Producer and Reliable Consumer example
1818
- [Super Stream](./super_stream) - Super Stream example with Single Active Consumer
19-
- [Client performances](./performances) - Client performances example
20-

Diff for: examples/deduplication/deduplication.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ func main() {
3535
}
3636

3737
producer, err := env.NewProducer(streamName,
38-
stream.NewProducerOptions().SetProducerName("myProducer")) // producer name is mandatory to handle the deduplication
38+
stream.NewProducerOptions().
39+
// producer name is mandatory to handle the deduplication
40+
// don't use the producer name if you don't need the deduplication
41+
SetProducerName("myProducer"))
3942

4043
CheckErr(err)
4144

Diff for: examples/getting_started.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ func main() {
111111
channelClose := consumer.NotifyClose()
112112
// channelClose receives all the closing events, here you can handle the
113113
// client reconnection or just log
114-
defer consumerClose(channelClose)
114+
go func() {
115+
consumerClose(channelClose)
116+
}()
115117

116118
fmt.Println("Press any key to stop ")
117119
_, _ = reader.ReadString('\n')

0 commit comments

Comments
 (0)