Skip to content

Commit 3387606

Browse files
Make reroute iteration time-bound for large shard allocations (opensearch-project#14848) (opensearch-project#14954)
* Make reroute iteration time-bound for large shard allocations Signed-off-by: Bukhtawar Khan <[email protected]> Co-authored-by: Rishab Nahata <[email protected]>
1 parent 46bd06b commit 3387606

File tree

15 files changed

+645
-16
lines changed

15 files changed

+645
-16
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6060
- Allow @InternalApi annotation on classes not meant to be constructed outside of the OpenSearch core ([#14575](https://github.com/opensearch-project/OpenSearch/pull/14575))
6161
- Add @InternalApi annotation to japicmp exclusions ([#14597](https://github.com/opensearch-project/OpenSearch/pull/14597))
6262
- Allow system index warning in OpenSearchRestTestCase.refreshAllIndices ([#14635](https://github.com/opensearch-project/OpenSearch/pull/14635))
63+
- Make reroute iteration time-bound for large shard allocations ([#14848](https://github.com/opensearch-project/OpenSearch/pull/14848))
6364

6465
### Deprecated
6566
- Deprecate batch_size parameter on bulk API ([#14725](https://github.com/opensearch-project/OpenSearch/pull/14725))

server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java

+127-1
Original file line numberDiff line numberDiff line change
@@ -769,7 +769,7 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
769769
ensureGreen("test");
770770
}
771771

772-
public void testBatchModeEnabled() throws Exception {
772+
public void testBatchModeEnabledWithoutTimeout() throws Exception {
773773
internalCluster().startClusterManagerOnlyNodes(
774774
1,
775775
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
@@ -810,6 +810,132 @@ public void testBatchModeEnabled() throws Exception {
810810
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
811811
}
812812

813+
public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Exception {
814+
internalCluster().startClusterManagerOnlyNodes(
815+
1,
816+
Settings.builder()
817+
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
818+
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "20s")
819+
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "20s")
820+
.build()
821+
);
822+
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
823+
createIndex(
824+
"test",
825+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
826+
);
827+
ensureGreen("test");
828+
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
829+
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));
830+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
831+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
832+
ensureRed("test");
833+
ensureStableCluster(1);
834+
835+
logger.info("--> Now do a protective reroute");
836+
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
837+
assertTrue(clusterRerouteResponse.isAcknowledged());
838+
839+
ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance(
840+
ShardsBatchGatewayAllocator.class,
841+
internalCluster().getClusterManagerName()
842+
);
843+
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
844+
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
845+
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());
846+
847+
// Now start both data nodes and ensure batch mode is working
848+
logger.info("--> restarting the stopped nodes");
849+
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
850+
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
851+
ensureStableCluster(3);
852+
ensureGreen("test");
853+
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
854+
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
855+
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
856+
}
857+
858+
public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws Exception {
859+
860+
internalCluster().startClusterManagerOnlyNodes(
861+
1,
862+
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
863+
);
864+
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
865+
createNIndices(50, "test"); // this will create 50p, 50r shards
866+
ensureStableCluster(3);
867+
IndicesStatsResponse indicesStats = dataNodeClient().admin().indices().prepareStats().get();
868+
assertThat(indicesStats.getSuccessfulShards(), equalTo(100));
869+
ClusterHealthResponse health = client().admin()
870+
.cluster()
871+
.health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("1m"))
872+
.actionGet();
873+
assertFalse(health.isTimedOut());
874+
assertEquals(GREEN, health.getStatus());
875+
876+
String clusterManagerName = internalCluster().getClusterManagerName();
877+
Settings clusterManagerDataPathSettings = internalCluster().dataPathSettings(clusterManagerName);
878+
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
879+
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));
880+
881+
internalCluster().stopCurrentClusterManagerNode();
882+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
883+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
884+
885+
// Now start cluster manager node and post that verify batches created
886+
internalCluster().startClusterManagerOnlyNodes(
887+
1,
888+
Settings.builder()
889+
.put("node.name", clusterManagerName)
890+
.put(clusterManagerDataPathSettings)
891+
.put(ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE.getKey(), 5)
892+
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
893+
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
894+
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
895+
.build()
896+
);
897+
ensureStableCluster(1);
898+
899+
logger.info("--> Now do a protective reroute"); // to avoid any race condition in test
900+
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
901+
assertTrue(clusterRerouteResponse.isAcknowledged());
902+
903+
ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance(
904+
ShardsBatchGatewayAllocator.class,
905+
internalCluster().getClusterManagerName()
906+
);
907+
908+
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
909+
assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches());
910+
assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches());
911+
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
912+
assertFalse(health.isTimedOut());
913+
assertEquals(RED, health.getStatus());
914+
assertEquals(100, health.getUnassignedShards());
915+
assertEquals(0, health.getInitializingShards());
916+
assertEquals(0, health.getActiveShards());
917+
assertEquals(0, health.getRelocatingShards());
918+
assertEquals(0, health.getNumberOfDataNodes());
919+
920+
// Now start both data nodes and ensure batch mode is working
921+
logger.info("--> restarting the stopped nodes");
922+
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
923+
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
924+
ensureStableCluster(3);
925+
926+
// wait for cluster to turn green
927+
health = client().admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("5m")).actionGet();
928+
assertFalse(health.isTimedOut());
929+
assertEquals(GREEN, health.getStatus());
930+
assertEquals(0, health.getUnassignedShards());
931+
assertEquals(0, health.getInitializingShards());
932+
assertEquals(100, health.getActiveShards());
933+
assertEquals(0, health.getRelocatingShards());
934+
assertEquals(2, health.getNumberOfDataNodes());
935+
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
936+
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
937+
}
938+
813939
public void testBatchModeDisabled() throws Exception {
814940
internalCluster().startClusterManagerOnlyNodes(
815941
1,

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.Iterator;
7373
import java.util.List;
7474
import java.util.Map;
75+
import java.util.Optional;
7576
import java.util.Set;
7677
import java.util.function.Function;
7778
import java.util.stream.Collectors;
@@ -617,10 +618,10 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
617618

618619
private void allocateAllUnassignedShards(RoutingAllocation allocation) {
619620
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
620-
allocator.allocateAllUnassignedShards(allocation, true);
621+
Optional.ofNullable(allocator.allocateAllUnassignedShards(allocation, true)).ifPresent(Runnable::run);
621622
allocator.afterPrimariesBeforeReplicas(allocation);
622623
// Replicas Assignment
623-
allocator.allocateAllUnassignedShards(allocation, false);
624+
Optional.ofNullable(allocator.allocateAllUnassignedShards(allocation, false)).ifPresent(Runnable::run);
624625
}
625626

626627
private void disassociateDeadNodes(RoutingAllocation allocation) {

server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.opensearch.gateway.GatewayAllocator;
4242
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
4343

44+
import java.util.ArrayList;
4445
import java.util.List;
4546

4647
/**
@@ -108,14 +109,16 @@ void allocateUnassigned(
108109
*
109110
* Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator}
110111
*/
111-
default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
112+
default Runnable allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) {
112113
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
114+
List<Runnable> runnables = new ArrayList<>();
113115
while (iterator.hasNext()) {
114116
ShardRouting shardRouting = iterator.next();
115117
if (shardRouting.primary() == primary) {
116-
allocateUnassigned(shardRouting, allocation, iterator);
118+
runnables.add(() -> allocateUnassigned(shardRouting, allocation, iterator));
117119
}
118120
}
121+
return () -> runnables.forEach(Runnable::run);
119122
}
120123

121124
/**

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+2
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,8 @@ public void apply(Settings value, Settings current, Settings previous) {
343343
GatewayService.RECOVER_AFTER_NODES_SETTING,
344344
GatewayService.RECOVER_AFTER_TIME_SETTING,
345345
ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE,
346+
ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING,
347+
ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING,
346348
PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
347349
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
348350
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.util;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.Randomness;
14+
import org.opensearch.common.unit.TimeValue;
15+
import org.opensearch.common.util.concurrent.TimeoutAwareRunnable;
16+
17+
import java.util.List;
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.function.Supplier;
20+
21+
/**
22+
* A {@link Runnable} that iteratively executes a batch of {@link TimeoutAwareRunnable}s. If the elapsed time exceeds the timeout defined by {@link TimeValue} timeout, then all subsequent {@link TimeoutAwareRunnable}s will have their {@link TimeoutAwareRunnable#onTimeout} method invoked and will not be run.
23+
*
24+
* @opensearch.internal
25+
*/
26+
public class BatchRunnableExecutor implements Runnable {
27+
28+
private final Supplier<TimeValue> timeoutSupplier;
29+
30+
private final List<TimeoutAwareRunnable> timeoutAwareRunnables;
31+
32+
private static final Logger logger = LogManager.getLogger(BatchRunnableExecutor.class);
33+
34+
public BatchRunnableExecutor(List<TimeoutAwareRunnable> timeoutAwareRunnables, Supplier<TimeValue> timeoutSupplier) {
35+
this.timeoutSupplier = timeoutSupplier;
36+
this.timeoutAwareRunnables = timeoutAwareRunnables;
37+
}
38+
39+
// for tests
40+
public List<TimeoutAwareRunnable> getTimeoutAwareRunnables() {
41+
return this.timeoutAwareRunnables;
42+
}
43+
44+
@Override
45+
public void run() {
46+
logger.debug("Starting execution of runnable of size [{}]", timeoutAwareRunnables.size());
47+
long startTime = System.nanoTime();
48+
if (timeoutAwareRunnables.isEmpty()) {
49+
return;
50+
}
51+
Randomness.shuffle(timeoutAwareRunnables);
52+
for (TimeoutAwareRunnable runnable : timeoutAwareRunnables) {
53+
if (timeoutSupplier.get().nanos() < 0 || System.nanoTime() - startTime < timeoutSupplier.get().nanos()) {
54+
runnable.run();
55+
} else {
56+
logger.debug("Executing timeout for runnable of size [{}]", timeoutAwareRunnables.size());
57+
runnable.onTimeout();
58+
}
59+
}
60+
logger.debug(
61+
"Time taken to execute timed runnables in this cycle:[{}ms]",
62+
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
63+
);
64+
}
65+
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.util.concurrent;
10+
11+
/**
12+
* Runnable that is aware of a timeout
13+
*
14+
* @opensearch.internal
15+
*/
16+
public interface TimeoutAwareRunnable extends Runnable {
17+
18+
void onTimeout();
19+
}

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

+21
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,20 @@
3636
import org.apache.logging.log4j.Logger;
3737
import org.opensearch.cluster.routing.RecoverySource;
3838
import org.opensearch.cluster.routing.RoutingNode;
39+
import org.opensearch.cluster.routing.RoutingNodes;
3940
import org.opensearch.cluster.routing.ShardRouting;
4041
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
4142
import org.opensearch.cluster.routing.allocation.AllocationDecision;
4243
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
4344
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
4445
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
4546
import org.opensearch.cluster.routing.allocation.decider.Decision;
47+
import org.opensearch.core.index.shard.ShardId;
4648

4749
import java.util.ArrayList;
50+
import java.util.HashSet;
4851
import java.util.List;
52+
import java.util.Set;
4953

5054
/**
5155
* An abstract class that implements basic functionality for allocating
@@ -78,6 +82,23 @@ public void allocateUnassigned(
7882
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
7983
}
8084

85+
protected void allocateUnassignedBatchOnTimeout(List<ShardRouting> shardRoutings, RoutingAllocation allocation, boolean primary) {
86+
Set<ShardId> shardIdsFromBatch = new HashSet<>();
87+
for (ShardRouting shardRouting : shardRoutings) {
88+
ShardId shardId = shardRouting.shardId();
89+
shardIdsFromBatch.add(shardId);
90+
}
91+
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
92+
while (iterator.hasNext()) {
93+
ShardRouting unassignedShard = iterator.next();
94+
AllocateUnassignedDecision allocationDecision;
95+
if (unassignedShard.primary() == primary && shardIdsFromBatch.contains(unassignedShard.shardId())) {
96+
allocationDecision = AllocateUnassignedDecision.throttle(null);
97+
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
98+
}
99+
}
100+
}
101+
81102
protected void executeDecision(
82103
ShardRouting shardRouting,
83104
AllocateUnassignedDecision allocateUnassignedDecision,

0 commit comments

Comments
 (0)