Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSHARP-5478: Add support for $rankFusion aggregation stage #1636

Merged
merged 9 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/MongoDB.Driver/AggregateFluent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,28 @@ public override IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefin
return WithPipeline(_pipeline.Project(projection));
}

public override IAggregateFluent<TNewResult> RankFusion<TNewResult>(
Dictionary<string, PipelineDefinition<TResult, TNewResult>> pipelines,
Dictionary<string, double> weights = null,
RankFusionOptions<TNewResult> options = null)
{
return WithPipeline(_pipeline.RankFusion(pipelines, weights, options));
}

public override IAggregateFluent<TNewResult> RankFusion<TNewResult>(
PipelineDefinition<TResult, TNewResult>[] pipelines,
RankFusionOptions<TNewResult> options = null)
{
return WithPipeline(_pipeline.RankFusion(pipelines, options));
}

public override IAggregateFluent<TNewResult> RankFusion<TNewResult>(
(PipelineDefinition<TResult, TNewResult>, double?)[] pipelinesWithWeights,
RankFusionOptions<TNewResult> options = null)
{
return WithPipeline(_pipeline.RankFusion(pipelinesWithWeights, options));
}

public override IAggregateFluent<TNewResult> ReplaceRoot<TNewResult>(AggregateExpressionDefinition<TResult, TNewResult> newRoot)
{
return WithPipeline(_pipeline.ReplaceRoot(newRoot));
Expand Down
23 changes: 23 additions & 0 deletions src/MongoDB.Driver/AggregateFluentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,29 @@ public virtual Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> ou
/// <inheritdoc />
public abstract IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefinition<TResult, TNewResult> projection);

/// <inheritdoc />
public virtual IAggregateFluent<TNewResult> RankFusion<TNewResult>(
Dictionary<string, PipelineDefinition<TResult, TNewResult>> pipelines,
Dictionary<string, double> weights = null,
RankFusionOptions<TNewResult> options = null)
{
throw new NotImplementedException();
}

/// <inheritdoc />
public virtual IAggregateFluent<TNewResult> RankFusion<TNewResult>(PipelineDefinition<TResult, TNewResult>[] pipelines, RankFusionOptions<TNewResult> options = null)
{
throw new NotImplementedException();
}

/// <inheritdoc />
public virtual IAggregateFluent<TNewResult> RankFusion<TNewResult>(
(PipelineDefinition<TResult, TNewResult>, double?)[] pipelinesWithWeights,
RankFusionOptions<TNewResult> options = null)
{
throw new NotImplementedException();
}

/// <inheritdoc />
public virtual IAggregateFluent<TNewResult> ReplaceRoot<TNewResult>(AggregateExpressionDefinition<TResult, TNewResult> newRoot)
{
Expand Down
6 changes: 6 additions & 0 deletions src/MongoDB.Driver/Core/Misc/Feature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class Feature
private static readonly Feature __loookupDocuments= new Feature("LoookupDocuments", WireVersion.Server60);
private static readonly Feature __mmapV1StorageEngine = new Feature("MmapV1StorageEngine", WireVersion.Zero, WireVersion.Server42);
private static readonly Feature __pickAccumulatorsNewIn52 = new Feature("PickAccumulatorsNewIn52", WireVersion.Server52);
private static readonly Feature __rankFusionStage = new Feature("RankFusionStage", WireVersion.Server81);
private static readonly Feature __regexMatch = new Feature("RegexMatch", WireVersion.Server42);
private static readonly Feature __round = new Feature("Round", WireVersion.Server42);
private static readonly Feature __scramSha256Authentication = new Feature("ScramSha256Authentication", WireVersion.Server40);
Expand Down Expand Up @@ -386,6 +387,11 @@ public class Feature
/// </summary>
public static Feature PickAccumulatorsNewIn52 => __pickAccumulatorsNewIn52;

/// <summary>
/// Gets the $rankFusion feature.
/// </summary>
public static Feature RankFusionStage => __rankFusionStage;

/// <summary>
/// Gets the regex match feature.
/// </summary>
Expand Down
35 changes: 35 additions & 0 deletions src/MongoDB.Driver/IAggregateFluent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,41 @@ IAggregateFluent<TNewResult> Lookup<TForeignDocument, TAsElement, TAs, TNewResul
/// </returns>
IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefinition<TResult, TNewResult> projection);

/// <summary>
/// Appends a $rankFusion stage to the pipeline.
/// </summary>
/// <typeparam name="TNewResult">The type of the new result.</typeparam>
/// <param name="pipelines">The map of named pipelines whose results will be combined. The pipelines must operate on the same collection.</param>
/// <param name="weights">The map of pipeline names to non-negative numerical weights determining result importance during combination. Default weight is 1 when unspecified.</param>
/// <param name="options">The rankFusion options.</param>
/// <returns>The fluent aggregate interface.</returns>
IAggregateFluent<TNewResult> RankFusion<TNewResult>(
Dictionary<string, PipelineDefinition<TResult, TNewResult>> pipelines,
Dictionary<string, double> weights = null,
RankFusionOptions<TNewResult> options = null);

/// <summary>
/// Appends a $rankFusion stage to the pipeline. Pipelines will be automatically named as "pipeline1", "pipeline2", etc.
/// </summary>
/// <typeparam name="TNewResult">The type of the new result.</typeparam>
/// <param name="pipelines">The collection of pipelines whose results will be combined. The pipelines must operate on the same collection.</param>
/// <param name="options">The rankFusion options.</param>
/// <returns>The fluent aggregate interface.</returns>
IAggregateFluent<TNewResult> RankFusion<TNewResult>(
PipelineDefinition<TResult, TNewResult>[] pipelines,
RankFusionOptions<TNewResult> options = null);

/// <summary>
/// Appends a $rankFusion stage to the pipeline. Pipelines will be automatically named as "pipeline1", "pipeline2", etc.
/// </summary>
/// <typeparam name="TNewResult">The type of the new result.</typeparam>
/// <param name="pipelinesWithWeights">The collection of tuples containing (pipeline, weight) pairs. The pipelines must operate on the same collection.</param>
/// <param name="options">The rankFusion options.</param>
/// <returns>The fluent aggregate interface.</returns>
IAggregateFluent<TNewResult> RankFusion<TNewResult>(
(PipelineDefinition<TResult, TNewResult>, double?)[] pipelinesWithWeights,
RankFusionOptions<TNewResult> options = null);

/// <summary>
/// Appends a $replaceRoot stage to the pipeline.
/// </summary>
Expand Down
59 changes: 59 additions & 0 deletions src/MongoDB.Driver/PipelineDefinitionBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,65 @@ public static PipelineDefinition<TInput, TOutput> Project<TInput, TIntermediate,
return pipeline.AppendStage(PipelineStageDefinitionBuilder.Project(projection));
}

/// <summary>
/// Appends a $rankFusion stage to the pipeline.
/// </summary>
/// <typeparam name="TInput">The type of the documents.</typeparam>
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
/// <typeparam name="TIntermediate">The type of the intermediate documents.</typeparam>
/// <param name="pipeline">The pipeline.</param>
/// <param name="pipelines">The map of named pipelines whose results will be combined. The pipelines must operate on the same collection.</param>
/// <param name="weights">The map of pipeline names to non-negative numerical weights determining result importance during combination. Default weight is 1 when unspecified.</param>
/// <param name="options">The rankFusion options.</param>
/// <returns>A new pipeline with an additional stage.</returns>
public static PipelineDefinition<TInput, TOutput> RankFusion<TInput, TIntermediate, TOutput>(
this PipelineDefinition<TInput, TIntermediate> pipeline,
Dictionary<string, PipelineDefinition<TIntermediate, TOutput>> pipelines,
Dictionary<string, double> weights = null,
RankFusionOptions<TOutput> options = null)
{
Ensure.IsNotNull(pipeline, nameof(pipeline));
return pipeline.AppendStage(PipelineStageDefinitionBuilder.RankFusion(pipelines, weights, options));
}

/// <summary>
/// Appends a $rankFusion stage to the pipeline. Pipelines will be automatically named as "pipeline1", "pipeline2", etc.
/// </summary>
/// <typeparam name="TInput">The type of the documents.</typeparam>
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
/// <typeparam name="TIntermediate">The type of the intermediate documents.</typeparam>
/// <param name="pipeline">The pipeline.</param>
/// <param name="pipelines">The collection of pipelines whose results will be combined. The pipelines must operate on the same collection.</param>
/// <param name="options">The rankFusion options.</param>
/// <returns>A new pipeline with an additional stage.</returns>
public static PipelineDefinition<TInput, TOutput> RankFusion<TInput, TIntermediate, TOutput>(
this PipelineDefinition<TInput, TIntermediate> pipeline,
PipelineDefinition<TIntermediate, TOutput>[] pipelines,
RankFusionOptions<TOutput> options = null)
{
Ensure.IsNotNull(pipeline, nameof(pipeline));
return pipeline.AppendStage(PipelineStageDefinitionBuilder.RankFusion(pipelines, options));
}

/// <summary>
/// Appends a $rankFusion stage to the pipeline. Pipelines will be automatically named as "pipeline1", "pipeline2", etc.
/// </summary>
/// <typeparam name="TInput">The type of the documents.</typeparam>
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
/// <typeparam name="TIntermediate">The type of the intermediate documents.</typeparam>
/// <param name="pipeline">The pipeline.</param>
/// <param name="pipelinesWithWeights">The collection of tuples containing (pipeline, weight) pairs. The pipelines must operate on the same collection.</param>
/// <param name="options">The rankFusion options.</param>
/// <returns>A new pipeline with an additional stage.</returns>
public static PipelineDefinition<TInput, TOutput> RankFusion<TInput, TIntermediate, TOutput>(
this PipelineDefinition<TInput, TIntermediate> pipeline,
(PipelineDefinition<TIntermediate, TOutput>, double?)[] pipelinesWithWeights,
RankFusionOptions<TOutput> options = null)
{
Ensure.IsNotNull(pipeline, nameof(pipeline));
return pipeline.AppendStage(PipelineStageDefinitionBuilder.RankFusion(pipelinesWithWeights, options));
}

/// <summary>
/// Appends a $replaceRoot stage to the pipeline.
/// </summary>
Expand Down
105 changes: 105 additions & 0 deletions src/MongoDB.Driver/PipelineStageDefinitionBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,111 @@ public static PipelineStageDefinition<TInput, SearchMetaResult> SearchMeta<TInpu
return stage;
}

/// <summary>
/// Creates a $rankFusion stage.
/// </summary>
/// <typeparam name="TInput">The type of the input documents.</typeparam>
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
/// <param name="pipelines">The map of named pipelines whose results will be combined. The pipelines must operate on the same collection.</param>
/// <param name="weights">The map of pipeline names to non-negative numerical weights determining result importance during combination. Default weight is 1 when unspecified.</param>
/// <param name="options">The rankFusion options.</param>
/// <returns>The stage.</returns>
public static PipelineStageDefinition<TInput, TOutput> RankFusion<TInput, TOutput>(
Dictionary<string, PipelineDefinition<TInput, TOutput>> pipelines,
Dictionary<string, double> weights = null,
RankFusionOptions<TOutput> options = null)
{
Ensure.IsNotNull(pipelines, nameof(pipelines));
if (pipelines.Any(pipeline => pipeline.Value == null))
{
throw new ArgumentNullException(nameof(pipelines), "Pipelines cannot contain a null pipeline.");
}

const string operatorName = "$rankFusion";
var stage = new DelegatedPipelineStageDefinition<TInput, TOutput>(
operatorName,
args =>
{
ClientSideProjectionHelper.ThrowIfClientSideProjection(args.DocumentSerializer, operatorName);
var renderedPipelines = new BsonDocument();
foreach (var pipeline in pipelines)
{
renderedPipelines.Add(pipeline.Key, new BsonArray(pipeline.Value.Render(args).Documents));
}

var document = new BsonDocument
{
{ "input", new BsonDocument("pipelines", renderedPipelines) },
{
"combination", () => new BsonDocument
{
{ "weights", new BsonDocument(weights.Select(w => new BsonElement(w.Key, w.Value))) }
},
weights != null
},
{ "scoreDetails", true, options?.ScoreDetails == true }
};

return new RenderedPipelineStageDefinition<TOutput>(
operatorName,
new BsonDocument(operatorName, document),
options?.OutputSerializer ?? args.SerializerRegistry.GetSerializer<TOutput>());
});

return stage;
}

/// <summary>
/// Creates a $rankFusion stage. Pipelines will be automatically named as "pipeline1", "pipeline2", etc.
/// </summary>
/// <typeparam name="TInput">The type of the input documents.</typeparam>
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
/// <param name="pipelines">The collection of pipelines whose results will be combined. The pipelines must operate on the same collection.</param>
/// <param name="options">The rankFusion options.</param>
/// <returns>The stage.</returns>
public static PipelineStageDefinition<TInput, TOutput> RankFusion<TInput, TOutput>(
PipelineDefinition<TInput, TOutput>[] pipelines,
RankFusionOptions<TOutput> options = null)
{
Ensure.IsNotNull(pipelines, nameof(pipelines));

var pipelinesMap = new Dictionary<string, PipelineDefinition<TInput, TOutput>>();
for (var i = 0; i < pipelines.Length; i++)
{
pipelinesMap[$"pipeline{i + 1}"] = pipelines[i];
}
return RankFusion(pipelinesMap, null, options);
}

/// <summary>
/// Creates a $rankFusion stage. Pipelines will be automatically named as "pipeline1", "pipeline2", etc.
/// </summary>
/// <typeparam name="TInput">The type of the input documents.</typeparam>
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
/// <param name="pipelinesWithWeights">The collection of tuples containing (pipeline, weight) pairs. The pipelines must operate on the same collection.</param>
/// <param name="options">The rankFusion options.</param>
/// <returns>The stage.</returns>
public static PipelineStageDefinition<TInput, TOutput> RankFusion<TInput, TOutput>(
(PipelineDefinition<TInput, TOutput> Pipeline, double? Weight)[] pipelinesWithWeights,
RankFusionOptions<TOutput> options = null)
{
Ensure.IsNotNull(pipelinesWithWeights, nameof(pipelinesWithWeights));

var pipelinesMap = new Dictionary<string, PipelineDefinition<TInput, TOutput>>();
var weightsMap = new Dictionary<string, double>();
for (var i = 0; i < pipelinesWithWeights.Length; i++)
{
var pipelineName = $"pipeline{i + 1}";
pipelinesMap[pipelineName] = pipelinesWithWeights[i].Pipeline;

if (pipelinesWithWeights[i].Weight.HasValue)
{
weightsMap[pipelineName] = pipelinesWithWeights[i].Weight.Value;
}
}
return RankFusion(pipelinesMap, weightsMap, options);
}

/// <summary>
/// Creates a $replaceRoot stage.
/// </summary>
Expand Down
Loading