35
35
import java .util .Iterator ;
36
36
import java .util .List ;
37
37
import java .util .Map ;
38
+ import java .util .NavigableMap ;
38
39
import java .util .Objects ;
39
40
import java .util .Optional ;
40
41
import java .util .Set ;
42
+ import java .util .SortedMap ;
41
43
import java .util .concurrent .CompletableFuture ;
42
44
import java .util .concurrent .CompletionException ;
43
45
import java .util .concurrent .ConcurrentHashMap ;
46
+ import java .util .concurrent .ConcurrentSkipListMap ;
44
47
import java .util .concurrent .ExecutorService ;
45
48
import java .util .concurrent .Executors ;
49
+ import java .util .concurrent .Future ;
46
50
import java .util .concurrent .Semaphore ;
47
51
import java .util .concurrent .ThreadFactory ;
48
52
import java .util .concurrent .ThreadPoolExecutor ;
53
+ import java .util .concurrent .TimeUnit ;
49
54
import java .util .concurrent .atomic .AtomicBoolean ;
50
55
import java .util .function .Consumer ;
51
56
import org .apache .hadoop .hdds .HddsUtils ;
@@ -187,13 +192,38 @@ long getStartTime() {
187
192
}
188
193
}
189
194
195
+ static class WriteFutures {
196
+ private final Future <ContainerCommandResponseProto > writeChunkFuture ;
197
+ private final CompletableFuture <Message > raftFuture ;
198
+ private final long startTime ;
199
+
200
+ WriteFutures (Future <ContainerCommandResponseProto > writeChunkFuture ,
201
+ CompletableFuture <Message > raftFuture , long startTime ) {
202
+ this .writeChunkFuture = writeChunkFuture ;
203
+ this .raftFuture = raftFuture ;
204
+ this .startTime = startTime ;
205
+ }
206
+
207
+ public Future <ContainerCommandResponseProto > getWriteChunkFuture () {
208
+ return writeChunkFuture ;
209
+ }
210
+
211
+ public CompletableFuture <Message > getRaftFuture () {
212
+ return raftFuture ;
213
+ }
214
+
215
+ long getStartTime () {
216
+ return startTime ;
217
+ }
218
+ }
219
+
190
220
private final SimpleStateMachineStorage storage =
191
221
new SimpleStateMachineStorage ();
192
222
private final ContainerDispatcher dispatcher ;
193
223
private final ContainerController containerController ;
194
224
private final XceiverServerRatis ratisServer ;
195
- private final ConcurrentHashMap <Long ,
196
- CompletableFuture < ContainerCommandResponseProto >> writeChunkFutureMap ;
225
+ private final NavigableMap <Long , WriteFutures > writeChunkFutureMap ;
226
+ private final long writeChunkWaitMaxNs ;
197
227
198
228
// keeps track of the containers created per pipeline
199
229
private final Map <Long , Long > container2BCSIDMap ;
@@ -229,7 +259,7 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
229
259
this .containerController = containerController ;
230
260
this .ratisServer = ratisServer ;
231
261
metrics = CSMMetrics .create (gid );
232
- this .writeChunkFutureMap = new ConcurrentHashMap <>();
262
+ this .writeChunkFutureMap = new ConcurrentSkipListMap <>();
233
263
applyTransactionCompletionMap = new ConcurrentHashMap <>();
234
264
this .unhealthyContainers = ConcurrentHashMap .newKeySet ();
235
265
long pendingRequestsBytesLimit = (long )conf .getStorageSize (
@@ -273,6 +303,8 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
273
303
this .waitOnBothFollowers = conf .getObject (
274
304
DatanodeConfiguration .class ).waitOnAllFollowers ();
275
305
306
+ this .writeChunkWaitMaxNs = conf .getTimeDuration (ScmConfigKeys .HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL ,
307
+ ScmConfigKeys .HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_NS_DEFAULT , TimeUnit .NANOSECONDS );
276
308
}
277
309
278
310
private void validatePeers () throws IOException {
@@ -542,6 +574,16 @@ private ContainerCommandResponseProto dispatchCommand(
542
574
private CompletableFuture <Message > writeStateMachineData (
543
575
ContainerCommandRequestProto requestProto , long entryIndex , long term ,
544
576
long startTime ) {
577
+ final WriteFutures previous = writeChunkFutureMap .get (entryIndex );
578
+ if (previous != null ) {
579
+ // generally state machine will wait forever, for precaution, a check is added if retry happens.
580
+ return previous .getRaftFuture ();
581
+ }
582
+ try {
583
+ validateLongRunningWrite ();
584
+ } catch (StorageContainerException e ) {
585
+ return completeExceptionally (e );
586
+ }
545
587
final WriteChunkRequestProto write = requestProto .getWriteChunk ();
546
588
RaftServer server = ratisServer .getServer ();
547
589
Preconditions .checkArgument (!write .getData ().isEmpty ());
@@ -564,19 +606,22 @@ private CompletableFuture<Message> writeStateMachineData(
564
606
.setContainer2BCSIDMap (container2BCSIDMap )
565
607
.build ();
566
608
CompletableFuture <Message > raftFuture = new CompletableFuture <>();
567
- // ensure the write chunk happens asynchronously in writeChunkExecutor pool
568
- // thread.
569
- CompletableFuture <ContainerCommandResponseProto > writeChunkFuture =
570
- CompletableFuture .supplyAsync (() -> {
609
+ // ensure the write chunk happens asynchronously in writeChunkExecutor pool thread.
610
+ Future <ContainerCommandResponseProto > future = getChunkExecutor (
611
+ requestProto .getWriteChunk ()).submit (() -> {
571
612
try {
572
613
try {
573
614
checkContainerHealthy (write .getBlockID ().getContainerID (), true );
574
615
} catch (StorageContainerException e ) {
575
- return ContainerUtils .logAndReturnError (LOG , e , requestProto );
616
+ ContainerCommandResponseProto result = ContainerUtils .logAndReturnError (LOG , e , requestProto );
617
+ handleCommandResult (requestProto , entryIndex , startTime , result , write , raftFuture );
618
+ return result ;
576
619
}
577
620
metrics .recordWriteStateMachineQueueingLatencyNs (
578
621
Time .monotonicNowNanos () - startTime );
579
- return dispatchCommand (requestProto , context );
622
+ ContainerCommandResponseProto result = dispatchCommand (requestProto , context );
623
+ handleCommandResult (requestProto , entryIndex , startTime , result , write , raftFuture );
624
+ return result ;
580
625
} catch (Exception e ) {
581
626
LOG .error ("{}: writeChunk writeStateMachineData failed: blockId" +
582
627
"{} logIndex {} chunkName {}" , getGroupId (), write .getBlockID (),
@@ -588,55 +633,87 @@ private CompletableFuture<Message> writeStateMachineData(
588
633
stateMachineHealthy .set (false );
589
634
raftFuture .completeExceptionally (e );
590
635
throw e ;
636
+ } finally {
637
+ // Remove the future once it finishes execution from the
638
+ writeChunkFutureMap .remove (entryIndex );
591
639
}
592
- }, getChunkExecutor ( requestProto . getWriteChunk ()) );
640
+ });
593
641
594
- writeChunkFutureMap .put (entryIndex , writeChunkFuture );
642
+ writeChunkFutureMap .put (entryIndex , new WriteFutures ( future , raftFuture , startTime ) );
595
643
if (LOG .isDebugEnabled ()) {
596
644
LOG .debug ("{}: writeChunk writeStateMachineData : blockId" +
597
645
"{} logIndex {} chunkName {}" , getGroupId (), write .getBlockID (),
598
646
entryIndex , write .getChunkData ().getChunkName ());
599
647
}
600
- // Remove the future once it finishes execution from the
601
- // writeChunkFutureMap.
602
- writeChunkFuture .thenApply (r -> {
603
- if (r .getResult () != ContainerProtos .Result .SUCCESS
604
- && r .getResult () != ContainerProtos .Result .CONTAINER_NOT_OPEN
605
- && r .getResult () != ContainerProtos .Result .CLOSED_CONTAINER_IO
606
- // After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and
607
- // that should not crash the pipeline.
608
- && r .getResult () != ContainerProtos .Result .CHUNK_FILE_INCONSISTENCY ) {
609
- StorageContainerException sce =
610
- new StorageContainerException (r .getMessage (), r .getResult ());
611
- LOG .error (getGroupId () + ": writeChunk writeStateMachineData failed: blockId" +
648
+ return raftFuture ;
649
+ }
650
+
651
+ private void handleCommandResult (ContainerCommandRequestProto requestProto , long entryIndex , long startTime ,
652
+ ContainerCommandResponseProto r , WriteChunkRequestProto write ,
653
+ CompletableFuture <Message > raftFuture ) {
654
+ if (r .getResult () != ContainerProtos .Result .SUCCESS
655
+ && r .getResult () != ContainerProtos .Result .CONTAINER_NOT_OPEN
656
+ && r .getResult () != ContainerProtos .Result .CLOSED_CONTAINER_IO
657
+ // After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and
658
+ // that should not crash the pipeline.
659
+ && r .getResult () != ContainerProtos .Result .CHUNK_FILE_INCONSISTENCY ) {
660
+ StorageContainerException sce =
661
+ new StorageContainerException (r .getMessage (), r .getResult ());
662
+ LOG .error (getGroupId () + ": writeChunk writeStateMachineData failed: blockId" +
663
+ write .getBlockID () + " logIndex " + entryIndex + " chunkName " +
664
+ write .getChunkData ().getChunkName () + " Error message: " +
665
+ r .getMessage () + " Container Result: " + r .getResult ());
666
+ metrics .incNumWriteDataFails ();
667
+ // If the write fails currently we mark the stateMachine as unhealthy.
668
+ // This leads to pipeline close. Any change in that behavior requires
669
+ // handling the entry for the write chunk in cache.
670
+ stateMachineHealthy .set (false );
671
+ unhealthyContainers .add (write .getBlockID ().getContainerID ());
672
+ raftFuture .completeExceptionally (sce );
673
+ } else {
674
+ metrics .incNumBytesWrittenCount (
675
+ requestProto .getWriteChunk ().getChunkData ().getLen ());
676
+ if (LOG .isDebugEnabled ()) {
677
+ LOG .debug (getGroupId () +
678
+ ": writeChunk writeStateMachineData completed: blockId" +
612
679
write .getBlockID () + " logIndex " + entryIndex + " chunkName " +
613
- write .getChunkData ().getChunkName () + " Error message: " +
614
- r .getMessage () + " Container Result: " + r .getResult ());
615
- metrics .incNumWriteDataFails ();
616
- // If the write fails currently we mark the stateMachine as unhealthy.
617
- // This leads to pipeline close. Any change in that behavior requires
618
- // handling the entry for the write chunk in cache.
619
- stateMachineHealthy .set (false );
620
- unhealthyContainers .add (write .getBlockID ().getContainerID ());
621
- raftFuture .completeExceptionally (sce );
622
- } else {
623
- metrics .incNumBytesWrittenCount (
624
- requestProto .getWriteChunk ().getChunkData ().getLen ());
625
- if (LOG .isDebugEnabled ()) {
626
- LOG .debug (getGroupId () +
627
- ": writeChunk writeStateMachineData completed: blockId" +
628
- write .getBlockID () + " logIndex " + entryIndex + " chunkName " +
629
- write .getChunkData ().getChunkName ());
630
- }
631
- raftFuture .complete (r ::toByteString );
632
- metrics .recordWriteStateMachineCompletionNs (
633
- Time .monotonicNowNanos () - startTime );
680
+ write .getChunkData ().getChunkName ());
634
681
}
682
+ raftFuture .complete (r ::toByteString );
683
+ metrics .recordWriteStateMachineCompletionNs (
684
+ Time .monotonicNowNanos () - startTime );
685
+ }
686
+ }
635
687
636
- writeChunkFutureMap .remove (entryIndex );
637
- return r ;
638
- });
639
- return raftFuture ;
688
+ private void validateLongRunningWrite () throws StorageContainerException {
689
+ // get min valid write chunk operation's future context
690
+ Map .Entry <Long , WriteFutures > writeFutureContextEntry = null ;
691
+ for (boolean found = false ; !found ;) {
692
+ writeFutureContextEntry = writeChunkFutureMap .firstEntry ();
693
+ if (null == writeFutureContextEntry ) {
694
+ return ;
695
+ }
696
+ if (writeFutureContextEntry .getValue ().getWriteChunkFuture ().isDone ()) {
697
+ // there is a possibility that writeChunkFutureMap may have dangling entry, as remove is done before add future
698
+ writeChunkFutureMap .remove (writeFutureContextEntry .getKey ());
699
+ } else {
700
+ found = true ;
701
+ }
702
+ }
703
+ // validate for timeout in milli second
704
+ long waitTime = Time .monotonicNowNanos () - writeFutureContextEntry .getValue ().getStartTime ();
705
+ if (waitTime > writeChunkWaitMaxNs ) {
706
+ LOG .error ("Write chunk has taken {}ns crossing threshold {}ns for index {} groupId {}, " +
707
+ "cancelling pending write chunk for this group" , waitTime , writeChunkWaitMaxNs ,
708
+ writeFutureContextEntry .getKey (), getGroupId ());
709
+ stateMachineHealthy .set (false );
710
+ writeChunkFutureMap .forEach ((key , value ) -> {
711
+ value .getWriteChunkFuture ().cancel (true );
712
+ });
713
+ throw new StorageContainerException ("Write chunk has taken " + waitTime + "ns crossing threshold "
714
+ + writeChunkWaitMaxNs + "ns for index " + writeFutureContextEntry .getKey () + " groupId " + getGroupId (),
715
+ ContainerProtos .Result .CONTAINER_INTERNAL_ERROR );
716
+ }
640
717
}
641
718
642
719
private StateMachine .DataChannel getStreamDataChannel (
@@ -819,9 +896,13 @@ private ByteString readStateMachineData(
819
896
*/
820
897
@ Override
821
898
public CompletableFuture <Void > flush (long index ) {
822
- return CompletableFuture .allOf (
823
- writeChunkFutureMap .entrySet ().stream ().filter (x -> x .getKey () <= index )
824
- .map (Map .Entry ::getValue ).toArray (CompletableFuture []::new ));
899
+ final SortedMap <Long , WriteFutures > head = writeChunkFutureMap .headMap (index , true );
900
+ if (head .isEmpty ()) {
901
+ return CompletableFuture .completedFuture (null );
902
+ }
903
+ return CompletableFuture .allOf (head .values ().stream ()
904
+ .map (WriteFutures ::getRaftFuture )
905
+ .toArray (CompletableFuture []::new ));
825
906
}
826
907
827
908
/**
0 commit comments