Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor_sampling: new trace sampling processor #10029

Merged
merged 24 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0c51815
lib: ctraces: upgrade to v0.6.2
edsiper Mar 23, 2025
cd34324
processor: extend processors trace callback to allow an output context
edsiper Mar 10, 2025
2967c0e
input_trace: adjust api usage for ctrace output context
edsiper Mar 10, 2025
83f302f
processor_content_modifier: traces: adjust API usage for new ctraces …
edsiper Mar 10, 2025
d057738
processor: remove unused code
edsiper Mar 14, 2025
2c92938
input_trace: own management of ctrace context on success
edsiper Mar 14, 2025
204760f
in_event_type: traces: do not remove context on success
edsiper Mar 14, 2025
ec8987d
in_opentelemetry: traces: destroy context only on error
edsiper Mar 14, 2025
4ad394c
input_trace: fix logic to encode processed context
edsiper Mar 19, 2025
bb2a3b3
processor_sampling: new trace sampling processor
edsiper Feb 28, 2025
7b6a5af
processor_sampling: add a dummy structure field (windows...)
edsiper Mar 1, 2025
298b722
processor_sampling: adjust to new trace processor API
edsiper Mar 10, 2025
0063f64
processor_sampling: add support for Tail Sampling and code conditionals
edsiper Mar 19, 2025
f75dcd6
processor_sampling: conditionals: add new 'latency' conditional
edsiper Mar 19, 2025
bdead40
processor_sampling: conditionals: add new 'string_attribute' conditional
edsiper Mar 19, 2025
5dc21d0
processor_sampling: conditionals: add 'match_type' property
edsiper Mar 20, 2025
12a5724
processor_sampling: conditionals: add 'regex' match_type support
edsiper Mar 20, 2025
37f60ed
processor_sampling: conditionals: add new 'numeric_attribute' conditi…
edsiper Mar 20, 2025
d153cd8
processor_sampling: conditionals: add 'boolean_attribute' conditional
edsiper Mar 20, 2025
9493698
processor_sampling: add new 'span_count' conditional
edsiper Mar 20, 2025
c8793d0
processor_sampling: add new conditional 'trace_state'
edsiper Mar 21, 2025
7eb0c13
processor_sampling: tail: add 'max_traces' configuration option
edsiper Mar 21, 2025
9f75018
network: add new flb_net_htonll wrapper
edsiper Mar 22, 2025
a964675
processor_sampling: use new flb_net_htonll() function
edsiper Mar 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmake/plugins_options.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ DEFINE_OPTION(FLB_IN_EBPF "Enable Linux eBPF input plugin"
DEFINE_OPTION(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_OPENTELEMETRY_ENVELOPE "Enable OpenTelemetry envelope processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_SQL "Enable SQL processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_OPENTELEMETRY_ENVELOPE "Enable OpenTelemetry envelope processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_SAMPLING "Enable sampling processor" ON)

# Filters
# =======
Expand Down
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_network.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,6 @@ int flb_net_socket_peer_info(flb_sockfd_t fd,

size_t flb_network_address_size(struct sockaddr_storage *address);

uint64_t flb_net_htonll(uint64_t value);

#endif
24 changes: 18 additions & 6 deletions include/fluent-bit/flb_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ struct flb_processor_plugin {

int (*cb_process_traces) (struct flb_processor_instance *,
struct ctrace *,
struct ctrace **,
const char *,
int);

Expand All @@ -179,6 +180,7 @@ struct flb_processor_instance {
char *alias; /* alias name */
void *context; /* Instance local context */
void *data;
struct flb_processor_unit *pu; /* processor unit linked to */
struct flb_processor_plugin *p; /* original plugin */
struct mk_list properties; /* config properties */
struct mk_list *config_map; /* configuration map */
Expand Down Expand Up @@ -228,12 +230,10 @@ int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k
int flb_processors_load_from_config_format_group(struct flb_processor *proc, struct flb_cf_group *g);

/* Processor plugin instance */

struct flb_processor_instance *flb_processor_instance_create(
struct flb_config *config,
int event_type,
const char *name,
void *data);
struct flb_processor_instance *flb_processor_instance_create(struct flb_config *config,
struct flb_processor_unit *pu,
int event_type,
const char *name, void *data);

void flb_processor_instance_destroy(
struct flb_processor_instance *ins);
Expand Down Expand Up @@ -274,4 +274,16 @@ static inline int flb_processor_instance_config_map_set(
return flb_config_map_set(&ins->properties, ins->config_map, context);
}

static inline
struct flb_input_instance *flb_processor_get_input_instance(struct flb_processor_unit *pu)
{
struct flb_processor *processor;
struct flb_input_instance *ins;

processor = (struct flb_processor *) pu->parent;
ins = (struct flb_input_instance *) processor->data;

return ins;
}

#endif
2 changes: 1 addition & 1 deletion lib/ctraces/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ endif()
# CTraces Version
set(CTR_VERSION_MAJOR 0)
set(CTR_VERSION_MINOR 6)
set(CTR_VERSION_PATCH 1)
set(CTR_VERSION_PATCH 2)
set(CTR_VERSION_STR "${CTR_VERSION_MAJOR}.${CTR_VERSION_MINOR}.${CTR_VERSION_PATCH}")

# Define __FILENAME__ consistently across Operating Systems
Expand Down
7 changes: 6 additions & 1 deletion lib/ctraces/include/ctraces/ctr_scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ struct ctrace_scope_span {
struct ctrace_instrumentation_scope *instrumentation_scope;
struct cfl_list spans;
cfl_sds_t schema_url;
struct cfl_list _head; /* link to ctrace_resource_span->scope_spans list */

/* parent resource span */
struct ctrace_resource_span *resource_span;

/* link to ctrace_resource_span->scope_spans list */
struct cfl_list _head;
};

/* scope span */
Expand Down
1 change: 1 addition & 0 deletions lib/ctraces/src/ctr_scope.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct ctrace_scope_span *ctr_scope_span_create(struct ctrace_resource_span *res
}
cfl_list_init(&scope_span->spans);
cfl_list_add(&scope_span->_head, &resource_span->scope_spans);
scope_span->resource_span = resource_span;

return scope_span;
}
Expand Down
3 changes: 2 additions & 1 deletion plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,9 @@ REGISTER_IN_PLUGIN("in_random")
REGISTER_PROCESSOR_PLUGIN("processor_content_modifier")
REGISTER_PROCESSOR_PLUGIN("processor_labels")
REGISTER_PROCESSOR_PLUGIN("processor_metrics_selector")
REGISTER_PROCESSOR_PLUGIN("processor_sql")
REGISTER_PROCESSOR_PLUGIN("processor_opentelemetry_envelope")
REGISTER_PROCESSOR_PLUGIN("processor_sampling")
REGISTER_PROCESSOR_PLUGIN("processor_sql")

# OUTPUTS
# =======
Expand Down
7 changes: 4 additions & 3 deletions plugins/in_event_type/event_type.c
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,10 @@ static int send_traces(struct flb_input_instance *ins)
ctr_id_destroy(trace_id);

ret = flb_input_trace_append(ins, NULL, 0, ctx);

/* destroy the context */
ctr_destroy(ctx);
if (ret == -1) {
/* destroy the context */
ctr_destroy(ctx);
}

/* exit options (it release resources allocated) */
ctr_opts_exit(&opts);
Expand Down
8 changes: 6 additions & 2 deletions plugins/in_opentelemetry/opentelemetry_traces.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ int opentelemetry_traces_process_protobuf(struct flb_opentelemetry *ctx,
&offset);
if (result == 0) {
result = flb_input_trace_append(ctx->ins, tag, tag_len, decoded_context);
ctr_decode_opentelemetry_destroy(decoded_context);
if (result == -1) {
ctr_destroy(decoded_context);
}
}

return result;
Expand Down Expand Up @@ -1106,7 +1108,9 @@ static int process_json(struct flb_opentelemetry *ctx,
ctr = process_root_msgpack(ctx, &unpacked_root.data);
if (ctr) {
result = flb_input_trace_append(ctx->ins, tag, tag_len, ctr);
ctr_destroy(ctr);
if (result == -1) {
ctr_destroy(ctr);
}
}

msgpack_unpacked_destroy(&unpacked_root);
Expand Down
5 changes: 3 additions & 2 deletions plugins/processor_content_modifier/cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ static int cb_process_logs(struct flb_processor_instance *ins,
}

static int cb_process_traces(struct flb_processor_instance *ins,
struct ctrace *traces_context,
struct ctrace *in_ctr,
struct ctrace **out_ctr,
const char *tag,
int tag_len)
{
Expand All @@ -93,7 +94,7 @@ static int cb_process_traces(struct flb_processor_instance *ins,
}
ctx = ins->context;

ret = cm_traces_process(ins, ctx, traces_context, tag, tag_len);
ret = cm_traces_process(ins, ctx, in_ctr, out_ctr, tag, tag_len);
return ret;

}
Expand Down
1 change: 1 addition & 0 deletions plugins/processor_content_modifier/cm.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ int cm_logs_process(struct flb_processor_instance *ins,
int cm_traces_process(struct flb_processor_instance *ins,
struct content_modifier_ctx *ctx,
struct ctrace *traces_context,
struct ctrace **out_traces_context,
const char *tag, int tag_len);

int cm_metrics_process(struct flb_processor_instance *ins,
Expand Down
3 changes: 3 additions & 0 deletions plugins/processor_content_modifier/cm_traces.c
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ static int traces_hash_attributes(struct content_modifier_ctx *ctx, struct ctrac
int cm_traces_process(struct flb_processor_instance *ins,
struct content_modifier_ctx *ctx,
struct ctrace *traces_context,
struct ctrace **out_traces_context,
const char *tag, int tag_len)
{
int ret = -1;
Expand Down Expand Up @@ -587,6 +588,8 @@ int cm_traces_process(struct flb_processor_instance *ins,
ret = traces_convert_attributes(ctx, traces_context, ctx->key, ctx->converted_type);
}

*out_traces_context = traces_context;

if (ret != 0) {
return FLB_PROCESSOR_FAILURE;
}
Expand Down
23 changes: 23 additions & 0 deletions plugins/processor_sampling/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
set(src
sampling.c
sampling_conf.c
sampling_span_registry.c

# conditions
sampling_conditions.c
sampling_cond_status_codes.c
sampling_cond_latency.c
sampling_cond_attribute.c
sampling_cond_string_attribute.c
sampling_cond_numeric_attribute.c
sampling_cond_boolean_attribute.c
sampling_cond_span_count.c
sampling_cond_trace_state.c

# types of sampling
#sampling_test.c
sampling_tail.c
sampling_probabilistic.c
)

FLB_PLUGIN(processor_sampling "${src}" "")
Loading
Loading