Skip to content

Commit 8d664b3

Browse files
Cache index shard limit to optimise ShardLimitsAllocationDecider (opensearch-project#14962) (opensearch-project#15000)
* Cache index shard limit per node (cherry picked from commit 122f3f0) Signed-off-by: Rishab Nahata <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent d4bdc7a commit 8d664b3

File tree

3 files changed

+150
-5
lines changed

3 files changed

+150
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.benchmark.routing.allocation;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.cluster.ClusterName;
13+
import org.opensearch.cluster.ClusterState;
14+
import org.opensearch.cluster.metadata.IndexMetadata;
15+
import org.opensearch.cluster.metadata.Metadata;
16+
import org.opensearch.cluster.node.DiscoveryNodes;
17+
import org.opensearch.cluster.routing.RoutingTable;
18+
import org.opensearch.cluster.routing.ShardRouting;
19+
import org.opensearch.cluster.routing.allocation.AllocationService;
20+
import org.opensearch.common.logging.LogConfigurator;
21+
import org.opensearch.common.settings.Settings;
22+
import org.openjdk.jmh.annotations.Benchmark;
23+
import org.openjdk.jmh.annotations.BenchmarkMode;
24+
import org.openjdk.jmh.annotations.Fork;
25+
import org.openjdk.jmh.annotations.Measurement;
26+
import org.openjdk.jmh.annotations.Mode;
27+
import org.openjdk.jmh.annotations.OutputTimeUnit;
28+
import org.openjdk.jmh.annotations.Param;
29+
import org.openjdk.jmh.annotations.Scope;
30+
import org.openjdk.jmh.annotations.Setup;
31+
import org.openjdk.jmh.annotations.State;
32+
import org.openjdk.jmh.annotations.Warmup;
33+
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.TimeUnit;
38+
39+
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
40+
41+
@Fork(1)
42+
@Warmup(iterations = 3)
43+
@Measurement(iterations = 3)
44+
@BenchmarkMode(Mode.AverageTime)
45+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
46+
@State(Scope.Benchmark)
47+
@SuppressWarnings("unused") // invoked by benchmarking framework
48+
public class RerouteBenchmark {
49+
@Param({
50+
// indices| nodes
51+
" 10000| 500|", })
52+
public String indicesNodes = "1|1";
53+
public int numIndices;
54+
public int numNodes;
55+
public int numShards = 10;
56+
public int numReplicas = 1;
57+
58+
private AllocationService allocationService;
59+
private ClusterState initialClusterState;
60+
61+
@Setup
62+
public void setUp() throws Exception {
63+
LogConfigurator.setNodeName("test");
64+
final String[] params = indicesNodes.split("\\|");
65+
numIndices = toInt(params[0]);
66+
numNodes = toInt(params[1]);
67+
68+
int totalShardCount = (numReplicas + 1) * numShards * numIndices;
69+
Metadata.Builder mb = Metadata.builder();
70+
for (int i = 1; i <= numIndices; i++) {
71+
mb.put(
72+
IndexMetadata.builder("test_" + i)
73+
.settings(Settings.builder().put("index.version.created", Version.CURRENT))
74+
.numberOfShards(numShards)
75+
.numberOfReplicas(numReplicas)
76+
);
77+
}
78+
79+
Metadata metadata = mb.build();
80+
RoutingTable.Builder rb = RoutingTable.builder();
81+
for (int i = 1; i <= numIndices; i++) {
82+
rb.addAsNew(metadata.index("test_" + i));
83+
}
84+
RoutingTable routingTable = rb.build();
85+
initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
86+
.metadata(metadata)
87+
.routingTable(routingTable)
88+
.nodes(setUpClusterNodes(numNodes))
89+
.build();
90+
}
91+
92+
@Benchmark
93+
public ClusterState measureShardAllocationEmptyCluster() throws Exception {
94+
ClusterState clusterState = initialClusterState;
95+
allocationService = Allocators.createAllocationService(
96+
Settings.builder()
97+
.put("cluster.routing.allocation.awareness.attributes", "zone")
98+
.put("cluster.routing.allocation.load_awareness.provisioned_capacity", numNodes)
99+
.put("cluster.routing.allocation.load_awareness.skew_factor", "50")
100+
.put("cluster.routing.allocation.node_concurrent_recoveries", "2")
101+
.build()
102+
);
103+
clusterState = allocationService.reroute(clusterState, "reroute");
104+
while (clusterState.getRoutingNodes().hasUnassignedShards()) {
105+
clusterState = startInitializingShardsAndReroute(allocationService, clusterState);
106+
}
107+
return clusterState;
108+
}
109+
110+
private int toInt(String v) {
111+
return Integer.valueOf(v.trim());
112+
}
113+
114+
private DiscoveryNodes.Builder setUpClusterNodes(int nodes) {
115+
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
116+
for (int i = 1; i <= nodes; i++) {
117+
Map<String, String> attributes = new HashMap<>();
118+
attributes.put("zone", "zone_" + (i % 3));
119+
nb.add(Allocators.newNode("node_0_" + i, attributes));
120+
}
121+
return nb;
122+
}
123+
124+
private static ClusterState startInitializingShardsAndReroute(AllocationService allocationService, ClusterState clusterState) {
125+
return startShardsAndReroute(allocationService, clusterState, clusterState.routingTable().shardsWithState(INITIALIZING));
126+
}
127+
128+
private static ClusterState startShardsAndReroute(
129+
AllocationService allocationService,
130+
ClusterState clusterState,
131+
List<ShardRouting> initializingShards
132+
) {
133+
return allocationService.reroute(allocationService.applyStartedShards(clusterState, initializingShards), "reroute after starting");
134+
}
135+
}

Diff for: server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.cluster.block.ClusterBlockLevel;
4444
import org.opensearch.cluster.node.DiscoveryNodeFilters;
4545
import org.opensearch.cluster.routing.allocation.IndexMetadataUpdater;
46+
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
4647
import org.opensearch.common.Nullable;
4748
import org.opensearch.common.annotation.PublicApi;
4849
import org.opensearch.common.collect.MapBuilder;
@@ -688,6 +689,8 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
688689
private final boolean isSystem;
689690
private final boolean isRemoteSnapshot;
690691

692+
private final int indexTotalShardsPerNodeLimit;
693+
691694
private IndexMetadata(
692695
final Index index,
693696
final long version,
@@ -713,7 +716,8 @@ private IndexMetadata(
713716
final int routingPartitionSize,
714717
final ActiveShardCount waitForActiveShards,
715718
final Map<String, RolloverInfo> rolloverInfos,
716-
final boolean isSystem
719+
final boolean isSystem,
720+
final int indexTotalShardsPerNodeLimit
717721
) {
718722

719723
this.index = index;
@@ -748,6 +752,7 @@ private IndexMetadata(
748752
this.rolloverInfos = Collections.unmodifiableMap(rolloverInfos);
749753
this.isSystem = isSystem;
750754
this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);
755+
this.indexTotalShardsPerNodeLimit = indexTotalShardsPerNodeLimit;
751756
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
752757
}
753758

@@ -901,6 +906,10 @@ public Set<String> inSyncAllocationIds(int shardId) {
901906
return inSyncAllocationIds.get(shardId);
902907
}
903908

909+
public int getIndexTotalShardsPerNodeLimit() {
910+
return this.indexTotalShardsPerNodeLimit;
911+
}
912+
904913
@Nullable
905914
public DiscoveryNodeFilters requireFilters() {
906915
return requireFilters;
@@ -1607,6 +1616,8 @@ public IndexMetadata build() {
16071616
);
16081617
}
16091618

1619+
final int indexTotalShardsPerNodeLimit = ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(settings);
1620+
16101621
final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
16111622

16121623
return new IndexMetadata(
@@ -1634,7 +1645,8 @@ public IndexMetadata build() {
16341645
routingPartitionSize,
16351646
waitForActiveShards,
16361647
rolloverInfos,
1637-
isSystem
1648+
isSystem,
1649+
indexTotalShardsPerNodeLimit
16381650
);
16391651
}
16401652

Diff for: server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232

3333
package org.opensearch.cluster.routing.allocation.decider;
3434

35-
import org.opensearch.cluster.metadata.IndexMetadata;
3635
import org.opensearch.cluster.routing.RoutingNode;
3736
import org.opensearch.cluster.routing.ShardRouting;
3837
import org.opensearch.cluster.routing.ShardRoutingState;
@@ -125,8 +124,7 @@ private Decision doDecide(
125124
RoutingAllocation allocation,
126125
BiPredicate<Integer, Integer> decider
127126
) {
128-
IndexMetadata indexMd = allocation.metadata().getIndexSafe(shardRouting.index());
129-
final int indexShardLimit = INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(indexMd.getSettings(), settings);
127+
final int indexShardLimit = allocation.metadata().getIndexSafe(shardRouting.index()).getIndexTotalShardsPerNodeLimit();
130128
// Capture the limit here in case it changes during this method's
131129
// execution
132130
final int clusterShardLimit = this.clusterShardLimit;

0 commit comments

Comments
 (0)