Skip to content

Commit 5a582d9

Browse files
authored
[exporter/splunkhec] apply multimetric metric merge for the whole batch (open-telemetry#23366)
Apply multi-metric merge at the level of the whole batch rather than within events emitted for one metric.
1 parent 16df49f commit 5a582d9

File tree

5 files changed

+352
-57
lines changed

5 files changed

+352
-57
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Use this changelog template to create an entry for release notes.
2+
# If your change doesn't affect end users, such as a test fix or a tooling change,
3+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
4+
5+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
6+
change_type: bug_fix
7+
8+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
9+
component: splunkhecexporter
10+
11+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
12+
note: Apply multi-metric merge at the level of the whole batch rather than within events emitted for one metric.
13+
14+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
15+
issues: [23365]
16+
17+
# (Optional) One or more lines of additional information to render under the primary note.
18+
# These lines will be padded with 2 spaces and then inserted directly into the document.
19+
# Use pipe (|) for multiline entries.
20+
subtext:

exporter/splunkhecexporter/client.go

+86-9
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ func (c *client) pushMetricsData(
9797
}
9898
}
9999

100+
if c.config.UseMultiMetricFormat {
101+
return c.pushMultiMetricsDataInBatches(ctx, md, localHeaders)
102+
}
100103
return c.pushMetricsDataInBatches(ctx, md, localHeaders)
101104
}
102105

@@ -255,15 +258,6 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS
255258
// Parsing metric record to Splunk event.
256259
events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)
257260
tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics))
258-
if c.config.UseMultiMetricFormat {
259-
merged, err := mergeEventsToMultiMetricFormat(events)
260-
if err != nil {
261-
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
262-
"error merging events: %w", err)))
263-
} else {
264-
events = merged
265-
}
266-
}
267261
for _, event := range events {
268262
// JSON encoding event and writing to buffer.
269263
b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream)
@@ -298,6 +292,43 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS
298292
return iterState{done: true}, permanentErrors
299293
}
300294

295+
func (c *client) fillMetricsBufferMultiMetrics(events []*splunk.Event, buf buffer, is iterState) (iterState, []error) {
296+
var permanentErrors []error
297+
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
298+
defer jsonStreamPool.Put(jsonStream)
299+
300+
for i := is.record; i < len(events); i++ {
301+
event := events[i]
302+
// JSON encoding event and writing to buffer.
303+
b, jsonErr := marshalEvent(event, c.config.MaxEventSize, jsonStream)
304+
if jsonErr != nil {
305+
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonErr)))
306+
continue
307+
}
308+
_, err := buf.Write(b)
309+
if errors.Is(err, errOverCapacity) {
310+
if !buf.Empty() {
311+
return iterState{
312+
record: i,
313+
done: false,
314+
}, permanentErrors
315+
}
316+
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
317+
fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+
318+
" content length %d bytes", len(b), c.config.MaxContentLengthMetrics)))
319+
return iterState{
320+
record: i + 1,
321+
done: i+1 != len(events),
322+
}, permanentErrors
323+
} else if err != nil {
324+
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
325+
"error writing the event: %w", err)))
326+
}
327+
}
328+
329+
return iterState{done: true}, permanentErrors
330+
}
331+
301332
func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState) (iterState, []error) {
302333
var permanentErrors []error
303334
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
@@ -345,6 +376,51 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState
345376
return iterState{done: true}, permanentErrors
346377
}
347378

379+
// pushMultiMetricsDataInBatches sends batches of Splunk multi-metric events in JSON format.
380+
// The batch content length is restricted to MaxContentLengthMetrics.
381+
// md metrics are parsed to Splunk events.
382+
func (c *client) pushMultiMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, headers map[string]string) error {
383+
buf := c.bufferPool.get()
384+
defer c.bufferPool.put(buf)
385+
is := iterState{}
386+
387+
var permanentErrors []error
388+
var events []*splunk.Event
389+
for i := 0; i < md.ResourceMetrics().Len(); i++ {
390+
rm := md.ResourceMetrics().At(i)
391+
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
392+
sm := rm.ScopeMetrics().At(j)
393+
for k := 0; k < sm.Metrics().Len(); k++ {
394+
metric := sm.Metrics().At(k)
395+
396+
// Parsing metric record to Splunk event.
397+
events = append(events, mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)...)
398+
}
399+
}
400+
}
401+
402+
merged, err := mergeEventsToMultiMetricFormat(events)
403+
if err != nil {
404+
return consumererror.NewPermanent(fmt.Errorf("error merging events: %w", err))
405+
}
406+
407+
for !is.done {
408+
buf.Reset()
409+
410+
latestIterState, batchPermanentErrors := c.fillMetricsBufferMultiMetrics(merged, buf, is)
411+
permanentErrors = append(permanentErrors, batchPermanentErrors...)
412+
if !buf.Empty() {
413+
if err := c.postEvents(ctx, buf, headers); err != nil {
414+
return consumererror.NewMetrics(err, md)
415+
}
416+
}
417+
418+
is = latestIterState
419+
}
420+
421+
return multierr.Combine(permanentErrors...)
422+
}
423+
348424
// pushMetricsDataInBatches sends batches of Splunk events in JSON format.
349425
// The batch content length is restricted to MaxContentLengthMetrics.
350426
// md metrics are parsed to Splunk events.
@@ -363,6 +439,7 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric
363439
return consumererror.NewMetrics(err, subMetrics(md, is))
364440
}
365441
}
442+
366443
is = latestIterState
367444
}
368445

0 commit comments

Comments
 (0)