diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index b104ebd1f00..98a555737fb 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -69,8 +69,9 @@ DEFINE_OPTION(FLB_IN_EBPF "Enable Linux eBPF input plugin" DEFINE_OPTION(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" ON) DEFINE_OPTION(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" ON) DEFINE_OPTION(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" ON) +DEFINE_OPTION(FLB_PROCESSOR_OPENTELEMETRY_ENVELOPE "Enable OpenTelemetry envelope processor" ON) DEFINE_OPTION(FLB_PROCESSOR_SQL "Enable SQL processor" ON) -DEFINE_OPTION(FLB_PROCESSOR_OPENTELEMETRY_ENVELOPE "Enable OpenTelemetry envelope processor" ON) +DEFINE_OPTION(FLB_PROCESSOR_SAMPLING "Enable sampling processor" ON) # Filters # ======= diff --git a/include/fluent-bit/flb_network.h b/include/fluent-bit/flb_network.h index 9569ba22ed3..b002d3862c9 100644 --- a/include/fluent-bit/flb_network.h +++ b/include/fluent-bit/flb_network.h @@ -228,4 +228,6 @@ int flb_net_socket_peer_info(flb_sockfd_t fd, size_t flb_network_address_size(struct sockaddr_storage *address); +uint64_t flb_net_htonll(uint64_t value); + #endif diff --git a/include/fluent-bit/flb_processor.h b/include/fluent-bit/flb_processor.h index 5781d41e47e..6ff7d746060 100644 --- a/include/fluent-bit/flb_processor.h +++ b/include/fluent-bit/flb_processor.h @@ -155,6 +155,7 @@ struct flb_processor_plugin { int (*cb_process_traces) (struct flb_processor_instance *, struct ctrace *, + struct ctrace **, const char *, int); @@ -179,6 +180,7 @@ struct flb_processor_instance { char *alias; /* alias name */ void *context; /* Instance local context */ void *data; + struct flb_processor_unit *pu; /* processor unit linked to */ struct flb_processor_plugin *p; /* original plugin */ struct mk_list properties; /* config properties */ struct mk_list *config_map; /* configuration map */ @@ -228,12 +230,10 @@ int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k int flb_processors_load_from_config_format_group(struct flb_processor *proc, struct flb_cf_group *g); /* Processor plugin instance */ - -struct flb_processor_instance *flb_processor_instance_create( - struct flb_config *config, - int event_type, - const char *name, - void *data); +struct flb_processor_instance *flb_processor_instance_create(struct flb_config *config, + struct flb_processor_unit *pu, + int event_type, + const char *name, void *data); void flb_processor_instance_destroy( struct flb_processor_instance *ins); @@ -274,4 +274,16 @@ static inline int flb_processor_instance_config_map_set( return flb_config_map_set(&ins->properties, ins->config_map, context); } +static inline +struct flb_input_instance *flb_processor_get_input_instance(struct flb_processor_unit *pu) +{ + struct flb_processor *processor; + struct flb_input_instance *ins; + + processor = (struct flb_processor *) pu->parent; + ins = (struct flb_input_instance *) processor->data; + + return ins; +} + #endif diff --git a/lib/ctraces/CMakeLists.txt b/lib/ctraces/CMakeLists.txt index ffe0589f05d..e692e386b53 100644 --- a/lib/ctraces/CMakeLists.txt +++ b/lib/ctraces/CMakeLists.txt @@ -27,7 +27,7 @@ endif() # CTraces Version set(CTR_VERSION_MAJOR 0) set(CTR_VERSION_MINOR 6) -set(CTR_VERSION_PATCH 1) +set(CTR_VERSION_PATCH 2) set(CTR_VERSION_STR "${CTR_VERSION_MAJOR}.${CTR_VERSION_MINOR}.${CTR_VERSION_PATCH}") # Define __FILENAME__ consistently across Operating Systems diff --git a/lib/ctraces/include/ctraces/ctr_scope.h b/lib/ctraces/include/ctraces/ctr_scope.h index 98faca943d3..7d6ae240f84 100644 --- a/lib/ctraces/include/ctraces/ctr_scope.h +++ b/lib/ctraces/include/ctraces/ctr_scope.h @@ -34,7 +34,12 @@ struct ctrace_scope_span { struct ctrace_instrumentation_scope *instrumentation_scope; struct cfl_list spans; cfl_sds_t schema_url; - struct cfl_list _head; /* link to ctrace_resource_span->scope_spans list */ + + /* parent resource span */ + struct ctrace_resource_span *resource_span; + + /* link to ctrace_resource_span->scope_spans list */ + struct cfl_list _head; }; /* scope span */ diff --git a/lib/ctraces/src/ctr_scope.c b/lib/ctraces/src/ctr_scope.c index fcb83435a95..2470eae41c8 100644 --- a/lib/ctraces/src/ctr_scope.c +++ b/lib/ctraces/src/ctr_scope.c @@ -30,6 +30,7 @@ struct ctrace_scope_span *ctr_scope_span_create(struct ctrace_resource_span *res } cfl_list_init(&scope_span->spans); cfl_list_add(&scope_span->_head, &resource_span->scope_spans); + scope_span->resource_span = resource_span; return scope_span; } diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index deb24958038..cb7e848f1a0 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -288,8 +288,9 @@ REGISTER_IN_PLUGIN("in_random") REGISTER_PROCESSOR_PLUGIN("processor_content_modifier") REGISTER_PROCESSOR_PLUGIN("processor_labels") REGISTER_PROCESSOR_PLUGIN("processor_metrics_selector") -REGISTER_PROCESSOR_PLUGIN("processor_sql") REGISTER_PROCESSOR_PLUGIN("processor_opentelemetry_envelope") +REGISTER_PROCESSOR_PLUGIN("processor_sampling") +REGISTER_PROCESSOR_PLUGIN("processor_sql") # OUTPUTS # ======= diff --git a/plugins/in_event_type/event_type.c b/plugins/in_event_type/event_type.c index aad5acccb11..915ab82af68 100644 --- a/plugins/in_event_type/event_type.c +++ b/plugins/in_event_type/event_type.c @@ -358,9 +358,10 @@ static int send_traces(struct flb_input_instance *ins) ctr_id_destroy(trace_id); ret = flb_input_trace_append(ins, NULL, 0, ctx); - - /* destroy the context */ - ctr_destroy(ctx); + if (ret == -1) { + /* destroy the context */ + ctr_destroy(ctx); + } /* exit options (it release resources allocated) */ ctr_opts_exit(&opts); diff --git a/plugins/in_opentelemetry/opentelemetry_traces.c b/plugins/in_opentelemetry/opentelemetry_traces.c index a9f5b2863a1..c3fca1dc6a3 100644 --- a/plugins/in_opentelemetry/opentelemetry_traces.c +++ b/plugins/in_opentelemetry/opentelemetry_traces.c @@ -43,7 +43,9 @@ int opentelemetry_traces_process_protobuf(struct flb_opentelemetry *ctx, &offset); if (result == 0) { result = flb_input_trace_append(ctx->ins, tag, tag_len, decoded_context); - ctr_decode_opentelemetry_destroy(decoded_context); + if (result == -1) { + ctr_destroy(decoded_context); + } } return result; @@ -1106,7 +1108,9 @@ static int process_json(struct flb_opentelemetry *ctx, ctr = process_root_msgpack(ctx, &unpacked_root.data); if (ctr) { result = flb_input_trace_append(ctx->ins, tag, tag_len, ctr); - ctr_destroy(ctr); + if (result == -1) { + ctr_destroy(ctr); + } } msgpack_unpacked_destroy(&unpacked_root); diff --git a/plugins/processor_content_modifier/cm.c b/plugins/processor_content_modifier/cm.c index 4c56d659ff6..097fd2dc0c7 100644 --- a/plugins/processor_content_modifier/cm.c +++ b/plugins/processor_content_modifier/cm.c @@ -81,7 +81,8 @@ static int cb_process_logs(struct flb_processor_instance *ins, } static int cb_process_traces(struct flb_processor_instance *ins, - struct ctrace *traces_context, + struct ctrace *in_ctr, + struct ctrace **out_ctr, const char *tag, int tag_len) { @@ -93,7 +94,7 @@ static int cb_process_traces(struct flb_processor_instance *ins, } ctx = ins->context; - ret = cm_traces_process(ins, ctx, traces_context, tag, tag_len); + ret = cm_traces_process(ins, ctx, in_ctr, out_ctr, tag, tag_len); return ret; } diff --git a/plugins/processor_content_modifier/cm.h b/plugins/processor_content_modifier/cm.h index 181872b0754..915528e65f2 100644 --- a/plugins/processor_content_modifier/cm.h +++ b/plugins/processor_content_modifier/cm.h @@ -114,6 +114,7 @@ int cm_logs_process(struct flb_processor_instance *ins, int cm_traces_process(struct flb_processor_instance *ins, struct content_modifier_ctx *ctx, struct ctrace *traces_context, + struct ctrace **out_traces_context, const char *tag, int tag_len); int cm_metrics_process(struct flb_processor_instance *ins, diff --git a/plugins/processor_content_modifier/cm_traces.c b/plugins/processor_content_modifier/cm_traces.c index 54d4469bcb6..fb58a18909d 100644 --- a/plugins/processor_content_modifier/cm_traces.c +++ b/plugins/processor_content_modifier/cm_traces.c @@ -560,6 +560,7 @@ static int traces_hash_attributes(struct content_modifier_ctx *ctx, struct ctrac int cm_traces_process(struct flb_processor_instance *ins, struct content_modifier_ctx *ctx, struct ctrace *traces_context, + struct ctrace **out_traces_context, const char *tag, int tag_len) { int ret = -1; @@ -587,6 +588,8 @@ int cm_traces_process(struct flb_processor_instance *ins, ret = traces_convert_attributes(ctx, traces_context, ctx->key, ctx->converted_type); } + *out_traces_context = traces_context; + if (ret != 0) { return FLB_PROCESSOR_FAILURE; } diff --git a/plugins/processor_sampling/CMakeLists.txt b/plugins/processor_sampling/CMakeLists.txt new file mode 100644 index 00000000000..8bdd3a7298c --- /dev/null +++ b/plugins/processor_sampling/CMakeLists.txt @@ -0,0 +1,23 @@ +set(src + sampling.c + sampling_conf.c + sampling_span_registry.c + + # conditions + sampling_conditions.c + sampling_cond_status_codes.c + sampling_cond_latency.c + sampling_cond_attribute.c + sampling_cond_string_attribute.c + sampling_cond_numeric_attribute.c + sampling_cond_boolean_attribute.c + sampling_cond_span_count.c + sampling_cond_trace_state.c + + # types of sampling + #sampling_test.c + sampling_tail.c + sampling_probabilistic.c +) + +FLB_PLUGIN(processor_sampling "${src}" "") diff --git a/plugins/processor_sampling/sampling.c b/plugins/processor_sampling/sampling.c new file mode 100644 index 00000000000..a1897b7a2bd --- /dev/null +++ b/plugins/processor_sampling/sampling.c @@ -0,0 +1,208 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "sampling.h" +#include "sampling_span_registry.h" + +static int clean_empty_resource_spans(struct ctrace *ctr) +{ + int count = 0; + struct cfl_list *head; + struct cfl_list *head_scope_span; + struct cfl_list *tmp; + struct cfl_list *tmp_scope_span; + struct ctrace_resource_span *resource_span; + struct ctrace_scope_span *scope_span; + + cfl_list_foreach_safe(head, tmp, &ctr->resource_spans) { + resource_span = cfl_list_entry(head, struct ctrace_resource_span, _head); + + /* iterate scope spans */ + cfl_list_foreach_safe(head_scope_span, tmp_scope_span, &resource_span->scope_spans) { + scope_span = cfl_list_entry(head_scope_span, struct ctrace_scope_span, _head); + if (cfl_list_is_empty(&scope_span->spans)) { + ctr_scope_span_destroy(scope_span); + } + } + + /* check if resource span is now empty */ + if (cfl_list_is_empty(&resource_span->scope_spans)) { + cfl_list_del(&resource_span->_head); + ctr_resource_span_destroy(resource_span); + count++; + } + } + + return count; +} + +static void debug_trace(struct sampling *ctx, struct ctrace *ctr, int is_before) +{ + char tmp[128]; + struct sampling_span_registry *reg = NULL; + + reg = sampling_span_registry_create(100); + if (!reg) { + return; + } + + sampling_span_registry_add_trace(ctx, reg, ctr); + if (is_before) { + snprintf(tmp, sizeof(tmp) - 1, "Debug sampling '%s' (%p): before", ctx->type_str, ctr); + sampling_span_registry_print(ctx, reg, tmp); + } + else { + snprintf(tmp, sizeof(tmp) - 1, "Debug sampling '%s' (%p): after", ctx->type_str, ctr); + sampling_span_registry_print(ctx, reg, tmp); + } + + sampling_span_registry_destroy(reg); +} + +static int cb_process_traces(struct flb_processor_instance *ins, + struct ctrace *in_ctr, + struct ctrace **out_ctr, + const char *tag, + int tag_len) +{ + int ret; + int count; + struct sampling *ctx = ins->context; + + /* just a quick check for developers */ + if (!ctx->plugin->cb_do_sampling) { + flb_plg_error(ins, "unimplemented sampling callback for type '%s'", ctx->type_str); + return -1; + } + + if (ctx->debug_mode) { + debug_trace(ctx, in_ctr, FLB_TRUE); + } + + /* do sampling: the callback will modify the ctrace context */ + ret = ctx->plugin->cb_do_sampling(ctx, ctx->plugin_context, in_ctr, out_ctr); + + if (ctx->debug_mode && *out_ctr) { + debug_trace(ctx, *out_ctr, FLB_FALSE); + } + + /* check if the ctrace context has empty resource spans */ + if (*out_ctr) { + count = clean_empty_resource_spans(*out_ctr); + flb_plg_trace(ins, "cleaned %i empty resource spans", count); + } + + return ret; +} + +/* register the sampling plugins available */ +static void sampling_plugin_register(struct sampling *ctx) +{ + cfl_list_add(&sampling_probabilistic_plugin._head, &ctx->plugins); +} + +static int cb_init(struct flb_processor_instance *processor_instance, + void *source_plugin_instance, + int source_plugin_type, + struct flb_config *config) +{ + int ret; + struct sampling *ctx; + struct flb_sched *sched; + + /* create main plugin context */ + ctx = sampling_config_create(processor_instance, config); + if (!ctx) { + return FLB_PROCESSOR_FAILURE; + } + processor_instance->context = (void *) ctx; + + /* register plugins */ + sampling_plugin_register(ctx); + + ret = sampling_config_process_rules(config, ctx); + if (ret == -1) { + flb_plg_error(processor_instance, "failed to parse sampling rules"); + flb_free(ctx); + return -1; + } + + /* get the scheduler context */ + sched = flb_sched_ctx_get(); + if (!sched) { + flb_plg_error(ctx->ins, "could not get scheduler context"); + return -1; + } + + /* initialize the backend plugin */ + ret = ctx->plugin->cb_init(config, ctx); + + return FLB_PROCESSOR_SUCCESS; +} + +static int cb_exit(struct flb_processor_instance *processor_instance, void *data) +{ + if (processor_instance != NULL && data != NULL) { + sampling_config_destroy(processor_instance->config, data); + } + + return FLB_PROCESSOR_SUCCESS; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "type", NULL, + 0, FLB_TRUE, offsetof(struct sampling, type_str), + "Type of the sampling processor" + }, + { + FLB_CONFIG_MAP_BOOL, "debug", "false", + 0, FLB_TRUE, offsetof(struct sampling, debug_mode), + "Enable debug mode where it prints the trace and it spans" + }, + { + FLB_CONFIG_MAP_VARIANT, "sampling_settings", NULL, + 0, FLB_TRUE, offsetof(struct sampling, sampling_settings), + "Sampling rules, these are defined by the sampling processor/type" + }, + { + FLB_CONFIG_MAP_VARIANT, "conditions", NULL, + 0, FLB_TRUE, offsetof(struct sampling, conditions), + "Sampling conditions" + }, + + /* EOF */ + {0} +}; + +struct flb_processor_plugin processor_sampling_plugin = { + .name = "sampling", + .description = "Sampling", + .cb_init = cb_init, + .cb_process_logs = NULL, + .cb_process_metrics = NULL, + .cb_process_traces = cb_process_traces, + .cb_exit = cb_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/plugins/processor_sampling/sampling.h b/plugins/processor_sampling/sampling.h new file mode 100644 index 00000000000..8333405fb1a --- /dev/null +++ b/plugins/processor_sampling/sampling.h @@ -0,0 +1,208 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_PROCESSOR_SAMPLING_H +#define FLB_PROCESSOR_SAMPLING_H + +#include +#include + +enum { + SAMPLING_TYPE_PROBABILISTIC = 0, + SAMPLING_TYPE_TAIL, + + /* unused: for dev/test purposes only */ + SAMPLING_TYPE_TEST, +}; + +struct trace_span { + struct ctrace_span *span; + + /* link to struct trace_entry->span_list */ + struct cfl_list _head; +}; + +struct trace_entry { + /* binary trace_id (this wraps a cfl_sds_t) */ + struct ctrace_id *trace_id; + + /* trace_id in hex format */ + cfl_sds_t hex_trace_id; + + /* describe if the root span has been received */ + int is_trace_complete; + + /* Linked list of spans */ + struct cfl_list span_list; + + uint64_t ts_created; + uint64_t ts_last_updated; + + /* link to struct sampling->trace_list */ + struct cfl_list _head; + + /* link to struct sampling->trace_list_complete or trace_list_incomplete */ + struct cfl_list _head_complete; +}; + +enum { + SAMPLING_COND_STATUS_CODE = 0, + SAMPLING_COND_LATENCY, + SAMPLING_COND_STRING_ATTRIBUTE, + SAMPLING_COND_NUMERIC_ATTRIBUTE, + SAMPLING_COND_BOOLEAN_ATTRIBUTE, + SAMPLING_COND_SPAN_COUNT, + SAMPLING_COND_TRACE_STATE, +}; + +struct sampling_condition { + int type; + void *type_context; + struct cfl_list _head; +}; + +struct sampling_conditions { + struct cfl_list list; +}; + +struct sampling { + /* config map properties */ + flb_sds_t type_str; + bool debug_mode; + struct cfl_variant *sampling_settings; + struct cfl_variant *conditions; + + /* + * Internal + * -------- + */ + int type; /* sampling type */ + + struct cfl_list plugins; + + struct sampling_conditions *sampling_conditions; + + /* plugin registration structure */ + struct sampling_plugin *plugin; + + /* Lists for config map and rule properties: this list is created dinamically */ + void *plugin_context; + struct mk_list plugin_settings_properties; + struct mk_list *plugin_config_map; + + /* Processor instance */ + struct flb_processor_instance *ins; + + /* Parent input plugin instance */ + struct flb_input_instance *input_ins; +}; + +/* Common structure for all sampling mechanisms */ +struct sampling_plugin { + char *name; + int type; + struct flb_config_map *config_map; + int (*cb_init) (struct flb_config *config, struct sampling *ctx); + int (*cb_do_sampling) (struct sampling *ctx, void *context, + struct ctrace *in_trace, struct ctrace **out_trace); + int (*cb_exit) (struct flb_config *config, void *context); + struct cfl_list _head; +}; + +/* Plugins registration */ +extern struct sampling_plugin sampling_test_plugin; +extern struct sampling_plugin sampling_probabilistic_plugin; +extern struct sampling_plugin sampling_tail_plugin; + +static inline void sampling_set_context(struct sampling *ctx, void *plugin_context) +{ + ctx->plugin_context = plugin_context; +} + +/* sampling_conf */ +int sampling_config_process_rules(struct flb_config *config, struct sampling *ctx); + +int sampling_config_map_set(struct flb_config *config, struct sampling *ctx, void *plugin_ctx, struct flb_config_map *map); + +struct sampling *sampling_config_create(struct flb_processor_instance *processor_instance, + struct flb_config *config); +void sampling_config_destroy(struct flb_config *config, struct sampling *ctx); + +/* conditions */ +struct sampling_conditions *sampling_conditions_create(struct sampling *ctx, struct cfl_variant *conditions); +int sampling_conditions_check(struct sampling *ctx, struct sampling_conditions *sampling_conditions, + struct trace_entry *trace_entry, struct ctrace_span *span); + +void sampling_conditions_destroy(struct sampling_conditions *sampling_conditions); + +/* + * conditions types + * ---------------- + */ + +/* condition: status_codes_check */ +struct sampling_condition *cond_status_codes_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings); +int cond_status_codes_check(struct sampling_condition *sampling_condition, struct ctrace_span *span); +void cond_status_codes_destroy(struct sampling_condition *sampling_condition); + +/* condition: latency */ +struct sampling_condition *cond_latency_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings); +int cond_latency_check(struct sampling_condition *sampling_condition, struct ctrace_span *span); +void cond_latency_destroy(struct sampling_condition *sampling_condition); + +/* condition: string_attribute */ +struct sampling_condition *cond_string_attr_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings); +int cond_string_attr_check(struct sampling_condition *sampling_condition, struct ctrace_span *span); +void cond_string_attr_destroy(struct sampling_condition *sampling_condition); + +/* condition: numeric_attribute */ +struct sampling_condition *cond_numeric_attr_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings); +void cond_numeric_attr_destroy(struct sampling_condition *sampling_condition); + + +/* condition: boolean_attribute */ +struct sampling_condition *cond_boolean_attr_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings); +void cond_boolean_attr_destroy(struct sampling_condition *sampling_condition); + +/* condition: span_count */ +int cond_span_count_check(struct sampling_condition *sampling_condition, struct trace_entry *trace_entry, struct ctrace_span *span); + +struct sampling_condition *cond_span_count_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings); +void cond_span_count_destroy(struct sampling_condition *sampling_condition); + +/* condition: trace_state */ +int cond_trace_state_check(struct sampling_condition *sampling_condition, struct ctrace_span *span); +struct sampling_condition *cond_trace_state_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings); +void cond_trace_state_destroy(struct sampling_condition *sampling_condition); + +#endif \ No newline at end of file diff --git a/plugins/processor_sampling/sampling_cond_attribute.c b/plugins/processor_sampling/sampling_cond_attribute.c new file mode 100644 index 00000000000..8df667892a0 --- /dev/null +++ b/plugins/processor_sampling/sampling_cond_attribute.c @@ -0,0 +1,167 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "sampling.h" +#include "sampling_cond_attribute.h" + +static int cond_attr_check_kvlist(struct cond_attribute *ctx, struct cfl_kvlist *kvlist, int attribute_type) +{ + struct cfl_list *head; + struct cfl_variant *var; + struct attribute_value *str_val; + + /* retrieve the value of the key if found */ + var = cfl_kvlist_fetch_s(kvlist, ctx->key, cfl_sds_len(ctx->key)); + if (!var) { + return FLB_FALSE; + } + + /* validate the value type */ + if (attribute_type == ATTRIBUTE_TYPE_STRING) { + if (var->type != CFL_VARIANT_STRING) { + return FLB_FALSE; + } + } + else if (attribute_type == ATTRIBUTE_TYPE_NUMERIC) { + if (var->type != CFL_VARIANT_INT && var->type != CFL_VARIANT_DOUBLE && var->type != CFL_VARIANT_UINT) { + return FLB_FALSE; + } + } + else if (attribute_type == ATTRIBUTE_TYPE_BOOLEAN) { + if (var->type != CFL_VARIANT_BOOL) { + return FLB_FALSE; + } + } + + /* if the match type is exists, return right away */ + if (ctx->match_type == MATCH_TYPE_EXISTS) { + return FLB_TRUE; + } + + /* numeric_attribute */ + if (attribute_type == ATTRIBUTE_TYPE_NUMERIC) { + if (var->type == CFL_VARIANT_INT) { + if (var->data.as_int64 >= ctx->min_value && var->data.as_int64 <= ctx->max_value) { + return FLB_TRUE; + } + } + else if (var->type == CFL_VARIANT_UINT) { + if (var->data.as_uint64 >= ctx->min_value && var->data.as_uint64 <= ctx->max_value) { + return FLB_TRUE; + } + } + else if (var->type == CFL_VARIANT_DOUBLE) { + if (var->data.as_double >= ctx->min_value && var->data.as_double <= ctx->max_value) { + return FLB_TRUE; + } + } + + return FLB_FALSE; + } + + /* boolean_attribute */ + if (attribute_type == ATTRIBUTE_TYPE_BOOLEAN) { + if (var->data.as_bool == ctx->boolean_value) { + return FLB_TRUE; + } + + return FLB_FALSE; + } + + /* string_attribute: check if the value matches any of the expected values */ + cfl_list_foreach(head, &ctx->list_values) { + str_val = cfl_list_entry(head, struct attribute_value, _head); + if (ctx->match_type == MATCH_TYPE_STRICT) { + if (attribute_type == ATTRIBUTE_TYPE_STRING) { + if (cfl_sds_len(var->data.as_string) != cfl_sds_len(str_val->value)) { + continue; + } + + if (strncmp(var->data.as_string, str_val->value, cfl_sds_len(var->data.as_string)) == 0) { + return FLB_TRUE; + } + } + } + else if (ctx->match_type == MATCH_TYPE_REGEX && attribute_type == CFL_VARIANT_STRING) { + if (flb_regex_match(str_val->regex_value, + (unsigned char *) var->data.as_string, + cfl_sds_len(var->data.as_string))) { + + return FLB_TRUE; + } + } + } + + return FLB_FALSE; +} + +int cond_attr_check(struct sampling_condition *sampling_condition, struct ctrace_span *span, + int attribute_type) +{ + int ret; + struct cond_attribute *ctx = sampling_condition->type_context; + + if (span->scope_span->resource_span->resource->attr->kv) { + ret = cond_attr_check_kvlist(ctx, + span->scope_span->resource_span->resource->attr->kv, + attribute_type); + if (ret == FLB_TRUE) { + return FLB_TRUE; + } + } + + if (span->attr) { + ret = cond_attr_check_kvlist(ctx, span->attr->kv, attribute_type); + if (ret == FLB_TRUE) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +void cond_attr_destroy(struct sampling_condition *sampling_condition) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct attribute_value *str_val; + struct cond_attribute *cond = sampling_condition->type_context; + + cfl_sds_destroy(cond->key); + + cfl_list_foreach_safe(head, tmp, &cond->list_values) { + str_val = cfl_list_entry(head, struct attribute_value, _head); + + if (str_val->value) { + cfl_sds_destroy(str_val->value); + } + + if (str_val->regex_value) { + flb_regex_destroy(str_val->regex_value); + } + + cfl_list_del(&str_val->_head); + flb_free(str_val); + } + + flb_free(cond); +} diff --git a/plugins/processor_sampling/sampling_cond_attribute.h b/plugins/processor_sampling/sampling_cond_attribute.h new file mode 100644 index 00000000000..376df13fe06 --- /dev/null +++ b/plugins/processor_sampling/sampling_cond_attribute.h @@ -0,0 +1,65 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_SAMPLING_COND_ATTRIBUTE_H + +#include +#include +#include "sampling.h" + +enum attribute_type { + ATTRIBUTE_TYPE_STRING = 0, + ATTRIBUTE_TYPE_NUMERIC, + ATTRIBUTE_TYPE_BOOLEAN, +}; + +enum match_type { + MATCH_TYPE_STRICT = 0, + MATCH_TYPE_EXISTS, + MATCH_TYPE_REGEX, +}; + +struct attribute_value { + cfl_sds_t value; + struct flb_regex *regex_value; + struct cfl_list _head; +}; + +struct cond_attribute { + int attribute_type; /* string_attribute, numeric_attribute or boolean_attribute */ + + /* config options */ + cfl_sds_t key; + int match_type; + + /* numeric_attribute config options */ + int invert_match; + int64_t min_value; + int64_t max_value; + + /* boolean_attribute */ + bool boolean_value; + + struct cfl_list list_values; +}; + +int cond_attr_check(struct sampling_condition *sampling_condition, struct ctrace_span *span, int attribute_type); +void cond_attr_destroy(struct sampling_condition *sampling_condition); + +#endif \ No newline at end of file diff --git a/plugins/processor_sampling/sampling_cond_boolean_attribute.c b/plugins/processor_sampling/sampling_cond_boolean_attribute.c new file mode 100644 index 00000000000..27a5340cab6 --- /dev/null +++ b/plugins/processor_sampling/sampling_cond_boolean_attribute.c @@ -0,0 +1,120 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + #include + #include + + #include "sampling.h" + #include "sampling_cond_attribute.h" + +struct sampling_condition *cond_boolean_attr_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings) +{ + struct cfl_variant *var; + struct cond_attribute *cond; + struct sampling_condition *sampling_condition; + + cond = flb_calloc(1, sizeof(struct cond_attribute)); + if (!cond) { + flb_errno(); + return NULL; + } + cond->attribute_type = ATTRIBUTE_TYPE_NUMERIC; + cond->match_type = MATCH_TYPE_STRICT; + cfl_list_init(&cond->list_values); + + /* key */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "key"); + if (var) { + if (var->type != CFL_VARIANT_STRING) { + flb_plg_error(ctx->ins, "key must be a string"); + flb_free(cond); + return NULL; + } + cond->key = cfl_sds_create_len(var->data.as_string, + cfl_sds_len(var->data.as_string)); + if (!cond->key) { + flb_free(cond); + return NULL; + } + } + else { + flb_plg_error(ctx->ins, "missing 'key' in condition"); + flb_free(cond); + return NULL; + } + + /* match_type */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "match_type"); + if (var) { + if (var->type != CFL_VARIANT_STRING) { + flb_plg_error(ctx->ins, "match_type must be a string"); + flb_free(cond); + return NULL; + } + + if (strcasecmp(var->data.as_string, "strict") == 0) { + cond->match_type = MATCH_TYPE_STRICT; + } + else if (strcasecmp(var->data.as_string, "exists") == 0) { + cond->match_type = MATCH_TYPE_EXISTS; + } + else { + flb_plg_error(ctx->ins, "invalid match_type '%s'", var->data.as_string); + flb_free(cond); + return NULL; + } + } + + /* values */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "value"); + if (var) { + if (var->type != CFL_VARIANT_BOOL) { + flb_plg_error(ctx->ins, "values must be an array"); + flb_free(cond); + return NULL; + } + cond->boolean_value = var->data.as_bool; + } + else { + flb_plg_error(ctx->ins, "missing boolean 'value' in condition"); + flb_free(cond); + return NULL; + } + + sampling_condition = flb_calloc(1, sizeof(struct sampling_condition)); + if (!sampling_condition) { + flb_errno(); + flb_free(cond); + return NULL; + } + + sampling_condition->type = SAMPLING_COND_BOOLEAN_ATTRIBUTE; + sampling_condition->type_context = cond; + cfl_list_add(&sampling_condition->_head, &sampling_conditions->list); + + return sampling_condition; + +} + +void cond_boolean_attr_destroy(struct sampling_condition *sampling_condition) +{ + cond_attr_destroy(sampling_condition); +} diff --git a/plugins/processor_sampling/sampling_cond_latency.c b/plugins/processor_sampling/sampling_cond_latency.c new file mode 100644 index 00000000000..eb57ac19eb3 --- /dev/null +++ b/plugins/processor_sampling/sampling_cond_latency.c @@ -0,0 +1,113 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + #include + #include "sampling.h" + +struct cond_latency { + uint64_t threshold_ms_low; + uint64_t threshold_ms_high; +}; + +int cond_latency_check(struct sampling_condition *sampling_condition, struct ctrace_span *span) +{ + uint64_t latency; + struct cond_latency *ctx = sampling_condition->type_context; + + + if (span->start_time_unix_nano > span->end_time_unix_nano) { + return FLB_FALSE; + } + + /* get the latency in milliseconds */ + latency = (span->end_time_unix_nano - span->start_time_unix_nano) / 1000000; + + /* check if the latency is within either of the thresholds */ + if ((ctx->threshold_ms_low != 0 && latency <= ctx->threshold_ms_low) || + (ctx->threshold_ms_high != 0 && latency >= ctx->threshold_ms_high)) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +struct sampling_condition *cond_latency_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings) +{ + struct cond_latency *cond; + struct cfl_variant *var; + struct sampling_condition *sampling_condition; + + cond = flb_calloc(1, sizeof(struct cond_latency)); + if (!cond) { + flb_errno(); + return NULL; + } + cond->threshold_ms_low = 0; + cond->threshold_ms_high = 0; + + /* threshold_ms_low */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "threshold_ms_low"); + if (var) { + if (var->type != CFL_VARIANT_UINT) { + flb_plg_error(ctx->ins, "threshold_ms_low must be an unsigned integer"); + flb_free(cond); + return NULL; + } + + cond->threshold_ms_low = var->data.as_uint64; + } + + /* threshold_ms_high */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "threshold_ms_high"); + if (var) { + if (var->type != CFL_VARIANT_UINT) { + flb_plg_error(ctx->ins, "threshold_ms_high must be an unsigned integer"); + flb_free(cond); + return NULL; + } + cond->threshold_ms_high = var->data.as_uint64; + } + + if (cond->threshold_ms_low == 0 && cond->threshold_ms_high == 0) { + flb_plg_error(ctx->ins, "either threshold_ms_low or threshold_ms_high must be set"); + flb_free(cond); + return NULL; + } + + sampling_condition = flb_calloc(1, sizeof(struct sampling_condition)); + if (!sampling_condition) { + flb_errno(); + flb_free(cond); + return NULL; + } + sampling_condition->type = SAMPLING_COND_LATENCY; + sampling_condition->type_context = cond; + cfl_list_add(&sampling_condition->_head, &sampling_conditions->list); + + return sampling_condition; + +} + +void cond_latency_destroy(struct sampling_condition *sampling_condition) +{ + struct cond_latency *cond = sampling_condition->type_context; + flb_free(cond); +} diff --git a/plugins/processor_sampling/sampling_cond_numeric_attribute.c b/plugins/processor_sampling/sampling_cond_numeric_attribute.c new file mode 100644 index 00000000000..7e6129178ad --- /dev/null +++ b/plugins/processor_sampling/sampling_cond_numeric_attribute.c @@ -0,0 +1,154 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "sampling.h" +#include "sampling_cond_attribute.h" + +struct sampling_condition *cond_numeric_attr_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings) +{ + struct cfl_variant *var; + struct cond_attribute *cond; + struct sampling_condition *sampling_condition; + + cond = flb_calloc(1, sizeof(struct cond_attribute)); + if (!cond) { + flb_errno(); + return NULL; + } + cond->attribute_type = ATTRIBUTE_TYPE_NUMERIC; + cond->match_type = MATCH_TYPE_STRICT; + cfl_list_init(&cond->list_values); + + /* key */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "key"); + if (var) { + if (var->type != CFL_VARIANT_STRING) { + flb_plg_error(ctx->ins, "key must be a string"); + flb_free(cond); + return NULL; + } + cond->key = cfl_sds_create_len(var->data.as_string, + cfl_sds_len(var->data.as_string)); + if (!cond->key) { + flb_free(cond); + return NULL; + } + } + else { + flb_plg_error(ctx->ins, "missing 'key' in condition"); + flb_free(cond); + return NULL; + } + + /* match_type */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "match_type"); + if (var) { + if (var->type != CFL_VARIANT_STRING) { + flb_plg_error(ctx->ins, "match_type must be a string"); + flb_free(cond); + return NULL; + } + + if (strcasecmp(var->data.as_string, "strict") == 0) { + cond->match_type = MATCH_TYPE_STRICT; + } + else if (strcasecmp(var->data.as_string, "exists") == 0) { + cond->match_type = MATCH_TYPE_EXISTS; + } + else { + flb_plg_error(ctx->ins, "invalid match_type '%s'", var->data.as_string); + flb_free(cond); + return NULL; + } + } + + /* min_value */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "min_value"); + if (var) { + if (var->type != CFL_VARIANT_INT && var->type != CFL_VARIANT_UINT) { + flb_plg_error(ctx->ins, "min_value must be an integer"); + flb_free(cond); + return NULL; + } + + if (var->type == CFL_VARIANT_INT) { + cond->min_value = var->data.as_int64; + } + else { + cond->min_value = (int64_t) var->data.as_uint64; + } + } + else { + flb_plg_error(ctx->ins, "missing 'min_value' in condition"); + flb_free(cond); + return NULL; + } + + /* max_value */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "max_value"); + if (var) { + if (var->type != CFL_VARIANT_INT && var->type != CFL_VARIANT_UINT) { + flb_plg_error(ctx->ins, "max_value must be an integer"); + flb_free(cond); + return NULL; + } + + if (var->type == CFL_VARIANT_INT) { + cond->max_value = var->data.as_int64; + } + else { + cond->max_value = (int64_t) var->data.as_uint64; + } + } + else { + flb_plg_error(ctx->ins, "missing 'max_value' in condition"); + flb_free(cond); + return NULL; + } + + /* check min_value < max_value */ + if (cond->min_value > cond->max_value) { + flb_plg_error(ctx->ins, "'min_value' must be less than 'max_value'"); + flb_free(cond); + return NULL; + } + + sampling_condition = flb_calloc(1, sizeof(struct sampling_condition)); + if (!sampling_condition) { + flb_errno(); + flb_free(cond); + return NULL; + } + sampling_condition->type = SAMPLING_COND_NUMERIC_ATTRIBUTE; + sampling_condition->type_context = cond; + cfl_list_add(&sampling_condition->_head, &sampling_conditions->list); + + return sampling_condition; + +} + +void cond_numeric_attr_destroy(struct sampling_condition *sampling_condition) +{ + cond_attr_destroy(sampling_condition); +} diff --git a/plugins/processor_sampling/sampling_cond_span_count.c b/plugins/processor_sampling/sampling_cond_span_count.c new file mode 100644 index 00000000000..9cf33feec66 --- /dev/null +++ b/plugins/processor_sampling/sampling_cond_span_count.c @@ -0,0 +1,130 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * +* http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "sampling.h" +#include "sampling_cond_attribute.h" + +struct cond_span_count { + int min_spans; + int max_spans; +}; + +int cond_span_count_check(struct sampling_condition *sampling_condition, + struct trace_entry *trace_entry, struct ctrace_span *span) +{ + int span_count = 0; + struct cond_span_count *ctx; + + ctx = sampling_condition->type_context; + span_count = cfl_list_size(&trace_entry->span_list); + + if (span_count >= ctx->min_spans && span_count <= ctx->max_spans) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +struct sampling_condition *cond_span_count_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings) +{ + struct cfl_variant *var; + struct cond_span_count *cond; + struct sampling_condition *sampling_condition; + + cond = flb_calloc(1, sizeof(struct cond_span_count)); + if (!cond) { + flb_errno(); + return NULL; + } + + /* min_spans */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "min_spans"); + if (var) { + if (var->type != CFL_VARIANT_INT && var->type != CFL_VARIANT_UINT) { + flb_plg_error(ctx->ins, "'min_spans' must be an integer"); + flb_free(cond); + return NULL; + } + + if (var->type == CFL_VARIANT_INT) { + cond->min_spans = var->data.as_int64; + } + else { + cond->min_spans = (int64_t) var->data.as_uint64; + } + } + else { + flb_plg_error(ctx->ins, "missing 'min_spans' in condition"); + flb_free(cond); + return NULL; + } + + + /* max_spans */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "max_spans"); + if (var) { + if (var->type != CFL_VARIANT_INT && var->type != CFL_VARIANT_UINT) { + flb_plg_error(ctx->ins, "'max_spans' must be an integer"); + flb_free(cond); + return NULL; + } + + if (var->type == CFL_VARIANT_INT) { + cond->max_spans = var->data.as_int64; + } + else { + cond->max_spans = (int64_t) var->data.as_uint64; + } + } + else { + flb_plg_error(ctx->ins, "missing 'max_spans' in condition"); + flb_free(cond); + return NULL; + } + + if (cond->min_spans > cond->max_spans) { + flb_plg_error(ctx->ins, "'min_spans' must be less than 'max_spans'"); + flb_free(cond); + return NULL; + } + + sampling_condition = flb_calloc(1, sizeof(struct sampling_condition)); + if (!sampling_condition) { + flb_errno(); + flb_free(cond); + return NULL; + } + sampling_condition->type = SAMPLING_COND_SPAN_COUNT; + sampling_condition->type_context = cond; + cfl_list_add(&sampling_condition->_head, &sampling_conditions->list); + + return sampling_condition; + +} + +void cond_span_count_destroy(struct sampling_condition *sampling_condition) +{ + struct cond_span_count *ctx = sampling_condition->type_context; + flb_free(ctx); +} diff --git a/plugins/processor_sampling/sampling_cond_status_codes.c b/plugins/processor_sampling/sampling_cond_status_codes.c new file mode 100644 index 00000000000..13ef71aee04 --- /dev/null +++ b/plugins/processor_sampling/sampling_cond_status_codes.c @@ -0,0 +1,125 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "sampling.h" + +/* condition: status_code */ +struct cond_status_code { + int status_unset; + int status_ok; + int status_error; +}; + +int cond_status_codes_check(struct sampling_condition *sampling_condition, struct ctrace_span *span) +{ + struct cond_status_code *ctx = sampling_condition->type_context; + + if (span->status.code == CTRACE_SPAN_STATUS_CODE_UNSET) { + if (ctx->status_unset == FLB_TRUE) { + return FLB_TRUE; + } + } + else if (span->status.code == CTRACE_SPAN_STATUS_CODE_OK) { + if (ctx->status_ok == FLB_TRUE) { + return FLB_TRUE; + } + } + else if (span->status.code == CTRACE_SPAN_STATUS_CODE_ERROR) { + if (ctx->status_error == FLB_TRUE) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +struct sampling_condition *cond_status_codes_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings) +{ + int i; + struct cond_status_code *cond; + struct cfl_variant *var; + struct cfl_variant *status_code; + struct sampling_condition *sampling_condition; + + cond = flb_calloc(1, sizeof(struct cond_status_code)); + if (!cond) { + flb_errno(); + return NULL; + } + + /* get option status_codes */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "status_codes"); + if (!var) { + flb_plg_error(ctx->ins, "missing 'status_codes' in condition"); + flb_free(cond); + return NULL; + } + + if (var->type != CFL_VARIANT_ARRAY) { + flb_plg_error(ctx->ins, "status_codes must be an array"); + flb_free(cond); + return NULL; + } + + /* iterate status codes */ + for (i = 0; i < var->data.as_array->entry_count; i++) { + status_code = var->data.as_array->entries[i]; + if (status_code->type != CFL_VARIANT_STRING) { + flb_plg_error(ctx->ins, "status code must be an string"); + flb_free(cond); + return NULL; + } + + if (strcasecmp(status_code->data.as_string, "UNSET") == 0) { + cond->status_unset = FLB_TRUE; + } + else if (strcasecmp(status_code->data.as_string, "OK") == 0) { + cond->status_ok = FLB_TRUE; + } + else if (strcasecmp(status_code->data.as_string, "ERROR") == 0) { + cond->status_error = FLB_TRUE; + } + else { + flb_plg_error(ctx->ins, "invalid status code '%s'", status_code->data.as_string); + flb_free(cond); + return NULL; + } + } + + sampling_condition = flb_calloc(1, sizeof(struct sampling_condition)); + if (!sampling_condition) { + flb_errno(); + flb_free(cond); + return NULL; + } + sampling_condition->type = SAMPLING_COND_STATUS_CODE; + sampling_condition->type_context = cond; + cfl_list_add(&sampling_condition->_head, &sampling_conditions->list); + + return sampling_condition; +} + +void cond_status_codes_destroy(struct sampling_condition *sampling_condition) +{ + struct cond_status_code *cond = sampling_condition->type_context; + flb_free(cond); +} diff --git a/plugins/processor_sampling/sampling_cond_string_attribute.c b/plugins/processor_sampling/sampling_cond_string_attribute.c new file mode 100644 index 00000000000..0f14f6bd508 --- /dev/null +++ b/plugins/processor_sampling/sampling_cond_string_attribute.c @@ -0,0 +1,163 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "sampling.h" +#include "sampling_cond_attribute.h" + +struct sampling_condition *cond_string_attr_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings) +{ + int i; + struct cfl_variant *var; + struct cfl_variant *var_value; + struct attribute_value *str_val; + struct cond_attribute *cond; + struct sampling_condition *sampling_condition; + + cond = flb_calloc(1, sizeof(struct cond_attribute)); + if (!cond) { + flb_errno(); + return NULL; + } + cond->attribute_type = ATTRIBUTE_TYPE_STRING; + cond->match_type = MATCH_TYPE_STRICT; + cfl_list_init(&cond->list_values); + + /* key */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "key"); + if (var) { + if (var->type != CFL_VARIANT_STRING) { + flb_plg_error(ctx->ins, "key must be a string"); + flb_free(cond); + return NULL; + } + cond->key = cfl_sds_create_len(var->data.as_string, + cfl_sds_len(var->data.as_string)); + if (!cond->key) { + flb_free(cond); + return NULL; + } + } + else { + flb_plg_error(ctx->ins, "missing 'key' in condition"); + flb_free(cond); + return NULL; + } + + /* match_type */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "match_type"); + if (var) { + if (var->type != CFL_VARIANT_STRING) { + flb_plg_error(ctx->ins, "match_type must be a string"); + flb_free(cond); + return NULL; + } + + if (strcasecmp(var->data.as_string, "strict") == 0) { + cond->match_type = MATCH_TYPE_STRICT; + } + else if (strcasecmp(var->data.as_string, "exists") == 0) { + cond->match_type = MATCH_TYPE_EXISTS; + } + else if (strcasecmp(var->data.as_string, "regex") == 0) { + cond->match_type = MATCH_TYPE_REGEX; + } + else { + flb_plg_error(ctx->ins, "invalid match_type '%s'", var->data.as_string); + flb_free(cond); + return NULL; + } + } + + /* values */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "values"); + if (var) { + if (var->type != CFL_VARIANT_ARRAY) { + flb_plg_error(ctx->ins, "values must be an array"); + flb_free(cond); + return NULL; + } + + /* iterate values */ + for (i = 0; i < var->data.as_array->entry_count; i++) { + var_value = var->data.as_array->entries[i]; + if (var_value->type != CFL_VARIANT_STRING) { + flb_plg_error(ctx->ins, "value must be an string"); + flb_free(cond); + return NULL; + } + + str_val = flb_calloc(1, sizeof(struct attribute_value)); + if (!str_val) { + flb_errno(); + flb_free(cond); + return NULL; + } + + if (cond->match_type == MATCH_TYPE_REGEX) { + str_val->regex_value = flb_regex_create(var_value->data.as_string); + if (!str_val->regex_value) { + flb_free(str_val); + flb_free(cond); + return NULL; + } + } + else { + str_val->value = cfl_sds_create_len(var_value->data.as_string, + cfl_sds_len(var_value->data.as_string)); + if (!str_val->value) { + flb_free(str_val); + flb_free(cond); + return NULL; + } + } + + cfl_list_add(&str_val->_head, &cond->list_values); + } + } + else { + if (cond->match_type != MATCH_TYPE_EXISTS) { + flb_plg_error(ctx->ins, "missing 'values' in condition"); + flb_free(cond); + return NULL; + } + } + + sampling_condition = flb_calloc(1, sizeof(struct sampling_condition)); + if (!sampling_condition) { + flb_errno(); + flb_free(cond); + return NULL; + } + sampling_condition->type = SAMPLING_COND_STRING_ATTRIBUTE; + sampling_condition->type_context = cond; + cfl_list_add(&sampling_condition->_head, &sampling_conditions->list); + + return sampling_condition; + +} + +void cond_string_attr_destroy(struct sampling_condition *sampling_condition) +{ + cond_attr_destroy(sampling_condition); +} diff --git a/plugins/processor_sampling/sampling_cond_trace_state.c b/plugins/processor_sampling/sampling_cond_trace_state.c new file mode 100644 index 00000000000..6db96b80ddc --- /dev/null +++ b/plugins/processor_sampling/sampling_cond_trace_state.c @@ -0,0 +1,199 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * +* http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "sampling.h" + +struct cond_state_entry { + cfl_sds_t kv; + struct cfl_list _head; +}; + +struct cond_trace_state { + struct cfl_list list_states; +}; + +static inline int slist_entry_compare(struct flb_slist_entry *entry, cfl_sds_t kv) +{ + if (flb_sds_len(entry->str) != cfl_sds_len(kv)) { + return FLB_FALSE; + } + + if (strncmp(entry->str, kv, flb_sds_len(kv)) == 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static inline int slist_check(struct mk_list *list, cfl_sds_t kv) +{ + struct mk_list *head; + struct flb_slist_entry *entry; + + mk_list_foreach(head, list) { + entry = mk_list_entry(head, struct flb_slist_entry, _head); + if (slist_entry_compare(entry, kv) == FLB_TRUE) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +int cond_trace_state_check(struct sampling_condition *sampling_condition, struct ctrace_span *span) +{ + int ret; + struct mk_list list; + struct cfl_list *head; + struct cond_trace_state *ctx; + struct cond_state_entry *entry; + + ctx = sampling_condition->type_context; + + if (!span->trace_state) { + return FLB_FALSE; + } + + if (cfl_sds_len(span->trace_state) == 0) { + return FLB_FALSE; + } + + flb_slist_create(&list); + ret = flb_slist_split_string(&list, span->trace_state, ',', 0); + if (ret == -1) { + return FLB_FALSE; + } + + cfl_list_foreach(head, &ctx->list_states) { + entry = cfl_list_entry(head, struct cond_state_entry, _head); + + ret = slist_check(&list, entry->kv); + if (ret == FLB_TRUE) { + flb_slist_destroy(&list); + return FLB_TRUE; + } + } + flb_slist_destroy(&list); + + /* no matches */ + return FLB_FALSE; +} + +static int read_values_from_variant(struct sampling *ctx, struct cond_trace_state *cond, struct cfl_variant *var) +{ + int i; + struct cfl_variant *value; + struct cond_state_entry *entry; + + for (i = 0; i < var->data.as_array->entry_count; i++) { + value = var->data.as_array->entries[i]; + if (value->type != CFL_VARIANT_STRING) { + return -1; + } + + entry = flb_calloc(1, sizeof(struct cond_state_entry)); + if (!entry) { + flb_errno(); + return -1; + } + + entry->kv = cfl_sds_create_len(value->data.as_string, flb_sds_len(value->data.as_string)); + if (!entry->kv) { + flb_free(entry); + return -1; + } + cfl_list_add(&entry->_head, &cond->list_states); + } + + return 0; +} + +struct sampling_condition *cond_trace_state_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings) +{ + int ret; + struct cfl_variant *var = NULL; + struct cond_trace_state *cond; + struct sampling_condition *sampling_condition; + + cond = flb_calloc(1, sizeof(struct cond_trace_state)); + if (!cond) { + flb_errno(); + return NULL; + } + cfl_list_init(&cond->list_states); + + /* values */ + var = cfl_kvlist_fetch(settings->data.as_kvlist, "values"); + if (var) { + if (var->type != CFL_VARIANT_ARRAY) { + flb_plg_error(ctx->ins, "'values' must be an array"); + flb_free(cond); + return NULL; + } + } + else { + flb_plg_error(ctx->ins, "missing 'values' in condition"); + flb_free(cond); + return NULL; + } + + ret = read_values_from_variant(ctx, cond, var); + if (ret == -1) { + flb_plg_error(ctx->ins, "failed to read values from variant"); + + return NULL; + } + + sampling_condition = flb_calloc(1, sizeof(struct sampling_condition)); + if (!sampling_condition) { + flb_errno(); + flb_free(cond); + return NULL; + } + sampling_condition->type = SAMPLING_COND_TRACE_STATE; + sampling_condition->type_context = cond; + cfl_list_add(&sampling_condition->_head, &sampling_conditions->list); + + return sampling_condition; + +} + +void cond_trace_state_destroy(struct sampling_condition *sampling_condition) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct cond_state_entry *entry; + struct cond_trace_state *ctx = sampling_condition->type_context; + + /* destroy states */ + cfl_list_foreach_safe(head, tmp, &ctx->list_states) { + entry = cfl_list_entry(head, struct cond_state_entry, _head); + cfl_sds_destroy(entry->kv); + cfl_list_del(&entry->_head); + flb_free(entry); + } + + flb_free(ctx); +} diff --git a/plugins/processor_sampling/sampling_conditions.c b/plugins/processor_sampling/sampling_conditions.c new file mode 100644 index 00000000000..93028fc9ec7 --- /dev/null +++ b/plugins/processor_sampling/sampling_conditions.c @@ -0,0 +1,238 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "sampling.h" +#include "sampling_cond_attribute.h" + +struct sampling_condition *cond_status_codes_create(struct sampling *ctx, + struct sampling_conditions *sampling_conditions, + struct cfl_variant *settings); + +static int condition_type_str_to_int(char *type_str) +{ + if (strcasecmp(type_str, "status_code") == 0) { + return SAMPLING_COND_STATUS_CODE; + } + else if (strcasecmp(type_str, "latency") == 0) { + return SAMPLING_COND_LATENCY; + } + else if (strcasecmp(type_str, "string_attribute") == 0) { + return SAMPLING_COND_STRING_ATTRIBUTE; + } + else if (strcasecmp(type_str, "numeric_attribute") == 0) { + return SAMPLING_COND_NUMERIC_ATTRIBUTE; + } + else if (strcasecmp(type_str, "boolean_attribute") == 0) { + return SAMPLING_COND_BOOLEAN_ATTRIBUTE; + } + else if (strcasecmp(type_str, "span_count") == 0) { + return SAMPLING_COND_SPAN_COUNT; + } + else if (strcasecmp(type_str, "trace_state") == 0) { + return SAMPLING_COND_TRACE_STATE; + } + + return -1; +} + +void sampling_conditions_destroy(struct sampling_conditions *sampling_conditions) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct sampling_condition *sampling_condition; + + if (!sampling_conditions) { + return; + } + + cfl_list_foreach_safe(head, tmp, &sampling_conditions->list) { + sampling_condition = cfl_list_entry(head, struct sampling_condition, _head); + if (sampling_condition->type == SAMPLING_COND_STATUS_CODE) { + cond_status_codes_destroy(sampling_condition); + } + else if (sampling_condition->type == SAMPLING_COND_LATENCY) { + cond_latency_destroy(sampling_condition); + } + else if (sampling_condition->type == SAMPLING_COND_STRING_ATTRIBUTE) { + cond_string_attr_destroy(sampling_condition); + } + else if (sampling_condition->type == SAMPLING_COND_NUMERIC_ATTRIBUTE) { + cond_numeric_attr_destroy(sampling_condition); + } + else if (sampling_condition->type == SAMPLING_COND_BOOLEAN_ATTRIBUTE) { + cond_boolean_attr_destroy(sampling_condition); + } + else if (sampling_condition->type == SAMPLING_COND_SPAN_COUNT) { + cond_span_count_destroy(sampling_condition); + } + else if (sampling_condition->type == SAMPLING_COND_TRACE_STATE) { + cond_trace_state_destroy(sampling_condition); + } + + cfl_list_del(&sampling_condition->_head); + flb_free(sampling_condition); + } + + flb_free(sampling_conditions); +} + +int sampling_conditions_check(struct sampling *ctx, struct sampling_conditions *sampling_conditions, + struct trace_entry *trace_entry, struct ctrace_span *span) +{ + int ret; + struct cfl_list *tmp; + struct cfl_list *head; + struct sampling_condition *sampling_condition; + + if (!sampling_conditions) { + return FLB_TRUE; + } + + cfl_list_foreach_safe(head, tmp, &sampling_conditions->list) { + sampling_condition = cfl_list_entry(head, struct sampling_condition, _head); + + ret = FLB_FALSE; + + if (sampling_condition->type == SAMPLING_COND_STATUS_CODE) { + ret = cond_status_codes_check(sampling_condition, span); + } + else if (sampling_condition->type == SAMPLING_COND_LATENCY) { + ret = cond_latency_check(sampling_condition, span); + } + else if (sampling_condition->type == SAMPLING_COND_STRING_ATTRIBUTE) { + ret = cond_attr_check(sampling_condition, span, ATTRIBUTE_TYPE_STRING); + } + else if (sampling_condition->type == SAMPLING_COND_NUMERIC_ATTRIBUTE) { + ret = cond_attr_check(sampling_condition, span, ATTRIBUTE_TYPE_NUMERIC); + } + else if (sampling_condition->type == SAMPLING_COND_BOOLEAN_ATTRIBUTE) { + ret = cond_attr_check(sampling_condition, span, ATTRIBUTE_TYPE_BOOLEAN); + } + else if (sampling_condition->type == SAMPLING_COND_SPAN_COUNT) { + ret = cond_span_count_check(sampling_condition, trace_entry, span); + } + else if (sampling_condition->type == SAMPLING_COND_TRACE_STATE) { + ret = cond_trace_state_check(sampling_condition, span); + } + + if (ret == FLB_TRUE) { + return FLB_TRUE; + } + } + + /* no matches, trace will be dropped */ + return FLB_FALSE; +} + +struct sampling_conditions *sampling_conditions_create(struct sampling *ctx, struct cfl_variant *conditions) +{ + int i; + int type; + char *type_str; + void *cond_ptr = NULL; + struct sampling_conditions *sampling_cond; + struct cfl_variant *type_settings; + struct cfl_variant *condition_settings; + + if (!conditions) { + return NULL; + } + + if (conditions->type != CFL_VARIANT_ARRAY) { + flb_plg_error(ctx->ins, "conditions must be an array"); + return NULL; + } + + sampling_cond = flb_calloc(1, sizeof(struct sampling_conditions)); + if (!sampling_cond) { + flb_errno(); + return NULL; + } + cfl_list_init(&sampling_cond->list); + + /* iterate conditions */ + for (i = 0; i < conditions->data.as_array->entry_count; i++) { + condition_settings = conditions->data.as_array->entries[i]; + if (condition_settings->type != CFL_VARIANT_KVLIST) { + flb_plg_error(ctx->ins, "condition must be a map"); + sampling_conditions_destroy(sampling_cond); + return NULL; + } + + type_settings = cfl_kvlist_fetch(condition_settings->data.as_kvlist, "type"); + if (!type_settings) { + flb_plg_error(ctx->ins, "condition must have a 'type' key"); + sampling_conditions_destroy(sampling_cond); + return NULL; + } + + if (type_settings->type != CFL_VARIANT_STRING) { + flb_plg_error(ctx->ins, "condition 'type' must be a string"); + sampling_conditions_destroy(sampling_cond); + return NULL; + } + + type_str = type_settings->data.as_string; + type = condition_type_str_to_int(type_str); + if (type == -1) { + flb_plg_error(ctx->ins, "unknown condition type '%s'", type_str); + sampling_conditions_destroy(sampling_cond); + return NULL; + } + + cond_ptr = NULL; + switch (type) { + case SAMPLING_COND_STATUS_CODE: + cond_ptr = cond_status_codes_create(ctx, sampling_cond, condition_settings); + break; + case SAMPLING_COND_LATENCY: + cond_ptr = cond_latency_create(ctx, sampling_cond, condition_settings); + break; + case SAMPLING_COND_STRING_ATTRIBUTE: + cond_ptr = cond_string_attr_create(ctx, sampling_cond, condition_settings); + break; + case SAMPLING_COND_NUMERIC_ATTRIBUTE: + cond_ptr = cond_numeric_attr_create(ctx, sampling_cond, condition_settings); + break; + case SAMPLING_COND_BOOLEAN_ATTRIBUTE: + cond_ptr = cond_boolean_attr_create(ctx, sampling_cond, condition_settings); + break; + case SAMPLING_COND_SPAN_COUNT: + cond_ptr = cond_span_count_create(ctx, sampling_cond, condition_settings); + break; + case SAMPLING_COND_TRACE_STATE: + cond_ptr = cond_trace_state_create(ctx, sampling_cond, condition_settings); + break; + default: + sampling_conditions_destroy(sampling_cond); + return NULL; + } + + if (!cond_ptr) { + flb_plg_error(ctx->ins, "failed to create condition type '%s'", type_str); + sampling_conditions_destroy(sampling_cond); + return NULL; + } + } + + return sampling_cond; +} + diff --git a/plugins/processor_sampling/sampling_conf.c b/plugins/processor_sampling/sampling_conf.c new file mode 100644 index 00000000000..1128d94ca40 --- /dev/null +++ b/plugins/processor_sampling/sampling_conf.c @@ -0,0 +1,255 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "sampling.h" + +static int sampling_type_lookup(const char *type_str) +{ + if (strcasecmp(type_str, "test") == 0) { + return SAMPLING_TYPE_TEST; + } + else if (strcasecmp(type_str, "probabilistic") == 0) { + return SAMPLING_TYPE_PROBABILISTIC; + } + else if (strcasecmp(type_str, "tail") == 0) { + return SAMPLING_TYPE_TAIL; + } + + return -1; +} + +static char *sampling_config_type_str(int type) +{ + switch (type) { + case SAMPLING_TYPE_TEST: + return "test"; + case SAMPLING_TYPE_PROBABILISTIC: + return "probabilistic"; + case SAMPLING_TYPE_TAIL: + return "tail"; + default: + return "unknown"; + } +} + +static struct sampling_plugin *sampling_config_get_plugin(int type) +{ + struct sampling_plugin *plugin = NULL; + + switch (type) { + /* + case SAMPLING_TYPE_TEST: + plugin = &sampling_test_plugin; + break; + */ + case SAMPLING_TYPE_PROBABILISTIC: + plugin = &sampling_probabilistic_plugin; + break; + case SAMPLING_TYPE_TAIL: + plugin = &sampling_tail_plugin; + break; + default: + plugin = NULL; + } + + return plugin; +} + +/* Register properties inside 'rules' into the ctx->plugin_rules_properties list */ + int sampling_config_process_rules(struct flb_config *config, struct sampling *ctx) +{ + int ret; + char val[1024]; + struct cfl_list *head; + struct cfl_variant *var; + struct cfl_kvlist *kv; + struct cfl_kvpair *pair; + struct mk_list *map; + struct flb_kv *kv_entry; + + if (!ctx->sampling_settings) { + /* no rules have been defined */ + return 0; + } + + var = ctx->sampling_settings; + if (var->type != CFL_VARIANT_KVLIST) { + flb_plg_error(ctx->ins, "rules must be a map"); + return -1; + } + + kv = var->data.as_kvlist; + cfl_list_foreach(head, &kv->list) { + pair = cfl_list_entry(head, struct cfl_kvpair, _head); + + if (pair->val->type != CFL_VARIANT_INT && + pair->val->type != CFL_VARIANT_UINT && + pair->val->type != CFL_VARIANT_STRING && + pair->val->type != CFL_VARIANT_BOOL && + pair->val->type != CFL_VARIANT_DOUBLE) { + flb_plg_error(ctx->ins, "invalid value type for key '%s'", pair->key); + return -1; + } + + /* + * Internal kvlist expects the value to be in string format, convert them from native + * to it string version. We might need a better interface for this. + */ + ret = -1; + + if (pair->val->type == CFL_VARIANT_INT) { + ret = snprintf(val, sizeof(val) - 1, "%ld", pair->val->data.as_int64); + } + else if (pair->val->type == CFL_VARIANT_UINT) { + ret = snprintf(val, sizeof(val) - 1, "%ld", pair->val->data.as_uint64); + } + else if (pair->val->type == CFL_VARIANT_DOUBLE) { + ret = snprintf(val, sizeof(val) - 1, "%f", pair->val->data.as_double); + } + else if (pair->val->type == CFL_VARIANT_BOOL) { + ret = snprintf(val, sizeof(val) - 1, "%s", pair->val->data.as_bool ? "true" : "false"); + } + else if (pair->val->type == CFL_VARIANT_STRING) { + ret = snprintf(val, sizeof(val) - 1, "%s", pair->val->data.as_string); + } + else { + flb_plg_error(ctx->ins, "invalid value type for key '%s'", pair->key); + return -1; + } + + if (ret <= 0) { + flb_plg_error(ctx->ins, "failed to convert value to string"); + return -1; + } + + kv_entry = flb_kv_item_create_len(&ctx->plugin_settings_properties, pair->key, strlen(pair->key), val, ret); + if (!kv_entry) { + flb_plg_error(ctx->ins, "failed to create kv entry for rule key '%s'", pair->key); + return -1; + } + } + + map = flb_config_map_create(config, ctx->plugin->config_map); + if (!map) { + flb_plg_error(ctx->ins, "failed to create map for plugin rules"); + return -1; + } + ctx->plugin_config_map = map; + + ret = flb_config_map_properties_check(ctx->type_str, &ctx->plugin_settings_properties, map); + if (ret == -1) { + flb_plg_error(ctx->ins, "failed to validate plugin rules properties"); + return -1; + } + + return 0; +} + +struct sampling *sampling_config_create(struct flb_processor_instance *processor_instance, + struct flb_config *config) +{ + int ret; + struct sampling *ctx; + struct sampling_plugin *plugin_context; + struct sampling_conditions *sampling_conditions; + + ctx = flb_calloc(1, sizeof(struct sampling)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = processor_instance; + ctx->input_ins = flb_processor_get_input_instance(ctx->ins->pu); + + /* config map */ + ret = flb_processor_instance_config_map_set(processor_instance, ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* sampling type: this is mandatory */ + if (!ctx->type_str) { + flb_plg_error(processor_instance, "no sampling 'type' defined"); + flb_free(ctx); + return NULL; + } + + /* type (int) */ + ret = sampling_type_lookup(ctx->type_str); + if (ret == -1) { + flb_plg_error(processor_instance, "unknown sampling type '%s'", ctx->type_str); + flb_free(ctx); + return NULL; + } + ctx->type = ret; + + plugin_context = sampling_config_get_plugin(ctx->type); + if (!plugin_context) { + flb_plg_error(processor_instance, "no plugin context found for sampling type '%s'", + sampling_config_type_str(ctx->type)); + flb_free(ctx); + return NULL; + } + ctx->plugin = plugin_context; + + cfl_list_init(&ctx->plugins); + flb_kv_init(&ctx->plugin_settings_properties); + + /* load conditions */ + if (ctx->conditions) { + sampling_conditions = sampling_conditions_create(ctx, ctx->conditions); + if (!sampling_conditions) { + flb_plg_error(processor_instance, "failed to create conditions"); + flb_free(ctx); + return NULL; + } + ctx->sampling_conditions = sampling_conditions; + } + + return ctx; +} + +void sampling_config_destroy(struct flb_config *config, struct sampling *ctx) +{ + if (!ctx) { + return; + } + + if (ctx->sampling_conditions) { + sampling_conditions_destroy(ctx->sampling_conditions); + } + + if (ctx->plugin) { + if (ctx->plugin->cb_exit) { + ctx->plugin->cb_exit(config, ctx->plugin_context); + } + } + + flb_kv_release(&ctx->plugin_settings_properties); + + if (ctx->plugin_config_map) { + flb_config_map_destroy(ctx->plugin_config_map); + } + + flb_free(ctx); +} diff --git a/plugins/processor_sampling/sampling_probabilistic.c b/plugins/processor_sampling/sampling_probabilistic.c new file mode 100644 index 00000000000..1e84119a1d2 --- /dev/null +++ b/plugins/processor_sampling/sampling_probabilistic.c @@ -0,0 +1,137 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "sampling.h" + + +struct sampling_settings { + int sampling_percentage; +}; + +static struct flb_config_map settings_config_map[] = { + { + FLB_CONFIG_MAP_INT, "sampling_percentage", "10", + 0, FLB_TRUE, offsetof(struct sampling_settings, sampling_percentage), + }, + + /* EOF */ + {0} +}; + +static int cb_init(struct flb_config *config, struct sampling *ctx) +{ + int ret; + struct sampling_rule *settings; + + flb_plg_info(ctx->ins, "initializing 'probabilistic' sampling processor"); + + settings = flb_calloc(1, sizeof(struct sampling_settings)); + if (!settings) { + flb_errno(); + return -1; + } + + ret = flb_config_map_set(&ctx->plugin_settings_properties, ctx->plugin_config_map, (void *) settings); + if (ret == -1) { + flb_free(settings); + return -1; + } + + sampling_set_context(ctx, settings); + return 0; +} + +/* Extract the first 8 bytes of trace_id */ +static uint64_t extract_trace_id(cfl_sds_t trace_id) { + uint64_t trace_number = 0; + + if (cfl_sds_len(trace_id) < 16) { + /* invalid trace_id */ + return 0; + } + + memcpy(&trace_number, trace_id, 8); + + /* convert to big-endian (if needed) */ + trace_number = flb_net_htonll(trace_number); + return trace_number; +} + +static int check_sampling(cfl_sds_t trace_id, double sampling_percentage) +{ + uint64_t trace_number; + double hash_value; + + trace_number = extract_trace_id(trace_id); + + /* normalize hash value */ + hash_value = (trace_number % 1000000) / 10000.0; + + /* compare with the sampling percentage */ + return hash_value < sampling_percentage; +} + +static int cb_do_sampling(struct sampling *ctx, void *plugin_context, + struct ctrace *in_ctr, struct ctrace **out_ctr) +{ + int ret; + struct cfl_list *head; + struct cfl_list *tmp; + struct ctrace_span *span; + struct sampling_settings *settings = (struct sampling_settings *) plugin_context; + + cfl_list_foreach_safe(head, tmp, &in_ctr->span_list) { + span = cfl_list_entry(head, struct ctrace_span, _head_global); + ret = check_sampling(span->trace_id->buf, settings->sampling_percentage); + if (ret == 1) { + /* we keep the span, all good */ + } + else { + /* remove the span */ + ctr_span_destroy(span); + } + } + + /* do not override the context */ + *out_ctr = in_ctr; + return 0; +} + +static int cb_exit(struct flb_config *config, void *data) +{ + struct sampling_rule *rule = data; + + if (rule) { + flb_free(rule); + } + + return 0; +} + +struct sampling_plugin sampling_probabilistic_plugin = { + .type = SAMPLING_TYPE_PROBABILISTIC, + .name = "probabilistic", + .config_map = settings_config_map, + .cb_init = cb_init, + .cb_do_sampling = cb_do_sampling, + .cb_exit = cb_exit, +}; diff --git a/plugins/processor_sampling/sampling_span_registry.c b/plugins/processor_sampling/sampling_span_registry.c new file mode 100644 index 00000000000..bddb4497d04 --- /dev/null +++ b/plugins/processor_sampling/sampling_span_registry.c @@ -0,0 +1,287 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "sampling.h" +#include "sampling_span_registry.h" + +struct sampling_span_registry *sampling_span_registry_create(uint64_t max_traces) +{ + struct sampling_span_registry *reg; + + reg = flb_calloc(1, sizeof(struct sampling_span_registry)); + if (!reg) { + flb_errno(); + return NULL; + } + + reg->ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1024, 0); + if (!reg->ht) { + flb_free(reg); + return NULL; + } + cfl_list_init(®->trace_list); + cfl_list_init(®->trace_list_complete); + cfl_list_init(®->trace_list_incomplete); + + reg->max_traces = max_traces; + + return reg; +} + +static void sampling_span_registry_delete_traces(struct sampling *ctx, struct sampling_span_registry *reg) +{ + struct cfl_list *head; + struct cfl_list *tmp; + struct trace_entry *t_entry; + + cfl_list_foreach_safe(head, tmp, ®->trace_list) { + t_entry = cfl_list_entry(head, struct trace_entry, _head); + cfl_list_del(&t_entry->_head); + cfl_list_del(&t_entry->_head_complete); + + /* free the trace_entry */ + cfl_sds_destroy(t_entry->hex_trace_id); + + ctr_id_destroy(t_entry->trace_id); + flb_free(t_entry); + } +} + +void sampling_span_registry_destroy(struct sampling_span_registry *reg) +{ + if (!reg) { + return; + } + + sampling_span_registry_delete_traces(NULL, reg); + + if (reg->ht) { + flb_hash_table_destroy(reg->ht); + } + flb_free(reg); +} + +int sampling_span_registry_delete_entry(struct sampling *ctx, struct sampling_span_registry *reg, + struct trace_entry *t_entry, int delete_spans) +{ + int ret; + struct cfl_list *head_span; + struct cfl_list *tmp_span; + struct trace_span *t_span; + + /* remove from the hash table */ + ret = flb_hash_table_del_ptr(reg->ht, ctr_id_get_buf(t_entry->trace_id), ctr_id_get_len(t_entry->trace_id), t_entry); + if (ret == -1) { + flb_plg_error(ctx->ins, "failed to delete trace entry from buffer"); + return -1; + } + + /* remove from the linked list */ + cfl_list_del(&t_entry->_head); + cfl_list_del(&t_entry->_head_complete); + + /* free the trace_entry */ + cfl_sds_destroy(t_entry->hex_trace_id); + + ctr_id_destroy(t_entry->trace_id); + + /* delete trace spans (this don't delete spans!) */ + cfl_list_foreach_safe(head_span, tmp_span, &t_entry->span_list) { + t_span = cfl_list_entry(head_span, struct trace_span, _head); + if (delete_spans) { + ctr_span_destroy(t_span->span); + } + + cfl_list_del(&t_span->_head); + flb_free(t_span); + } + + flb_free(t_entry); + + reg->count_traces--; + + return 0; +} + +int sampling_span_registry_add_span(struct sampling *ctx, struct sampling_span_registry *reg, struct ctrace_span *span) +{ + int ret; + size_t out_size = 0; + cfl_sds_t hex_trace_id; + struct cfl_list *head; + struct trace_entry *t_entry; + struct trace_entry *t_entry_delete; + struct trace_span *t_span; + + /* convert trace_id to readable format */ + if (!span->trace_id) { + flb_plg_error(ctx->ins, "trace_id is missing in span %s", span->name); + return -1; + } + + if (!span->span_id) { + flb_plg_error(ctx->ins, "span_id is missing in span %s", span->name); + return -1; + } + + /* check if the trace_id exists or not in the trace_buffer hash table */ + ret = flb_hash_table_get(reg->ht, + ctr_id_get_buf(span->trace_id), + ctr_id_get_len(span->trace_id), + (void **) &t_entry, &out_size); + if (ret == -1) { + /* create a new trace_entry for the trace_id in question */ + t_entry = flb_calloc(1, sizeof(struct trace_entry)); + if (!t_entry) { + flb_errno(); + return -1; + } + t_entry->ts_created = time(NULL); + t_entry->ts_last_updated = t_entry->ts_created; + cfl_list_init(&t_entry->span_list); + + /* trace_id */ + t_entry->trace_id = ctr_id_create(ctr_id_get_buf(span->trace_id), ctr_id_get_len(span->trace_id)); + if (!t_entry->trace_id) { + flb_plg_error(ctx->ins, "failed to create trace_id"); + flb_free(t_entry); + return -1; + } + + /* hex trace id (for test/dev purposes mostly) */ + hex_trace_id = ctr_id_to_lower_base16(span->trace_id); + if (!hex_trace_id) { + flb_plg_error(ctx->ins, "failed to convert trace_id to readable format"); + flb_free(t_entry); + return -1; + } + t_entry->hex_trace_id = hex_trace_id; + cfl_list_add(&t_entry->_head, ®->trace_list); + + /* always add a new trace into the incomplete list */ + cfl_list_add(&t_entry->_head_complete, ®->trace_list_incomplete); + + /* add to the hash table */ + ret = flb_hash_table_add(reg->ht, + ctr_id_get_buf(span->trace_id), + ctr_id_get_len(span->trace_id), + t_entry, 0); + if (ret == -1) { + flb_plg_error(ctx->ins, "failed to add trace entry to buffer"); + cfl_list_del(&t_entry->_head); + cfl_list_del(&t_entry->_head_complete); + flb_free(t_entry); + return -1; + } + + reg->count_traces++; + } + + /* update if the trace is completed */ + if (!span->parent_span_id) { + t_entry->is_trace_complete = FLB_TRUE; + + /* move entry to the complete list */ + cfl_list_del(&t_entry->_head_complete); + cfl_list_add(&t_entry->_head_complete, ®->trace_list_complete); + } + + /* add the span to the trace_entry */ + t_span = flb_calloc(1, sizeof(struct trace_span)); + if (!t_span) { + flb_errno(); + return -1; + } + t_span->span = span; + cfl_list_add(&t_span->_head, &t_entry->span_list); + + /* update timestamp */ + t_entry->ts_last_updated = cfl_time_now(); + + /* if the new number of traces exceeds max_traces, delete the oldest one */ + if (reg->count_traces > reg->max_traces) { + cfl_list_foreach(head, ®->trace_list) { + t_entry_delete = cfl_list_entry(head, struct trace_entry, _head); + + /* delete the first entry from the list */ + sampling_span_registry_delete_entry(ctx, reg, t_entry_delete, FLB_TRUE); + break; + } + } + return 0; +} + +int sampling_span_registry_add_trace(struct sampling *ctx, struct sampling_span_registry *reg, struct ctrace *ctr) +{ + int ret; + struct cfl_list *tmp; + struct cfl_list *head; + struct ctrace_span *span; + + /* iterate spans */ + cfl_list_foreach_safe(head, tmp, &ctr->span_list) { + span = cfl_list_entry(head, struct ctrace_span, _head_global); + ret = sampling_span_registry_add_span(ctx, reg, span); + if (ret != 0) { + flb_plg_error(ctx->ins, "failed to process span: %s", span->name); + return -1; + } + } + + return 0; +} + +int sampling_span_registry_print(struct sampling *ctx, struct sampling_span_registry *reg, char *title) +{ + struct cfl_list *head; + struct cfl_list *head_span; + struct trace_entry *t_entry; + struct trace_span *t_span; + cfl_sds_t span_id; + + printf("\n"); + printf("🔍 %s\n", title); + cfl_list_foreach(head, ®->trace_list) { + t_entry = cfl_list_entry(head, struct trace_entry, _head); + printf(" ┌─────────────────────────────────────────────────────────────────┐\n"); + printf(" │ trace_id=%s │\n", t_entry->hex_trace_id); + printf(" ├─────────────────────────────────────────────────────────────────┤\n"); + printf(" │ spans: │\n"); + + /* iterate spans */ + cfl_list_foreach(head_span, &t_entry->span_list) { + t_span = cfl_list_entry(head_span, struct trace_span, _head); + + span_id = ctr_id_to_lower_base16(t_span->span->span_id); + if (!span_id) { + flb_plg_error(ctx->ins, "failed to convert span_id to readable format"); + return -1; + } + printf(" │ ├── id=%s name=%-32s │\n", span_id, t_span->span->name); + + cfl_sds_destroy(span_id); + } + printf(" └─────────────────────────────────────────────────────────────────┘\n\n"); + } + + return 0; +} + diff --git a/plugins/processor_sampling/sampling_span_registry.h b/plugins/processor_sampling/sampling_span_registry.h new file mode 100644 index 00000000000..97a73b4da10 --- /dev/null +++ b/plugins/processor_sampling/sampling_span_registry.h @@ -0,0 +1,46 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_PROCESSOR_SAMPLING_SPAN_REGISTRY_H +#define FLB_PROCESSOR_SAMPLING_SPAN_REGISTRY_H + +#include + +struct sampling_span_registry { + struct flb_hash_table *ht; + + uint64_t count_traces; + uint64_t max_traces; + + /* linked lists of trace_entries inside the hash table (ht) using the _head node */ + struct cfl_list trace_list; + + /* the following two lists puts the trace into a complete or incomplete status */ + struct cfl_list trace_list_complete; + struct cfl_list trace_list_incomplete; +}; + +struct sampling_span_registry *sampling_span_registry_create(uint64_t max_traces); +void sampling_span_registry_destroy(struct sampling_span_registry *reg); +int sampling_span_registry_delete_entry(struct sampling *ctx, struct sampling_span_registry *reg, + struct trace_entry *t_entry, int delete_spans); +int sampling_span_registry_add_trace(struct sampling *ctx, struct sampling_span_registry *reg, struct ctrace *ctr); +int sampling_span_registry_print(struct sampling *ctx, struct sampling_span_registry *reg, char *title); + +#endif diff --git a/plugins/processor_sampling/sampling_tail.c b/plugins/processor_sampling/sampling_tail.c new file mode 100644 index 00000000000..7c56934574b --- /dev/null +++ b/plugins/processor_sampling/sampling_tail.c @@ -0,0 +1,463 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "sampling.h" +#include "sampling_span_registry.h" + +struct sampling_ctrace_entry { + struct ctrace *ctr; + struct cfl_list _head; /* sampling_settings->list_ctraces */ +}; + +struct sampling_settings { + int decision_wait; + uint64_t max_traces; + + /* internal */ + void *parent; /* struct sampling *ctx */ + uint64_t decision_wait_ms; + + /* linked list with a reference to all the ctraces contexts */ + struct cfl_list list_ctraces; + + /* span registry */ + struct sampling_span_registry *span_reg; +}; + +static struct flb_config_map settings_config_map[] = { + { + FLB_CONFIG_MAP_TIME, "decision_wait", "30s", + 0, FLB_TRUE, offsetof(struct sampling_settings, decision_wait), + }, + + { + FLB_CONFIG_MAP_INT, "max_traces", "50000", + 0, FLB_TRUE, offsetof(struct sampling_settings, max_traces), + }, + + /* EOF */ + {0} +}; + +/* delete a list ctrace entry */ +static void list_ctrace_delete_entry(struct sampling *ctx, struct sampling_ctrace_entry *ctrace_entry) +{ + ctr_destroy(ctrace_entry->ctr); + cfl_list_del(&ctrace_entry->_head); + flb_free(ctrace_entry); +} + +/* delete ctrace entries with no spans */ +static void list_ctrace_delete_empty(struct sampling *ctx, struct sampling_settings *settings) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct sampling_ctrace_entry *ctrace_entry; + + cfl_list_foreach_safe(head, tmp, &settings->list_ctraces) { + ctrace_entry = cfl_list_entry(head, struct sampling_ctrace_entry, _head); + if (cfl_list_size(&ctrace_entry->ctr->span_list) == 0) { + list_ctrace_delete_entry(ctx, ctrace_entry); + } + } +} + +static void list_ctrace_delete_all(struct sampling *ctx, struct sampling_settings *settings) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct sampling_ctrace_entry *ctrace_entry; + + cfl_list_foreach_safe(head, tmp, &settings->list_ctraces) { + ctrace_entry = cfl_list_entry(head, struct sampling_ctrace_entry, _head); + list_ctrace_delete_entry(ctx, ctrace_entry); + } +} + +struct ctrace_attributes *copy_attributes(struct sampling *ctx, struct ctrace_attributes *attr) +{ + int ret = -1; + struct cfl_list *head; + struct cfl_kvpair *pair; + struct ctrace_attributes *attr_copy; + + attr_copy = ctr_attributes_create(); + if (!attr_copy) { + return NULL; + } + + cfl_list_foreach(head, &attr->kv->list) { + pair = cfl_list_entry(head, struct cfl_kvpair, _head); + + if (pair->val->type == CFL_VARIANT_STRING) { + ret = ctr_attributes_set_string(attr_copy, pair->key, pair->val->data.as_string); + } + else if (pair->val->type == CFL_VARIANT_BOOL) { + ret = ctr_attributes_set_bool(attr_copy, pair->key, pair->val->data.as_bool); + } + else if (pair->val->type == CFL_VARIANT_INT) { + ret = ctr_attributes_set_int64(attr_copy, pair->key, pair->val->data.as_int64); + } + else if (pair->val->type == CFL_VARIANT_DOUBLE) { + ret = ctr_attributes_set_double(attr_copy, pair->key, pair->val->data.as_double); + } + else if (pair->val->type == CFL_VARIANT_ARRAY) { + ret = ctr_attributes_set_array(attr_copy, pair->key, pair->val->data.as_array); + } + else if (pair->val->type == CFL_VARIANT_KVLIST) { + ret = ctr_attributes_set_kvlist(attr_copy, pair->key, pair->val->data.as_kvlist); + } + else { + flb_plg_error(ctx->ins, "unsupported attribute type %i", pair->val->type); + ctr_attributes_destroy(attr_copy); + return NULL; + } + } + + if (ret != 0) { + ctr_attributes_destroy(attr_copy); + return NULL; + } + + return attr_copy; +}; + +static struct ctrace *reconcile_and_create_ctrace(struct sampling *ctx, struct sampling_settings *settings, struct trace_entry *t_entry) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct trace_span *t_span; + struct ctrace *ctr = NULL; + struct ctrace_resource_span *resource_span = NULL; + struct ctrace_resource *resource = NULL; + struct ctrace_scope_span *scope_span = NULL; + struct ctrace_instrumentation_scope *instrumentation_scope = NULL; + struct ctrace_span *span; + struct ctrace_attributes *attr; + + /* for each complete trace, reconcile, convert to ctrace context and enqueue it */ + cfl_list_foreach_safe(head, tmp, &t_entry->span_list) { + t_span = cfl_list_entry(head, struct trace_span, _head); + span = t_span->span; + + /* create a new ctraces context if does not exists */ + if (!ctr) { + ctr = ctr_create(NULL); + if (!ctr) { + flb_plg_error(ctx->ins, "could not create ctrace context"); + return NULL; + } + } + + /* create a resource span */ + if (!resource_span) { + resource_span = ctr_resource_span_create(ctr); + if (!resource_span) { + flb_plg_error(ctx->ins, "could not create resource span"); + ctr_destroy(ctr); + return NULL; + } + } + + if (!resource) { + resource = ctr_resource_span_get_resource(resource_span); + if (!resource) { + flb_plg_error(ctx->ins, "could not get resource"); + ctr_destroy(ctr); + return NULL; + } + + /* resource attributes */ + if (span->scope_span->resource_span->resource->attr) { + attr = copy_attributes(ctx, span->scope_span->resource_span->resource->attr); + if (attr) { + ctr_resource_set_attributes(resource, attr); + } + } + + /* resource dropped attributes count */ + if (span->scope_span->resource_span->resource->dropped_attr_count) { + ctr_resource_set_dropped_attr_count(resource, span->scope_span->resource_span->resource->dropped_attr_count); + } + + /* resource schema url */ + if (span->scope_span->resource_span->schema_url) { + ctr_resource_span_set_schema_url(resource_span, span->scope_span->resource_span->schema_url); + } + } + + if (!scope_span) { + scope_span = ctr_scope_span_create(resource_span); + if (!scope_span) { + flb_plg_error(ctx->ins, "could not create scope span"); + ctr_destroy(ctr); + return NULL; + } + } + + if (!instrumentation_scope) { + /* this is optional, check in the original span context if we have some instrumentation associated */ + if (span->scope_span->instrumentation_scope) { + attr = NULL; + if (span->scope_span->instrumentation_scope->attr) { + attr = copy_attributes(ctx, span->scope_span->instrumentation_scope->attr); + } + + instrumentation_scope = ctr_instrumentation_scope_create(span->scope_span->instrumentation_scope->name, + span->scope_span->instrumentation_scope->version, + span->scope_span->instrumentation_scope->dropped_attr_count, + attr); + if (instrumentation_scope) { + ctr_scope_span_set_instrumentation_scope(scope_span, instrumentation_scope); + } + } + } + + /* unlink active span from it original ctrace context and link it to the active scope_span list */ + cfl_list_del(&span->_head); + cfl_list_add(&span->_head, &scope_span->spans); + + /* reset all the contexts */ + resource_span = NULL; + resource = NULL; + scope_span = NULL; + instrumentation_scope = NULL; + + /* remote t_span entry */ + cfl_list_del(&t_span->_head); + flb_free(t_span); + } + + sampling_span_registry_delete_entry(ctx, settings->span_reg, t_entry, FLB_FALSE); + + return ctr; +} + +static int check_conditions(struct sampling *ctx, struct trace_entry *t_entry) +{ + int ret; + struct cfl_list *head; + struct trace_span *t_span; + + cfl_list_foreach(head, &t_entry->span_list) { + t_span = cfl_list_entry(head, struct trace_span, _head); + ret = sampling_conditions_check(ctx, ctx->sampling_conditions, t_entry, t_span->span); + if (ret == FLB_TRUE) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static void trace_entry_delete_spans(struct trace_entry *t_entry) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct trace_span *t_span; + + cfl_list_foreach_safe(head, tmp, &t_entry->span_list) { + t_span = cfl_list_entry(head, struct trace_span, _head); + cfl_list_del(&t_span->_head); + ctr_span_destroy(t_span->span); + flb_free(t_span); + } +} + +static int reconcile_and_dispatch_traces(struct sampling *ctx, struct sampling_settings *settings) +{ + int ret; + time_t now; + struct cfl_list *tmp; + struct cfl_list *head; + struct trace_entry *t_entry; + struct ctrace *ctr = NULL; + + now = time(NULL); + + /* for each complete trace, reconcile, convert to ctraces contexts (plural) and enqueue them */ + cfl_list_foreach_safe(head, tmp, &settings->span_reg->trace_list) { + t_entry = cfl_list_entry(head, struct trace_entry, _head); + + /* check if this trace still need to wait */ + if (t_entry->ts_created + settings->decision_wait > now) { + continue; + } + + /* + * check if the spans registered to this trace entry matches the conditions: if only one span + * matches, we keep the trace entry, otherwise we discard it + */ + ret = check_conditions(ctx, t_entry); + if (ret == FLB_FALSE) { + /* t_entry has many t_spans, since the spans will be discarded is safe to remove it original ctr_span reference */ + trace_entry_delete_spans(t_entry); + + /* remove the trace entry */ + sampling_span_registry_delete_entry(ctx, settings->span_reg, t_entry, FLB_FALSE); + continue; + } + + /* Compose a new ctrace context using the spans associated to the same trace_id */ + ctr = reconcile_and_create_ctrace(ctx, settings, t_entry); + if (!ctr) { + flb_plg_error(ctx->ins, "could not reconcile and create ctrace context"); + return -1; + } + + /* add the new ctrace contex to the pipeline */ + ret = flb_input_trace_append_skip_processor_stages(ctx->input_ins, ctx->ins->pu->stage + 1, NULL, 0, ctr); + if (ret != 0) { + flb_plg_error(ctx->ins, "could not enqueue ctrace context"); + ctr_destroy(ctr); + return -1; + } + } + + return 0; +} + +static void cb_timer_flush(struct flb_config *config, void *data) +{ + int ret; + struct sampling_settings *settings; + struct sampling *ctx; + + settings = (struct sampling_settings *) data; + ctx = settings->parent; + + ret = reconcile_and_dispatch_traces(ctx, settings); + if (ret != 0) { + flb_plg_error(ctx->ins, "could not reconcile and dispatch traces"); + } + + /* delete empty ctraces contexts */ + list_ctrace_delete_empty(ctx, settings); +} + +static int cb_init(struct flb_config *config, struct sampling *ctx) +{ + int ret; + struct sampling_settings *settings; + struct flb_sched *sched; + + flb_plg_info(ctx->ins, "initializing 'tail' sampling processor"); + + settings = flb_calloc(1, sizeof(struct sampling_settings)); + if (!settings) { + flb_errno(); + return -1; + } + settings->parent = ctx; + cfl_list_init(&settings->list_ctraces); + + /* get the scheduler context */ + sched = flb_sched_ctx_get(); + if (!sched) { + flb_plg_error(ctx->ins, "could not get scheduler context"); + return -1; + } + + ret = flb_config_map_set(&ctx->plugin_settings_properties, ctx->plugin_config_map, (void *) settings); + if (ret == -1) { + flb_free(settings); + return -1; + } + + /* convert decision wait to milliseconds*/ + settings->decision_wait_ms = settings->decision_wait * 1000; + + /* set a timer callback */ + ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, + settings->decision_wait_ms, cb_timer_flush, + settings, NULL); + if (ret != 0) { + flb_plg_error(ctx->ins, "could not create timer"); + flb_free(settings); + return -1; + } + + settings->span_reg = sampling_span_registry_create(settings->max_traces); + if (!settings->span_reg) { + flb_plg_error(ctx->ins, "could not span registry"); + flb_free(settings); + return -1; + } + + sampling_set_context(ctx, settings); + return 0; +} + +static int cb_do_sampling(struct sampling *ctx, void *plugin_context, + struct ctrace *in_ctr, struct ctrace **out_ctr) +{ + int ret; + struct sampling_ctrace_entry *ctrace_entry; + struct sampling_settings *settings = plugin_context; + + ret = sampling_span_registry_add_trace(ctx, settings->span_reg, in_ctr); + if (ret == -1) { + flb_plg_error(ctx->ins, "failed to add trace to span registry"); + return FLB_PROCESSOR_FAILURE; + } + + /* register the ctrace context */ + ctrace_entry = flb_malloc(sizeof(struct sampling_ctrace_entry)); + if (!ctrace_entry) { + flb_errno(); + return FLB_PROCESSOR_FAILURE; + } + ctrace_entry->ctr = in_ctr; + cfl_list_add(&ctrace_entry->_head, &settings->list_ctraces); + + /* caller must not destroy the ctrace reference */ + *out_ctr = NULL; + + return FLB_PROCESSOR_SUCCESS; +} + +static int cb_exit(struct flb_config *config, void *data) +{ + struct sampling_settings *settings = data; + + if (!settings) { + return 0; + } + + if (settings->span_reg) { + sampling_span_registry_destroy(settings->span_reg); + } + + list_ctrace_delete_all(settings->parent, settings); + + + flb_free(settings); + return 0; +} + +struct sampling_plugin sampling_tail_plugin = { + .type = SAMPLING_TYPE_TAIL, + .name = "tail", + .config_map = settings_config_map, + .cb_init = cb_init, + .cb_do_sampling = cb_do_sampling, + .cb_exit = cb_exit, +}; diff --git a/plugins/processor_sampling/sampling_test.c b/plugins/processor_sampling/sampling_test.c new file mode 100644 index 00000000000..5cab4ebf2fa --- /dev/null +++ b/plugins/processor_sampling/sampling_test.c @@ -0,0 +1,85 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "sampling.h" +#include "sampling_span_registry.h" + +/* + * You can use this plugin as a base for the next sampling processor... + */ + +struct sampling_rule { + int empty; +}; + +/* we don't have any options (yet) */ +static struct flb_config_map rules_config_map[] = { + /* EOF */ + {0} +}; + +static int cb_init(struct flb_config *config, struct sampling *ctx) +{ + return 0; +} + +static int cb_do_sampling(struct sampling *ctx, void *plugin_context, + struct ctrace *in_ctr, struct ctrace **out_ctr) +{ + int ret; + struct sampling_span_registry *span_reg; + + span_reg = sampling_span_registry_create(); + if (!span_reg) { + return -1; + } + + ret = sampling_span_registry_add_trace(ctx, span_reg, in_ctr); + if (ret == -1) { + sampling_span_registry_destroy(span_reg); + flb_plg_error(ctx->ins, "failed to add trace to span registry"); + return -1; + } + + sampling_span_registry_print(ctx, span_reg, "test"); + sampling_span_registry_destroy(span_reg); + + return 0; +} + +static int cb_exit(struct flb_config *config, void *data) +{ + struct sampling_rule *rule = data; + + if (rule) { + flb_free(rule); + } + return 0; + } + +struct sampling_plugin sampling_test_plugin = { + .type = SAMPLING_TYPE_TEST, + .name = "test", + .config_map = rules_config_map, + .cb_init = cb_init, + .cb_do_sampling = cb_do_sampling, + .cb_exit = cb_exit, +}; diff --git a/src/flb_input_trace.c b/src/flb_input_trace.c index 169e9213296..61db87f1f74 100644 --- a/src/flb_input_trace.c +++ b/src/flb_input_trace.c @@ -26,15 +26,25 @@ #include #include +/* + * Append a CTrace context into the pipeline. On success, this function returns 0 and -1 + * on error. + * + * Note that the memory pointed by the CTrace context will be handled automatically inside + * this function if the return value is 0, otherwise if is -1, the caller is responsible + * to destroy the context. + */ + static int input_trace_append(struct flb_input_instance *ins, size_t processor_starting_stage, const char *tag, size_t tag_len, struct ctrace *ctr) { int ret; - char *out_buf; - size_t out_size; + char *out_buf = NULL; + size_t out_size = 0; int processor_is_active; + struct ctrace *out_context = NULL; processor_is_active = flb_processor_is_active(ins->processor); if (processor_is_active) { @@ -53,19 +63,38 @@ static int input_trace_append(struct flb_input_instance *ins, processor_starting_stage, FLB_PROCESSOR_TRACES, tag, tag_len, - (char *) ctr, - 0, NULL, NULL); - + (char *) ctr, 0, + (void **) &out_context, NULL); if (ret == -1) { return -1; } + + if (out_context == NULL) { + /* + * nothing to do: no output context was set (out_context) that means that likely + * the original CTrace context is being handled by the processor itself. We don't + * need to destroy it. + */ + return 0; + } } - /* Convert trace context to msgpack */ - ret = ctr_encode_msgpack_create(ctr, &out_buf, &out_size); - if (ret != 0) { - flb_plg_error(ins, "could not encode traces"); - return -1; + if (out_context) { + ret = ctr_encode_msgpack_create(out_context, &out_buf, &out_size); + if (out_context != ctr) { + ctr_destroy(out_context); + } + if (ret != 0) { + flb_plg_error(ins, "could not encode traces"); + return -1; + } + } + else { + ret = ctr_encode_msgpack_create(ctr, &out_buf, &out_size); + if (ret != 0) { + flb_plg_error(ins, "could not encode traces"); + return -1; + } } /* Append packed metrics */ @@ -74,6 +103,19 @@ static int input_trace_append(struct flb_input_instance *ins, ctr_encode_msgpack_destroy(out_buf); + if (ret == 0) { + /* + * the CTrace context was processed properly, we need to destroy the contexts: the original + * and the output one + */ + if (out_context != NULL && out_context != ctr) { + ctr_destroy(out_context); + } + + /* destroy the original context */ + ctr_destroy(ctr); + } + return ret; } diff --git a/src/flb_network.c b/src/flb_network.c index e53030ba742..67016a00c14 100644 --- a/src/flb_network.c +++ b/src/flb_network.c @@ -29,6 +29,7 @@ #ifdef FLB_SYSTEM_WINDOWS #define poll WSAPoll +#include #else #include #endif @@ -1810,7 +1811,7 @@ flb_sockfd_t flb_net_accept(flb_sockfd_t server_fd) struct sockaddr_storage sock_addr = { 0 }; socklen_t socket_size = sizeof(sock_addr); - /* + /* * sock_addr used to be a sockaddr struct, but this was too * small of a structure to handle IPV6 addresses (#9053). * This would cause accept() to not accept the connection (with no error), @@ -2291,3 +2292,15 @@ int flb_net_socket_peer_info(flb_sockfd_t fd, str_output_buffer_size, str_output_data_size); } + +uint64_t flb_net_htonll(uint64_t value) +{ +#if defined(_WIN32) + /* use windows system provided htonll */ + return htonll(value); +#elif __BYTE_ORDER == __LITTLE_ENDIAN + return ((uint64_t) htonl(value & 0xFFFFFFFF) << 32) | htonl(value >> 32); +#else + return value; +#endif +} \ No newline at end of file diff --git a/src/flb_processor.c b/src/flb_processor.c index d01ab3a49d3..bd899cc3cfc 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -242,7 +242,10 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, pu->unit_type = FLB_PROCESSOR_UNIT_NATIVE; /* create an instance of the processor */ - processor_instance = flb_processor_instance_create(config, pu->event_type, unit_name, NULL); + processor_instance = flb_processor_instance_create(config, + pu, + pu->event_type, + unit_name, NULL); if (processor_instance == NULL) { flb_error("[processor] error creating native processor instance %s", pu->name); @@ -745,20 +748,36 @@ int flb_processor_run(struct flb_processor *proc, } } else if (type == FLB_PROCESSOR_TRACES) { - if (p_ins->p->cb_process_traces != NULL) { + tmp_buf = NULL; + out_size = NULL; ret = p_ins->p->cb_process_traces(p_ins, (struct ctrace *) cur_buf, + (struct ctrace **) &tmp_buf, tag, tag_len); - - if (ret != FLB_PROCESSOR_SUCCESS) { + if (ret == FLB_PROCESSOR_FAILURE) { release_lock(&pu->lock, FLB_PROCESSOR_LOCK_RETRY_LIMIT, FLB_PROCESSOR_LOCK_RETRY_DELAY); return -1; } + else if (ret == FLB_PROCESSOR_SUCCESS) { + if (tmp_buf == NULL) { + /* + * the processsor ran successfuly but there is no + * trace output, that means that the invoked processor + * will enqueue the trace through a different mechanism, + * we just return saying nothing else is needed. + */ + release_lock(&pu->lock, + FLB_PROCESSOR_LOCK_RETRY_LIMIT, + FLB_PROCESSOR_LOCK_RETRY_DELAY); + return 0; + } + } + } } else if (type == FLB_PROCESSOR_PROFILES) { @@ -1053,6 +1072,7 @@ const char *flb_processor_instance_get_property( } struct flb_processor_instance *flb_processor_instance_create(struct flb_config *config, + struct flb_processor_unit *pu, int event_type, const char *name, void *data) { @@ -1098,6 +1118,7 @@ struct flb_processor_instance *flb_processor_instance_create(struct flb_config * instance->p = plugin; instance->data = data; instance->log_level = -1; + instance->pu = pu; mk_list_init(&instance->properties); @@ -1206,7 +1227,6 @@ int flb_processor_instance_init( if (!ins->cmt) { flb_error("[processor] could not create cmetrics context: %s", name); - return -1; } @@ -1226,8 +1246,7 @@ int flb_processor_instance_init( config); if (ret != 0) { - flb_error("[processor] failed initialize filter %s", ins->name); - + flb_error("[processor] failed initialize processor %s", ins->name); return -1; } }