From 2f64c49d487bd82b91041a32806a8dde2fc04083 Mon Sep 17 00:00:00 2001 From: Larisa Motova Date: Tue, 11 Mar 2025 10:21:36 -1000 Subject: [PATCH 1/9] ToAggregateMetricDouble function --- .../org/elasticsearch/TransportVersions.java | 1 + .../xpack/esql/core/type/DataType.java | 1 - .../AggregateMetricDoubleBlockBuilder.java | 39 +- .../compute/data/BlockUtils.java | 14 +- .../xpack/esql/EsqlTestUtils.java | 12 +- .../xpack/esql/action/EsqlCapabilities.java | 7 +- .../esql/expression/ExpressionWritables.java | 2 + .../function/EsqlFunctionRegistry.java | 2 + .../convert/ToAggregateMetricDouble.java | 371 ++++++++++++++++++ .../convert/ToAggregateMetricDoubleTests.java | 90 +++++ ...bstractPhysicalPlanSerializationTests.java | 2 + .../rest-api-spec/test/esql/46_downsample.yml | 105 +++++ .../rest-api-spec/test/esql/60_usage.yml | 4 +- 13 files changed, 642 insertions(+), 8 deletions(-) create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java create mode 100644 x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 3ace93ece62f0..c9d8391bd0ee8 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -185,6 +185,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_THREAD_NAME_IN_DRIVER_PROFILE = def(9_027_0_00); public static final TransportVersion INFERENCE_CONTEXT = def(9_028_0_00); public static final TransportVersion ML_INFERENCE_DEEPSEEK = def(9_029_00_0); + public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_030_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java index 4fdc32ea41e12..febe76f3da33d 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java @@ -557,7 +557,6 @@ public static boolean isRepresentable(DataType t) { && t != SOURCE && t != HALF_FLOAT && t != PARTIAL_AGG - && t != AGGREGATE_METRIC_DOUBLE && t.isCounter() == false; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java index 4f1c6faf520be..ffb897854904d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java @@ -7,9 +7,17 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.common.io.stream.GenericNamedWriteable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.mapper.BlockLoader; +import java.io.IOException; + public class AggregateMetricDoubleBlockBuilder extends AbstractBlockBuilder implements BlockLoader.AggregateMetricDoubleBuilder { private DoubleBlockBuilder minBuilder; @@ -161,11 +169,40 @@ public String getLabel() { } } - public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) { + public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) implements GenericNamedWriteable { public AggregateMetricDoubleLiteral { min = min.isNaN() ? null : min; max = max.isNaN() ? null : max; sum = sum.isNaN() ? null : sum; } + + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + GenericNamedWriteable.class, + "AggregateMetricDoubleLiteral", + AggregateMetricDoubleLiteral::new + ); + + @Override + public String getWriteableName() { + return "AggregateMetricDoubleLiteral"; + } + + public AggregateMetricDoubleLiteral(StreamInput input) throws IOException { + this(input.readOptionalDouble(), input.readOptionalDouble(), input.readOptionalDouble(), input.readOptionalInt()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalDouble(min); + out.writeOptionalDouble(max); + out.writeOptionalDouble(sum); + out.writeOptionalInt(count); + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL; + } + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java index 8773a3b9785e0..1d6012a8a73de 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java @@ -285,7 +285,19 @@ private static Object valueAtOffset(Block block, int offset) { DocVector v = ((DocBlock) block).asVector(); yield new Doc(v.shards().getInt(offset), v.segments().getInt(offset), v.docs().getInt(offset)); } - case COMPOSITE -> throw new IllegalArgumentException("can't read values from composite blocks"); + case COMPOSITE -> { + CompositeBlock compositeBlock = (CompositeBlock) block; + var minBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()); + var maxBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()); + var sumBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()); + var countBlock = (IntBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()); + yield new AggregateMetricDoubleLiteral( + minBlock.getDouble(offset), + maxBlock.getDouble(offset), + sumBlock.getDouble(offset), + countBlock.getInt(offset) + ); + } case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]"); }; } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 0ddb19f8a8216..19e8f0ef36278 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.BytesRefBlock; @@ -788,6 +789,12 @@ public static Literal randomLiteral(DataType type) { case CARTESIAN_POINT -> CARTESIAN.asWkb(ShapeTestUtils.randomPoint()); case GEO_SHAPE -> GEO.asWkb(GeometryTestUtils.randomGeometry(randomBoolean())); case CARTESIAN_SHAPE -> CARTESIAN.asWkb(ShapeTestUtils.randomGeometry(randomBoolean())); + case AGGREGATE_METRIC_DOUBLE -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral( + randomDouble(), + randomDouble(), + randomDouble(), + randomInt() + ); case NULL -> null; case SOURCE -> { try { @@ -798,8 +805,9 @@ public static Literal randomLiteral(DataType type) { throw new UncheckedIOException(e); } } - case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG, AGGREGATE_METRIC_DOUBLE -> - throw new IllegalArgumentException("can't make random values for [" + type.typeName() + "]"); + case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG -> throw new IllegalArgumentException( + "can't make random values for [" + type.typeName() + "]" + ); }, type); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index c9d42f007ffc9..0954b0114fb93 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -875,7 +875,12 @@ public enum Cap { /** * Full text functions can be scored when being part of a disjunction */ - FULL_TEXT_FUNCTIONS_DISJUNCTIONS_SCORE; + FULL_TEXT_FUNCTIONS_DISJUNCTIONS_SCORE, + + /** + * Support for to_aggregate_metric_double function + */ + AGGREGATE_METRIC_DOUBLE_CONVERT_TO(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG); private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/ExpressionWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/ExpressionWritables.java index dba0ec799f312..b93028b3e5897 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/ExpressionWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/ExpressionWritables.java @@ -14,6 +14,7 @@ import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextWritables; import org.elasticsearch.xpack.esql.expression.function.scalar.ScalarFunctionWritables; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromBase64; +import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToAggregateMetricDouble; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBase64; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBoolean; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToCartesianPoint; @@ -180,6 +181,7 @@ public static List unaryScalars() { entries.add(StY.ENTRY); entries.add(Tan.ENTRY); entries.add(Tanh.ENTRY); + entries.add(ToAggregateMetricDouble.ENTRY); entries.add(ToBase64.ENTRY); entries.add(ToBoolean.ENTRY); entries.add(ToCartesianPoint.ENTRY); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java index d88b3fe1a4ade..1d6c4d5344abb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Greatest; import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Least; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromBase64; +import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToAggregateMetricDouble; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBase64; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBoolean; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToCartesianPoint; @@ -376,6 +377,7 @@ private static FunctionDefinition[][] functions() { // conversion functions new FunctionDefinition[] { def(FromBase64.class, FromBase64::new, "from_base64"), + def(ToAggregateMetricDouble.class, ToAggregateMetricDouble::new, "to_aggregate_metric_double", "to_aggregatemetricdouble"), def(ToBase64.class, ToBase64::new, "to_base64"), def(ToBoolean.class, ToBoolean::new, "to_boolean", "to_bool"), def(ToCartesianPoint.class, ToCartesianPoint::new, "to_cartesianpoint"), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java new file mode 100644 index 0000000000000..14e4ba500a875 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java @@ -0,0 +1,371 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.scalar.convert; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.EvalOperator; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; +import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; +import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE; +import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE; +import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER; +import static org.elasticsearch.xpack.esql.core.type.DataType.LONG; +import static org.elasticsearch.xpack.esql.core.type.DataType.UNSIGNED_LONG; + +public class ToAggregateMetricDouble extends AbstractConvertFunction { + + private static final Map EVALUATORS = Map.ofEntries( + Map.entry(AGGREGATE_METRIC_DOUBLE, (field, source) -> field), + Map.entry(DOUBLE, DoubleFactory::new), + Map.entry(INTEGER, IntFactory::new), + Map.entry(LONG, LongFactory::new), + Map.entry(UNSIGNED_LONG, UnsignedLongFactory::new) + ); + + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Expression.class, + "ToAggregateMetricDouble", + ToAggregateMetricDouble::new + ); + + @FunctionInfo(returnType = "aggregate_metric_double", description = "Encode a numeric to an aggregate_metric_double.") + public ToAggregateMetricDouble( + Source source, + @Param( + name = "number", + type = { "double", "long", "unsigned_long", "integer", "aggregate_metric_double" }, + description = "Input value. The input can be a single-valued column or an expression." + ) Expression field + ) { + super(source, field); + } + + private ToAggregateMetricDouble(StreamInput in) throws IOException { + super(in); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + protected TypeResolution resolveType() { + if (childrenResolved() == false) { + return new TypeResolution("Unresolved children"); + } + return isType( + field, + dt -> dt == DataType.AGGREGATE_METRIC_DOUBLE || dt == DataType.DOUBLE || dt == LONG || dt == INTEGER || dt == UNSIGNED_LONG, + sourceText(), + DEFAULT, + "numeric or aggregate_metric_double" + ); + } + + @Override + public DataType dataType() { + return AGGREGATE_METRIC_DOUBLE; + } + + @Override + public Expression replaceChildren(List newChildren) { + return new ToAggregateMetricDouble(source(), newChildren.get(0)); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, ToAggregateMetricDouble::new, field); + } + + @Override + protected Map factories() { + return EVALUATORS; + } + + public static class DoubleFactory implements EvalOperator.ExpressionEvaluator.Factory { + private final Source source; + + private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; + + public DoubleFactory(EvalOperator.ExpressionEvaluator.Factory fieldEvaluator, Source source) { + this.fieldEvaluator = fieldEvaluator; + this.source = source; + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromDoubleEvaluator[" + "field=" + fieldEvaluator + "]"; + } + + @Override + public EvalOperator.ExpressionEvaluator get(DriverContext context) { + final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context); + + return new EvalOperator.ExpressionEvaluator() { + @Override + public Block eval(Page page) { + Block block = eval.eval(page); + int positionCount = page.getPositionCount(); + try { + DoubleBlock doubleBlock = (DoubleBlock) block; + try ( + AggregateMetricDoubleBlockBuilder result = context.blockFactory() + .newAggregateMetricDoubleBlockBuilder(positionCount) + ) { + for (int p = 0; p < positionCount; p++) { + try { + if (doubleBlock.isNull(p)) { + result.appendNull(); + continue; + } + var val = doubleBlock.getDouble(p); + result.min().appendDouble(val); + result.max().appendDouble(val); + result.sum().appendDouble(val); + result.count().appendInt(1); + } catch (Exception e) { + result.appendNull(); + } + } + return result.build(); + } + } finally { + block.close(); + } + } + + @Override + public void close() { + Releasables.closeExpectNoException(eval); + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromDoubleEvaluator[field=" + eval + "]"; + } + }; + } + } + + public static class LongFactory implements EvalOperator.ExpressionEvaluator.Factory { + private final Source source; + + private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; + + public LongFactory(EvalOperator.ExpressionEvaluator.Factory fieldEvaluator, Source source) { + this.fieldEvaluator = fieldEvaluator; + this.source = source; + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromLongEvaluator[" + "field=" + fieldEvaluator + "]"; + } + + @Override + public EvalOperator.ExpressionEvaluator get(DriverContext context) { + final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context); + + return new EvalOperator.ExpressionEvaluator() { + @Override + public Block eval(Page page) { + Block block = eval.eval(page); + int positionCount = page.getPositionCount(); + try { + LongBlock longBlock = (LongBlock) block; + try ( + AggregateMetricDoubleBlockBuilder result = context.blockFactory() + .newAggregateMetricDoubleBlockBuilder(positionCount) + ) { + for (int p = 0; p < positionCount; p++) { + try { + if (longBlock.isNull(p)) { + result.appendNull(); + continue; + } + var val = longBlock.getLong(p); + result.min().appendDouble(val); + result.max().appendDouble(val); + result.sum().appendDouble(val); + result.count().appendInt(1); + } catch (Exception e) { + result.appendNull(); + } + } + return result.build(); + } + } finally { + block.close(); + } + } + + @Override + public void close() { + Releasables.closeExpectNoException(eval); + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromLongEvaluator[field=" + eval + "]"; + } + }; + } + } + + public static class UnsignedLongFactory implements EvalOperator.ExpressionEvaluator.Factory { + private final Source source; + + private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; + + public UnsignedLongFactory(EvalOperator.ExpressionEvaluator.Factory fieldEvaluator, Source source) { + this.fieldEvaluator = fieldEvaluator; + this.source = source; + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromUnsignedLongEvaluator[" + "field=" + fieldEvaluator + "]"; + } + + @Override + public EvalOperator.ExpressionEvaluator get(DriverContext context) { + final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context); + + return new EvalOperator.ExpressionEvaluator() { + @Override + public Block eval(Page page) { + Block block = eval.eval(page); + int positionCount = page.getPositionCount(); + try { + LongBlock longBlock = (LongBlock) block; + try ( + AggregateMetricDoubleBlockBuilder result = context.blockFactory() + .newAggregateMetricDoubleBlockBuilder(positionCount) + ) { + for (int p = 0; p < positionCount; p++) { + try { + if (longBlock.isNull(p)) { + result.appendNull(); + continue; + } + var val = EsqlDataTypeConverter.unsignedLongToDouble(longBlock.getLong(p)); + result.min().appendDouble(val); + result.max().appendDouble(val); + result.sum().appendDouble(val); + result.count().appendInt(1); + } catch (Exception e) { + result.appendNull(); + } + } + return result.build(); + } + } finally { + block.close(); + } + } + + @Override + public void close() { + Releasables.closeExpectNoException(eval); + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromUnsignedLongEvaluator[field=" + eval + "]"; + } + }; + } + } + + public static class IntFactory implements EvalOperator.ExpressionEvaluator.Factory { + private final Source source; + + private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; + + public IntFactory(EvalOperator.ExpressionEvaluator.Factory fieldEvaluator, Source source) { + this.fieldEvaluator = fieldEvaluator; + this.source = source; + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromIntEvaluator[" + "field=" + fieldEvaluator + "]"; + } + + @Override + public EvalOperator.ExpressionEvaluator get(DriverContext context) { + final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context); + + return new EvalOperator.ExpressionEvaluator() { + @Override + public Block eval(Page page) { + Block block = eval.eval(page); + int positionCount = page.getPositionCount(); + try { + IntBlock intBlock = (IntBlock) block; + try ( + AggregateMetricDoubleBlockBuilder result = context.blockFactory() + .newAggregateMetricDoubleBlockBuilder(positionCount) + ) { + for (int p = 0; p < positionCount; p++) { + try { + if (intBlock.isNull(p)) { + result.appendNull(); + continue; + } + var val = intBlock.getInt(p); + result.min().appendDouble(val); + result.max().appendDouble(val); + result.sum().appendDouble(val); + result.count().appendInt(1); + } catch (Exception e) { + result.appendNull(); + } + } + return result.build(); + } + } finally { + block.close(); + } + } + + @Override + public void close() { + Releasables.closeExpectNoException(eval); + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromIntEvaluator[field=" + eval + "]"; + } + }; + } + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java new file mode 100644 index 0000000000000..39f3a3569c315 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.scalar.convert; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.AbstractScalarFunctionTestCase; +import org.elasticsearch.xpack.esql.expression.function.FunctionName; +import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +import static java.util.Collections.emptyList; + +@FunctionName("to_aggregate_metric_double") +public class ToAggregateMetricDoubleTests extends AbstractScalarFunctionTestCase { + public ToAggregateMetricDoubleTests(@Name("TestCase") Supplier testCaseSupplier) { + this.testCase = testCaseSupplier.get(); + } + + @Override + protected Expression build(Source source, List args) { + if (args.get(0).dataType() == DataType.AGGREGATE_METRIC_DOUBLE) { + assumeTrue("Test sometimes wraps literals as fields", args.get(0).foldable()); + } + return new ToAggregateMetricDouble(source, args.get(0)); + } + + @ParametersFactory + public static Iterable parameters() { + final String read = "Attribute[channel=0]"; + final List suppliers = new ArrayList<>(); + + TestCaseSupplier.forUnaryInt( + suppliers, + "ToAggregateMetricDoubleFromIntEvaluator[field=" + read + "]", + DataType.AGGREGATE_METRIC_DOUBLE, + i -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral((double) i, (double) i, (double) i, 1), + Integer.MIN_VALUE, + Integer.MAX_VALUE, + emptyList() + ); + TestCaseSupplier.forUnaryLong( + suppliers, + "ToAggregateMetricDoubleFromLongEvaluator[field=" + read + "]", + DataType.AGGREGATE_METRIC_DOUBLE, + l -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral((double) l, (double) l, (double) l, 1), + Long.MIN_VALUE, + Long.MAX_VALUE, + emptyList() + ); + TestCaseSupplier.forUnaryUnsignedLong( + suppliers, + "ToAggregateMetricDoubleFromUnsignedLongEvaluator[field=" + read + "]", + DataType.AGGREGATE_METRIC_DOUBLE, + ul -> { + var newVal = ul.doubleValue(); + return new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(newVal, newVal, newVal, 1); + }, + BigInteger.ZERO, + UNSIGNED_LONG_MAX, + emptyList() + ); + TestCaseSupplier.forUnaryDouble( + suppliers, + "ToAggregateMetricDoubleFromDoubleEvaluator[field=" + read + "]", + DataType.AGGREGATE_METRIC_DOUBLE, + d -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(d, d, d, 1), + Double.NEGATIVE_INFINITY, + Double.POSITIVE_INFINITY, + emptyList() + ); + + return parameterSuppliersFromTypedDataWithDefaultChecksNoErrors(true, suppliers); + } + +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java index 11d62a5f8082c..ac831d36f1533 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.search.SearchModule; import org.elasticsearch.xpack.esql.core.tree.Node; import org.elasticsearch.xpack.esql.expression.ExpressionWritables; @@ -51,6 +52,7 @@ protected final NamedWriteableRegistry getNamedWriteableRegistry() { entries.addAll(ExpressionWritables.allExpressions()); entries.addAll(new SearchModule(Settings.EMPTY, List.of()).getNamedWriteables()); // Query builders entries.add(Add.ENTRY); // Used by the eval tests + entries.add(AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.ENTRY); return new NamedWriteableRegistry(entries); } diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml index 5624c532dc77b..ee1a381c6e589 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml @@ -146,3 +146,108 @@ setup: - match: {columns.0.name: "k8s.pod.network.rx"} - match: {columns.0.type: "aggregate_metric_double"} - match: {values.0.0: '{"min":530604.0,"max":530605.0,"sum":1061209.0,"value_count":2}'} + +--- +"Stats from downsampled and non-downsampled index simultaneously": + - requires: + test_runner_features: [capabilities] + capabilities: + - method: POST + path: /_query + parameters: [] + capabilities: [aggregate_metric_double_convert_to] + reason: "Support for to_aggregate_metric_double function" + + - do: + indices.downsample: + index: test + target_index: test-downsample + body: > + { + "fixed_interval": "1h" + } + - is_true: acknowledged + + - do: + indices.create: + index: test-2 + body: + settings: + number_of_shards: 1 + index: + mode: time_series + routing_path: [ metricset, k8s.pod.uid ] + time_series: + start_time: 2021-04-29T00:00:00Z + end_time: 2021-04-30T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + metricset: + type: keyword + time_series_dimension: true + k8s: + properties: + pod: + properties: + uid: + type: keyword + time_series_dimension: true + name: + type: keyword + created_at: + type: date_nanos + running: + type: boolean + number_of_containers: + type: integer + ip: + type: ip + tags: + type: keyword + values: + type: integer + network: + properties: + tx: + type: long + time_series_metric: gauge + rx: + type: long + time_series_metric: gauge + + - do: + bulk: + refresh: true + index: test-2 + body: + - '{"index": {}}' + - '{"@timestamp": "2021-04-29T21:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001810, "rx": 802339}, "created_at": "2021-04-28T19:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-29T21:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.26", "network": {"tx": 2000177, "rx": 800479}, "created_at": "2021-04-28T19:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}' + - '{"index": {}}' + + - do: + esql.query: + body: + query: "FROM test-* | + WHERE k8s.pod.uid == \"947e4ced-1786-4e53-9e0c-5c447e959507\" | + EVAL rx = to_aggregate_metric_double(k8s.pod.network.rx) | + STATS max(rx), min(rx), sum(rx), count(rx) | + LIMIT 100" + + - length: {values: 1} + - length: {values.0: 4} + - match: {columns.0.name: "max(rx)"} + - match: {columns.0.type: "double"} + - match: {columns.1.name: "min(rx)"} + - match: {columns.1.type: "double"} + - match: {columns.2.name: "sum(rx)"} + - match: {columns.2.type: "double"} + - match: {columns.3.name: "count(rx)"} + - match: {columns.3.type: "long"} + - match: {values.0.0: 803685.0} + - match: {values.0.1: 800479.0} + - match: {values.0.2: 4812452.0} + - match: {values.0.3: 6} diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml index 4e2215e0f9300..5c554fdac52f2 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml @@ -93,7 +93,7 @@ setup: - gt: {esql.functions.to_long: $functions_to_long} - match: {esql.functions.coalesce: $functions_coalesce} # Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation. - - length: {esql.functions: 133} # check the "sister" test below for a likely update to the same esql.functions length check + - length: {esql.functions: 134} # check the "sister" test below for a likely update to the same esql.functions length check --- "Basic ESQL usage output (telemetry) non-snapshot version": @@ -164,4 +164,4 @@ setup: - match: {esql.functions.cos: $functions_cos} - gt: {esql.functions.to_long: $functions_to_long} - match: {esql.functions.coalesce: $functions_coalesce} - - length: {esql.functions: 130} # check the "sister" test above for a likely update to the same esql.functions length check + - length: {esql.functions: 131} # check the "sister" test above for a likely update to the same esql.functions length check From efab11790f6d1cc584429731a7bdfc6ee0ee99b0 Mon Sep 17 00:00:00 2001 From: Larisa Motova Date: Thu, 13 Mar 2025 07:27:38 -1000 Subject: [PATCH 2/9] add aggmetricliteral to serialization tests named writable registery --- .../elasticsearch/xpack/esql/SerializationTestUtils.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java index 3e644f3e61b05..8e396e4753f09 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java @@ -10,9 +10,11 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.GenericNamedWriteable; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.ExistsQueryBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; @@ -112,6 +114,13 @@ public static NamedWriteableRegistry writableRegistry() { entries.add(SingleValueQuery.ENTRY); entries.addAll(ExpressionWritables.getNamedWriteables()); entries.addAll(PlanWritables.getNamedWriteables()); + entries.add( + new NamedWriteableRegistry.Entry( + GenericNamedWriteable.class, + AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.ENTRY.name, + AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral::new + ) + ); return new NamedWriteableRegistry(entries); } } From 58104fb112ea20eec58e841dc482ce458566e047 Mon Sep 17 00:00:00 2001 From: Larisa Motova Date: Thu, 13 Mar 2025 07:46:39 -1000 Subject: [PATCH 3/9] flip order of args of factory builders --- .../scalar/convert/ToAggregateMetricDouble.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java index 14e4ba500a875..b91ee6986432d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java @@ -41,7 +41,7 @@ public class ToAggregateMetricDouble extends AbstractConvertFunction { private static final Map EVALUATORS = Map.ofEntries( - Map.entry(AGGREGATE_METRIC_DOUBLE, (field, source) -> field), + Map.entry(AGGREGATE_METRIC_DOUBLE, (source, fieldEval) -> fieldEval), Map.entry(DOUBLE, DoubleFactory::new), Map.entry(INTEGER, IntFactory::new), Map.entry(LONG, LongFactory::new), @@ -114,7 +114,7 @@ public static class DoubleFactory implements EvalOperator.ExpressionEvaluator.Fa private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; - public DoubleFactory(EvalOperator.ExpressionEvaluator.Factory fieldEvaluator, Source source) { + public DoubleFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) { this.fieldEvaluator = fieldEvaluator; this.source = source; } @@ -179,7 +179,7 @@ public static class LongFactory implements EvalOperator.ExpressionEvaluator.Fact private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; - public LongFactory(EvalOperator.ExpressionEvaluator.Factory fieldEvaluator, Source source) { + public LongFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) { this.fieldEvaluator = fieldEvaluator; this.source = source; } @@ -244,7 +244,7 @@ public static class UnsignedLongFactory implements EvalOperator.ExpressionEvalua private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; - public UnsignedLongFactory(EvalOperator.ExpressionEvaluator.Factory fieldEvaluator, Source source) { + public UnsignedLongFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) { this.fieldEvaluator = fieldEvaluator; this.source = source; } @@ -309,7 +309,7 @@ public static class IntFactory implements EvalOperator.ExpressionEvaluator.Facto private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; - public IntFactory(EvalOperator.ExpressionEvaluator.Factory fieldEvaluator, Source source) { + public IntFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) { this.fieldEvaluator = fieldEvaluator; this.source = source; } From 61c3fef3dc1701fcd5ca112e2048adb5426b4fe3 Mon Sep 17 00:00:00 2001 From: Larisa Motova Date: Fri, 14 Mar 2025 16:10:46 -1000 Subject: [PATCH 4/9] agg metric docs --- .../description/to_aggregate_metric_double.md | 6 ++++++ .../layout/to_aggregate_metric_double.md | 20 +++++++++++++++++++ .../parameters/to_aggregate_metric_double.md | 7 +++++++ .../types/to_aggregate_metric_double.md | 8 ++++++++ .../functions/to_aggregate_metric_double.svg | 1 + .../functions/to_aggregate_metric_double.json | 9 +++++++++ .../functions/to_aggregate_metric_double.md | 7 +++++++ 7 files changed, 58 insertions(+) create mode 100644 docs/reference/query-languages/esql/_snippets/functions/description/to_aggregate_metric_double.md create mode 100644 docs/reference/query-languages/esql/_snippets/functions/layout/to_aggregate_metric_double.md create mode 100644 docs/reference/query-languages/esql/_snippets/functions/parameters/to_aggregate_metric_double.md create mode 100644 docs/reference/query-languages/esql/_snippets/functions/types/to_aggregate_metric_double.md create mode 100644 docs/reference/query-languages/esql/images/functions/to_aggregate_metric_double.svg create mode 100644 docs/reference/query-languages/esql/kibana/definition/functions/to_aggregate_metric_double.json create mode 100644 docs/reference/query-languages/esql/kibana/docs/functions/to_aggregate_metric_double.md diff --git a/docs/reference/query-languages/esql/_snippets/functions/description/to_aggregate_metric_double.md b/docs/reference/query-languages/esql/_snippets/functions/description/to_aggregate_metric_double.md new file mode 100644 index 0000000000000..144c427ff07cb --- /dev/null +++ b/docs/reference/query-languages/esql/_snippets/functions/description/to_aggregate_metric_double.md @@ -0,0 +1,6 @@ +% This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it. + +**Description** + +Encode a numeric to an aggregate_metric_double. + diff --git a/docs/reference/query-languages/esql/_snippets/functions/layout/to_aggregate_metric_double.md b/docs/reference/query-languages/esql/_snippets/functions/layout/to_aggregate_metric_double.md new file mode 100644 index 0000000000000..52439c620383c --- /dev/null +++ b/docs/reference/query-languages/esql/_snippets/functions/layout/to_aggregate_metric_double.md @@ -0,0 +1,20 @@ +% This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it. + +## `TO_AGGREGATE_METRIC_DOUBLE` [esql-to_aggregate_metric_double] + +**Syntax** + +:::{image} ../../../images/functions/to_aggregate_metric_double.svg +:alt: Embedded +:class: text-center +::: + + +:::{include} ../parameters/to_aggregate_metric_double.md +::: + +:::{include} ../description/to_aggregate_metric_double.md +::: + +:::{include} ../types/to_aggregate_metric_double.md +::: diff --git a/docs/reference/query-languages/esql/_snippets/functions/parameters/to_aggregate_metric_double.md b/docs/reference/query-languages/esql/_snippets/functions/parameters/to_aggregate_metric_double.md new file mode 100644 index 0000000000000..5204e46661d48 --- /dev/null +++ b/docs/reference/query-languages/esql/_snippets/functions/parameters/to_aggregate_metric_double.md @@ -0,0 +1,7 @@ +% This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it. + +**Parameters** + +`number` +: Input value. The input can be a single-valued column or an expression. + diff --git a/docs/reference/query-languages/esql/_snippets/functions/types/to_aggregate_metric_double.md b/docs/reference/query-languages/esql/_snippets/functions/types/to_aggregate_metric_double.md new file mode 100644 index 0000000000000..cfe51aa1472c1 --- /dev/null +++ b/docs/reference/query-languages/esql/_snippets/functions/types/to_aggregate_metric_double.md @@ -0,0 +1,8 @@ +% This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it. + +**Supported types** + +| number | result | +| --- | --- | +aggregate_metric_double + diff --git a/docs/reference/query-languages/esql/images/functions/to_aggregate_metric_double.svg b/docs/reference/query-languages/esql/images/functions/to_aggregate_metric_double.svg new file mode 100644 index 0000000000000..12550278d6e36 --- /dev/null +++ b/docs/reference/query-languages/esql/images/functions/to_aggregate_metric_double.svg @@ -0,0 +1 @@ +TO_AGGREGATE_METRIC_DOUBLE(number) \ No newline at end of file diff --git a/docs/reference/query-languages/esql/kibana/definition/functions/to_aggregate_metric_double.json b/docs/reference/query-languages/esql/kibana/definition/functions/to_aggregate_metric_double.json new file mode 100644 index 0000000000000..0336ca89e19e9 --- /dev/null +++ b/docs/reference/query-languages/esql/kibana/definition/functions/to_aggregate_metric_double.json @@ -0,0 +1,9 @@ +{ + "comment" : "This is generated by ESQL’s AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.", + "type" : "scalar", + "name" : "to_aggregate_metric_double", + "description" : "Encode a numeric to an aggregate_metric_double.", + "signatures" : [ ], + "preview" : false, + "snapshot_only" : false +} diff --git a/docs/reference/query-languages/esql/kibana/docs/functions/to_aggregate_metric_double.md b/docs/reference/query-languages/esql/kibana/docs/functions/to_aggregate_metric_double.md new file mode 100644 index 0000000000000..0dea481d1a773 --- /dev/null +++ b/docs/reference/query-languages/esql/kibana/docs/functions/to_aggregate_metric_double.md @@ -0,0 +1,7 @@ + + +### TO_AGGREGATE_METRIC_DOUBLE +Encode a numeric to an aggregate_metric_double. + From a65a1d3ce6643b61c5736b77e9af7486ea77ca3e Mon Sep 17 00:00:00 2001 From: Larisa Motova Date: Fri, 14 Mar 2025 18:06:54 -1000 Subject: [PATCH 5/9] multi-values and Cast.cast --- .../convert/ToAggregateMetricDouble.java | 245 ++---------------- .../convert/ToAggregateMetricDoubleTests.java | 18 +- 2 files changed, 39 insertions(+), 224 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java index b91ee6986432d..37bf8e97adc04 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java @@ -12,19 +12,18 @@ import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.DoubleBlock; -import org.elasticsearch.compute.data.IntBlock; -import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.core.Releasables; +import org.elasticsearch.search.aggregations.metrics.CompensatedSum; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; import org.elasticsearch.xpack.esql.expression.function.Param; -import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter; +import org.elasticsearch.xpack.esql.expression.function.scalar.math.Cast; import java.io.IOException; import java.util.List; @@ -43,9 +42,9 @@ public class ToAggregateMetricDouble extends AbstractConvertFunction { private static final Map EVALUATORS = Map.ofEntries( Map.entry(AGGREGATE_METRIC_DOUBLE, (source, fieldEval) -> fieldEval), Map.entry(DOUBLE, DoubleFactory::new), - Map.entry(INTEGER, IntFactory::new), - Map.entry(LONG, LongFactory::new), - Map.entry(UNSIGNED_LONG, UnsignedLongFactory::new) + Map.entry(INTEGER, ((source, fieldEval) -> new DoubleFactory(source, Cast.cast(source, INTEGER, DOUBLE, fieldEval)))), + Map.entry(LONG, ((source, fieldEval) -> new DoubleFactory(source, Cast.cast(source, LONG, DOUBLE, fieldEval)))), + Map.entry(UNSIGNED_LONG, ((source, fieldEval) -> new DoubleFactory(source, Cast.cast(source, UNSIGNED_LONG, DOUBLE, fieldEval)))) ); public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( @@ -121,7 +120,7 @@ public DoubleFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fie @Override public String toString() { - return "ToAggregateMetricDoubleFromDoubleEvaluator[" + "field=" + fieldEvaluator + "]"; + return "ToAggregateMetricDoubleEvaluator[" + "field=" + fieldEvaluator + "]"; } @Override @@ -131,228 +130,38 @@ public EvalOperator.ExpressionEvaluator get(DriverContext context) { return new EvalOperator.ExpressionEvaluator() { @Override public Block eval(Page page) { - Block block = eval.eval(page); - int positionCount = page.getPositionCount(); - try { + try (Block block = eval.eval(page)) { + int positionCount = block.getPositionCount(); DoubleBlock doubleBlock = (DoubleBlock) block; try ( AggregateMetricDoubleBlockBuilder result = context.blockFactory() .newAggregateMetricDoubleBlockBuilder(positionCount) ) { + CompensatedSum sum = new CompensatedSum(); for (int p = 0; p < positionCount; p++) { - try { - if (doubleBlock.isNull(p)) { - result.appendNull(); - continue; - } - var val = doubleBlock.getDouble(p); - result.min().appendDouble(val); - result.max().appendDouble(val); - result.sum().appendDouble(val); - result.count().appendInt(1); - } catch (Exception e) { + int valueCount = doubleBlock.getValueCount(p); + int start = doubleBlock.getFirstValueIndex(p); + int end = start + valueCount; + if (valueCount == 0) { result.appendNull(); + continue; } - } - return result.build(); - } - } finally { - block.close(); - } - } - - @Override - public void close() { - Releasables.closeExpectNoException(eval); - } - - @Override - public String toString() { - return "ToAggregateMetricDoubleFromDoubleEvaluator[field=" + eval + "]"; - } - }; - } - } - - public static class LongFactory implements EvalOperator.ExpressionEvaluator.Factory { - private final Source source; - - private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; - - public LongFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) { - this.fieldEvaluator = fieldEvaluator; - this.source = source; - } - - @Override - public String toString() { - return "ToAggregateMetricDoubleFromLongEvaluator[" + "field=" + fieldEvaluator + "]"; - } - - @Override - public EvalOperator.ExpressionEvaluator get(DriverContext context) { - final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context); - - return new EvalOperator.ExpressionEvaluator() { - @Override - public Block eval(Page page) { - Block block = eval.eval(page); - int positionCount = page.getPositionCount(); - try { - LongBlock longBlock = (LongBlock) block; - try ( - AggregateMetricDoubleBlockBuilder result = context.blockFactory() - .newAggregateMetricDoubleBlockBuilder(positionCount) - ) { - for (int p = 0; p < positionCount; p++) { - try { - if (longBlock.isNull(p)) { - result.appendNull(); - continue; - } - var val = longBlock.getLong(p); - result.min().appendDouble(val); - result.max().appendDouble(val); - result.sum().appendDouble(val); - result.count().appendInt(1); - } catch (Exception e) { - result.appendNull(); - } - } - return result.build(); - } - } finally { - block.close(); - } - } - - @Override - public void close() { - Releasables.closeExpectNoException(eval); - } - - @Override - public String toString() { - return "ToAggregateMetricDoubleFromLongEvaluator[field=" + eval + "]"; - } - }; - } - } - - public static class UnsignedLongFactory implements EvalOperator.ExpressionEvaluator.Factory { - private final Source source; - - private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; - - public UnsignedLongFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) { - this.fieldEvaluator = fieldEvaluator; - this.source = source; - } - - @Override - public String toString() { - return "ToAggregateMetricDoubleFromUnsignedLongEvaluator[" + "field=" + fieldEvaluator + "]"; - } - - @Override - public EvalOperator.ExpressionEvaluator get(DriverContext context) { - final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context); - - return new EvalOperator.ExpressionEvaluator() { - @Override - public Block eval(Page page) { - Block block = eval.eval(page); - int positionCount = page.getPositionCount(); - try { - LongBlock longBlock = (LongBlock) block; - try ( - AggregateMetricDoubleBlockBuilder result = context.blockFactory() - .newAggregateMetricDoubleBlockBuilder(positionCount) - ) { - for (int p = 0; p < positionCount; p++) { - try { - if (longBlock.isNull(p)) { - result.appendNull(); - continue; - } - var val = EsqlDataTypeConverter.unsignedLongToDouble(longBlock.getLong(p)); - result.min().appendDouble(val); - result.max().appendDouble(val); - result.sum().appendDouble(val); - result.count().appendInt(1); - } catch (Exception e) { - result.appendNull(); - } - } - return result.build(); - } - } finally { - block.close(); - } - } - - @Override - public void close() { - Releasables.closeExpectNoException(eval); - } - - @Override - public String toString() { - return "ToAggregateMetricDoubleFromUnsignedLongEvaluator[field=" + eval + "]"; - } - }; - } - } - - public static class IntFactory implements EvalOperator.ExpressionEvaluator.Factory { - private final Source source; - - private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; - - public IntFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) { - this.fieldEvaluator = fieldEvaluator; - this.source = source; - } - - @Override - public String toString() { - return "ToAggregateMetricDoubleFromIntEvaluator[" + "field=" + fieldEvaluator + "]"; - } - - @Override - public EvalOperator.ExpressionEvaluator get(DriverContext context) { - final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context); - - return new EvalOperator.ExpressionEvaluator() { - @Override - public Block eval(Page page) { - Block block = eval.eval(page); - int positionCount = page.getPositionCount(); - try { - IntBlock intBlock = (IntBlock) block; - try ( - AggregateMetricDoubleBlockBuilder result = context.blockFactory() - .newAggregateMetricDoubleBlockBuilder(positionCount) - ) { - for (int p = 0; p < positionCount; p++) { - try { - if (intBlock.isNull(p)) { - result.appendNull(); - continue; - } - var val = intBlock.getInt(p); - result.min().appendDouble(val); - result.max().appendDouble(val); - result.sum().appendDouble(val); - result.count().appendInt(1); - } catch (Exception e) { - result.appendNull(); + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + for (int i = start; i < end; i++) { + double current = doubleBlock.getDouble(i); + min = Math.min(min, current); + max = Math.max(max, current); + sum.add(current); } + result.min().appendDouble(min); + result.max().appendDouble(max); + result.sum().appendDouble(sum.value()); + result.count().appendInt(valueCount); + sum.reset(0, 0); } return result.build(); } - } finally { - block.close(); } } @@ -363,7 +172,7 @@ public void close() { @Override public String toString() { - return "ToAggregateMetricDoubleFromIntEvaluator[field=" + eval + "]"; + return "ToAggregateMetricDoubleEvaluator[field=" + eval + "]"; } }; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java index 39f3a3569c315..89ef5592d2f65 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java @@ -41,12 +41,13 @@ protected Expression build(Source source, List args) { @ParametersFactory public static Iterable parameters() { - final String read = "Attribute[channel=0]"; + final String evaluatorStringLeft = "ToAggregateMetricDoubleEvaluator[field=Cast"; + final String evaluatorStringRight = "ToDoubleEvaluator[v=Attribute[channel=0]]]"; final List suppliers = new ArrayList<>(); TestCaseSupplier.forUnaryInt( suppliers, - "ToAggregateMetricDoubleFromIntEvaluator[field=" + read + "]", + evaluatorStringLeft + "Int" + evaluatorStringRight, DataType.AGGREGATE_METRIC_DOUBLE, i -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral((double) i, (double) i, (double) i, 1), Integer.MIN_VALUE, @@ -55,7 +56,7 @@ public static Iterable parameters() { ); TestCaseSupplier.forUnaryLong( suppliers, - "ToAggregateMetricDoubleFromLongEvaluator[field=" + read + "]", + evaluatorStringLeft + "Long" + evaluatorStringRight, DataType.AGGREGATE_METRIC_DOUBLE, l -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral((double) l, (double) l, (double) l, 1), Long.MIN_VALUE, @@ -64,7 +65,7 @@ public static Iterable parameters() { ); TestCaseSupplier.forUnaryUnsignedLong( suppliers, - "ToAggregateMetricDoubleFromUnsignedLongEvaluator[field=" + read + "]", + evaluatorStringLeft + "UnsignedLong" + evaluatorStringRight, DataType.AGGREGATE_METRIC_DOUBLE, ul -> { var newVal = ul.doubleValue(); @@ -76,9 +77,14 @@ public static Iterable parameters() { ); TestCaseSupplier.forUnaryDouble( suppliers, - "ToAggregateMetricDoubleFromDoubleEvaluator[field=" + read + "]", + "ToAggregateMetricDoubleEvaluator[field=Attribute[channel=0]]", DataType.AGGREGATE_METRIC_DOUBLE, - d -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(d, d, d, 1), + d -> { + if (d == -0.0) { + return new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(d, d, 0.0, 1); + } + return new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(d, d, d, 1); + }, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY, emptyList() From 227de7d640ea375016b3080319dcee65605f37c4 Mon Sep 17 00:00:00 2001 From: Larisa Motova Date: Fri, 14 Mar 2025 18:29:01 -1000 Subject: [PATCH 6/9] except undo cast because it doesn't cooperate with multivalue --- .../convert/ToAggregateMetricDouble.java | 224 +++++++++++++++++- .../rest-api-spec/test/esql/40_tsdb.yml | 50 ++++ 2 files changed, 268 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java index 37bf8e97adc04..423a3140c842a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java @@ -12,6 +12,8 @@ import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.EvalOperator; @@ -23,7 +25,7 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; import org.elasticsearch.xpack.esql.expression.function.Param; -import org.elasticsearch.xpack.esql.expression.function.scalar.math.Cast; +import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter; import java.io.IOException; import java.util.List; @@ -42,9 +44,9 @@ public class ToAggregateMetricDouble extends AbstractConvertFunction { private static final Map EVALUATORS = Map.ofEntries( Map.entry(AGGREGATE_METRIC_DOUBLE, (source, fieldEval) -> fieldEval), Map.entry(DOUBLE, DoubleFactory::new), - Map.entry(INTEGER, ((source, fieldEval) -> new DoubleFactory(source, Cast.cast(source, INTEGER, DOUBLE, fieldEval)))), - Map.entry(LONG, ((source, fieldEval) -> new DoubleFactory(source, Cast.cast(source, LONG, DOUBLE, fieldEval)))), - Map.entry(UNSIGNED_LONG, ((source, fieldEval) -> new DoubleFactory(source, Cast.cast(source, UNSIGNED_LONG, DOUBLE, fieldEval)))) + Map.entry(INTEGER, IntFactory::new), + Map.entry(LONG, LongFactory::new), + Map.entry(UNSIGNED_LONG, UnsignedLongFactory::new) ); public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( @@ -120,7 +122,7 @@ public DoubleFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fie @Override public String toString() { - return "ToAggregateMetricDoubleEvaluator[" + "field=" + fieldEvaluator + "]"; + return "ToAggregateMetricDoubleFromDoubleEvaluator[" + "field=" + fieldEvaluator + "]"; } @Override @@ -172,7 +174,217 @@ public void close() { @Override public String toString() { - return "ToAggregateMetricDoubleEvaluator[field=" + eval + "]"; + return "ToAggregateMetricDoubleFromDoubleEvaluator[field=" + eval + "]"; + } + }; + } + } + + public static class IntFactory implements EvalOperator.ExpressionEvaluator.Factory { + private final Source source; + + private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; + + public IntFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) { + this.fieldEvaluator = fieldEvaluator; + this.source = source; + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromIntEvaluator[" + "field=" + fieldEvaluator + "]"; + } + + @Override + public EvalOperator.ExpressionEvaluator get(DriverContext context) { + final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context); + + return new EvalOperator.ExpressionEvaluator() { + @Override + public Block eval(Page page) { + try (Block block = eval.eval(page)) { + int positionCount = block.getPositionCount(); + IntBlock intBlock = (IntBlock) block; + try ( + AggregateMetricDoubleBlockBuilder result = context.blockFactory() + .newAggregateMetricDoubleBlockBuilder(positionCount) + ) { + CompensatedSum sum = new CompensatedSum(); + for (int p = 0; p < positionCount; p++) { + int valueCount = intBlock.getValueCount(p); + int start = intBlock.getFirstValueIndex(p); + int end = start + valueCount; + if (valueCount == 0) { + result.appendNull(); + continue; + } + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + for (int i = start; i < end; i++) { + double current = intBlock.getInt(i); + min = Math.min(min, current); + max = Math.max(max, current); + sum.add(current); + } + result.min().appendDouble(min); + result.max().appendDouble(max); + result.sum().appendDouble(sum.value()); + result.count().appendInt(valueCount); + sum.reset(0, 0); + } + return result.build(); + } + } + } + + @Override + public void close() { + Releasables.closeExpectNoException(eval); + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromIntEvaluator[field=" + eval + "]"; + } + }; + } + } + + public static class LongFactory implements EvalOperator.ExpressionEvaluator.Factory { + private final Source source; + + private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; + + public LongFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) { + this.fieldEvaluator = fieldEvaluator; + this.source = source; + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromLongEvaluator[" + "field=" + fieldEvaluator + "]"; + } + + @Override + public EvalOperator.ExpressionEvaluator get(DriverContext context) { + final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context); + + return new EvalOperator.ExpressionEvaluator() { + @Override + public Block eval(Page page) { + try (Block block = eval.eval(page)) { + int positionCount = block.getPositionCount(); + LongBlock longBlock = (LongBlock) block; + try ( + AggregateMetricDoubleBlockBuilder result = context.blockFactory() + .newAggregateMetricDoubleBlockBuilder(positionCount) + ) { + CompensatedSum sum = new CompensatedSum(); + for (int p = 0; p < positionCount; p++) { + int valueCount = longBlock.getValueCount(p); + int start = longBlock.getFirstValueIndex(p); + int end = start + valueCount; + if (valueCount == 0) { + result.appendNull(); + continue; + } + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + for (int i = start; i < end; i++) { + double current = longBlock.getLong(i); + min = Math.min(min, current); + max = Math.max(max, current); + sum.add(current); + } + result.min().appendDouble(min); + result.max().appendDouble(max); + result.sum().appendDouble(sum.value()); + result.count().appendInt(valueCount); + sum.reset(0, 0); + } + return result.build(); + } + } + } + + @Override + public void close() { + Releasables.closeExpectNoException(eval); + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromLongEvaluator[field=" + eval + "]"; + } + }; + } + } + + public static class UnsignedLongFactory implements EvalOperator.ExpressionEvaluator.Factory { + private final Source source; + + private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator; + + public UnsignedLongFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) { + this.fieldEvaluator = fieldEvaluator; + this.source = source; + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromUnsignedLongEvaluator[" + "field=" + fieldEvaluator + "]"; + } + + @Override + public EvalOperator.ExpressionEvaluator get(DriverContext context) { + final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context); + + return new EvalOperator.ExpressionEvaluator() { + @Override + public Block eval(Page page) { + try (Block block = eval.eval(page)) { + int positionCount = block.getPositionCount(); + LongBlock longBlock = (LongBlock) block; + try ( + AggregateMetricDoubleBlockBuilder result = context.blockFactory() + .newAggregateMetricDoubleBlockBuilder(positionCount) + ) { + CompensatedSum sum = new CompensatedSum(); + for (int p = 0; p < positionCount; p++) { + int valueCount = longBlock.getValueCount(p); + int start = longBlock.getFirstValueIndex(p); + int end = start + valueCount; + if (valueCount == 0) { + result.appendNull(); + continue; + } + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + for (int i = start; i < end; i++) { + var current = EsqlDataTypeConverter.unsignedLongToDouble(longBlock.getLong(p)); + min = Math.min(min, current); + max = Math.max(max, current); + sum.add(current); + } + result.min().appendDouble(min); + result.max().appendDouble(max); + result.sum().appendDouble(sum.value()); + result.count().appendInt(valueCount); + sum.reset(0, 0); + } + return result.build(); + } + } + } + + @Override + public void close() { + Releasables.closeExpectNoException(eval); + } + + @Override + public String toString() { + return "ToAggregateMetricDoubleFromUnsignedLongEvaluator[field=" + eval + "]"; } }; } diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml index 3c952571c9008..89c9f179b5be6 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml @@ -609,3 +609,53 @@ _source: rx: 530600088 tx: 1434577921 uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 + +--- +to_aggregate_metric_double with multi_values: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: POST + path: /_query + parameters: [ ] + capabilities: [ aggregate_metric_double_convert_to ] + reason: "Support for to_aggregate_metric_double function" + + - do: + indices.create: + index: convert_test + body: + mappings: + properties: + "some_long_field": + type: long + "some_double_field": + type: double + "some_int_field": + type: integer + "some_unsigned_long_field": + type: unsigned_long + - do: + bulk: + refresh: true + index: new_test + body: + - {"index": {}} + - {"some_long_field": [20385, 182941, -10958], "some_double_field": [195.1, 102.444], "some_int_field": [64, 121, 498, 1456], "some_unsigned_long_field": [13985, 19418924, 123]} + - do: + esql.query: + body: + query: 'FROM new_test | EVAL from_long=TO_AGGREGATE_METRIC_DOUBLE(some_long_field), from_double=TO_AGGREGATE_METRIC_DOUBLE(some_double_field), from_int=TO_AGGREGATE_METRIC_DOUBLE(some_int_field), from_ulong=TO_AGGREGATE_METRIC_DOUBLE(some_unsigned_long_field) | KEEP from_long, from_double, from_int, from_ulong | LIMIT 1' + + - match: {columns.0.name: "from_long"} + - match: {columns.0.type: "aggregate_metric_double"} + - match: {columns.1.name: "from_double"} + - match: {columns.1.type: "aggregate_metric_double"} + - match: {columns.2.name: "from_int"} + - match: {columns.2.type: "aggregate_metric_double"} + - match: {columns.3.name: "from_ulong"} + - match: {columns.3.type: "aggregate_metric_double"} + - match: {values.0.0: '{"min":-10958.0,"max":182941.0,"sum":192368.0,"value_count":3}'} + - match: {values.0.1: '{"min":102.44400024414062,"max":195.10000610351562,"sum":297.54400634765625,"value_count":2}'} + - match: {values.0.2: '{"min":64.0,"max":1456.0,"sum":2139.0,"value_count":4}'} + - match: {values.0.3: '{"min":123.0,"max":1.9418924E7,"sum":1.9433032E7,"value_count":3}'} From 2ccfd46ba15766dd97b67b8ed52bc61af190bc17 Mon Sep 17 00:00:00 2001 From: Larisa Motova Date: Fri, 14 Mar 2025 18:30:24 -1000 Subject: [PATCH 7/9] introduce AggregateMetricDoubleVector? --- .../AggregateMetricDoubleBlockBuilder.java | 65 ++++++++++++++ .../compute/data/BlockFactory.java | 4 + .../convert/ToAggregateMetricDouble.java | 84 ++++++++++++------- 3 files changed, 122 insertions(+), 31 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java index ffb897854904d..d94805cd61cd9 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java @@ -205,4 +205,69 @@ public TransportVersion getMinimalSupportedVersion() { } } + + public static class AggregateMetricDoubleVectorBuilder extends AbstractBlockBuilder { + private final DoubleBlockBuilder valuesBuilder; + + public AggregateMetricDoubleVectorBuilder(int estimatedSize, BlockFactory blockFactory) { + super(blockFactory); + valuesBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory); + } + + @Override + protected int valuesLength() { + throw new UnsupportedOperationException("Not available on aggregate_metric_double"); + } + + @Override + protected void growValuesArray(int newSize) { + throw new UnsupportedOperationException("Not available on aggregate_metric_double"); + } + + @Override + protected int elementSize() { + throw new UnsupportedOperationException("Not available on aggregate_metric_double"); + } + + @Override + public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) { + // TODO + return null; + } + + @Override + public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) { + // TODO + return null; + } + + public void appendValue(double value) { + valuesBuilder.appendDouble(value); + } + + @Override + public Block build() { + Block[] blocks = new Block[4]; + Block block = null; + ConstantIntVector countVector = null; + boolean success = false; + try { + finish(); + block = valuesBuilder.build(); + countVector = new ConstantIntVector(1, block.getPositionCount(), blockFactory); + blocks[Metric.MIN.getIndex()] = block; + blocks[Metric.MAX.getIndex()] = block; + blocks[Metric.SUM.getIndex()] = block; + blocks[Metric.COUNT.getIndex()] = countVector.asBlock(); + CompositeBlock compositeBlock = new CompositeBlock(blocks); + success = true; + return compositeBlock; + } finally { + if (success == false) { + Releasables.closeExpectNoException(block, countVector); + } + } + } + + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java index 55053f509591d..07939b8848fa0 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java @@ -436,6 +436,10 @@ public AggregateMetricDoubleBlockBuilder newAggregateMetricDoubleBlockBuilder(in return new AggregateMetricDoubleBlockBuilder(estimatedSize, this); } + public AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleVectorBuilder newAggregateMetricDoubleVectorBuilder(int estimatedSize) { + return new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleVectorBuilder(estimatedSize, this); + } + public final Block newConstantAggregateMetricDoubleBlock( AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral value, int positions diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java index 423a3140c842a..56d7852e3e9b5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java @@ -12,9 +12,11 @@ import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.data.Vector; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.core.Releasables; @@ -130,40 +132,60 @@ public EvalOperator.ExpressionEvaluator get(DriverContext context) { final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context); return new EvalOperator.ExpressionEvaluator() { + private Block evalBlock(Block block) { + int positionCount = block.getPositionCount(); + DoubleBlock doubleBlock = (DoubleBlock) block; + try ( + AggregateMetricDoubleBlockBuilder result = context.blockFactory() + .newAggregateMetricDoubleBlockBuilder(positionCount) + ) { + CompensatedSum sum = new CompensatedSum(); + for (int p = 0; p < positionCount; p++) { + int valueCount = doubleBlock.getValueCount(p); + int start = doubleBlock.getFirstValueIndex(p); + int end = start + valueCount; + if (valueCount == 0) { + result.appendNull(); + continue; + } + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + for (int i = start; i < end; i++) { + double current = doubleBlock.getDouble(i); + min = Math.min(min, current); + max = Math.max(max, current); + sum.add(current); + } + result.min().appendDouble(min); + result.max().appendDouble(max); + result.sum().appendDouble(sum.value()); + result.count().appendInt(valueCount); + sum.reset(0, 0); + } + return result.build(); + } + } + + private Block evalVector(Vector vector) { + int positionCount = vector.getPositionCount(); + DoubleVector doubleVector = (DoubleVector) vector; + try ( + AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleVectorBuilder builder = context.blockFactory() + .newAggregateMetricDoubleVectorBuilder(positionCount) + ) { + for (int p = 0; p < positionCount; p++) { + double value = doubleVector.getDouble(p); + builder.appendValue(value); + } + return builder.build(); + } + } + @Override public Block eval(Page page) { try (Block block = eval.eval(page)) { - int positionCount = block.getPositionCount(); - DoubleBlock doubleBlock = (DoubleBlock) block; - try ( - AggregateMetricDoubleBlockBuilder result = context.blockFactory() - .newAggregateMetricDoubleBlockBuilder(positionCount) - ) { - CompensatedSum sum = new CompensatedSum(); - for (int p = 0; p < positionCount; p++) { - int valueCount = doubleBlock.getValueCount(p); - int start = doubleBlock.getFirstValueIndex(p); - int end = start + valueCount; - if (valueCount == 0) { - result.appendNull(); - continue; - } - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - for (int i = start; i < end; i++) { - double current = doubleBlock.getDouble(i); - min = Math.min(min, current); - max = Math.max(max, current); - sum.add(current); - } - result.min().appendDouble(min); - result.max().appendDouble(max); - result.sum().appendDouble(sum.value()); - result.count().appendInt(valueCount); - sum.reset(0, 0); - } - return result.build(); - } + Vector vector = block.asVector(); + return vector == null ? evalBlock(block) : evalVector(vector); } } From 5c4d9653e980bb2025ce7fe13feeae98ebfe313b Mon Sep 17 00:00:00 2001 From: Larisa Motova Date: Fri, 14 Mar 2025 18:57:00 -1000 Subject: [PATCH 8/9] fix tests --- .../data/AggregateMetricDoubleBlockBuilder.java | 10 ++++++---- .../convert/ToAggregateMetricDoubleTests.java | 13 ++++--------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java index d94805cd61cd9..af76ba6c96b49 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java @@ -249,22 +249,24 @@ public void appendValue(double value) { public Block build() { Block[] blocks = new Block[4]; Block block = null; - ConstantIntVector countVector = null; + IntBlock countVector = null; boolean success = false; try { finish(); block = valuesBuilder.build(); - countVector = new ConstantIntVector(1, block.getPositionCount(), blockFactory); + countVector = blockFactory.newConstantIntBlockWith(1, block.getPositionCount()); blocks[Metric.MIN.getIndex()] = block; blocks[Metric.MAX.getIndex()] = block; + block.incRef(); blocks[Metric.SUM.getIndex()] = block; - blocks[Metric.COUNT.getIndex()] = countVector.asBlock(); + block.incRef(); + blocks[Metric.COUNT.getIndex()] = countVector; CompositeBlock compositeBlock = new CompositeBlock(blocks); success = true; return compositeBlock; } finally { if (success == false) { - Releasables.closeExpectNoException(block, countVector); + Releasables.closeExpectNoException(blocks); } } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java index 89ef5592d2f65..14910572d8c9d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java @@ -41,8 +41,8 @@ protected Expression build(Source source, List args) { @ParametersFactory public static Iterable parameters() { - final String evaluatorStringLeft = "ToAggregateMetricDoubleEvaluator[field=Cast"; - final String evaluatorStringRight = "ToDoubleEvaluator[v=Attribute[channel=0]]]"; + final String evaluatorStringLeft = "ToAggregateMetricDoubleFrom"; + final String evaluatorStringRight = "Evaluator[field=Attribute[channel=0]]"; final List suppliers = new ArrayList<>(); TestCaseSupplier.forUnaryInt( @@ -77,14 +77,9 @@ public static Iterable parameters() { ); TestCaseSupplier.forUnaryDouble( suppliers, - "ToAggregateMetricDoubleEvaluator[field=Attribute[channel=0]]", + evaluatorStringLeft + "Double" + evaluatorStringRight, DataType.AGGREGATE_METRIC_DOUBLE, - d -> { - if (d == -0.0) { - return new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(d, d, 0.0, 1); - } - return new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(d, d, d, 1); - }, + d -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(d, d, d, 1), Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY, emptyList() From 05949907bba1e647700039700a56347054ab71e3 Mon Sep 17 00:00:00 2001 From: Larisa Motova Date: Fri, 14 Mar 2025 23:38:17 -1000 Subject: [PATCH 9/9] fix last test --- .../convert/ToAggregateMetricDouble.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java index 56d7852e3e9b5..acd2324aafe98 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java @@ -139,7 +139,7 @@ private Block evalBlock(Block block) { AggregateMetricDoubleBlockBuilder result = context.blockFactory() .newAggregateMetricDoubleBlockBuilder(positionCount) ) { - CompensatedSum sum = new CompensatedSum(); + CompensatedSum compensatedSum = new CompensatedSum(); for (int p = 0; p < positionCount; p++) { int valueCount = doubleBlock.getValueCount(p); int start = doubleBlock.getFirstValueIndex(p); @@ -148,19 +148,21 @@ private Block evalBlock(Block block) { result.appendNull(); continue; } - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - for (int i = start; i < end; i++) { - double current = doubleBlock.getDouble(i); + // First iteration of the loop is manual to support having -0.0 as an input consistently + double current = doubleBlock.getDouble(start); + double min = current; + double max = current; + compensatedSum.reset(current, 0); + for (int i = start + 1; i < end; i++) { + current = doubleBlock.getDouble(i); min = Math.min(min, current); max = Math.max(max, current); - sum.add(current); + compensatedSum.add(current); } result.min().appendDouble(min); result.max().appendDouble(max); - result.sum().appendDouble(sum.value()); + result.sum().appendDouble(compensatedSum.value()); result.count().appendInt(valueCount); - sum.reset(0, 0); } return result.build(); }