Skip to content

Commit 3830aa5

Browse files
committed
WIP: remove ids buffering and add support for json
1 parent 7acdd67 commit 3830aa5

File tree

4 files changed

+116
-156
lines changed

4 files changed

+116
-156
lines changed

src/fastfield/readers.rs

+16
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,22 @@ impl FastFieldReaders {
234234
Ok(dynamic_column_handle_opt)
235235
}
236236

237+
/// Returning all `dynamic_column_handle`.
238+
pub fn dynamic_column_handles(
239+
&self,
240+
field_name: &str,
241+
) -> crate::Result<Vec<DynamicColumnHandle>> {
242+
let Some(resolved_field_name) = self.resolve_field(field_name)? else {
243+
return Ok(Vec::new());
244+
};
245+
let dynamic_column_handles = self
246+
.columnar
247+
.read_columns(&resolved_field_name)?
248+
.into_iter()
249+
.collect();
250+
Ok(dynamic_column_handles)
251+
}
252+
237253
#[doc(hidden)]
238254
pub async fn list_dynamic_column_handles(
239255
&self,

src/query/exist_query.rs

+93-148
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,20 @@
11
use core::fmt::Debug;
22

3-
use columnar::{ColumnType, DynamicColumn};
3+
use columnar::{ColumnIndex, DynamicColumn};
44

5-
use super::range_query::VecCursor;
6-
use super::ConstScorer;
5+
use super::{ConstScorer, EmptyScorer};
76
use crate::core::SegmentReader;
87
use crate::docset::{DocSet, TERMINATED};
98
use crate::query::explanation::does_not_match;
109
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
11-
use crate::schema::FieldType;
1210
use crate::{DocId, Score, TantivyError};
1311

1412
/// Query that matches all of the documents.
1513
///
1614
/// All of the document get the score 1.0.
1715
#[derive(Clone, Debug)]
1816
pub struct ExistsQuery {
19-
field: String,
17+
field_name: String,
2018
}
2119

2220
impl ExistsQuery {
@@ -25,62 +23,55 @@ impl ExistsQuery {
2523
/// If the value type is not correct, something may go terribly wrong when
2624
/// the `Weight` object is created.
2725
pub fn new_exists_query(field: String) -> ExistsQuery {
28-
ExistsQuery { field }
26+
ExistsQuery { field_name: field }
2927
}
3028
}
3129

3230
impl Query for ExistsQuery {
3331
fn weight(&self, enable_scoring: EnableScoring) -> crate::Result<Box<dyn Weight>> {
3432
let schema = enable_scoring.schema();
35-
let field_type = schema
36-
.get_field_entry(schema.get_field(&self.field)?)
37-
.field_type();
33+
let Some((field, _path)) = schema.find_field(&self.field_name) else {
34+
return Err(TantivyError::FieldNotFound(self.field_name.clone()));
35+
};
36+
let field_type = schema.get_field_entry(field).field_type();
3837
if !field_type.is_fast() {
3938
return Err(TantivyError::SchemaError(format!(
40-
"Field {:?} is not a fast field.",
41-
self.field
39+
"Field {} is not a fast field.",
40+
self.field_name
4241
)));
4342
}
4443
Ok(Box::new(ExistsWeight {
45-
field: self.field.clone(),
46-
field_type: field_type.clone(),
44+
field_name: self.field_name.clone(),
4745
}))
4846
}
4947
}
5048

5149
/// Weight associated with the `ExistsQuery` query.
5250
pub struct ExistsWeight {
53-
field: String,
54-
field_type: FieldType,
51+
field_name: String,
5552
}
5653

5754
impl ExistsWeight {}
5855

5956
impl Weight for ExistsWeight {
6057
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
6158
let fast_field_reader = reader.fast_fields();
62-
let column_type = match self.field_type {
63-
FieldType::Str(_) => Some(ColumnType::Str),
64-
FieldType::U64(_) => Some(ColumnType::U64),
65-
FieldType::I64(_) => Some(ColumnType::I64),
66-
FieldType::F64(_) => Some(ColumnType::F64),
67-
FieldType::Bool(_) => Some(ColumnType::Bool),
68-
FieldType::Date(_) => Some(ColumnType::DateTime),
69-
FieldType::Bytes(_) => Some(ColumnType::Bytes),
70-
FieldType::IpAddr(_) => Some(ColumnType::IpAddr),
71-
_ => None,
59+
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = fast_field_reader
60+
.dynamic_column_handles(&self.field_name)?
61+
.into_iter()
62+
.map(|handle| handle.open().map_err(|io_error| io_error.into()))
63+
.collect();
64+
let mut non_empty_columns = Vec::new();
65+
for column in dynamic_columns? {
66+
if !matches!(column.column_index(), ColumnIndex::Empty { .. }) {
67+
non_empty_columns.push(column)
68+
}
7269
}
73-
.expect("Should be here");
74-
if let Some(dynamic_column) =
75-
fast_field_reader.dynamic_column_handle(&self.field, column_type)?
76-
{
77-
let docset = ExistsDocSet::new(dynamic_column.open()?);
70+
if !non_empty_columns.is_empty() {
71+
let docset = ExistsDocSet::new(non_empty_columns, reader.max_doc());
7872
return Ok(Box::new(ConstScorer::new(docset, boost)));
7973
} else {
80-
return Err(TantivyError::SchemaError(format!(
81-
"Field {:?} with type {:?} is not supported by exists query.",
82-
self.field, self.field_type,
83-
)));
74+
return Ok(Box::new(EmptyScorer));
8475
}
8576
}
8677

@@ -94,147 +85,64 @@ impl Weight for ExistsWeight {
9485
}
9586

9687
pub(crate) struct ExistsDocSet {
97-
column: DynamicColumn,
98-
/// The next docid start range to fetch (inclusive).
99-
next_fetch_start: u32,
100-
/// Number of docs range checked in a batch.
101-
///
102-
/// There are two patterns.
103-
/// - We do a full scan. => We can load large chunks. We don't know in advance if seek call
104-
/// will come, so we start with small chunks
105-
/// - We load docs, interspersed with seek calls. When there are big jumps in the seek, we
106-
/// should load small chunks. When the seeks are small, we can employ the same strategy as on a
107-
/// full scan.
108-
fetch_horizon: u32,
109-
/// Current batch of loaded docs.
110-
loaded_docs: VecCursor,
111-
last_seek_pos_opt: Option<u32>,
88+
columns: Vec<DynamicColumn>,
89+
doc: DocId,
90+
max_doc: DocId,
11291
}
11392

114-
const DEFAULT_FETCH_HORIZON: u32 = 128;
11593
impl ExistsDocSet {
116-
pub(crate) fn new(column: DynamicColumn) -> Self {
117-
let mut exists_doc_set = Self {
118-
column,
119-
loaded_docs: VecCursor::new(),
120-
next_fetch_start: 0,
121-
fetch_horizon: DEFAULT_FETCH_HORIZON,
122-
last_seek_pos_opt: None,
94+
pub(crate) fn new(columns: Vec<DynamicColumn>, max_doc: DocId) -> Self {
95+
let mut set = Self {
96+
columns,
97+
doc: 0u32,
98+
max_doc,
12399
};
124-
exists_doc_set.reset_fetch_range();
125-
exists_doc_set.fetch_block();
126-
exists_doc_set
100+
set.find_next();
101+
set
127102
}
128103

129-
/// Returns true if more data could be fetched
130-
fn fetch_block(&mut self) {
131-
const MAX_HORIZON: u32 = 100_000;
132-
while self.loaded_docs.is_empty() {
133-
let finished_to_end = self.fetch_horizon(self.fetch_horizon);
134-
if finished_to_end {
135-
break;
104+
fn find_next(&mut self) -> DocId {
105+
// TODO: can this be optimized?
106+
while self.doc < self.max_doc {
107+
if self
108+
.columns
109+
.iter()
110+
.find(|col| col.column_index().has_value(self.doc))
111+
.is_some()
112+
{
113+
return self.doc;
136114
}
137-
// Fetch more data, increase horizon. Horizon only gets reset when doing a seek.
138-
self.fetch_horizon = (self.fetch_horizon * 2).min(MAX_HORIZON);
115+
self.doc += 1;
139116
}
140-
}
141-
142-
/// Fetches a block for docid range [next_fetch_start .. next_fetch_start + HORIZON]
143-
fn fetch_horizon(&mut self, horizon: u32) -> bool {
144-
let mut finished_to_end = false;
145-
146-
let limit = self.column.num_docs();
147-
let mut end = self.next_fetch_start + horizon;
148-
if end >= limit {
149-
end = limit;
150-
finished_to_end = true;
151-
}
152-
153-
let last_value = self.loaded_docs.last_value();
154-
let doc_buffer: &mut Vec<DocId> = self.loaded_docs.get_cleared_data();
155-
self.column
156-
.column_index()
157-
.get_docids_with_existing_value(self.next_fetch_start..end, doc_buffer);
158-
if let Some(last_value) = last_value {
159-
while self.loaded_docs.current() == Some(last_value) {
160-
self.loaded_docs.next();
161-
}
162-
}
163-
self.next_fetch_start = end;
164-
165-
finished_to_end
166-
}
167-
168-
/// check if the distance between the seek calls is large
169-
fn is_last_seek_distance_large(&self, new_seek: DocId) -> bool {
170-
if let Some(last_seek_pos) = self.last_seek_pos_opt {
171-
(new_seek - last_seek_pos) >= 128
172-
} else {
173-
true
174-
}
175-
}
176-
177-
fn reset_fetch_range(&mut self) {
178-
self.fetch_horizon = DEFAULT_FETCH_HORIZON;
117+
self.doc = TERMINATED;
118+
return TERMINATED;
179119
}
180120
}
181121

182122
impl DocSet for ExistsDocSet {
183123
fn advance(&mut self) -> DocId {
184-
if let Some(docid) = self.loaded_docs.next() {
185-
return docid;
186-
}
187-
if self.next_fetch_start >= self.column.num_values() {
188-
return TERMINATED;
189-
}
190-
self.fetch_block();
191-
self.loaded_docs.current().unwrap_or(TERMINATED)
192-
}
193-
194-
#[inline]
195-
fn doc(&self) -> DocId {
196-
self.loaded_docs.current().unwrap_or(TERMINATED)
197-
}
198-
199-
/// Advances the `DocSet` forward until reaching the target, or going to the
200-
/// lowest [`DocId`] greater than the target.
201-
///
202-
/// If the end of the `DocSet` is reached, [`TERMINATED`] is returned.
203-
///
204-
/// Calling `.seek(target)` on a terminated `DocSet` is legal. Implementation
205-
/// of `DocSet` should support it.
206-
///
207-
/// Calling `seek(TERMINATED)` is also legal and is the normal way to consume a `DocSet`.
208-
fn seek(&mut self, target: DocId) -> DocId {
209-
if self.is_last_seek_distance_large(target) {
210-
self.reset_fetch_range();
211-
}
212-
if target > self.next_fetch_start {
213-
self.next_fetch_start = target;
214-
}
215-
let mut doc = self.doc();
216-
debug_assert!(doc <= target);
217-
while doc < target {
218-
doc = self.advance();
219-
}
220-
self.last_seek_pos_opt = Some(target);
221-
doc
124+
self.doc += 1;
125+
self.find_next()
222126
}
223127

224128
fn size_hint(&self) -> u32 {
225129
0 // heuristic possible by checking number of hits when fetching a block
226130
}
131+
132+
fn doc(&self) -> DocId {
133+
self.doc
134+
}
227135
}
228136

229137
#[cfg(test)]
230138
mod tests {
231139
use crate::collector::Count;
232140
use crate::query::exist_query::ExistsQuery;
233-
use crate::schema::{Schema, FAST, INDEXED, STRING};
141+
use crate::schema::{Schema, FAST, INDEXED, STRING, TEXT};
234142
use crate::{doc, Index};
235143

236144
#[test]
237-
fn test_range_query_simple() -> crate::Result<()> {
145+
fn test_exists_query_simple() -> crate::Result<()> {
238146
let mut schema_builder = Schema::builder();
239147
let all_field = schema_builder.add_u64_field("all", INDEXED | FAST);
240148
let even_field = schema_builder.add_u64_field("even", INDEXED | FAST);
@@ -284,4 +192,41 @@ mod tests {
284192

285193
Ok(())
286194
}
195+
196+
#[test]
197+
fn test_exists_query_json() -> crate::Result<()> {
198+
let mut schema_builder = Schema::builder();
199+
let json = schema_builder.add_json_field("json", TEXT | FAST);
200+
let schema = schema_builder.build();
201+
202+
let index = Index::create_in_ram(schema);
203+
{
204+
let mut index_writer = index.writer_for_tests()?;
205+
for i in 0u64..100u64 {
206+
if i % 2 == 0 {
207+
index_writer.add_document(doc!(json => json!({"all": i, "even": true})))?;
208+
} else {
209+
index_writer
210+
.add_document(doc!(json => json!({"all": i.to_string(), "odd": true})))?;
211+
}
212+
}
213+
index_writer.commit()?;
214+
}
215+
let reader = index.reader()?;
216+
let searcher = reader.searcher();
217+
218+
let alldocs = ExistsQuery::new_exists_query("json.all".to_string());
219+
let count = searcher.search(&alldocs, &Count)?;
220+
assert_eq!(count, 100);
221+
222+
let even_docs = ExistsQuery::new_exists_query("json.even".to_string());
223+
let count = searcher.search(&even_docs, &Count)?;
224+
assert_eq!(count, 50);
225+
226+
let odd_docs = ExistsQuery::new_exists_query("json.odd".to_string());
227+
let count = searcher.search(&odd_docs, &Count)?;
228+
assert_eq!(count, 50);
229+
230+
Ok(())
231+
}
287232
}

src/query/range_query/fast_field_range_query.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,34 @@ use crate::{DocId, DocSet, TERMINATED};
77

88
/// Helper to have a cursor over a vec of docids
99
#[derive(Debug)]
10-
pub(crate) struct VecCursor {
10+
struct VecCursor {
1111
docs: Vec<u32>,
1212
current_pos: usize,
1313
}
1414
impl VecCursor {
15-
pub fn new() -> Self {
15+
fn new() -> Self {
1616
Self {
1717
docs: Vec::with_capacity(32),
1818
current_pos: 0,
1919
}
2020
}
21-
pub fn next(&mut self) -> Option<u32> {
21+
fn next(&mut self) -> Option<u32> {
2222
self.current_pos += 1;
2323
self.current()
2424
}
2525
#[inline]
26-
pub fn current(&self) -> Option<u32> {
26+
fn current(&self) -> Option<u32> {
2727
self.docs.get(self.current_pos).copied()
2828
}
29-
pub fn get_cleared_data(&mut self) -> &mut Vec<u32> {
29+
fn get_cleared_data(&mut self) -> &mut Vec<u32> {
3030
self.docs.clear();
3131
self.current_pos = 0;
3232
&mut self.docs
3333
}
34-
pub fn last_value(&self) -> Option<u32> {
34+
fn last_value(&self) -> Option<u32> {
3535
self.docs.iter().last().cloned()
3636
}
37-
pub fn is_empty(&self) -> bool {
37+
fn is_empty(&self) -> bool {
3838
self.current().is_none()
3939
}
4040
}

0 commit comments

Comments
 (0)