Skip to content

Commit bb68941

Browse files
committed
Add buffered control
1 parent 01d75aa commit bb68941

File tree

6 files changed

+189
-109
lines changed

6 files changed

+189
-109
lines changed

crates/mdbx-remote/src/any.rs

+34-22
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use async_stream::try_stream;
99
use tokio_stream::Stream;
1010

1111
use crate::{
12-
remote::{ClientError, RemoteCursor, RemoteDatabase, RemoteEnvironment, RemoteTransaction},
12+
remote::{
13+
BufferConfiguration, ClientError, RemoteCursor, RemoteDatabase, RemoteEnvironment,
14+
RemoteTransaction,
15+
},
1316
service::RemoteMDBXClient,
1417
CommitLatency, Cursor, Database, DatabaseFlags, Environment, EnvironmentBuilder,
1518
EnvironmentFlags, EnvironmentKind, Info, Mode, Stat, TableObject, Transaction, TransactionKind,
@@ -670,9 +673,9 @@ impl<K: TransactionKind> CursorAny<K> {
670673
}
671674
}
672675

673-
pub fn into_iter_cnt<'a, Key, Value>(
676+
pub fn into_iter_buffered<'a, Key, Value>(
674677
self,
675-
cnt: u64,
678+
buffer_config: BufferConfiguration,
676679
) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
677680
where
678681
Key: TableObject + Send + 'a,
@@ -685,7 +688,7 @@ impl<K: TransactionKind> CursorAny<K> {
685688
yield (k, v);
686689
}
687690
}),
688-
Self::Remote(cur) => cur.into_iter_cnt(cnt),
691+
Self::Remote(cur) => cur.into_iter_buffered(buffer_config),
689692
}
690693
}
691694

@@ -702,9 +705,9 @@ impl<K: TransactionKind> CursorAny<K> {
702705
}
703706
}
704707

705-
pub fn into_iter_start_cnt<'a, Key, Value>(
708+
pub fn into_iter_start_buffered<'a, Key, Value>(
706709
self,
707-
cnt: u64,
710+
buffer_config: BufferConfiguration,
708711
) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
709712
where
710713
Key: TableObject + Send + 'a,
@@ -717,7 +720,7 @@ impl<K: TransactionKind> CursorAny<K> {
717720
yield (k, v);
718721
}
719722
}),
720-
Self::Remote(cur) => cur.into_iter_start_cnt(cnt),
723+
Self::Remote(cur) => cur.into_iter_start_buffered(buffer_config),
721724
}
722725
}
723726

@@ -735,10 +738,10 @@ impl<K: TransactionKind> CursorAny<K> {
735738
})
736739
}
737740

738-
pub async fn into_iter_from_cnt<'a, Key, Value>(
741+
pub async fn into_iter_from_buffered<'a, Key, Value>(
739742
self,
740743
key: &'a [u8],
741-
cnt: u64,
744+
buffer_config: BufferConfiguration,
742745
) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
743746
where
744747
Key: TableObject + Send + 'a,
@@ -751,7 +754,10 @@ impl<K: TransactionKind> CursorAny<K> {
751754
yield (k, v);
752755
}
753756
}),
754-
Self::Remote(cur) => cur.into_iter_from_cnt(key.to_vec(), cnt).await?,
757+
Self::Remote(cur) => {
758+
cur.into_iter_from_buffered(key.to_vec(), buffer_config)
759+
.await?
760+
}
755761
})
756762
}
757763

@@ -774,9 +780,9 @@ impl<K: TransactionKind> CursorAny<K> {
774780
}
775781
}
776782

777-
pub fn into_iter_dup_cnt<'a, Key, Value>(
783+
pub fn into_iter_dup_buffered<'a, Key, Value>(
778784
self,
779-
cnt: u64,
785+
buffer_config: BufferConfiguration,
780786
) -> Pin<
781787
Box<
782788
dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
@@ -795,7 +801,7 @@ impl<K: TransactionKind> CursorAny<K> {
795801
yield st;
796802
}
797803
}),
798-
Self::Remote(cur) => cur.into_iter_dup_cnt(cnt),
804+
Self::Remote(cur) => cur.into_iter_dup_buffered(buffer_config),
799805
}
800806
}
801807

@@ -818,9 +824,9 @@ impl<K: TransactionKind> CursorAny<K> {
818824
}
819825
}
820826

821-
pub fn into_iter_dup_start_cnt<'a, Key, Value>(
827+
pub fn into_iter_dup_start_buffered<'a, Key, Value>(
822828
self,
823-
cnt: u64,
829+
buffer_config: BufferConfiguration,
824830
) -> Pin<
825831
Box<
826832
dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
@@ -839,7 +845,7 @@ impl<K: TransactionKind> CursorAny<K> {
839845
yield st;
840846
}
841847
}),
842-
Self::Remote(cur) => cur.into_iter_dup_start_cnt(cnt),
848+
Self::Remote(cur) => cur.into_iter_dup_start_buffered(buffer_config),
843849
}
844850
}
845851

@@ -868,10 +874,10 @@ impl<K: TransactionKind> CursorAny<K> {
868874
})
869875
}
870876

871-
pub async fn into_iter_dup_from_cnt<'a, Key, Value>(
877+
pub async fn into_iter_dup_from_buffered<'a, Key, Value>(
872878
self,
873879
key: &'a [u8],
874-
cnt: u64,
880+
buffer_config: BufferConfiguration,
875881
) -> Result<
876882
Pin<
877883
Box<
@@ -895,7 +901,10 @@ impl<K: TransactionKind> CursorAny<K> {
895901
yield st;
896902
}
897903
}),
898-
Self::Remote(cur) => cur.into_iter_dup_from_cnt(key.to_vec(), cnt).await?,
904+
Self::Remote(cur) => {
905+
cur.into_iter_dup_from_buffered(key.to_vec(), buffer_config)
906+
.await?
907+
}
899908
})
900909
}
901910

@@ -913,10 +922,10 @@ impl<K: TransactionKind> CursorAny<K> {
913922
})
914923
}
915924

916-
pub async fn into_iter_dup_of_cnt<'a, Key, Value>(
925+
pub async fn into_iter_dup_of_buffered<'a, Key, Value>(
917926
self,
918927
key: &'a [u8],
919-
cnt: u64,
928+
buffer_config: BufferConfiguration,
920929
) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
921930
where
922931
Key: TableObject + Send + 'a,
@@ -929,7 +938,10 @@ impl<K: TransactionKind> CursorAny<K> {
929938
yield (k, v);
930939
}
931940
}),
932-
Self::Remote(cur) => cur.into_iter_dup_of_cnt(key.to_vec(), cnt).await?,
941+
Self::Remote(cur) => {
942+
cur.into_iter_dup_of_buffered(key.to_vec(), buffer_config)
943+
.await?
944+
}
933945
})
934946
}
935947
}

crates/mdbx-remote/src/lib.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ pub use crate::{
2323
},
2424
error::{Error, Result},
2525
flags::*,
26-
remote::{ClientError, RemoteCursor, RemoteDatabase, RemoteEnvironment, RemoteTransaction},
26+
remote::{
27+
BufferConfiguration, ClientError, RemoteCursor, RemoteDatabase, RemoteEnvironment,
28+
RemoteTransaction,
29+
},
2730
service::{MDBXServerState, RemoteMDBX, RemoteMDBXClient, RemoteMDBXServer, ServerError},
2831
transaction::{CommitLatency, Transaction, TransactionKind, RO, RW},
2932
};

0 commit comments

Comments
 (0)