Skip to content

Commit a659206

Browse files
authored
optimization: join probe returns first item directly (hydro-project#874)
* optimization: join probe returns first item directly * update comments
1 parent fefa450 commit a659206

File tree

4 files changed

+44
-25
lines changed

4 files changed

+44
-25
lines changed

hydroflow/src/compiled/pull/half_join_state/mod.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,14 @@ pub use set::HalfSetJoinState;
1515
pub type SetJoinState<Key, V1, V2> = (HalfSetJoinState<Key, V1, V2>, HalfSetJoinState<Key, V2, V1>);
1616

1717
pub trait HalfJoinState<Key, ValBuild, ValProbe> {
18+
/// Insert a key value pair into the join state, currently this is always inserting into a hash table
19+
/// If the key-value pair exists then it is implementation defined what hapepns, usually either two copies are stored or only one copy is stored.
1820
fn build(&mut self, k: Key, v: &ValBuild) -> bool;
19-
fn probe(&mut self, k: &Key, v: &ValProbe);
21+
22+
/// This function does the actual joining part of the join. It looks up a key in the local join state and creates matches
23+
/// The first match is return directly to the caller, and any additional matches are stored internally to be retrieved later with `pop_match`
24+
fn probe(&mut self, k: &Key, v: &ValProbe) -> Option<(Key, ValProbe, ValBuild)>;
25+
26+
/// If there are any stored matches from previous calls to probe then this function will remove them one at a time and return it.
2027
fn pop_match(&mut self) -> Option<(Key, ValProbe, ValBuild)>;
2128
}

hydroflow/src/compiled/pull/half_join_state/multiset.rs

+15-11
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,21 @@ where
5757
true
5858
}
5959

60-
fn probe(&mut self, k: &Key, v: &ValProbe) {
61-
if let Some(entry) = self.table.get(k) {
62-
// TODO: We currently don't free/shrink the self.current_matches vecdeque to save time.
63-
// This mean it will grow to eventually become the largest number of matches in a single probe call.
64-
// Maybe we should clear this memory at the beginning of every tick/periodically?
65-
self.current_matches.extend(
66-
entry
67-
.iter()
68-
.map(|valbuild| (k.clone(), v.clone(), valbuild.clone())),
69-
);
70-
}
60+
fn probe(&mut self, k: &Key, v: &ValProbe) -> Option<(Key, ValProbe, ValBuild)> {
61+
// TODO: We currently don't free/shrink the self.current_matches vecdeque to save time.
62+
// This mean it will grow to eventually become the largest number of matches in a single probe call.
63+
// Maybe we should clear this memory at the beginning of every tick/periodically?
64+
let mut iter = self
65+
.table
66+
.get(k)?
67+
.iter()
68+
.map(|valbuild| (k.clone(), v.clone(), valbuild.clone()));
69+
70+
let first = iter.next();
71+
72+
self.current_matches.extend(iter);
73+
74+
first
7175
}
7276

7377
fn pop_match(&mut self) -> Option<(Key, ValProbe, ValBuild)> {

hydroflow/src/compiled/pull/half_join_state/set.rs

+15-11
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,21 @@ where
6262
false
6363
}
6464

65-
fn probe(&mut self, k: &Key, v: &ValProbe) {
66-
if let Some(entry) = self.table.get(k) {
67-
// TODO: We currently don't free/shrink the self.current_matches vecdeque to save time.
68-
// This mean it will grow to eventually become the largest number of matches in a single probe call.
69-
// Maybe we should clear this memory at the beginning of every tick/periodically?
70-
self.current_matches.extend(
71-
entry
72-
.iter()
73-
.map(|valbuild| (k.clone(), v.clone(), valbuild.clone())),
74-
);
75-
}
65+
fn probe(&mut self, k: &Key, v: &ValProbe) -> Option<(Key, ValProbe, ValBuild)> {
66+
// TODO: We currently don't free/shrink the self.current_matches vecdeque to save time.
67+
// This mean it will grow to eventually become the largest number of matches in a single probe call.
68+
// Maybe we should clear this memory at the beginning of every tick/periodically?
69+
let mut iter = self
70+
.table
71+
.get(k)?
72+
.iter()
73+
.map(|valbuild| (k.clone(), v.clone(), valbuild.clone()));
74+
75+
let first = iter.next();
76+
77+
self.current_matches.extend(iter);
78+
79+
first
7680
}
7781

7882
fn pop_match(&mut self) -> Option<(Key, ValProbe, ValBuild)> {

hydroflow/src/compiled/pull/symmetric_hash_join.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,17 @@ where
4040

4141
if let Some((k, v1)) = self.lhs.next() {
4242
if self.lhs_state.build(k.clone(), &v1) {
43-
self.rhs_state.probe(&k, &v1);
43+
if let Some((k, v1, v2)) = self.rhs_state.probe(&k, &v1) {
44+
return Some((k, (v1, v2)));
45+
}
4446
}
4547
continue;
4648
}
4749
if let Some((k, v2)) = self.rhs.next() {
4850
if self.rhs_state.build(k.clone(), &v2) {
49-
self.lhs_state.probe(&k, &v2);
51+
if let Some((k, v2, v1)) = self.lhs_state.probe(&k, &v2) {
52+
return Some((k, (v1, v2)));
53+
}
5054
}
5155
continue;
5256
}

0 commit comments

Comments
 (0)