Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🗂️ SinkConsumerFlushedWalCursor #935

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

RTLS
Copy link
Contributor

@RTLS RTLS commented Jan 31, 2025

No description provided.

Copy link
Contributor Author

RTLS commented Jan 31, 2025

This stack of pull requests is managed by Graphite. Learn more about stacking.

@RTLS RTLS force-pushed the 01-31-_sinkconsumerflushedwalcursor branch 2 times, most recently from b97df28 to 1857743 Compare January 31, 2025 22:39
@RTLS RTLS force-pushed the 01-31-_sinkconsumerflushedwalcursor branch from 1857743 to c6cb18d Compare February 11, 2025 22:11
@RTLS RTLS force-pushed the 01-31-_sinkconsumerflushedwalcursor branch 2 times, most recently from 9fa7fb8 to 8fd0786 Compare February 12, 2025 02:42
@RTLS RTLS force-pushed the 01-31-_sinkconsumerflushedwalcursor branch from 8fd0786 to 836565c Compare February 12, 2025 02:46
@@ -0,0 +1,49 @@
defmodule Sequin.Consumers.SinkConsumerHighWatermarkWalCursor do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change from flushed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DurableWalCursor

@@ -66,13 +67,14 @@ defmodule Sequin.ConsumersRuntime.ConsumerProducer do
@impl GenStage
def handle_info(:init, state) do
consumer = Repo.lazy_preload(state.consumer, postgres_database: [:replication_slot])
{:ok, low_watermark} = Replication.find_watermark(consumer.replication_slot_id, :low)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's my durable wal cursor?

Comment on lines +199 to +201
less_than_low_watermark =
message.commit_lsn < state.low_watermark.commit_lsn or
(message.commit_lsn == state.low_watermark.commit_lsn and message.commit_idx < state.low_watermark.commit_idx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect, because failed messages can have <

:ok

high_watermark_wal_cursor ->
{:ok, _} =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{:ok, cursor} =

Comment on lines +124 to +125
@spec put_high_watermark_wal_cursor(State.t(), Replication.wal_cursor()) :: {:ok, State.t()}
def put_high_watermark_wal_cursor(%State{} = state, wal_cursor) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slot_high_watermark
slot_non_durable_cursor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incoming_wal_cursor

Comment on lines +361 to +362
# Force update from the database so we advance the low watermark for acknowledgement
replication_slot = Repo.preload(state.replication_slot, [:low_watermark_wal_cursor], force: true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this under reply == 1

messages =
if reply == 1 and not is_nil(state.last_commit_lsn) do
if reply == 1 and not is_nil(replication_slot.low_watermark_wal_cursor) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if reply == 1 and not is_nil(replication_slot.low_watermark_wal_cursor) do
if reply == 1 do

@@ -730,7 +752,13 @@ defmodule Sequin.DatabasesRuntime.SlotProcessor do
send(state.test_pid, {__MODULE__, :heartbeat_received})
end

state
if accumulated_messages?(state) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you always advance this?

@RTLS RTLS force-pushed the main branch 3 times, most recently from 6657098 to 459bc87 Compare March 3, 2025 17:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants