Skip to content

Commit ede9bb5

Browse files
BulkBeingyhl25vigith
authored
feat: Custom Serve Store for integration with any user-defined store (#113)
Signed-off-by: Sreekanth <[email protected]> Signed-off-by: Yashash H L <[email protected]> Co-authored-by: Yashash H L <[email protected]> Co-authored-by: Vigith Maurice <[email protected]>
1 parent c9806a0 commit ede9bb5

15 files changed

+1137
-8
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ default-members = ["numaflow"]
55
resolver = "2"
66

77
[workspace.package]
8-
rust-version = "1.82"
8+
rust-version = "1.84"

examples/simple-source/src/main.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
///! An example for simple User Defined Source. It generates a continuous increasing sequence of offsets and some data for each call to [`numaflow::source::sourcer::read`].
1+
//! An example for simple User Defined Source. It generates a continuous increasing sequence of offsets
2+
//! and some data for each call to [`numaflow::source::sourcer::read`].
23
34
#[tokio::main]
45
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

numaflow/Cargo.toml

+2-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "numaflow"
33
version = "0.2.1"
44
edition = "2021"
5-
rust-version = "1.80"
5+
rust-version = "1.84"
66
description = "Rust SDK for Numaflow"
77
authors = ["Numaflow Developers"]
88
license = "Apache-2.0"
@@ -34,10 +34,9 @@ tokio-stream = { version = "0.1.16", features = ["net"] }
3434
serde = { version = "1.0.210", features = ["derive"] }
3535
chrono = "0.4.38"
3636
serde_json = "1.0.128"
37-
futures-util = "0.3.30"
3837
tracing = "0.1.40"
3938
uuid = { version = "1.10.0", features = ["v4"] }
40-
thiserror = "1.0"
39+
thiserror = "2.0.12"
4140
hyper-util = "0.1.7"
4241

4342
[build-dependencies]

numaflow/build.rs

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ fn main() {
1515
"proto/reduce.proto",
1616
"proto/sink.proto",
1717
"proto/sideinput.proto",
18+
"proto/store.proto",
1819
],
1920
&["proto"],
2021
)

numaflow/proto/sink.proto

+4-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ syntax = "proto3";
33
import "google/protobuf/empty.proto";
44
import "google/protobuf/timestamp.proto";
55

6+
67
package sink.v1;
78

89
service Sink {
@@ -63,6 +64,7 @@ enum Status {
6364
SUCCESS = 0;
6465
FAILURE = 1;
6566
FALLBACK = 2;
67+
SERVE = 3;
6668
}
6769

6870
/**
@@ -76,8 +78,9 @@ message SinkResponse {
7678
Status status = 2;
7779
// err_msg is the error message, set it if success is set to false.
7880
string err_msg = 3;
81+
optional bytes serve_response = 4;
7982
}
8083
repeated Result results = 1;
8184
optional Handshake handshake = 2;
8285
optional TransmissionStatus status = 3;
83-
}
86+
}

numaflow/proto/store.proto

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
Copyright 2022 The Numaproj Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
syntax = "proto3";
18+
19+
import "google/protobuf/empty.proto";
20+
import "google/protobuf/timestamp.proto";
21+
22+
package serving.v1;
23+
24+
// ServingStore defines a set of methods to interface with a user-defined Store.
25+
service ServingStore {
26+
// Put is to put the PutRequest into the Store.
27+
rpc Put(PutRequest) returns (PutResponse);
28+
29+
// Get gets the GetRequest from the Store.
30+
rpc Get(GetRequest) returns (GetResponse);
31+
32+
// IsReady checks the health of the container interfacing the Store.
33+
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
34+
}
35+
36+
// Payload that represent the output that is to be written into to the store.
37+
message Payload {
38+
// Origin is the Vertex that generated this result.
39+
string origin = 1;
40+
// Value is the result of the computation.
41+
bytes value = 2;
42+
}
43+
44+
// PutRequest is the request sent to the Store.
45+
message PutRequest {
46+
// ID is the unique id as provided by the user in the original request. If not provided, it will be a system generated
47+
// uuid.
48+
string id = 1;
49+
// Payloads are one or more results generated (could be more than one due to flat-map).
50+
repeated Payload payloads = 2;
51+
}
52+
53+
// PutResponse is the result of the Put call.
54+
message PutResponse {
55+
bool success = 1;
56+
}
57+
58+
// GetRequest is the call to get the result stored in the Store.
59+
message GetRequest {
60+
// ID is the unique id as provided by the user in the original request. If not provided, it will be a system generated
61+
// uuid.
62+
string id = 1;
63+
}
64+
65+
// GetResponse is the result stored in the Store.
66+
message GetResponse {
67+
string id = 1;
68+
// Payloads are one or more results generated (could be more than one due to flat-map).
69+
repeated Payload payloads = 2;
70+
}
71+
72+
/**
73+
* ReadyResponse is the health check result.
74+
*/
75+
message ReadyResponse {
76+
bool ready = 1;
77+
}

numaflow/src/batchmap.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ where
354354
};
355355

356356
// we are done with this batch because eot=true
357-
if message.status.map_or(false, |status| status.eot) {
357+
if message.status.is_some_and(|status| status.eot) {
358358
debug!("Batch Ended, received an EOT message");
359359
break;
360360
}

numaflow/src/error.rs

+3
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,7 @@ pub enum Error {
3333

3434
#[error("SideInput Error: {0}")]
3535
SideInputError(ErrorKind),
36+
37+
#[error("ServingStore Error: {0}")]
38+
ServingStoreError(ErrorKind),
3639
}

numaflow/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ pub mod batchmap;
4040
/// mapstream is for writing the map in [stream mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/#streaming-mode) handlers.
4141
pub mod mapstream;
4242

43+
pub mod serving_store;
44+
4345
mod servers;
4446

4547
// Error handling on Numaflow SDKs!

numaflow/src/servers.rs

+4
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,7 @@ pub mod source;
2121
#[path = "servers/sourcetransformer.v1.rs"]
2222
#[rustfmt::skip]
2323
pub mod sourcetransformer;
24+
25+
#[path = "servers/serving.v1.rs"]
26+
#[rustfmt::skip]
27+
pub mod serving;

0 commit comments

Comments
 (0)