Skip to content

Commit 1d6a792

Browse files
committed
✨ AtLeastOnceVerification redis module
1 parent 06f9206 commit 1d6a792

File tree

3 files changed

+243
-0
lines changed

3 files changed

+243
-0
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,6 @@ sequin.yaml
6262
.aider*
6363
.env
6464
.iex.local.exs
65+
66+
# Cursor shenanigans
67+
*plan.md
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
defmodule Sequin.ConsumersRuntime.AtLeastOnceVerification do
2+
@moduledoc """
3+
Verifies at-least-once delivery guarantees by tracking commit tuples in Redis.
4+
5+
As messages are fanned out to SlotMessageStores, commit tuples are written to a Redis sorted set
6+
with the commit timestamp as the score. When messages are acknowledged, the corresponding commit
7+
tuples are removed.
8+
9+
Remaining commit tuples in the sorted set that are older than a certain threshold may indicate
10+
missed deliveries, which can be verified against the ConsumerEvent/ConsumerRecord tables.
11+
"""
12+
13+
alias Sequin.Error
14+
alias Sequin.Redis
15+
16+
require Logger
17+
18+
@type consumer_id :: String.t()
19+
# {xid, lsn}
20+
@type commit_tuple :: {integer(), integer()}
21+
@type commit_timestamp :: integer()
22+
23+
@spec record_commit(consumer_id(), commit_tuple(), commit_timestamp()) :: :ok | {:error, Error.t()}
24+
def record_commit(consumer_id, {xid, lsn}, timestamp) do
25+
key = commit_key(consumer_id)
26+
member = "#{xid}:#{lsn}"
27+
28+
case Redis.command(["ZADD", key, timestamp, member]) do
29+
{:ok, _} -> :ok
30+
{:error, error} -> {:error, error}
31+
end
32+
end
33+
34+
@spec remove_commit(consumer_id(), commit_tuple()) :: :ok | {:error, Error.t()}
35+
def remove_commit(consumer_id, {xid, lsn}) do
36+
key = commit_key(consumer_id)
37+
member = "#{xid}:#{lsn}"
38+
39+
case Redis.command(["ZREM", key, member]) do
40+
{:ok, _} -> :ok
41+
{:error, error} -> {:error, error}
42+
end
43+
end
44+
45+
@spec get_unverified_commits(consumer_id(), commit_timestamp()) ::
46+
{:ok, [{commit_tuple(), commit_timestamp()}]} | {:error, Error.t()}
47+
def get_unverified_commits(consumer_id, older_than_timestamp) do
48+
key = commit_key(consumer_id)
49+
50+
with {:ok, results} <- Redis.command(["ZRANGEBYSCORE", key, "-inf", older_than_timestamp, "WITHSCORES"]) do
51+
commits =
52+
results
53+
|> Enum.chunk_every(2)
54+
|> Enum.map(fn [member, score] ->
55+
[xid, lsn] = String.split(member, ":")
56+
{{String.to_integer(xid), String.to_integer(lsn)}, String.to_integer(score)}
57+
end)
58+
59+
{:ok, commits}
60+
end
61+
end
62+
63+
@spec trim_commits(consumer_id(), commit_timestamp()) :: :ok | {:error, Error.t()}
64+
def trim_commits(consumer_id, older_than_timestamp) do
65+
key = commit_key(consumer_id)
66+
67+
with {:ok, initial_size} <- Redis.command(["ZCARD", key]),
68+
{:ok, trimmed} <- Redis.command(["ZREMRANGEBYSCORE", key, "-inf", older_than_timestamp]),
69+
{:ok, final_size} <- Redis.command(["ZCARD", key]) do
70+
Logger.info("[AtLeastOnceVerification] Trimmed commits older than #{older_than_timestamp}",
71+
consumer_id: consumer_id,
72+
initial_size: initial_size,
73+
records_removed: trimmed,
74+
final_size: final_size
75+
)
76+
77+
:ok
78+
else
79+
{:error, error} when is_exception(error) ->
80+
Logger.error("[AtLeastOnceVerification] Error trimming commits", error: error)
81+
{:error, error}
82+
end
83+
end
84+
85+
@spec all_commits(consumer_id()) :: {:ok, [{commit_tuple(), commit_timestamp()}]} | {:error, Error.t()}
86+
def all_commits(consumer_id) do
87+
key = commit_key(consumer_id)
88+
89+
with {:ok, results} <- Redis.command(["ZRANGE", key, 0, -1, "WITHSCORES"]) do
90+
commits =
91+
results
92+
|> Enum.chunk_every(2)
93+
|> Enum.map(fn [member, score] ->
94+
[xid, lsn] = String.split(member, ":")
95+
{{String.to_integer(xid), String.to_integer(lsn)}, String.to_integer(score)}
96+
end)
97+
98+
{:ok, commits}
99+
end
100+
end
101+
102+
@spec count_commits(consumer_id()) :: {:ok, non_neg_integer()} | {:error, Error.t()}
103+
def count_commits(consumer_id) do
104+
key = commit_key(consumer_id)
105+
Redis.command(["ZCARD", key])
106+
end
107+
108+
defp commit_key(consumer_id), do: "consumer:#{consumer_id}:commit_verification"
109+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
defmodule Sequin.ConsumersRuntime.AtLeastOnceVerificationTest do
2+
use ExUnit.Case, async: true
3+
4+
alias Sequin.ConsumersRuntime.AtLeastOnceVerification
5+
alias Sequin.Factory
6+
alias Sequin.Redis
7+
8+
setup do
9+
consumer_id = Factory.uuid()
10+
commit_key = "consumer:#{consumer_id}:commit_verification"
11+
12+
on_exit(fn ->
13+
Redis.command(["DEL", commit_key])
14+
end)
15+
16+
%{consumer_id: consumer_id}
17+
end
18+
19+
describe "record_commit/3" do
20+
test "records a commit tuple with timestamp", %{consumer_id: consumer_id} do
21+
commit = {123, 456}
22+
timestamp = System.system_time(:second)
23+
24+
assert :ok = AtLeastOnceVerification.record_commit(consumer_id, commit, timestamp)
25+
26+
# Verify the commit was recorded with correct score
27+
{:ok, commits} = AtLeastOnceVerification.all_commits(consumer_id)
28+
assert length(commits) == 1
29+
assert List.first(commits) == {commit, timestamp}
30+
end
31+
32+
test "can record multiple commits", %{consumer_id: consumer_id} do
33+
commits = [
34+
{{100, 200}, 1000},
35+
{{101, 201}, 1001},
36+
{{102, 202}, 1002}
37+
]
38+
39+
Enum.each(commits, fn {commit, ts} ->
40+
assert :ok = AtLeastOnceVerification.record_commit(consumer_id, commit, ts)
41+
end)
42+
43+
# Verify all commits were recorded
44+
{:ok, count} = AtLeastOnceVerification.count_commits(consumer_id)
45+
assert String.to_integer(count) == 3
46+
end
47+
end
48+
49+
describe "remove_commit/2" do
50+
test "removes a specific commit tuple", %{consumer_id: consumer_id} do
51+
commit = {123, 456}
52+
timestamp = System.system_time(:second)
53+
54+
:ok = AtLeastOnceVerification.record_commit(consumer_id, commit, timestamp)
55+
assert :ok = AtLeastOnceVerification.remove_commit(consumer_id, commit)
56+
57+
# Verify the commit was removed
58+
{:ok, members} = Redis.command(["ZRANGE", "consumer:#{consumer_id}:commit_verification", 0, -1])
59+
assert members == []
60+
end
61+
62+
test "returns ok when removing non-existent commit", %{consumer_id: consumer_id} do
63+
assert :ok = AtLeastOnceVerification.remove_commit(consumer_id, {999, 999})
64+
end
65+
end
66+
67+
describe "get_unverified_commits/2" do
68+
test "returns commits older than specified timestamp", %{consumer_id: consumer_id} do
69+
now = System.system_time(:second)
70+
71+
old_commits = [
72+
{{100, 200}, now - 100},
73+
{{101, 201}, now - 50}
74+
]
75+
76+
new_commit = {{102, 202}, now}
77+
78+
# Record all commits
79+
Enum.each(old_commits, fn {commit, ts} ->
80+
:ok = AtLeastOnceVerification.record_commit(consumer_id, commit, ts)
81+
end)
82+
83+
:ok = AtLeastOnceVerification.record_commit(consumer_id, elem(new_commit, 0), elem(new_commit, 1))
84+
85+
# Get commits older than (now - 25)
86+
{:ok, unverified} = AtLeastOnceVerification.get_unverified_commits(consumer_id, now - 25)
87+
88+
assert length(unverified) == 2
89+
assert Enum.all?(unverified, fn {_commit, ts} -> ts < now - 25 end)
90+
end
91+
92+
test "returns empty list when no commits exist", %{consumer_id: consumer_id} do
93+
{:ok, unverified} = AtLeastOnceVerification.get_unverified_commits(consumer_id, System.system_time(:second))
94+
assert unverified == []
95+
end
96+
end
97+
98+
describe "trim_commits/2" do
99+
test "removes commits older than specified timestamp", %{consumer_id: consumer_id} do
100+
now = System.system_time(:second)
101+
102+
commits = [
103+
# old
104+
{{100, 200}, now - 100},
105+
# old
106+
{{101, 201}, now - 50},
107+
# current
108+
{{102, 202}, now}
109+
]
110+
111+
# Record all commits
112+
Enum.each(commits, fn {commit, ts} ->
113+
:ok = AtLeastOnceVerification.record_commit(consumer_id, commit, ts)
114+
end)
115+
116+
# Trim commits older than (now - 25)
117+
assert :ok = AtLeastOnceVerification.trim_commits(consumer_id, now - 25)
118+
119+
# Verify only newer commit remains
120+
{:ok, remaining} = AtLeastOnceVerification.count_commits(consumer_id)
121+
assert String.to_integer(remaining) == 1
122+
123+
{:ok, [member]} = AtLeastOnceVerification.all_commits(consumer_id)
124+
assert member == {{102, 202}, now}
125+
end
126+
127+
test "handles empty set gracefully", %{consumer_id: consumer_id} do
128+
assert :ok = AtLeastOnceVerification.trim_commits(consumer_id, System.system_time(:second))
129+
end
130+
end
131+
end

0 commit comments

Comments
 (0)