diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index e1e62785ac1cca..79379b883c6d12 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -417,7 +417,15 @@ def ingest_transactions_options() -> list[click.Option]: "process-spans": { "topic": Topic.INGEST_SPANS, "strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory", - "click_options": multiprocessing_options(default_max_batch_size=100), + "click_options": [ + click.Option( + ["--max-flush-segments", "max_flush_segments"], + type=int, + default=100, + help="The number of segments to download from redis at once. Defaults to 100.", + ), + *multiprocessing_options(default_max_batch_size=100), + ], }, "process-segments": { "topic": Topic.BUFFERED_SEGMENTS, diff --git a/src/sentry/scripts/spans/add-buffer.lua b/src/sentry/scripts/spans/add-buffer.lua new file mode 100644 index 00000000000000..9d91285ee5cad1 --- /dev/null +++ b/src/sentry/scripts/spans/add-buffer.lua @@ -0,0 +1,56 @@ +--[[ + +Add a span to the span buffer. + +KEYS: +- "project_id:trace_id" -- just for redis-cluster routing, all keys that the script uses are sharded like this/have this hashtag. + +ARGS: +- payload -- str +- is_root_span -- bool +- span_id -- str +- parent_span_id -- str +- set_timeout -- int + +]]-- + +local project_and_trace = KEYS[1] + +local payload = ARGV[1] +local is_root_span = ARGV[2] == "true" +local span_id = ARGV[3] +local parent_span_id = ARGV[4] +local set_timeout = tonumber(ARGV[5]) + +local span_key = string.format("span-buf:s:{%s}:%s", project_and_trace, span_id) + +local main_redirect_key = string.format("span-buf:sr:{%s}", project_and_trace) +local set_span_id = parent_span_id +for i = 0, 10000 do -- theoretically this limit means that segment trees of depth 10k may not be joined together correctly, if there is e.g. a hole of size 10k. + local new_set_span = redis.call("hget", main_redirect_key, set_span_id) + if not new_set_span or new_set_span == set_span_id then + break + end + + set_span_id = new_set_span +end + +redis.call("hset", main_redirect_key, span_id, set_span_id) +local set_key = string.format("span-buf:s:{%s}:%s", project_and_trace, set_span_id) + +if not is_root_span then + redis.call("sunionstore", set_key, set_key, span_key) + redis.call("del", span_key) +end +redis.call("sadd", set_key, payload) +redis.call("expire", set_key, set_timeout) + +redis.call("expire", main_redirect_key, set_timeout) + +local has_root_span_key = string.format("span-buf:hrs:%s", set_key) +local has_root_span = redis.call("get", has_root_span_key) == "1" +if has_root_span or is_root_span then + redis.call("setex", has_root_span_key, set_timeout, "1") +end + +return {span_key, set_key, has_root_span or is_root_span} diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py new file mode 100644 index 00000000000000..7aad3aa68fc558 --- /dev/null +++ b/src/sentry/spans/buffer.py @@ -0,0 +1,327 @@ +""" +Span buffer is a consumer that takes individual spans from snuba-spans (soon +ingest-spans, anyway, from Relay) and assembles them to segments of this form: + + {"spans": ,,} + +We have to do this without having such a thing as segment ID: + + span1 = {"span_id": "a...", "parent_span_id": "b..."} + span2 = {"span_id": "b...", "parent_span_id": "c..."} + span3 = {"span_id": "c...", "parent_span_id": "d..."} + +In other words, spans only know their parent spans' IDs, and the segment should +be assembled according to those relationships and implied transitive ones. + +There are a few ways to detect when a span is a root span (aka segment span): + +1. It does not have a parent_span_id +2. It has an explicit is_segment_span marker, or some attribute directly on the span. +3. For some time, no span comes in that identifies itself as parent. +4. The parent span exists in another project. + +We simplify this set of conditions for the span buffer: + +* Relay writes is_segment based on some other attributes for us, so that we don't have to look at N span-local attributes. This simplifies condition 2. +* The span buffer is sharded by project. Therefore, condition 4 is handled by the code for condition 3, although with some delay. + +Segments are flushed out to `buffered-spans` topic under two conditions: + +* If the segment has a root span, it is flushed out after `span_buffer_root_timeout` seconds of inactivity. +* Otherwise, it is flushed out after `span_buffer_timeout` seconds of inactivity. + +Now how does that look like in Redis? For each incoming span, we: + +1. Try to figure out what the name of the respective span buffer is (`set_key` in `add-buffer.lua`) + a. We look up any "redirects" from the span buffer's parent_span_id (hashmap at "span-buf:sr:{project_id:trace_id}") to another key. + b. Otherwise we use "span-buf:s:{project_id:trace_id}:span_id" +2. Rename any span buffers keyed under the span's own span ID to `set_key`, merging their contents. +3. Add the ingested span's payload to the set under `set_key`. +4. To a "global queue", we write the set's key, sorted by timeout. + +Eventually, flushing cronjob looks at that global queue, and removes all timed +out keys from it. Then fetches the sets associated with those keys, and deletes +the sets. + +This happens in two steps: Get the to-be-flushed segments in `flush_segments`, +then the consumer produces them, then they are deleted from Redis +(`done_flush_segments`) + +On top of this, the global queue is sharded by partition, meaning that each +consumer reads and writes to shards that correspond to its own assigned +partitions. This means that extra care needs to be taken when recreating topics +or using spillover topics, especially when their new partition count is lower +than the original topic. + +Glossary for types of keys: + + * span-buf:s:* -- the actual set keys, containing span payloads. Each key contains all data for a segment. The most memory-intensive kind of key. + * span-buf:q:* -- the priority queue, used to determine which segments are ready to be flushed. + * span-buf:hrs:* -- simple bool key to flag a segment as "has root span" (HRS) + * span-buf:sr:* -- redirect mappings so that each incoming span ID can be mapped to the right span-buf:s: set. +""" + +from __future__ import annotations + +import itertools +from collections.abc import Sequence +from typing import Any, NamedTuple + +import rapidjson +from django.conf import settings +from sentry_redis_tools.clients import RedisCluster, StrictRedis + +from sentry.utils import metrics, redis + +# This SegmentId is an internal identifier used by the redis buffer that is +# also directly used as raw redis key. the format is +# "span-buf:s:{project_id:trace_id}:span_id", and the type is bytes because our redis +# client is bytes. +# +# The segment ID in the Kafka protocol is actually only the span ID. +SegmentId = bytes + + +def _segment_to_span_id(segment_id: SegmentId) -> bytes: + return parse_segment_id(segment_id)[2] + + +def parse_segment_id(segment_id: SegmentId) -> tuple[bytes, bytes, bytes]: + segment_id_parts = segment_id.split(b":") + project_id = segment_id_parts[2][1:] + trace_id = segment_id_parts[3][:-1] + span_id = segment_id_parts[4] + + return project_id, trace_id, span_id + + +def get_redis_client() -> RedisCluster[bytes] | StrictRedis[bytes]: + return redis.redis_clusters.get_binary(settings.SENTRY_SPAN_BUFFER_CLUSTER) + + +add_buffer_script = redis.load_redis_script("spans/add-buffer.lua") + + +# NamedTuples are faster to construct than dataclasses +class Span(NamedTuple): + trace_id: str + span_id: str + parent_span_id: str | None + project_id: int + payload: bytes + is_segment_span: bool = False + + +class OutputSpan(NamedTuple): + payload: dict[str, Any] + + +class SpansBuffer: + def __init__( + self, + assigned_shards: list[int], + span_buffer_timeout_secs: int = 60, + span_buffer_root_timeout_secs: int = 10, + redis_ttl: int = 3600, + ): + self.client: RedisCluster[bytes] | StrictRedis[bytes] = get_redis_client() + self.assigned_shards = list(assigned_shards) + self.span_buffer_timeout_secs = span_buffer_timeout_secs + self.span_buffer_root_timeout_secs = span_buffer_root_timeout_secs + self.redis_ttl = redis_ttl + + # make it pickleable + def __reduce__(self): + return ( + SpansBuffer, + ( + self.assigned_shards, + self.span_buffer_timeout_secs, + self.span_buffer_root_timeout_secs, + self.redis_ttl, + ), + ) + + def process_spans(self, spans: Sequence[Span], now: int): + """ + :param spans: List of to-be-ingested spans. + :param now: The current time to be used for setting expiration/flush + deadlines. Used for unit-testing and managing backlogging behavior. + """ + + queue_keys = [] + queue_delete_items = [] + queue_items = [] + queue_item_has_root_span = [] + + is_root_span_count = 0 + has_root_span_count = 0 + + with metrics.timer("sentry.spans.buffer.process_spans.insert_spans"): + with self.client.pipeline(transaction=False) as p: + for span in spans: + # (parent_span_id) -> [Span] + shard = self.assigned_shards[int(span.trace_id, 16) % len(self.assigned_shards)] + queue_key = f"span-buf:q:{shard}" + + # Note: For the case where the span's parent is in another project, we + # will still flush the segment-without-root-span as one unit, just + # after span_buffer_timeout_secs rather than + # span_buffer_root_timeout_secs. + is_root_span = span.is_segment_span + + if is_root_span: + is_root_span_count += 1 + parent_span_id = span.span_id + else: + parent_span_id = span.parent_span_id or span.span_id + + # hack to make redis-cluster-py pipelines work well with + # scripts. we cannot use EVALSHA or "the normal way to do + # scripts in sentry" easily until we get rid of + # redis-cluster-py sentrywide. this probably leaves a bit of + # perf on the table as we send the full lua sourcecode with every span. + p.eval( + add_buffer_script.script, + 1, + f"{span.project_id}:{span.trace_id}", + span.payload, + "true" if is_root_span else "false", + span.span_id, + parent_span_id, + self.redis_ttl, + ) + + queue_keys.append(queue_key) + + results = iter(p.execute()) + for delete_item, item, has_root_span in results: + queue_delete_items.append(delete_item) + queue_items.append(item) + queue_item_has_root_span.append(has_root_span) + + with metrics.timer("sentry.spans.buffer.process_spans.update_queue"): + with self.client.pipeline(transaction=False) as p: + for key, delete_item, item, has_root_span in zip( + queue_keys, queue_delete_items, queue_items, queue_item_has_root_span + ): + # if the currently processed span is a root span, OR the buffer + # already had a root span inside, use a different timeout than + # usual. + if has_root_span: + has_root_span_count += 1 + timestamp = now + self.span_buffer_root_timeout_secs + else: + timestamp = now + self.span_buffer_timeout_secs + + if delete_item != item: + p.zrem(key, delete_item) + p.zadd(key, {item: timestamp}) + p.expire(key, self.redis_ttl) + + p.execute() + + metrics.timing("sentry.spans.buffer.process_spans.num_spans", len(spans)) + metrics.timing("sentry.spans.buffer.process_spans.num_is_root_spans", is_root_span_count) + metrics.timing("sentry.spans.buffer.process_spans.num_has_root_spans", has_root_span_count) + + def flush_segments( + self, now: int, max_segments: int = 0 + ) -> tuple[int, dict[SegmentId, list[OutputSpan]]]: + cutoff = now + + with metrics.timer("sentry.spans.buffer.flush_segments.load_segment_ids"): + with self.client.pipeline(transaction=False) as p: + for shard in self.assigned_shards: + key = f"span-buf:q:{shard}" + p.zrangebyscore( + key, 0, cutoff, start=0 if max_segments else None, num=max_segments or None + ) + p.zcard(key) + + result = iter(p.execute()) + + segment_ids = [] + queue_sizes = [] + + with metrics.timer("sentry.spans.buffer.flush_segments.load_segment_data"): + with self.client.pipeline(transaction=False) as p: + # ZRANGEBYSCORE output + for segment_span_ids in result: + # process return value of zrevrangebyscore + for segment_id in segment_span_ids: + segment_ids.append(segment_id) + p.smembers(segment_id) + + # ZCARD output + queue_sizes.append(next(result)) + + segments = p.execute() + + for shard_i, queue_size in zip(self.assigned_shards, queue_sizes): + metrics.timing( + "sentry.spans.buffer.flush_segments.queue_size", + queue_size, + tags={"shard_i": shard_i}, + ) + + return_segments = {} + + for segment_id, segment in zip(segment_ids, segments): + segment_span_id = _segment_to_span_id(segment_id).decode("ascii") + + return_segment = [] + metrics.timing("sentry.spans.buffer.flush_segments.num_spans_per_segment", len(segment)) + for payload in segment: + val = rapidjson.loads(payload) + val["segment_id"] = segment_span_id + val["is_segment"] = segment_span_id == val["span_id"] + return_segment.append(OutputSpan(payload=val)) + + return_segments[segment_id] = return_segment + metrics.timing("sentry.spans.buffer.flush_segments.num_segments", len(return_segments)) + + return sum(queue_sizes), return_segments + + def done_flush_segments(self, segment_ids: dict[SegmentId, list[OutputSpan]]): + num_hdels = [] + metrics.timing("sentry.spans.buffer.done_flush_segments.num_segments", len(segment_ids)) + with metrics.timer("sentry.spans.buffer.done_flush_segments"): + with self.client.pipeline(transaction=False) as p: + for segment_id, output_spans in segment_ids.items(): + hrs_key = b"span-buf:hrs:" + segment_id + p.get(hrs_key) + p.delete(hrs_key) + p.delete(segment_id) + + project_id, trace_id, _ = parse_segment_id(segment_id) + redirect_map_key = b"span-buf:sr:{%s:%s}" % (project_id, trace_id) + shard = self.assigned_shards[int(trace_id, 16) % len(self.assigned_shards)] + p.zrem(f"span-buf:q:{shard}".encode("ascii"), segment_id) + + i = 0 + for span_batch in itertools.batched(output_spans, 100): + i += 1 + p.hdel( + redirect_map_key, + *[output_span.payload["span_id"] for output_span in span_batch], + ) + + num_hdels.append(i) + + results = iter(p.execute()) + + has_root_span_count = 0 + for result, num_hdel in zip(results, num_hdels): + if result: + has_root_span_count += 1 + + next(results) # DEL hrs_key + next(results) # DEL segment_id + next(results) # ZREM ... + for _ in range(num_hdel): # HDEL ... + next(results) + + metrics.timing( + "sentry.spans.buffer.done_flush_segments.has_root_span", has_root_span_count + ) diff --git a/src/sentry/spans/buffer/redis.py b/src/sentry/spans/buffer/redis.py deleted file mode 100644 index a90275aaa09dd7..00000000000000 --- a/src/sentry/spans/buffer/redis.py +++ /dev/null @@ -1,159 +0,0 @@ -from __future__ import annotations - -import dataclasses -from collections.abc import Mapping -from typing import NamedTuple - -import sentry_sdk -from django.conf import settings -from sentry_redis_tools.clients import RedisCluster, StrictRedis - -from sentry import options -from sentry.utils import redis -from sentry.utils.iterators import chunked - - -@dataclasses.dataclass -class ProcessSegmentsContext: - timestamp: int - partition: int - should_process_segments: bool - - -class SegmentKey(NamedTuple): - segment_id: str - project_id: int - partition: int - - -def get_redis_client() -> RedisCluster[bytes] | StrictRedis[bytes]: - return redis.redis_clusters.get_binary(settings.SENTRY_SPAN_BUFFER_CLUSTER) - - -def get_segment_key(project_id: str | int, segment_id: str) -> str: - return f"segment:{segment_id}:{project_id}:process-segment" - - -def get_last_processed_timestamp_key(partition_index: int) -> str: - return f"performance-issues:last-processed-timestamp:partition:{partition_index}" - - -def get_unprocessed_segments_key(partition_index: int) -> str: - return f"performance-issues:unprocessed-segments:partition-2:{partition_index}" - - -class RedisSpansBuffer: - def __init__(self): - self.client: RedisCluster | StrictRedis = get_redis_client() - - def batch_write_and_check_processing( - self, - spans_map: Mapping[SegmentKey, list[bytes]], - segment_first_seen_ts: Mapping[SegmentKey, int], - latest_ts_by_partition: Mapping[int, int], - ) -> list[ProcessSegmentsContext]: - """ - 1. Pushes batches of spans to redis - 2. Check if number of spans pushed == to the number of elements that exist on the key. This - tells us if it was the first time we see the key. This works fine because RPUSH is atomic. - 3. If it is the first time we see a particular segment, push the segment id and first seen - timestamp to a bucket so we know when it is ready to be processed. - 3. Checks if 1 second has passed since the last time segments were processed for a partition. - """ - keys = list(spans_map.keys()) - spans_written_per_segment = [] - ttl = options.get("standalone-spans.buffer-ttl.seconds") - - # Batch write spans in a segment - with self.client.pipeline() as p: - for key in keys: - segment_id, project_id, partition = key - spans = spans_map[key] - segment_key = get_segment_key(project_id, segment_id) - # RPUSH is atomic - p.rpush(segment_key, *spans) - spans_written_per_segment.append(len(spans)) - - results = p.execute() - - partitions = list(latest_ts_by_partition.keys()) - with self.client.pipeline() as p: - # Get last processed timestamp for each partition processed - # by consumer - for partition in partitions: - timestamp = latest_ts_by_partition[partition] - timestamp_key = get_last_processed_timestamp_key(partition) - # GETSET is atomic - p.getset(timestamp_key, timestamp) - - for result in zip(keys, spans_written_per_segment, results): - # Check if this is a new segment, if yes, add to bucket to be processed - key, num_written, num_total = result - if num_written == num_total: - segment_id, project_id, partition = key - segment_key = get_segment_key(project_id, segment_id) - bucket = get_unprocessed_segments_key(partition) - - timestamp = segment_first_seen_ts[key] - p.expire(segment_key, ttl) - p.rpush(bucket, timestamp, segment_key) - - timestamp_results = p.execute() - - # For each partition this consumer is assigned to, check if it should process segments - process_segments_contexts: list[ProcessSegmentsContext] = [] - for value in zip(timestamp_results[: len(latest_ts_by_partition)], partitions): - last_ts, partition = value - should_process = last_ts is None or int(last_ts) < latest_ts_by_partition[partition] - process_segments_contexts.append( - ProcessSegmentsContext( - timestamp=latest_ts_by_partition[partition], - partition=partition, - should_process_segments=should_process, - ) - ) - - return process_segments_contexts - - def read_and_expire_many_segments(self, keys: list[str]) -> list[list[str | bytes]]: - values = [] - with self.client.pipeline() as p: - for key in keys: - p.lrange(key, 0, -1) - - p.delete(*keys) - response = p.execute() - - for value in response[:-1]: - values.append(value) - - return values - - def get_unprocessed_segments_and_prune_bucket(self, now: int, partition: int) -> list[str]: - key = get_unprocessed_segments_key(partition) - results = self.client.lrange(key, 0, -1) or [] - - buffer_window = options.get("standalone-spans.buffer-window.seconds") - - segment_keys = [] - processed_segment_ts = None - for result in chunked(results, 2): - try: - segment_timestamp, segment_key = result - segment_timestamp = int(segment_timestamp) - if now - segment_timestamp < buffer_window: - break - - processed_segment_ts = segment_timestamp - segment_keys.append(segment_key.decode("utf-8")) - except Exception: - # Just in case something funky happens here - sentry_sdk.capture_exception() - break - - self.client.ltrim(key, len(segment_keys) * 2, -1) - - segment_context = {"current_timestamp": now, "segment_timestamp": processed_segment_ts} - sentry_sdk.set_context("processed_segment", segment_context) - - return segment_keys diff --git a/src/sentry/spans/consumers/process/factory.py b/src/sentry/spans/consumers/process/factory.py index 880a11e4677794..85753cb717d402 100644 --- a/src/sentry/spans/consumers/process/factory.py +++ b/src/sentry/spans/consumers/process/factory.py @@ -1,267 +1,27 @@ -import dataclasses import logging -from collections import defaultdict +import time from collections.abc import Mapping -from datetime import datetime -from typing import Any +from functools import partial -import orjson import rapidjson -import sentry_sdk from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaProducer, build_kafka_configuration -from arroyo.backends.kafka.consumer import Headers, KafkaPayload +from arroyo.backends.kafka.consumer import KafkaPayload from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory from arroyo.processing.strategies.batching import BatchStep, ValuesBatch -from arroyo.processing.strategies.produce import Produce +from arroyo.processing.strategies.commit import CommitOffsets from arroyo.processing.strategies.run_task import RunTask -from arroyo.processing.strategies.unfold import Unfold -from arroyo.types import ( - FILTERED_PAYLOAD, - BrokerValue, - Commit, - FilteredPayload, - Message, - Partition, - Value, -) -from sentry_kafka_schemas.codecs import Codec -from sentry_kafka_schemas.schema_types.snuba_spans_v1 import SpanEvent +from arroyo.types import Commit, Message, Partition -from sentry import options -from sentry.conf.types.kafka_definition import Topic, get_topic_codec -from sentry.spans.buffer.redis import ProcessSegmentsContext, RedisSpansBuffer, SegmentKey -from sentry.spans.consumers.process.strategy import CommitSpanOffsets, NoOp -from sentry.utils import metrics +from sentry.conf.types.kafka_definition import Topic +from sentry.spans.buffer import Span, SpansBuffer +from sentry.spans.consumers.process.flusher import SpanFlusher from sentry.utils.arroyo import MultiprocessingPool, run_task_with_multiprocessing from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition +from sentry.utils.safe import get_path logger = logging.getLogger(__name__) -SPANS_CODEC: Codec[SpanEvent] = get_topic_codec(Topic.INGEST_SPANS) -MAX_PAYLOAD_SIZE = 10 * 1000 * 1000 # 10 MB - -BATCH_SIZE = 100 - - -def in_process_spans_rollout_group(project_id: int | None) -> bool: - if project_id and project_id in options.get( - "standalone-spans.process-spans-consumer.project-allowlist" - ): - return True - - if project_id and (project_id % 100000) / 100000 < options.get( - "standalone-spans.process-spans-consumer.project-rollout" - ): - return True - return False - - -@dataclasses.dataclass -class SpanMessageWithMetadata: - segment_id: str - project_id: int - timestamp: int - partition: int - span: bytes - - -def get_project_id(headers: Headers) -> int | None: - for k, v in headers: - if k == "project_id": - return int(v.decode("utf-8")) - - return None - - -def prepare_buffered_segment_payload(segments) -> bytes: - segment_str = b",".join(segments) - return b'{"spans": [' + segment_str + b"]}" - - -@metrics.wraps("spans.consumers.process.deserialize_span") -def _deserialize_span(value: bytes, use_orjson=False, use_rapidjson=False) -> Mapping[str, Any]: - if use_orjson: - sentry_sdk.set_tag("json_lib", "orjson") - return orjson.loads(value) - if use_rapidjson: - sentry_sdk.set_tag("json_lib", "rapidjson") - return rapidjson.loads(value) - - return SPANS_CODEC.decode(value) - - -def _process_message(message: Message[KafkaPayload]) -> SpanMessageWithMetadata | FilteredPayload: - """ - Deserializes span to get segment_id. Returns `SpanMessageWithMetadata` which contains the - original span payload value in bytes along with other segment_id, message timestamp and - partition data to ensure correct bucketing in redis. - """ - if not options.get("standalone-spans.process-spans-consumer.enable"): - return FILTERED_PAYLOAD - - try: - project_id = get_project_id(message.payload.headers) - except Exception: - logger.exception("Failed to parse span message header") - return FILTERED_PAYLOAD - - if not project_id or not in_process_spans_rollout_group(project_id=project_id): - return FILTERED_PAYLOAD - - assert isinstance(message.value, BrokerValue) - - with sentry_sdk.start_transaction(op="process", name="spans.process.process_message") as txn: - payload_value = message.payload.value - timestamp = int(message.value.timestamp.timestamp()) - partition = message.value.partition.index - - use_orjson = options.get("standalone-spans.deserialize-spans-orjson.enable") - use_rapidjson = options.get("standalone-spans.deserialize-spans-rapidjson.enable") - - with txn.start_child(op="deserialize"): - span = _deserialize_span( - payload_value, use_orjson=use_orjson, use_rapidjson=use_rapidjson - ) - - segment_id: str | None = span.get("segment_id", None) - if segment_id is None: - return FILTERED_PAYLOAD - - return SpanMessageWithMetadata( - segment_id=segment_id, - project_id=project_id, - timestamp=timestamp, - partition=partition, - span=payload_value, - ) - - -def process_message(message: Message[KafkaPayload]) -> SpanMessageWithMetadata | FilteredPayload: - try: - return _process_message(message) - except Exception: - sentry_sdk.capture_exception() - return FILTERED_PAYLOAD - - -def _batch_write_to_redis(message: Message[ValuesBatch[SpanMessageWithMetadata]]): - """ - Gets a batch of `SpanMessageWithMetadata` and creates a dictionary with - segment_id as key and a list of spans belonging to that segment_id as value. - Pushes the batch of spans to redis. - """ - with sentry_sdk.start_transaction(op="process", name="spans.process.expand_segments"): - batch = message.payload - latest_ts_by_partition: dict[int, int] = {} - spans_map: dict[SegmentKey, list[bytes]] = defaultdict(list) - segment_first_seen_ts: dict[SegmentKey, int] = {} - - for item in batch: - payload = item.payload - partition = payload.partition - segment_id = payload.segment_id - project_id = payload.project_id - span = payload.span - timestamp = payload.timestamp - - key = SegmentKey(segment_id, project_id, partition) - - # Collects spans for each segment_id - spans_map[key].append(span) - - # Collects "first_seen" timestamps for each segment in batch. - # Batch step doesn't guarantee order, so pick lowest ts. - if key not in segment_first_seen_ts or timestamp < segment_first_seen_ts[key]: - segment_first_seen_ts[key] = timestamp - - # Collects latest timestamps processed in each partition. It is - # important to keep track of this per partition because message - # timestamps are guaranteed to be monotonic per partition only. - if ( - partition not in latest_ts_by_partition - or timestamp > latest_ts_by_partition[partition] - ): - latest_ts_by_partition[partition] = timestamp - - client = RedisSpansBuffer() - - return client.batch_write_and_check_processing( - spans_map=spans_map, - segment_first_seen_ts=segment_first_seen_ts, - latest_ts_by_partition=latest_ts_by_partition, - ) - - -def batch_write_to_redis( - message: Message[ValuesBatch[SpanMessageWithMetadata]], -): - try: - return _batch_write_to_redis(message) - except Exception: - sentry_sdk.capture_exception() - return FILTERED_PAYLOAD - - -def _expand_segments(should_process_segments: list[ProcessSegmentsContext]): - with sentry_sdk.start_transaction(op="process", name="spans.process.expand_segments") as txn: - buffered_segments: list[Value] = [] - - for result in should_process_segments: - timestamp = result.timestamp - partition = result.partition - should_process = result.should_process_segments - - if not should_process: - continue - - client = RedisSpansBuffer() - payload_context = {} - - with txn.start_child(op="process", name="fetch_unprocessed_segments"): - keys = client.get_unprocessed_segments_and_prune_bucket(timestamp, partition) - - sentry_sdk.set_measurement("segments.count", len(keys)) - if len(keys) > 0: - payload_context["sample_key"] = keys[0] - - # With pipelining, redis server is forced to queue replies using - # up memory, so batching the keys we fetch. - with txn.start_child(op="process", name="read_and_expire_many_segments"): - for i in range(0, len(keys), BATCH_SIZE): - segments = client.read_and_expire_many_segments(keys[i : i + BATCH_SIZE]) - - for j, segment in enumerate(segments): - if not segment: - continue - - payload_data = prepare_buffered_segment_payload(segment) - if len(payload_data) > MAX_PAYLOAD_SIZE: - logger.warning( - "Failed to produce message: max payload size exceeded.", - extra={"segment_key": keys[i + j]}, - ) - metrics.incr("performance.buffered_segments.max_payload_size_exceeded") - continue - - buffered_segments.append( - Value( - KafkaPayload(None, payload_data, []), - {}, - datetime.fromtimestamp(timestamp), - ) - ) - - return buffered_segments - - -def expand_segments(should_process_segments: list[ProcessSegmentsContext]): - try: - return _expand_segments(should_process_segments) - except Exception: - sentry_sdk.capture_exception() - return [] - class ProcessSpansStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): """ @@ -278,12 +38,16 @@ def __init__( max_batch_size: int, max_batch_time: int, num_processes: int, + max_flush_segments: int, input_block_size: int | None, output_block_size: int | None, ): super().__init__() + + # config self.max_batch_size = max_batch_size self.max_batch_time = max_batch_time + self.max_flush_segments = max_flush_segments self.input_block_size = input_block_size self.output_block_size = output_block_size self.__pool = MultiprocessingPool(num_processes) @@ -301,38 +65,84 @@ def create_with_partitions( commit: Commit, partitions: Mapping[Partition, int], ) -> ProcessingStrategy[KafkaPayload]: + committer = CommitOffsets(commit) - produce_step = Produce( - producer=self.producer, - topic=self.output_topic, - next_step=NoOp(), - ) - - unfold_step = Unfold(generator=expand_segments, next_step=produce_step) + buffer = SpansBuffer(assigned_shards=[p.index for p in partitions]) - commit_step = CommitSpanOffsets(commit=commit, next_step=unfold_step) - - batch_processor = RunTask( - function=batch_write_to_redis, - next_step=commit_step, + # patch onto self just for testing + flusher = self._flusher = SpanFlusher( + buffer, + self.producer, + self.output_topic, + self.max_flush_segments, + next_step=committer, ) - batch_step = BatchStep( + run_task = run_task_with_multiprocessing( + function=partial(process_batch, buffer), + next_step=flusher, max_batch_size=self.max_batch_size, max_batch_time=self.max_batch_time, - next_step=batch_processor, + pool=self.__pool, + input_block_size=self.input_block_size, + output_block_size=self.output_block_size, ) - return run_task_with_multiprocessing( - function=process_message, - next_step=batch_step, + batch = BatchStep( max_batch_size=self.max_batch_size, max_batch_time=self.max_batch_time, - pool=self.__pool, - input_block_size=self.input_block_size, - output_block_size=self.output_block_size, + next_step=run_task, ) + # We use the produce timestamp to drive the clock for flushing, so that + # consumer backlogs do not cause segments to be flushed prematurely. + # The received timestamp in the span is too old for this purpose if + # Relay starts buffering, and we don't want that effect to propagate + # into this system. + def add_produce_timestamp_cb(message: Message[KafkaPayload]) -> tuple[int, KafkaPayload]: + return ( + int(message.timestamp.timestamp() if message.timestamp else time.time()), + message.payload, + ) + + add_timestamp = RunTask( + function=add_produce_timestamp_cb, + next_step=batch, + ) + + return add_timestamp + def shutdown(self) -> None: self.producer.close() self.__pool.close() + + +def process_batch( + buffer: SpansBuffer, values: Message[ValuesBatch[tuple[int, KafkaPayload]]] +) -> int: + min_timestamp = None + spans = [] + for value in values.payload: + timestamp, payload = value.payload + if min_timestamp is None or timestamp < min_timestamp: + min_timestamp = timestamp + + val = rapidjson.loads(payload.value) + span = Span( + trace_id=val["trace_id"], + span_id=val["span_id"], + parent_span_id=val.get("parent_span_id"), + project_id=val["project_id"], + payload=payload.value, + # TODO: validate, this logic may not be complete. + is_segment_span=( + val.get("parent_span_id") is None + or get_path(val, "sentry_tags", "op") == "http.server" + or val.get("is_remote") + ), + ) + spans.append(span) + + assert min_timestamp is not None + buffer.process_spans(spans, now=min_timestamp) + return min_timestamp diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py new file mode 100644 index 00000000000000..1811197649f056 --- /dev/null +++ b/src/sentry/spans/consumers/process/flusher.py @@ -0,0 +1,111 @@ +import threading +import time +from concurrent import futures + +import rapidjson +from arroyo import Topic as ArroyoTopic +from arroyo.backends.kafka import KafkaPayload, KafkaProducer +from arroyo.processing.strategies.abstract import ProcessingStrategy +from arroyo.types import Message + +from sentry.spans.buffer import SpansBuffer +from sentry.utils import metrics + + +class SpanFlusher(ProcessingStrategy[int]): + """ + A background thread that polls Redis for new segments to flush and to produce to Kafka. + + This is a processing step to be embedded into the consumer that writes to + Redis. It takes and fowards integer messages that represent recently + processed timestamps (from the producer timestamp of the incoming span + message), which are then used as a clock to determine whether segments have expired. + + + + :param producer: + :param topic: The topic to send segments to. + :param max_flush_segments: How many segments to flush at once in a single Redis call. + """ + + def __init__( + self, + buffer: SpansBuffer, + producer: KafkaProducer, + topic: ArroyoTopic, + max_flush_segments: int, + next_step: ProcessingStrategy[int], + ): + self.buffer = buffer + self.producer = producer + self.topic = topic + self.max_flush_segments = max_flush_segments + self.next_step = next_step + + self.stopped = False + self.current_drift = 0 + + self.thread = threading.Thread(target=self.main, daemon=True) + self.thread.start() + + def main(self): + while not self.stopped: + now = int(time.time()) + self.current_drift + + producer_futures = [] + + queue_size, flushed_segments = self.buffer.flush_segments( + max_segments=self.max_flush_segments, now=now + ) + metrics.timing("sentry.spans.buffer.inflight_segments", queue_size) + + if not flushed_segments: + time.sleep(1) + continue + + for _, spans_set in flushed_segments.items(): + if not spans_set: + # This is a bug, most likely the input topic is not + # partitioned by trace_id so multiple consumers are writing + # over each other. The consequence is duplicated segments, + # worst-case. + metrics.incr("sentry.spans.buffer.empty_segments") + continue + + spans = [span.payload for span in spans_set] + + kafka_payload = KafkaPayload( + None, rapidjson.dumps({"spans": spans}).encode("utf8"), [] + ) + + producer_futures.append(self.producer.produce(self.topic, kafka_payload)) + + futures.wait(producer_futures) + + self.buffer.done_flush_segments(flushed_segments) + + def poll(self) -> None: + self.next_step.poll() + + def submit(self, message: Message[int]) -> None: + self.current_drift = message.payload - int(time.time()) + self.next_step.submit(message) + + def terminate(self) -> None: + self.stopped = True + self.next_step.terminate() + + def close(self) -> None: + self.stopped = True + self.next_step.close() + + def join(self, timeout: float | None = None): + # set stopped flag first so we can "flush" the background thread while + # next_step is also shutting down. we can do two things at once! + self.stopped = True + deadline = time.time() + timeout if timeout else None + + self.next_step.join(timeout) + + while self.thread.is_alive() and (deadline is None or deadline > time.time()): + time.sleep(0.1) diff --git a/src/sentry/spans/consumers/process/strategy.py b/src/sentry/spans/consumers/process/strategy.py deleted file mode 100644 index 9c24e69fde04e5..00000000000000 --- a/src/sentry/spans/consumers/process/strategy.py +++ /dev/null @@ -1,58 +0,0 @@ -from typing import Any, Generic, TypeVar - -from arroyo.processing.strategies.abstract import ProcessingStrategy -from arroyo.processing.strategies.commit import CommitOffsets -from arroyo.types import Commit, Message - -TPayload = TypeVar("TPayload") - - -class CommitSpanOffsets(CommitOffsets, Generic[TPayload]): - """ - Inherits from CommitOffsets so we can add a next step. We'd like to commit offsets for - processed spans before carrying on the work to build segments and produce them since - the processing messages and producing segments are two distinct operations. Span messages - should be committed once they are processed and put into redis. - """ - - def __init__(self, commit: Commit, next_step: ProcessingStrategy[TPayload]) -> None: - super().__init__(commit=commit) - self.__next_step = next_step - - def poll(self) -> None: - super().poll() - self.__next_step.poll() - - def submit(self, message: Message[TPayload]) -> None: - super().submit(message) - self.__next_step.submit(message) - - def close(self) -> None: - self.__next_step.close() - - def terminate(self) -> None: - self.__next_step.terminate() - - def join(self, timeout: float | None = None) -> None: - super().join(timeout) - self.__next_step.join(timeout=timeout) - - -class NoOp(ProcessingStrategy[Any]): - def __init__(self) -> None: - return - - def poll(self) -> None: - pass - - def submit(self, message: Message[Any]) -> None: - pass - - def close(self) -> None: - pass - - def terminate(self) -> None: - pass - - def join(self, timeout: float | None = None) -> None: - pass diff --git a/src/sentry/testutils/helpers/redis.py b/src/sentry/testutils/helpers/redis.py index 9381d5f9131aa4..6bdf79eba09ad6 100644 --- a/src/sentry/testutils/helpers/redis.py +++ b/src/sentry/testutils/helpers/redis.py @@ -21,6 +21,7 @@ def use_redis_cluster( with_options: dict[str, Any] | None = None, ) -> Generator[None]: # Cluster id needs to be different than "default" to distinguish redis instance with redis cluster. + # In order to run tests that use this helper, run 'devservices up --mode backend-ci' or '--mode full' options = { "backpressure.high_watermarks.redis": high_watermark, diff --git a/tests/sentry/spans/buffer/__init__.py b/tests/sentry/spans/buffer/__init__.py deleted file mode 100644 index e69de29bb2d1d6..00000000000000 diff --git a/tests/sentry/spans/buffer/test_redis.py b/tests/sentry/spans/buffer/test_redis.py deleted file mode 100644 index 808218b8822fb2..00000000000000 --- a/tests/sentry/spans/buffer/test_redis.py +++ /dev/null @@ -1,150 +0,0 @@ -from sentry.spans.buffer.redis import ProcessSegmentsContext, RedisSpansBuffer, SegmentKey -from sentry.testutils.pytest.fixtures import django_db_all - - -class TestRedisSpansBuffer: - @django_db_all - def test_batch_write(self): - buffer = RedisSpansBuffer() - spans_map = { - SegmentKey("segment_1", 1, 1): [b"span data", b"span data 2", b"span data 3"], - SegmentKey("segment_2", 1, 1): [b"span data"], - } - timestamp_map = { - SegmentKey("segment_1", 1, 1): 1710280889, - SegmentKey("segment_2", 1, 1): 1710280889, - } - last_seen_map = { - 1: 1710280889, - } - result = buffer.batch_write_and_check_processing( - spans_map=spans_map, - segment_first_seen_ts=timestamp_map, - latest_ts_by_partition=last_seen_map, - ) - assert result == [ - ProcessSegmentsContext(timestamp=1710280889, partition=1, should_process_segments=True) - ] - assert buffer.client.ttl("segment:segment_1:1:process-segment") == 300 - assert buffer.client.lrange( - "performance-issues:unprocessed-segments:partition-2:1", 0, -1 - ) == [ - b"1710280889", - b"segment:segment_1:1:process-segment", - b"1710280889", - b"segment:segment_2:1:process-segment", - ] - - assert buffer.read_and_expire_many_segments( - ["segment:segment_1:1:process-segment", "segment:segment_2:1:process-segment"] - ) == [[b"span data", b"span data 2", b"span data 3"], [b"span data"]] - - @django_db_all - def test_multiple_batch_write(self): - buffer = RedisSpansBuffer() - spans_map = { - SegmentKey("segment_1", 1, 1): [b"span data", b"span data 2", b"span data 3"], - SegmentKey("segment_2", 1, 2): [b"span data"], - } - timestamp_map = { - SegmentKey("segment_1", 1, 1): 1710280889, - SegmentKey("segment_2", 1, 2): 1710280889, - } - last_seen_map = { - 1: 1710280889, - 2: 1710280889, - } - buffer.batch_write_and_check_processing( - spans_map=spans_map, - segment_first_seen_ts=timestamp_map, - latest_ts_by_partition=last_seen_map, - ) - - spans_map_2 = { - SegmentKey("segment_1", 1, 1): [b"span data 4", b"span data 5"], - SegmentKey("segment_2", 1, 2): [b"span data 2"], - SegmentKey("segment_3", 1, 1): [b"span data"], - } - timestamp_map_2 = { - SegmentKey("segment_1", 1, 1): 1710280890, - SegmentKey("segment_2", 1, 2): 1710280889, - SegmentKey("segment_3", 1, 1): 1710280891, - } - last_seen_map_2 = { - 1: 1710280891, - 2: 1710280889, - } - - result = buffer.batch_write_and_check_processing( - spans_map=spans_map_2, - segment_first_seen_ts=timestamp_map_2, - latest_ts_by_partition=last_seen_map_2, - ) - - assert result == [ - ProcessSegmentsContext(timestamp=1710280891, partition=1, should_process_segments=True), - ProcessSegmentsContext( - timestamp=1710280889, partition=2, should_process_segments=False - ), - ] - - assert buffer.client.ttl("segment:segment_1:1:process-segment") == 300 - assert buffer.client.lrange( - "performance-issues:unprocessed-segments:partition-2:1", 0, -1 - ) == [ - b"1710280889", - b"segment:segment_1:1:process-segment", - b"1710280891", - b"segment:segment_3:1:process-segment", - ] - assert buffer.read_and_expire_many_segments(["segment:segment_1:1:process-segment"]) == [ - [b"span data", b"span data 2", b"span data 3", b"span data 4", b"span data 5"] - ] - - @django_db_all - def test_get_unprocessed_segments_and_prune_bucket(self): - buffer = RedisSpansBuffer() - spans_map = { - SegmentKey("segment_1", 1, 1): [b"span data"], - SegmentKey("segment_2", 1, 1): [b"span data"], - SegmentKey("segment_3", 1, 1): [b"span data"], - SegmentKey("segment_4", 1, 2): [b"span data"], - } - timestamp_map = { - SegmentKey("segment_1", 1, 1): 1710280890, - SegmentKey("segment_2", 1, 1): 1710280891, - SegmentKey("segment_3", 1, 1): 1710280892, - SegmentKey("segment_4", 1, 2): 1710280893, - } - last_seen_map = { - 1: 1710280893, - } - buffer.batch_write_and_check_processing( - spans_map=spans_map, - segment_first_seen_ts=timestamp_map, - latest_ts_by_partition=last_seen_map, - ) - - assert buffer.client.lrange( - "performance-issues:unprocessed-segments:partition-2:1", 0, -1 - ) == [ - b"1710280890", - b"segment:segment_1:1:process-segment", - b"1710280891", - b"segment:segment_2:1:process-segment", - b"1710280892", - b"segment:segment_3:1:process-segment", - ] - - segment_keys = buffer.get_unprocessed_segments_and_prune_bucket(1710281011, 1) - assert segment_keys == [ - "segment:segment_1:1:process-segment", - "segment:segment_2:1:process-segment", - ] - - assert buffer.client.lrange( - "performance-issues:unprocessed-segments:partition-2:1", 0, -1 - ) == [ - b"1710280892", - b"segment:segment_3:1:process-segment", - ] diff --git a/tests/sentry/spans/consumers/process/__init__.py b/tests/sentry/spans/consumers/process/__init__.py index e69de29bb2d1d6..6e4ac685cc351f 100644 --- a/tests/sentry/spans/consumers/process/__init__.py +++ b/tests/sentry/spans/consumers/process/__init__.py @@ -0,0 +1,34 @@ +def build_mock_span(project_id, span_op=None, **kwargs): + span = { + "description": "OrganizationNPlusOne", + "duration_ms": 107, + "event_id": "61ccae71d70f45bb9b1f2ccb7f7a49ec", + "exclusive_time_ms": 107.359, + "is_segment": True, + "parent_span_id": "b35b839c02985f33", + "profile_id": "dbae2b82559649a1a34a2878134a007b", + "project_id": project_id, + "organization_id": 1, + "received": 1707953019.044972, + "retention_days": 90, + "segment_id": "a49b42af9fb69da0", + "sentry_tags": { + "browser.name": "Google Chrome", + "environment": "development", + "op": span_op or "base.dispatch.sleep", + "release": "backend@24.2.0.dev0+699ce0cd1281cc3c7275d0a474a595375c769ae8", + "transaction": "/api/0/organizations/{organization_id_or_slug}/n-plus-one/", + "transaction.method": "GET", + "transaction.op": "http.server", + "user": "id:1", + "platform": "python", + }, + "span_id": "a49b42af9fb69da0", + "start_timestamp_ms": 1707953018865, + "start_timestamp_precise": 1707953018.865, + "end_timestamp_precise": 1707953018.972, + "trace_id": "94576097f3a64b68b85a59c7d4e3ee2a", + } + + span.update(**kwargs) + return span diff --git a/tests/sentry/spans/consumers/process/test_consumer.py b/tests/sentry/spans/consumers/process/test_consumer.py new file mode 100644 index 00000000000000..06a52f5ea96da2 --- /dev/null +++ b/tests/sentry/spans/consumers/process/test_consumer.py @@ -0,0 +1,83 @@ +from concurrent import futures +from datetime import datetime + +import rapidjson +from arroyo.backends.kafka import KafkaPayload +from arroyo.types import Message, Partition, Topic, Value + +from sentry.spans.consumers.process.factory import ProcessSpansStrategyFactory + + +def test_basic(monkeypatch, request): + monkeypatch.setattr(futures, "wait", lambda _: None) + monkeypatch.setattr("time.sleep", lambda _: None) + + topic = Topic("test") + fac = ProcessSpansStrategyFactory( + max_batch_size=10, + max_batch_time=10, + num_processes=1, + max_flush_segments=10, + input_block_size=None, + output_block_size=None, + ) + + produced_messages = [] + + def produce(produce_topic, message): + produced_messages.append(message) + # The real produce would return a future here, but it doesn't matter + # because we also patched futures.wait + return None + + fac.producer.produce = produce # type:ignore[method-assign] + + commits = [] + + def add_commit(offsets, force=False): + commits.append(offsets) + + step = fac.create_with_partitions(add_commit, {Partition(topic, 0): 0}) + + step.submit( + Message( + Value( + KafkaPayload( + None, + rapidjson.dumps( + { + "project_id": 12, + "span_id": "a" * 16, + "trace_id": "b" * 32, + } + ).encode("ascii"), + [], + ), + {}, + datetime.now(), + ) + ) + ) + + @request.addfinalizer + def _(): + step.join() + fac.shutdown() + + step.poll() + fac._flusher.current_drift = 9000 # "advance" our "clock" + + step.join() + + (msg,) = produced_messages + assert rapidjson.loads(msg.value) == { + "spans": [ + { + "is_segment": True, + "project_id": 12, + "segment_id": "aaaaaaaaaaaaaaaa", + "span_id": "aaaaaaaaaaaaaaaa", + "trace_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + }, + ], + } diff --git a/tests/sentry/spans/consumers/process/test_factory.py b/tests/sentry/spans/consumers/process/test_factory.py deleted file mode 100644 index 0ca47c78b92333..00000000000000 --- a/tests/sentry/spans/consumers/process/test_factory.py +++ /dev/null @@ -1,506 +0,0 @@ -from datetime import datetime, timedelta -from unittest import mock - -from arroyo.backends.kafka import KafkaPayload -from arroyo.types import BrokerValue, Message, Partition -from arroyo.types import Topic as ArroyoTopic - -from sentry.conf.types.kafka_definition import Topic -from sentry.spans.buffer.redis import get_redis_client -from sentry.spans.consumers.process.factory import ( - ProcessSpansStrategyFactory, - batch_write_to_redis, - expand_segments, -) -from sentry.spans.consumers.process_segments.factory import BUFFERED_SEGMENT_SCHEMA -from sentry.testutils.helpers.options import override_options -from sentry.testutils.pytest.fixtures import django_db_all -from sentry.utils import json -from sentry.utils.kafka_config import get_topic_definition - - -def build_mock_span(project_id, span_op=None, **kwargs): - span = { - "description": "OrganizationNPlusOne", - "duration_ms": 107, - "event_id": "61ccae71d70f45bb9b1f2ccb7f7a49ec", - "exclusive_time_ms": 107.359, - "is_segment": True, - "parent_span_id": "b35b839c02985f33", - "profile_id": "dbae2b82559649a1a34a2878134a007b", - "project_id": project_id, - "organization_id": 1, - "received": 1707953019.044972, - "retention_days": 90, - "segment_id": "a49b42af9fb69da0", - "sentry_tags": { - "browser.name": "Google Chrome", - "environment": "development", - "op": span_op or "base.dispatch.sleep", - "release": "backend@24.2.0.dev0+699ce0cd1281cc3c7275d0a474a595375c769ae8", - "transaction": "/api/0/organizations/{organization_id_or_slug}/n-plus-one/", - "transaction.method": "GET", - "transaction.op": "http.server", - "user": "id:1", - "platform": "python", - }, - "span_id": "a49b42af9fb69da0", - "start_timestamp_ms": 1707953018865, - "start_timestamp_precise": 1707953018.865, - "end_timestamp_precise": 1707953018.972, - "trace_id": "94576097f3a64b68b85a59c7d4e3ee2a", - } - - span.update(**kwargs) - return span - - -def build_mock_message(data, topic=None): - message = mock.Mock() - message.value.return_value = json.dumps(data) - if topic: - message.topic.return_value = topic - return message - - -def make_payload(message, partition, offset=1, timestamp=None): - timestamp = timestamp or datetime.now() - return Message( - BrokerValue( - KafkaPayload( - b"key", - message.value().encode("utf-8"), - [ - ("project_id", b"1"), - ], - ), - partition, - offset, - timestamp, - ) - ) - - -def process_spans_strategy(): - return ProcessSpansStrategyFactory( - num_processes=2, - input_block_size=1, - max_batch_size=2, - max_batch_time=1, - output_block_size=1, - ) - - -@django_db_all -@override_options( - { - "standalone-spans.process-spans-consumer.enable": True, - "standalone-spans.process-spans-consumer.project-allowlist": [1], - } -) -def test_consumer_pushes_to_redis(): - redis_client = get_redis_client() - - topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) - partition = Partition(topic, 0) - strategy = process_spans_strategy().create_with_partitions( - commit=mock.Mock(), - partitions={}, - ) - - span_data = build_mock_span(project_id=1, is_segment=True) - message1 = build_mock_message(span_data, topic) - strategy.submit(make_payload(message1, partition)) - - span_data = build_mock_span(project_id=1) - message2 = build_mock_message(span_data, topic) - strategy.submit(make_payload(message2, partition)) - - strategy.poll() - strategy.join(1) - strategy.terminate() - - assert redis_client.lrange("segment:a49b42af9fb69da0:1:process-segment", 0, -1) == [ - message1.value().encode("utf-8"), - message2.value().encode("utf-8"), - ] - - -@django_db_all -@override_options( - { - "standalone-spans.process-spans-consumer.enable": True, - "standalone-spans.process-spans-consumer.project-allowlist": [1], - } -) -def test_produces_valid_segment_to_kafka(): - topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) - partition = Partition(topic, 0) - factory = process_spans_strategy() - with mock.patch.object( - factory, - "producer", - new=mock.Mock(), - ) as mock_producer: - strategy = factory.create_with_partitions( - commit=mock.Mock(), - partitions={}, - ) - - span_data = build_mock_span(project_id=1, is_segment=True) - message1 = build_mock_message(span_data, topic) - strategy.submit(make_payload(message1, partition, 1, datetime.now() - timedelta(minutes=3))) - - span_data = build_mock_span(project_id=1) - message2 = build_mock_message(span_data, topic) - strategy.submit(make_payload(message2, partition)) - - strategy.poll() - strategy.join(1) - strategy.terminate() - - mock_producer.produce.assert_called_once() - decoded_segment = BUFFERED_SEGMENT_SCHEMA.decode( - mock_producer.produce.call_args.args[1].value - ) - assert len(decoded_segment["spans"]) == 2 - assert mock_producer.produce.call_args.args[0] == ArroyoTopic("buffered-segments") - - -@django_db_all -@override_options( - { - "standalone-spans.process-spans-consumer.enable": True, - "standalone-spans.process-spans-consumer.project-allowlist": [1], - } -) -def test_rejects_large_message_size_to_kafka(): - topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) - partition = Partition(topic, 0) - factory = process_spans_strategy() - with mock.patch.object( - factory, - "producer", - new=mock.Mock(), - ) as mock_producer: - strategy = factory.create_with_partitions( - commit=mock.Mock(), - partitions={}, - ) - - span_data = build_mock_span( - project_id=1, is_segment=True, description="a" * 1000 * 1000 * 10 - ) - message1 = build_mock_message(span_data, topic) - strategy.submit(make_payload(message1, partition, 1, datetime.now() - timedelta(minutes=3))) - - span_data = build_mock_span(project_id=1) - message2 = build_mock_message(span_data, topic) - strategy.submit(make_payload(message2, partition)) - - strategy.poll() - strategy.join(1) - strategy.terminate() - - mock_producer.produce.assert_not_called() - - -@override_options( - { - "standalone-spans.process-spans-consumer.enable": False, - "standalone-spans.process-spans-consumer.project-allowlist": [1], - } -) -@mock.patch("sentry.spans.consumers.process.factory.RedisSpansBuffer") -def test_option_disabled(mock_buffer): - topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) - partition = Partition(topic, 0) - mock_commit = mock.Mock() - strategy = process_spans_strategy().create_with_partitions( - commit=mock_commit, - partitions={}, - ) - - span_data = build_mock_span(project_id=1) - message = build_mock_message(span_data, topic) - strategy.submit(make_payload(message, partition)) - - strategy.poll() - strategy.join(1) - strategy.terminate() - mock_buffer.assert_not_called() - - calls = [ - mock.call({partition: 2}), - mock.call({}, force=True), - ] - - mock_commit.assert_has_calls(calls=calls, any_order=True) - - -@django_db_all -@override_options( - { - "standalone-spans.process-spans-consumer.enable": True, - "standalone-spans.process-spans-consumer.project-rollout": 1.0, - } -) -@mock.patch("sentry.spans.consumers.process.factory.RedisSpansBuffer") -def test_option_project_rollout_rate_discard(mock_buffer): - topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) - partition = Partition(topic, 0) - strategy = process_spans_strategy().create_with_partitions( - commit=mock.Mock(), - partitions={}, - ) - - span_data = build_mock_span(project_id=1) - message = build_mock_message(span_data, topic) - strategy.submit(make_payload(message, partition)) - - strategy.poll() - strategy.join(1) - strategy.terminate() - mock_buffer.assert_called() - - -@override_options( - { - "standalone-spans.process-spans-consumer.enable": True, - "standalone-spans.process-spans-consumer.project-allowlist": [2], - } -) -@mock.patch("sentry.spans.consumers.process.factory.RedisSpansBuffer") -def test_option_project_rollout(mock_buffer): - topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) - partition = Partition(topic, 0) - strategy = process_spans_strategy().create_with_partitions( - commit=mock.Mock(), - partitions={}, - ) - - span_data = build_mock_span(project_id=1) - message = build_mock_message(span_data, topic) - strategy.submit(make_payload(message, partition)) - - strategy.poll() - strategy.join(1) - strategy.terminate() - mock_buffer.assert_not_called() - - -@django_db_all -@override_options( - { - "standalone-spans.process-spans-consumer.enable": True, - "standalone-spans.process-spans-consumer.project-allowlist": [1], - } -) -def test_commit_and_produce_with_multiple_partitions(): - topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) - partition_1 = Partition(topic, 0) - partition_2 = Partition(topic, 1) - factory = process_spans_strategy() - mock_commit = mock.Mock() - with mock.patch.object( - factory, - "producer", - new=mock.Mock(), - ) as mock_producer: - strategy = factory.create_with_partitions( - commit=mock_commit, - partitions={}, - ) - - span_data = build_mock_span(project_id=1, is_segment=True) - message1 = build_mock_message(span_data, topic) - - span_data = build_mock_span(project_id=1) - message2 = build_mock_message(span_data, topic) - - offsets = {partition_1: 0, partition_2: 0} - for _ in range(2): - for partition in [partition_1, partition_2]: - offset = offsets[partition] - strategy.submit(make_payload(message1, partition, offset + 1, datetime.now())) - strategy.submit( - make_payload( - message2, partition, offset + 2, datetime.now() + timedelta(minutes=1) - ) - ) - strategy.submit( - make_payload( - message2, partition, offset + 3, datetime.now() + timedelta(minutes=1) - ) - ) - strategy.submit( - make_payload( - message2, partition, offset + 4, datetime.now() + timedelta(minutes=3) - ) - ) - offsets[partition] = offset + 4 - - strategy.poll() - strategy.join(1) - strategy.terminate() - - # max batch size is 2, one commit per batch, 1 during join - calls = [ - mock.call({partition_1: 3}), - mock.call({partition_1: 5}), - mock.call({partition_1: 7}), - mock.call({partition_1: 9}), - mock.call({partition_2: 3}), - mock.call({partition_2: 5}), - mock.call({partition_2: 7}), - mock.call({partition_2: 9}), - mock.call({}, force=True), - ] - - mock_commit.assert_has_calls(calls=calls, any_order=True) - - assert mock_producer.produce.call_count == 4 - BUFFERED_SEGMENT_SCHEMA.decode(mock_producer.produce.call_args.args[1].value) - assert mock_producer.produce.call_args.args[0] == ArroyoTopic("buffered-segments") - - -@django_db_all -@override_options( - { - "standalone-spans.process-spans-consumer.enable": True, - "standalone-spans.process-spans-consumer.project-allowlist": [1], - } -) -def test_with_multiple_partitions(): - redis_client = get_redis_client() - topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) - partition_1 = Partition(topic, 0) - partition_2 = Partition(topic, 1) - - factory = process_spans_strategy() - mock_commit = mock.Mock() - with mock.patch( - "sentry.spans.consumers.process.factory.batch_write_to_redis", - wraps=batch_write_to_redis, - ) as mock_batch_write_and_check_processing: - with mock.patch.object( - factory, - "producer", - new=mock.Mock(), - ) as mock_producer: - strategy = factory.create_with_partitions( - commit=mock_commit, - partitions={}, - ) - - segment_1 = "89225fa064375ee5" - span_data = build_mock_span(project_id=1, segment_id=segment_1) - message1 = build_mock_message(span_data, topic) - - segment_2 = "a96c2bcd49de0c43" - span_data = build_mock_span(project_id=1, segment_id=segment_2) - message2 = build_mock_message(span_data, topic) - - now = datetime.now() - now_plus_one_second = now + timedelta(seconds=1) - - strategy.submit(make_payload(message1, partition_1, 1, now)) - strategy.submit(make_payload(message1, partition_1, 2, now)) - strategy.submit(make_payload(message2, partition_2, 1, now)) - strategy.submit(make_payload(message2, partition_2, 2, now)) - strategy.submit(make_payload(message2, partition_2, 3, now_plus_one_second)) - strategy.poll() - strategy.join(1) - strategy.terminate() - - calls = [ - mock.call({partition_1: 3}), - mock.call({partition_2: 3}), - mock.call({partition_2: 4}), - ] - mock_commit.assert_has_calls(calls=calls, any_order=True) - - context_calls = [ - {partition_1: 3}, - {partition_2: 3}, - {partition_2: 4}, - ] - assert all( - [ - c.args[0].committable in context_calls - for c in mock_batch_write_and_check_processing.mock_calls - ] - ) - assert mock_batch_write_and_check_processing.call_count == 3 - - assert redis_client.lrange("segment:89225fa064375ee5:1:process-segment", 0, -1) == [ - message1.value().encode("utf-8"), - message1.value().encode("utf-8"), - ] - - assert redis_client.lrange("segment:a96c2bcd49de0c43:1:process-segment", 0, -1) == [ - message2.value().encode("utf-8"), - message2.value().encode("utf-8"), - message2.value().encode("utf-8"), - ] - - mock_producer.assert_not_called() - - -@django_db_all -@override_options( - { - "standalone-spans.process-spans-consumer.enable": True, - "standalone-spans.process-spans-consumer.project-allowlist": [1], - } -) -def test_with_expand_segment(): - redis_client = get_redis_client() - topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) - partition_1 = Partition(topic, 0) - partition_2 = Partition(topic, 1) - - factory = process_spans_strategy() - mock_commit = mock.Mock() - with mock.patch( - "sentry.spans.consumers.process.factory.expand_segments", wraps=expand_segments - ) as mock_expand_segments: - strategy = factory.create_with_partitions( - commit=mock_commit, - partitions={}, - ) - - segment_1 = "89225fa064375ee5" - span_data = build_mock_span(project_id=1, segment_id=segment_1) - message1 = build_mock_message(span_data, topic) - - segment_2 = "a96c2bcd49de0c43" - span_data = build_mock_span(project_id=1, segment_id=segment_2) - message2 = build_mock_message(span_data, topic) - - now = datetime.now() - now_plus_120_seconds = now + timedelta(seconds=120) - - strategy.submit(make_payload(message1, partition_1, 1, now)) - strategy.submit(make_payload(message1, partition_1, 2, now)) - strategy.submit(make_payload(message2, partition_2, 1, now)) - strategy.submit(make_payload(message2, partition_2, 2, now)) - strategy.submit(make_payload(message2, partition_2, 3, now_plus_120_seconds)) - strategy.poll() - strategy.join() - strategy.terminate() - - calls = [ - mock.call({partition_1: 3}), - mock.call({partition_2: 3}), - mock.call({partition_2: 4}), - ] - mock_commit.assert_has_calls(calls=calls, any_order=True) - - assert mock_expand_segments.call_count == 3 - assert redis_client.lrange("segment:89225fa064375ee5:1:process-segment", 0, -1) == [ - message1.value().encode("utf-8"), - message1.value().encode("utf-8"), - ] - - assert redis_client.ttl("segment:a96c2bcd49de0c43:1:process-segment") == -2 diff --git a/tests/sentry/spans/consumers/process_segments/test_factory.py b/tests/sentry/spans/consumers/process_segments/test_factory.py index 8e2202780fe8f6..d8b42b2630093e 100644 --- a/tests/sentry/spans/consumers/process_segments/test_factory.py +++ b/tests/sentry/spans/consumers/process_segments/test_factory.py @@ -12,7 +12,7 @@ from sentry.testutils.helpers.options import override_options from sentry.utils import json from sentry.utils.kafka_config import get_topic_definition -from tests.sentry.spans.consumers.process.test_factory import build_mock_span +from tests.sentry.spans.consumers.process import build_mock_span SNUBA_SPANS_CODEC: Codec[SpanEvent] = get_topic_codec(Topic.SNUBA_SPANS) diff --git a/tests/sentry/spans/consumers/process_segments/test_message.py b/tests/sentry/spans/consumers/process_segments/test_message.py index e394307d8a1bf1..6c3a66ddf2e9a3 100644 --- a/tests/sentry/spans/consumers/process_segments/test_message.py +++ b/tests/sentry/spans/consumers/process_segments/test_message.py @@ -8,7 +8,7 @@ from sentry.spans.consumers.process_segments.message import process_segment from sentry.testutils.cases import TestCase from sentry.testutils.helpers.options import override_options -from tests.sentry.spans.consumers.process.test_factory import build_mock_span +from tests.sentry.spans.consumers.process import build_mock_span class TestSpansTask(TestCase): diff --git a/tests/sentry/spans/test_buffer.py b/tests/sentry/spans/test_buffer.py new file mode 100644 index 00000000000000..aa7d863a412f78 --- /dev/null +++ b/tests/sentry/spans/test_buffer.py @@ -0,0 +1,401 @@ +from __future__ import annotations + +import itertools + +import pytest +import rapidjson +from sentry_redis_tools.clients import StrictRedis + +from sentry.spans.buffer import OutputSpan, SegmentId, Span, SpansBuffer + + +def shallow_permutations(spans: list[Span]) -> list[list[Span]]: + return [ + spans, + list(reversed(spans)), + ] + + +def _segment_id(project_id: int, trace_id: str, span_id: str) -> SegmentId: + return f"span-buf:s:{{{project_id}:{trace_id}}}:{span_id}".encode("ascii") + + +def _payload(span_id: bytes) -> bytes: + return rapidjson.dumps({"span_id": span_id}).encode("ascii") + + +def _output_segment(span_id: bytes, segment_id: bytes, is_segment: bool) -> OutputSpan: + return OutputSpan( + payload={ + "span_id": span_id.decode("ascii"), + "segment_id": segment_id.decode("ascii"), + "is_segment": is_segment, + } + ) + + +def _normalize_output(output: dict[SegmentId, list[OutputSpan]]): + for segment in output.values(): + segment.sort(key=lambda span: span.payload["span_id"]) + + +@pytest.fixture(params=["cluster", "single"]) +def buffer(request): + if request.param == "cluster": + from sentry.testutils.helpers.redis import use_redis_cluster + + with use_redis_cluster("default"): + buf = SpansBuffer(assigned_shards=list(range(32))) + # since we patch the default redis cluster only temporarily, we + # need to clean it up ourselves. + buf.client.flushall() + yield buf + else: + yield SpansBuffer(assigned_shards=list(range(32))) + + +def assert_ttls(client: StrictRedis[bytes]): + """ + Check that all keys have a TTL, because if the consumer dies before + flushing, we should not leak memory. + """ + + for k in client.keys("*"): + assert client.ttl(k) > -1, k + + +def assert_clean(client: StrictRedis[bytes]): + """ + Check that there's no leakage. + + Note: CANNOT be done in pytest fixture as that one runs _after_ redis gets + wiped by the test harness. + """ + assert not [x for x in client.keys("*") if b":hrs:" not in x] + + +@pytest.mark.parametrize( + "spans", + list( + itertools.permutations( + [ + Span( + payload=_payload(b"a" * 16), + trace_id="a" * 32, + span_id="a" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"d" * 16), + trace_id="a" * 32, + span_id="d" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"c" * 16), + trace_id="a" * 32, + span_id="c" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"b" * 16), + trace_id="a" * 32, + span_id="b" * 16, + parent_span_id=None, + is_segment_span=True, + project_id=1, + ), + ] + ) + ), +) +def test_basic(buffer: SpansBuffer, spans): + buffer.process_spans(spans, now=0) + + assert_ttls(buffer.client) + + assert buffer.flush_segments(now=5) == (1, {}) + _, rv = buffer.flush_segments(now=11) + _normalize_output(rv) + assert rv == { + _segment_id(1, "a" * 32, "b" * 16): [ + _output_segment(b"a" * 16, b"b" * 16, False), + _output_segment(b"b" * 16, b"b" * 16, True), + _output_segment(b"c" * 16, b"b" * 16, False), + _output_segment(b"d" * 16, b"b" * 16, False), + ] + } + buffer.done_flush_segments(rv) + assert buffer.flush_segments(now=30) == (0, {}) + + assert_clean(buffer.client) + + +@pytest.mark.parametrize( + "spans", + list( + itertools.permutations( + [ + Span( + payload=_payload(b"d" * 16), + trace_id="a" * 32, + span_id="d" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"b" * 16), + trace_id="a" * 32, + span_id="b" * 16, + parent_span_id="a" * 16, + project_id=1, + ), + Span( + payload=_payload(b"a" * 16), + trace_id="a" * 32, + span_id="a" * 16, + parent_span_id=None, + is_segment_span=True, + project_id=1, + ), + Span( + payload=_payload(b"c" * 16), + trace_id="a" * 32, + span_id="c" * 16, + parent_span_id="a" * 16, + project_id=1, + ), + ] + ) + ), +) +def test_deep(buffer: SpansBuffer, spans): + buffer.process_spans(spans, now=0) + + assert_ttls(buffer.client) + + _, rv = buffer.flush_segments(now=10) + _normalize_output(rv) + assert rv == { + _segment_id(1, "a" * 32, "a" * 16): [ + _output_segment(b"a" * 16, b"a" * 16, True), + _output_segment(b"b" * 16, b"a" * 16, False), + _output_segment(b"c" * 16, b"a" * 16, False), + _output_segment(b"d" * 16, b"a" * 16, False), + ] + } + + buffer.done_flush_segments(rv) + + _, rv = buffer.flush_segments(now=60) + assert rv == {} + + assert_clean(buffer.client) + + +@pytest.mark.parametrize( + "spans", + list( + itertools.permutations( + [ + Span( + payload=_payload(b"e" * 16), + trace_id="a" * 32, + span_id="e" * 16, + parent_span_id="d" * 16, + project_id=1, + ), + Span( + payload=_payload(b"d" * 16), + trace_id="a" * 32, + span_id="d" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"b" * 16), + trace_id="a" * 32, + span_id="b" * 16, + parent_span_id="c" * 16, + project_id=1, + ), + Span( + payload=_payload(b"c" * 16), + trace_id="a" * 32, + span_id="c" * 16, + parent_span_id="a" * 16, + project_id=1, + ), + Span( + payload=_payload(b"a" * 16), + trace_id="a" * 32, + span_id="a" * 16, + parent_span_id=None, + is_segment_span=True, + project_id=1, + ), + ] + ) + ), +) +def test_deep2(buffer: SpansBuffer, spans): + buffer.process_spans(spans, now=0) + + assert_ttls(buffer.client) + + _, rv = buffer.flush_segments(now=10) + _normalize_output(rv) + assert rv == { + _segment_id(1, "a" * 32, "a" * 16): [ + _output_segment(b"a" * 16, b"a" * 16, True), + _output_segment(b"b" * 16, b"a" * 16, False), + _output_segment(b"c" * 16, b"a" * 16, False), + _output_segment(b"d" * 16, b"a" * 16, False), + _output_segment(b"e" * 16, b"a" * 16, False), + ] + } + + buffer.done_flush_segments(rv) + + _, rv = buffer.flush_segments(now=60) + assert rv == {} + + assert_clean(buffer.client) + + +@pytest.mark.parametrize( + "spans", + list( + itertools.permutations( + [ + Span( + payload=_payload(b"c" * 16), + trace_id="a" * 32, + span_id="c" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"d" * 16), + trace_id="a" * 32, + span_id="d" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"e" * 16), + trace_id="a" * 32, + span_id="e" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"b" * 16), + trace_id="a" * 32, + span_id="b" * 16, + parent_span_id=None, + is_segment_span=True, + project_id=2, + ), + ] + ) + ), +) +def test_parent_in_other_project(buffer: SpansBuffer, spans): + buffer.process_spans(spans, now=0) + + assert_ttls(buffer.client) + + assert buffer.flush_segments(now=5) == (2, {}) + _, rv = buffer.flush_segments(now=11) + assert rv == {_segment_id(2, "a" * 32, "b" * 16): [_output_segment(b"b" * 16, b"b" * 16, True)]} + buffer.done_flush_segments(rv) + + # TODO: flush faster, since we already saw parent in other project + assert buffer.flush_segments(now=30) == (1, {}) + _, rv = buffer.flush_segments(now=60) + _normalize_output(rv) + assert rv == { + _segment_id(1, "a" * 32, "b" * 16): [ + _output_segment(b"c" * 16, b"b" * 16, False), + _output_segment(b"d" * 16, b"b" * 16, False), + _output_segment(b"e" * 16, b"b" * 16, False), + ] + } + buffer.done_flush_segments(rv) + + assert buffer.flush_segments(now=90) == (0, {}) + + assert_clean(buffer.client) + + +@pytest.mark.parametrize( + "spans", + shallow_permutations( + [ + Span( + payload=_payload(b"c" * 16), + trace_id="a" * 32, + span_id="c" * 16, + parent_span_id="d" * 16, + project_id=1, + is_segment_span=True, + ), + Span( + payload=_payload(b"d" * 16), + trace_id="a" * 32, + span_id="d" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"e" * 16), + trace_id="a" * 32, + span_id="e" * 16, + parent_span_id="b" * 16, + project_id=1, + ), + Span( + payload=_payload(b"b" * 16), + trace_id="a" * 32, + span_id="b" * 16, + parent_span_id=None, + is_segment_span=True, + project_id=2, + ), + ] + ), +) +def test_parent_in_other_project_and_nested_is_segment_span(buffer: SpansBuffer, spans): + buffer.process_spans(spans, now=0) + + assert_ttls(buffer.client) + + assert buffer.flush_segments(now=5) == (3, {}) + _, rv = buffer.flush_segments(now=11) + assert rv == { + _segment_id(2, "a" * 32, "b" * 16): [_output_segment(b"b" * 16, b"b" * 16, True)], + _segment_id(1, "a" * 32, "c" * 16): [ + _output_segment(b"c" * 16, b"c" * 16, True), + ], + } + buffer.done_flush_segments(rv) + + # TODO: flush faster, since we already saw parent in other project + assert buffer.flush_segments(now=30) == (1, {}) + _, rv = buffer.flush_segments(now=60) + _normalize_output(rv) + assert rv == { + _segment_id(1, "a" * 32, "b" * 16): [ + _output_segment(b"d" * 16, b"b" * 16, False), + _output_segment(b"e" * 16, b"b" * 16, False), + ], + } + buffer.done_flush_segments(rv) + + assert buffer.flush_segments(now=90) == (0, {}) + + assert_clean(buffer.client)