Skip to content

Commit 19e2a2d

Browse files
DSET-3998 feat: export Logs resource info based on export_resource_info_on_event configuration (open-telemetry#23250)
* DSET-3998 - export Logs resource info based on export_resource_info_on_event configuration * DSET-3998 - simplify * DSET-3998 - improve docs * Fix log exporter to set AddEvents Event timestamp (ts) field to event observed timetamp in case LogRecord doesn't contain timestamp. Even though ObservedTimestamp should always be present, we fall back to current time in case it's not. In addition to that, remove duplicate and redundant "timestamp" attribute from the AddEvents event. * Add additional assertions. * Remove dummy debug logs. * Create export-logs-resource-info-based-configuration * address PR notes - fix changelog gen * fix docs typo * fix changelog file suffix --------- Co-authored-by: Tomaz Muraus <[email protected]> Co-authored-by: Tomaz Muraus <[email protected]>
1 parent 5a582d9 commit 19e2a2d

10 files changed

+224
-40
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Use this changelog template to create an entry for release notes.
2+
# If your change doesn't affect end users, such as a test fix or a tooling change,
3+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
4+
5+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
6+
change_type: 'enhancement'
7+
8+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
9+
component: datasetexporter
10+
11+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
12+
note: "Allow include Logs resource info export to DataSet based on new export_resource_info_on_event configuration. Fix timestamp handling."
13+
14+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
15+
issues: [20660]
16+
17+
# (Optional) One or more lines of additional information to render under the primary note.
18+
# These lines will be padded with 2 spaces and then inserted directly into the document.
19+
# Use pipe (|) for multiline entries.
20+
subtext:

exporter/datasetexporter/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ If you do not want to specify `api_key` in the file, you can use the [builtin fu
3333
- `traces`:
3434
- `aggregate` (default = false): Count the number of spans and errors belonging to a trace.
3535
- `max_wait` (default = 5s): The maximum waiting for all spans from single trace to arrive; ignored if `aggregate` is false.
36+
- `logs`:
37+
- `export_resource_info_on_event` (default = false): Include resource info to DataSet Event while exporting Logs. This is especially useful when reducing DataSet billable log volume.
3638
- `retry_on_failure`: See [retry_on_failure](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
3739
- `sending_queue`: See [sending_queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
3840
- `timeout`: See [timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)

exporter/datasetexporter/config.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,22 @@ func newDefaultTracesSettings() TracesSettings {
3232
}
3333
}
3434

35+
const logsExportResourceInfoDefault = false
36+
37+
type LogsSettings struct {
38+
// ExportResourceInfo is optional flag to signal that the resource info is being exported to DataSet while exporting Logs.
39+
// This is especially useful when reducing DataSet billable log volume.
40+
// Default value: false.
41+
ExportResourceInfo bool `mapstructure:"export_resource_info_on_event"`
42+
}
43+
44+
// newDefaultLogsSettings returns the default settings for LogsSettings.
45+
func newDefaultLogsSettings() LogsSettings {
46+
return LogsSettings{
47+
ExportResourceInfo: logsExportResourceInfoDefault,
48+
}
49+
}
50+
3551
const bufferMaxLifetime = 5 * time.Second
3652
const bufferRetryInitialInterval = 5 * time.Second
3753
const bufferRetryMaxInterval = 30 * time.Second
@@ -61,6 +77,7 @@ type Config struct {
6177
APIKey configopaque.String `mapstructure:"api_key"`
6278
BufferSettings `mapstructure:"buffer"`
6379
TracesSettings `mapstructure:"traces"`
80+
LogsSettings `mapstructure:"logs"`
6481
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
6582
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
6683
exporterhelper.TimeoutSettings `mapstructure:"timeout"`
@@ -96,7 +113,8 @@ func (c *Config) String() string {
96113
s += fmt.Sprintf("%s: %+v; ", "TracesSettings", c.TracesSettings)
97114
s += fmt.Sprintf("%s: %+v; ", "RetrySettings", c.RetrySettings)
98115
s += fmt.Sprintf("%s: %+v; ", "QueueSettings", c.QueueSettings)
99-
s += fmt.Sprintf("%s: %+v", "TimeoutSettings", c.TimeoutSettings)
116+
s += fmt.Sprintf("%s: %+v; ", "TimeoutSettings", c.TimeoutSettings)
117+
s += fmt.Sprintf("%s: %+v", "LogsSettings", c.LogsSettings)
100118

101119
return s
102120
}
@@ -123,11 +141,13 @@ func (c *Config) convert() (*ExporterConfig, error) {
123141
},
124142
},
125143
tracesSettings: c.TracesSettings,
144+
logsSettings: c.LogsSettings,
126145
},
127146
nil
128147
}
129148

130149
type ExporterConfig struct {
131150
datasetConfig *datasetConfig.DataSetConfig
132151
tracesSettings TracesSettings
152+
logsSettings LogsSettings
133153
}

exporter/datasetexporter/config_test.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func TestConfigUseDefaults(t *testing.T) {
4343
assert.Equal(t, "secret", string(config.APIKey))
4444
assert.Equal(t, bufferMaxLifetime, config.MaxLifetime)
4545
assert.Equal(t, tracesMaxWait, config.TracesSettings.MaxWait)
46+
assert.Equal(t, logsExportResourceInfoDefault, config.LogsSettings.ExportResourceInfo)
4647
}
4748

4849
func TestConfigValidate(t *testing.T) {
@@ -114,7 +115,22 @@ func TestConfigString(t *testing.T) {
114115
}
115116

116117
assert.Equal(t,
117-
"DatasetURL: https://example.com; BufferSettings: {MaxLifetime:123ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {Aggregate:true MaxWait:45s}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}",
118+
"DatasetURL: https://example.com; BufferSettings: {MaxLifetime:123ns GroupBy:[field1 field2] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {Aggregate:true MaxWait:45s}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}; LogsSettings: {ExportResourceInfo:false}",
118119
config.String(),
119120
)
120121
}
122+
123+
func TestConfigUseProvidedExportResourceInfoValue(t *testing.T) {
124+
f := NewFactory()
125+
config := f.CreateDefaultConfig().(*Config)
126+
configMap := confmap.NewFromStringMap(map[string]interface{}{
127+
"dataset_url": "https://example.com",
128+
"api_key": "secret",
129+
"logs": map[string]any{
130+
"export_resource_info_on_event": true,
131+
},
132+
})
133+
err := config.Unmarshal(configMap)
134+
assert.Nil(t, err)
135+
assert.Equal(t, true, config.LogsSettings.ExportResourceInfo)
136+
}

exporter/datasetexporter/datasetexporter.go

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type DatasetExporter struct {
2424
logger *zap.Logger
2525
session string
2626
spanTracker *spanTracker
27+
exporterCfg *ExporterConfig
2728
}
2829

2930
func newDatasetExporter(entity string, config *Config, logger *zap.Logger) (*DatasetExporter, error) {
@@ -60,6 +61,7 @@ func newDatasetExporter(entity string, config *Config, logger *zap.Logger) (*Dat
6061
session: uuid.New().String(),
6162
logger: logger,
6263
spanTracker: tracker,
64+
exporterCfg: exporterCfg,
6365
}, nil
6466
}
6567

exporter/datasetexporter/factory.go

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func createDefaultConfig() component.Config {
2727
return &Config{
2828
BufferSettings: newDefaultBufferSettings(),
2929
TracesSettings: newDefaultTracesSettings(),
30+
LogsSettings: newDefaultLogsSettings(),
3031
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
3132
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
3233
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),

exporter/datasetexporter/factory_test.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func TestLoadConfig(t *testing.T) {
4949
APIKey: "key-minimal",
5050
BufferSettings: newDefaultBufferSettings(),
5151
TracesSettings: newDefaultTracesSettings(),
52+
LogsSettings: newDefaultLogsSettings(),
5253
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
5354
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
5455
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
@@ -67,6 +68,7 @@ func TestLoadConfig(t *testing.T) {
6768
RetryMaxElapsedTime: bufferRetryMaxElapsedTime,
6869
},
6970
TracesSettings: newDefaultTracesSettings(),
71+
LogsSettings: newDefaultLogsSettings(),
7072
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
7173
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
7274
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
@@ -87,6 +89,9 @@ func TestLoadConfig(t *testing.T) {
8789
TracesSettings: TracesSettings{
8890
MaxWait: 3 * time.Second,
8991
},
92+
LogsSettings: LogsSettings{
93+
ExportResourceInfo: true,
94+
},
9095
RetrySettings: exporterhelper.RetrySettings{
9196
Enabled: true,
9297
InitialInterval: 11 * time.Nanosecond,
@@ -133,7 +138,7 @@ func createExporterTests() []CreateTest {
133138
{
134139
name: "broken",
135140
config: &Config{},
136-
expectedError: fmt.Errorf("cannot get DataSetExpoter: cannot convert config: DatasetURL: ; BufferSettings: {MaxLifetime:0s GroupBy:[] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {Aggregate:false MaxWait:0s}; RetrySettings: {Enabled:false InitialInterval:0s RandomizationFactor:0 Multiplier:0 MaxInterval:0s MaxElapsedTime:0s}; QueueSettings: {Enabled:false NumConsumers:0 QueueSize:0 StorageID:<nil>}; TimeoutSettings: {Timeout:0s}; config is not valid: api_key is required"),
141+
expectedError: fmt.Errorf("cannot get DataSetExpoter: cannot convert config: DatasetURL: ; BufferSettings: {MaxLifetime:0s GroupBy:[] RetryInitialInterval:0s RetryMaxInterval:0s RetryMaxElapsedTime:0s}; TracesSettings: {Aggregate:false MaxWait:0s}; RetrySettings: {Enabled:false InitialInterval:0s RandomizationFactor:0 Multiplier:0 MaxInterval:0s MaxElapsedTime:0s}; QueueSettings: {Enabled:false NumConsumers:0 QueueSize:0 StorageID:<nil>}; TimeoutSettings: {Timeout:0s}; LogsSettings: {ExportResourceInfo:false}; config is not valid: api_key is required"),
137142
},
138143
{
139144
name: "valid",

exporter/datasetexporter/logs_exporter.go

+29-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"go.opentelemetry.io/collector/pdata/plog"
1818
)
1919

20+
var now = time.Now
21+
2022
func createLogsExporter(ctx context.Context, set exporter.CreateSettings, config component.Config) (exporter.Logs, error) {
2123
cfg := castConfig(config)
2224
e, err := newDatasetExporter("logs", cfg, set.Logger)
@@ -63,17 +65,22 @@ func buildBody(attrs map[string]interface{}, value pcommon.Value) string {
6365
return message
6466
}
6567

66-
func buildEventFromLog(log plog.LogRecord, resource pcommon.Resource, scope pcommon.InstrumentationScope) *add_events.EventBundle {
68+
func buildEventFromLog(
69+
log plog.LogRecord,
70+
resource pcommon.Resource,
71+
scope pcommon.InstrumentationScope,
72+
settings LogsSettings,
73+
) *add_events.EventBundle {
6774
attrs := make(map[string]interface{})
6875
event := add_events.Event{}
6976

77+
observedTs := log.ObservedTimestamp().AsTime()
7078
if sevNum := log.SeverityNumber(); sevNum > 0 {
7179
attrs["severity.number"] = sevNum
7280
event.Sev = int(sevNum)
7381
}
7482

7583
if timestamp := log.Timestamp().AsTime(); !timestamp.Equal(time.Unix(0, 0)) {
76-
attrs["timestamp"] = timestamp.String()
7784
event.Ts = strconv.FormatInt(timestamp.UnixNano(), 10)
7885
}
7986

@@ -86,8 +93,8 @@ func buildEventFromLog(log plog.LogRecord, resource pcommon.Resource, scope pcom
8693
if dropped := log.DroppedAttributesCount(); dropped > 0 {
8794
attrs["dropped_attributes_count"] = dropped
8895
}
89-
if observed := log.ObservedTimestamp().AsTime(); !observed.Equal(time.Unix(0, 0)) {
90-
attrs["observed.timestamp"] = observed.String()
96+
if !observedTs.Equal(time.Unix(0, 0)) {
97+
attrs["observed.timestamp"] = observedTs.String()
9198
}
9299
if sevText := log.SeverityText(); sevText != "" {
93100
attrs["severity.text"] = sevText
@@ -100,11 +107,27 @@ func buildEventFromLog(log plog.LogRecord, resource pcommon.Resource, scope pcom
100107
attrs["trace_id"] = trace
101108
}
102109

110+
// Event needs to always have timestamp set otherwise it will get set to unix epoch start time
111+
if event.Ts == "" {
112+
// ObservedTimestamp should always be set, but in case if it's not, we fall back to
113+
// current time
114+
// TODO: We should probably also do a rate limited log message here since this
115+
// could indicate an issue with the current setup in case most events don't contain
116+
// a timestamp.
117+
if !observedTs.Equal(time.Unix(0, 0)) {
118+
event.Ts = strconv.FormatInt(observedTs.UnixNano(), 10)
119+
} else {
120+
event.Ts = strconv.FormatInt(now().UnixNano(), 10)
121+
}
122+
}
123+
103124
updateWithPrefixedValues(attrs, "attributes.", ".", log.Attributes().AsRaw(), 0)
104125
attrs["flags"] = log.Flags()
105126
attrs["flag.is_sampled"] = log.Flags().IsSampled()
106127

107-
updateWithPrefixedValues(attrs, "resource.attributes.", ".", resource.Attributes().AsRaw(), 0)
128+
if settings.ExportResourceInfo {
129+
updateWithPrefixedValues(attrs, "resource.attributes.", ".", resource.Attributes().AsRaw(), 0)
130+
}
108131
attrs["scope.name"] = scope.Name()
109132
updateWithPrefixedValues(attrs, "scope.attributes.", ".", scope.Attributes().AsRaw(), 0)
110133

@@ -130,7 +153,7 @@ func (e *DatasetExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
130153
logRecords := scopeLogs.At(j).LogRecords()
131154
for k := 0; k < logRecords.Len(); k++ {
132155
logRecord := logRecords.At(k)
133-
events = append(events, buildEventFromLog(logRecord, resource, scope))
156+
events = append(events, buildEventFromLog(logRecord, resource, scope, e.exporterCfg.logsSettings))
134157
}
135158
}
136159
}

0 commit comments

Comments
 (0)