Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10338. Implement a Client Datanode API to stream a block #6613

Open
wants to merge 92 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 65 commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
d6d7b2a
implement data stream api
chungen0126 Mar 11, 2024
d8a519d
implement XceiverClientGrpc#sendCommandOnlyRead
chungen0126 Mar 11, 2024
ffebd66
read data to buffers
chungen0126 Mar 12, 2024
a6ed056
create NewBlockInputStream to support Streaming data
chungen0126 Mar 17, 2024
fbd20eb
fix checkstyle
chungen0126 Mar 18, 2024
a663604
fix checkstyle
chungen0126 Mar 18, 2024
0f35371
fix synchronized
chungen0126 Mar 18, 2024
80329dc
fix synchronized
chungen0126 Mar 18, 2024
e398d17
fix synchronized
chungen0126 Mar 18, 2024
a404266
fix synchronized
chungen0126 Mar 18, 2024
c57c2e6
fix synchronized
chungen0126 Mar 18, 2024
4dc9082
fix synchronized
chungen0126 Mar 18, 2024
af4a25b
ignore find bugs in TestNewBlockInputStream
chungen0126 Mar 18, 2024
79ae3eb
clean up
chungen0126 Mar 18, 2024
b1b301e
implement server side stream data
chungen0126 Apr 9, 2024
ad0fd8d
fix bug
chungen0126 Apr 9, 2024
e86aee9
fix bug
chungen0126 Apr 9, 2024
5442761
Merge branch 'master' into HDDS-10338
chungen0126 Apr 10, 2024
8d97d47
fix bug
chungen0126 Apr 12, 2024
b694448
fix bug
chungen0126 Apr 12, 2024
d5dc908
fix bug
chungen0126 Apr 12, 2024
74eac1e
fix bug
chungen0126 Apr 15, 2024
29c8f80
fix bug
chungen0126 Apr 15, 2024
741effb
fix bug
chungen0126 Apr 15, 2024
f1d4d7f
fix checkstyle
chungen0126 Apr 15, 2024
6a67375
fix bug
chungen0126 Apr 16, 2024
b0c64d7
Merge branch 'master' into HDDS-10338
chungen0126 Apr 16, 2024
b4cfd3f
fix checkstyle
chungen0126 Apr 16, 2024
91631ac
fix bug
chungen0126 Apr 16, 2024
6e36ec1
fix bug
chungen0126 Apr 17, 2024
e0b1d2b
fix bug
chungen0126 Apr 17, 2024
0e8576e
fix bug
chungen0126 Apr 17, 2024
f243c46
fix bug
chungen0126 Apr 17, 2024
1d8500f
Merge branch 'master' into HDDS-10338
chungen0126 Apr 17, 2024
80eb936
fix bug
chungen0126 Apr 17, 2024
7afa66b
Merge branch 'master' into HDDS-10338
chungen0126 May 2, 2024
b26b51d
fix protobuf
chungen0126 May 2, 2024
c4e8b2b
rename NewBlockInputStream
chungen0126 May 27, 2024
56d8ca9
revert sendCommandWithRetry
chungen0126 Jul 18, 2024
7807741
Merge branch 'master' into HDDS-10338
chungen0126 Jul 18, 2024
a8ea3ec
Merge branch 'master' into HDDS-10338
chungen0126 Jul 18, 2024
57b874c
make strea block configurable
chungen0126 Jul 18, 2024
7b98108
fix checkstyle
chungen0126 Jul 18, 2024
db82e3b
remove TestStreamBlockInput
chungen0126 Jul 18, 2024
158d2e4
fix checkstyle
chungen0126 Jul 18, 2024
4935668
Merge branch 'master' into HDDS-10338
chungen0126 Aug 14, 2024
d87d2c4
Merge branch 'master' into HDDS-10338
chungen0126 Sep 4, 2024
4b5e14c
Merge branch 'master' into HDDS-10338
chungen0126 Sep 4, 2024
0d0e31c
Merge branch 'master' into HDDS-10338
chungen0126 Sep 5, 2024
d7e9c92
Merge branch 'master' into HDDS-10338
chungen0126 Sep 13, 2024
3cefe36
update for compabatbilities and add some tests
chungen0126 Sep 25, 2024
61b79ae
default streamReadBlock as false
chungen0126 Sep 25, 2024
5fb1478
log BlockInputStream Initializing
chungen0126 Sep 30, 2024
b857fba
remove ReadBlockResponse
chungen0126 Oct 3, 2024
92505bc
remove ReadBlockResponse
chungen0126 Oct 3, 2024
8202468
remove V0
chungen0126 Oct 3, 2024
a1925c4
fix checkstyle
chungen0126 Oct 3, 2024
ef23455
remove V0
chungen0126 Oct 3, 2024
7a7388f
fix checkstyle
chungen0126 Oct 4, 2024
7bbe3d9
adress comments
chungen0126 Oct 5, 2024
71094bd
add testReadBlock in TestKeyValueHandler and rename variable
chungen0126 Oct 8, 2024
9e5f77c
fix checkstyle and fix bug
chungen0126 Oct 8, 2024
281e91e
revert StreamObserver<ContainerCommandResponseProto>.onComplete
chungen0126 Oct 8, 2024
397ef40
create functions to handle exception
chungen0126 Oct 9, 2024
fe5f8ec
address comments
chungen0126 Oct 9, 2024
9566b89
address comments
chungen0126 Oct 16, 2024
200da6d
address comments
chungen0126 Oct 16, 2024
0814a22
address comments
chungen0126 Oct 18, 2024
e040768
address comment
chungen0126 Oct 18, 2024
ece66fc
Merge branch 'master' into HDDS-10338
chungen0126 Nov 8, 2024
398692e
fix DummyStreamBlockInput
chungen0126 Nov 15, 2024
fead0b7
rmove StreamData type
chungen0126 Nov 15, 2024
2fb2851
fix checkstyle
chungen0126 Nov 15, 2024
dec005d
Merge branch 'master' into HDDS-10338
chungen0126 Nov 25, 2024
1968756
Merge branch 'master' into HDDS-10338
chungen0126 Nov 25, 2024
5c70fd4
fix verify checksum
chungen0126 Nov 27, 2024
bc804ff
no need to compute startByteIndex
chungen0126 Nov 28, 2024
0e4e41e
address comments
chungen0126 Dec 3, 2024
f8e5f28
address comments
chungen0126 Dec 3, 2024
228de8d
add read empty block in TestStreamBlockInputStream.java
chungen0126 Dec 3, 2024
c24da9b
address comments
chungen0126 Dec 6, 2024
f457ac2
fix checkstyle
chungen0126 Dec 6, 2024
c1fbad2
address comments
chungen0126 Dec 12, 2024
5e2b3cf
Merge branch 'master' into HDDS-10338
chungen0126 Jan 30, 2025
ba100c8
fix conflict
chungen0126 Jan 30, 2025
1fec95f
Merge remote-tracking branch 'origin/master' into HDDS-10338
adoroszlai Feb 15, 2025
7daa875
Merge branch 'master' into HDDS-10338
chungen0126 Mar 16, 2025
5db133a
fix checks
chungen0126 Mar 16, 2025
2bf716d
fix checks
chungen0126 Mar 17, 2025
d53bdcb
fix checkstyle
chungen0126 Mar 17, 2025
aa51cbb
fix test
chungen0126 Mar 19, 2025
03986e4
Merge branch 'master' into HDDS-10338
jojochuang Mar 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions hadoop-hdds/client/dev-support/findbugsExcludeFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@
<Class name="org.apache.hadoop.hdds.scm.storage.TestBlockInputStream"></Class>
<Bug pattern="RR_NOT_CHECKED" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdds.scm.storage.TestStreamBlockInput"></Class>
<Bug pattern="RR_NOT_CHECKED" />
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private long streamBufferMaxSize = 32 * 1024 * 1024;

@Config(key = "stream.readblock.enable",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Allow ReadBlock to stream all the readChunk in one request.",
tags = ConfigTag.CLIENT)
private boolean streamReadBlock = true;

@Config(key = "max.retries",
defaultValue = "5",
description = "Maximum number of retries by Ozone Client on "
Expand Down Expand Up @@ -166,7 +173,7 @@ public enum ChecksumCombineMode {
description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] "
+ "determines which algorithm would be used to compute checksum for "
+ "chunk data. Default checksum type is CRC32.",
tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE })
tags = {ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE})
private String checksumType = ChecksumType.CRC32.name();

@Config(key = "bytes.per.checksum",
Expand All @@ -175,7 +182,7 @@ public enum ChecksumCombineMode {
description = "Checksum will be computed for every bytes per checksum "
+ "number of bytes and stored sequentially. The minimum value for "
+ "this config is 8KB.",
tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE })
tags = {ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE})
private int bytesPerChecksum = 16 * 1024;

@Config(key = "verify.checksum",
Expand Down Expand Up @@ -558,4 +565,12 @@ public void setMaxConcurrentWritePerKey(int maxConcurrentWritePerKey) {
public int getMaxConcurrentWritePerKey() {
return this.maxConcurrentWritePerKey;
}

public boolean isStreamReadBlock() {
return streamReadBlock;
}

public void setStreamReadBlock(boolean streamReadBlock) {
this.streamReadBlock = streamReadBlock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand Down Expand Up @@ -108,12 +110,12 @@ public class XceiverClientGrpc extends XceiverClientSpi {
* Constructs a client that can communicate with the Container framework on
* data nodes via DatanodeClientProtocol.
*
* @param pipeline - Pipeline that defines the machines.
* @param config -- Ozone Config
* @param pipeline - Pipeline that defines the machines.
* @param config -- Ozone Config
* @param trustManager - a {@link ClientTrustManager} with proper CA handling.
*/
public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
ClientTrustManager trustManager) {
ClientTrustManager trustManager) {
super();
Preconditions.checkNotNull(pipeline);
Preconditions.checkNotNull(config);
Expand Down Expand Up @@ -441,7 +443,11 @@ private XceiverClientReply sendCommandWithRetry(
// sendCommandAsyncCall will create a new channel and async stub
// in case these don't exist for the specific datanode.
reply.addDatanode(dn);
responseProto = sendCommandAsync(request, dn).getResponse().get();
if (request.getCmdType() == ContainerProtos.Type.ReadBlock) {
responseProto = sendCommandAsyncReadOnly(request, dn).getResponse().get();
} else {
responseProto = sendCommandAsync(request, dn).getResponse().get();
}
if (validators != null && !validators.isEmpty()) {
for (Validator validator : validators) {
validator.accept(request, responseProto);
Expand Down Expand Up @@ -485,7 +491,7 @@ private XceiverClientReply sendCommandWithRetry(
String message = "Failed to execute command {}";
if (LOG.isDebugEnabled()) {
LOG.debug(message + " on the pipeline {}.",
processForDebug(request), pipeline);
processForDebug(request), pipeline);
} else {
LOG.warn(message + " on the pipeline {}.",
request.getCmdType(), pipeline);
Expand Down Expand Up @@ -618,6 +624,69 @@ private void decreasePendingMetricsAndReleaseSemaphore() {
return new XceiverClientReply(replyFuture);
}

public XceiverClientReply sendCommandAsyncReadOnly(
ContainerCommandRequestProto request, DatanodeDetails dn)
throws IOException, InterruptedException {

CompletableFuture<ContainerCommandResponseProto> future =
new CompletableFuture<>();
ContainerCommandResponseProto.Builder response =
ContainerCommandResponseProto.newBuilder();
ContainerProtos.ReadBlockResponseProto.Builder readBlock =
ContainerProtos.ReadBlockResponseProto.newBuilder();
checkOpen(dn);
UUID dnID = dn.getUuid();
Type cmdType = request.getCmdType();
semaphore.acquire();
long requestTime = System.currentTimeMillis();
metrics.incrPendingContainerOpsMetrics(cmdType);

final StreamObserver<ContainerCommandRequestProto> requestObserver =
asyncStubs.get(dnID).withDeadlineAfter(timeout, TimeUnit.SECONDS)
.send(new StreamObserver<ContainerCommandResponseProto>() {
@Override
public void onNext(
ContainerCommandResponseProto responseProto) {
if (responseProto.getResult() == Result.SUCCESS) {
readBlock.addReadChunk(responseProto.getReadChunk());
} else {
future.complete(
ContainerCommandResponseProto.newBuilder(responseProto)
.setCmdType(Type.StreamRead).build());
}
}

@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
metrics.decrPendingContainerOpsMetrics(cmdType);
metrics.addContainerOpsLatency(
cmdType, System.currentTimeMillis() - requestTime);

}

@Override
public void onCompleted() {
if (readBlock.getReadChunkCount() > 0) {
future.complete(response.setReadBlock(readBlock)
.setCmdType(Type.StreamRead).setResult(Result.SUCCESS).build());
}
if (!future.isDone()) {
future.completeExceptionally(new IOException(
"Stream completed but no reply for request " +
processForDebug(request)));
}
metrics.decrPendingContainerOpsMetrics(cmdType);
metrics.addContainerOpsLatency(
cmdType, System.currentTimeMillis() - requestTime);
}
});
requestObserver.onNext(request);
requestObserver.onCompleted();
semaphore.release();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot release the semaphore directly here. Release the semaphore after the response is returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to release semaphore at the end of StreamObserver#onError and StreamObserver#onCompleted.

return new XceiverClientReply(future);
}

private synchronized void checkOpen(DatanodeDetails dn)
throws IOException {
if (closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public BlockInputStream(
OzoneClientConfig config) throws IOException {
this.blockInfo = blockInfo;
this.blockID = blockInfo.getBlockID();
LOG.debug("Initializing BlockInputStream for block {}", blockID);
this.length = blockInfo.getLength();
setPipeline(pipeline);
tokenRef.set(token);
Expand Down
Loading