Skip to content
This repository was archived by the owner on Aug 24, 2019. It is now read-only.

Commit 2a813dc

Browse files
committed
1 parent aeefb59 commit 2a813dc

File tree

5 files changed

+1803
-5
lines changed

5 files changed

+1803
-5
lines changed

src/lib.rs

+4
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,12 @@
124124
#![feature(rustc_private)]
125125
#![feature(specialization)]
126126
#![feature(try_from)]
127+
#![feature(existential_type)]
128+
#![feature(maybe_uninit)]
129+
#![feature(nll)]
127130
#![allow(dead_code)]
128131
#![allow(non_camel_case_types)]
132+
#![allow(private_in_public)]
129133

130134
#[macro_use]
131135
extern crate quick_error;

src/record/mod.rs

+23-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,28 @@
2020
mod api;
2121
pub mod reader;
2222
mod triplet;
23+
pub mod schemas;
24+
pub mod types;
2325

26+
use std::{fmt,fmt::Debug};
27+
use schema::types::ColumnDescPtr;
28+
use schema::types::ColumnPath;
29+
use std::collections::HashMap;
30+
use errors::ParquetError;
31+
use schema::types::Type;
32+
use record::reader::RRReader;
33+
use column::reader::ColumnReader;
2434
// pub use self::api::{List, ListAccessor, Map, MapAccessor, Row, RowAccessor};
25-
pub use self::triplet::TypedTripletIter;
35+
// pub use self::triplet::TypedTripletIter;
36+
37+
pub trait DebugType {
38+
fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error>;
39+
}
40+
41+
pub trait Deserialize: Sized {
42+
type Schema: Debug + DebugType;
43+
type Reader: RRReader<Item = Self>;
44+
45+
fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError>;
46+
fn reader(schema: &Self::Schema, mut path: &mut Vec<String>, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap<ColumnPath, (ColumnDescPtr,ColumnReader)>) -> Self::Reader;
47+
}

src/record/reader.rs

+241-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
//! [`Row`](`::record::api::Row`)s.
2020
2121
use std::{collections::HashMap, fmt, rc::Rc};
22-
2322
use basic::{LogicalType, Repetition};
2423
use errors::{ParquetError, Result};
2524
use file::reader::{FileReader, RowGroupReader};
@@ -31,9 +30,10 @@ use schema::types::{ColumnPath, SchemaDescPtr, SchemaDescriptor, Type, TypePtr};
3130
use record::triplet::TypedTripletIter;
3231
use data_type::{BoolType, Int32Type, Int64Type, Int96Type, FloatType, DoubleType, ByteArrayType, FixedLenByteArrayType, Int96};
3332
use basic::Type as PhysicalType;
34-
35-
// /// Default batch size for a reader
36-
// const DEFAULT_BATCH_SIZE: usize = 1024;
33+
use super::{Deserialize,DebugType};
34+
use super::types::{Value,Group,Timestamp,List,Map,Root};
35+
use std::{convert::TryInto,marker::PhantomData};
36+
use std::error::Error;
3737

3838
// /// Tree builder for `Reader` enum.
3939
// /// Serves as a container of options for building a reader tree and a builder, and
@@ -860,6 +860,243 @@ impl<K: RRReader, V: RRReader> RRReader for KeyValueReader<K, V> {
860860
}
861861
}
862862

863+
pub struct GroupReader {
864+
pub(super) def_level: i16,
865+
pub(super) readers: Vec<ValueReader>,
866+
pub(super) fields: Rc<HashMap<String,usize>>,
867+
}
868+
impl RRReader for GroupReader {
869+
type Item = Group;
870+
871+
fn read_field(&mut self) -> Result<Self::Item> {
872+
let mut fields = Vec::new();
873+
for reader in self.readers.iter_mut() {
874+
fields.push(reader.read_field()?);
875+
}
876+
Ok(Group(fields, self.fields.clone()))
877+
}
878+
fn advance_columns(&mut self) {
879+
for reader in self.readers.iter_mut() {
880+
reader.advance_columns();
881+
}
882+
}
883+
fn has_next(&self) -> bool {
884+
self.readers.first().unwrap().has_next()
885+
}
886+
fn current_def_level(&self) -> i16 {
887+
match self.readers.first() {
888+
Some(reader) => reader.current_def_level(),
889+
None => panic!("Current definition level: empty group reader"),
890+
}
891+
}
892+
fn current_rep_level(&self) -> i16 {
893+
match self.readers.first() {
894+
Some(reader) => reader.current_rep_level(),
895+
None => panic!("Current repetition level: empty group reader"),
896+
}
897+
}
898+
}
899+
900+
pub enum ValueReader {
901+
Bool(<bool as Deserialize>::Reader),
902+
U8(<u8 as Deserialize>::Reader),
903+
I8(<i8 as Deserialize>::Reader),
904+
U16(<u16 as Deserialize>::Reader),
905+
I16(<i16 as Deserialize>::Reader),
906+
U32(<u32 as Deserialize>::Reader),
907+
I32(<i32 as Deserialize>::Reader),
908+
U64(<u64 as Deserialize>::Reader),
909+
I64(<i64 as Deserialize>::Reader),
910+
F32(<f32 as Deserialize>::Reader),
911+
F64(<f64 as Deserialize>::Reader),
912+
Timestamp(<Timestamp as Deserialize>::Reader),
913+
Array(<Vec<u8> as Deserialize>::Reader),
914+
String(<String as Deserialize>::Reader),
915+
List(Box<<List<Value> as Deserialize>::Reader>),
916+
Map(Box<<Map<Value,Value> as Deserialize>::Reader>),
917+
Group(<Group as Deserialize>::Reader),
918+
Option(Box<<Option<Value> as Deserialize>::Reader>),
919+
}
920+
impl RRReader for ValueReader {
921+
type Item = Value;
922+
923+
fn read_field(&mut self) -> Result<Self::Item> {
924+
match self {
925+
ValueReader::Bool(ref mut reader) => reader.read_field().map(Value::Bool),
926+
ValueReader::U8(ref mut reader) => reader.read_field().map(Value::U8),
927+
ValueReader::I8(ref mut reader) => reader.read_field().map(Value::I8),
928+
ValueReader::U16(ref mut reader) => reader.read_field().map(Value::U16),
929+
ValueReader::I16(ref mut reader) => reader.read_field().map(Value::I16),
930+
ValueReader::U32(ref mut reader) => reader.read_field().map(Value::U32),
931+
ValueReader::I32(ref mut reader) => reader.read_field().map(Value::I32),
932+
ValueReader::U64(ref mut reader) => reader.read_field().map(Value::U64),
933+
ValueReader::I64(ref mut reader) => reader.read_field().map(Value::I64),
934+
ValueReader::F32(ref mut reader) => reader.read_field().map(Value::F32),
935+
ValueReader::F64(ref mut reader) => reader.read_field().map(Value::F64),
936+
ValueReader::Timestamp(ref mut reader) => reader.read_field().map(Value::Timestamp),
937+
ValueReader::Array(ref mut reader) => reader.read_field().map(Value::Array),
938+
ValueReader::String(ref mut reader) => reader.read_field().map(Value::String),
939+
ValueReader::List(ref mut reader) => reader.read_field().map(Value::List),
940+
ValueReader::Map(ref mut reader) => reader.read_field().map(Value::Map),
941+
ValueReader::Group(ref mut reader) => reader.read_field().map(Value::Group),
942+
ValueReader::Option(ref mut reader) => reader.read_field().map(|x|Value::Option(Box::new(x))),
943+
}
944+
}
945+
fn advance_columns(&mut self) {
946+
match self {
947+
ValueReader::Bool(ref mut reader) => reader.advance_columns(),
948+
ValueReader::U8(ref mut reader) => reader.advance_columns(),
949+
ValueReader::I8(ref mut reader) => reader.advance_columns(),
950+
ValueReader::U16(ref mut reader) => reader.advance_columns(),
951+
ValueReader::I16(ref mut reader) => reader.advance_columns(),
952+
ValueReader::U32(ref mut reader) => reader.advance_columns(),
953+
ValueReader::I32(ref mut reader) => reader.advance_columns(),
954+
ValueReader::U64(ref mut reader) => reader.advance_columns(),
955+
ValueReader::I64(ref mut reader) => reader.advance_columns(),
956+
ValueReader::F32(ref mut reader) => reader.advance_columns(),
957+
ValueReader::F64(ref mut reader) => reader.advance_columns(),
958+
ValueReader::Timestamp(ref mut reader) => reader.advance_columns(),
959+
ValueReader::Array(ref mut reader) => reader.advance_columns(),
960+
ValueReader::String(ref mut reader) => reader.advance_columns(),
961+
ValueReader::List(ref mut reader) => reader.advance_columns(),
962+
ValueReader::Map(ref mut reader) => reader.advance_columns(),
963+
ValueReader::Group(ref mut reader) => reader.advance_columns(),
964+
ValueReader::Option(ref mut reader) => reader.advance_columns(),
965+
}
966+
}
967+
fn has_next(&self) -> bool {
968+
match self {
969+
ValueReader::Bool(ref reader) => reader.has_next(),
970+
ValueReader::U8(ref reader) => reader.has_next(),
971+
ValueReader::I8(ref reader) => reader.has_next(),
972+
ValueReader::U16(ref reader) => reader.has_next(),
973+
ValueReader::I16(ref reader) => reader.has_next(),
974+
ValueReader::U32(ref reader) => reader.has_next(),
975+
ValueReader::I32(ref reader) => reader.has_next(),
976+
ValueReader::U64(ref reader) => reader.has_next(),
977+
ValueReader::I64(ref reader) => reader.has_next(),
978+
ValueReader::F32(ref reader) => reader.has_next(),
979+
ValueReader::F64(ref reader) => reader.has_next(),
980+
ValueReader::Timestamp(ref reader) => reader.has_next(),
981+
ValueReader::Array(ref reader) => reader.has_next(),
982+
ValueReader::String(ref reader) => reader.has_next(),
983+
ValueReader::List(ref reader) => reader.has_next(),
984+
ValueReader::Map(ref reader) => reader.has_next(),
985+
ValueReader::Group(ref reader) => reader.has_next(),
986+
ValueReader::Option(ref reader) => reader.has_next(),
987+
}
988+
}
989+
fn current_def_level(&self) -> i16 {
990+
match self {
991+
ValueReader::Bool(ref reader) => reader.current_def_level(),
992+
ValueReader::U8(ref reader) => reader.current_def_level(),
993+
ValueReader::I8(ref reader) => reader.current_def_level(),
994+
ValueReader::U16(ref reader) => reader.current_def_level(),
995+
ValueReader::I16(ref reader) => reader.current_def_level(),
996+
ValueReader::U32(ref reader) => reader.current_def_level(),
997+
ValueReader::I32(ref reader) => reader.current_def_level(),
998+
ValueReader::U64(ref reader) => reader.current_def_level(),
999+
ValueReader::I64(ref reader) => reader.current_def_level(),
1000+
ValueReader::F32(ref reader) => reader.current_def_level(),
1001+
ValueReader::F64(ref reader) => reader.current_def_level(),
1002+
ValueReader::Timestamp(ref reader) => reader.current_def_level(),
1003+
ValueReader::Array(ref reader) => reader.current_def_level(),
1004+
ValueReader::String(ref reader) => reader.current_def_level(),
1005+
ValueReader::List(ref reader) => reader.current_def_level(),
1006+
ValueReader::Map(ref reader) => reader.current_def_level(),
1007+
ValueReader::Group(ref reader) => reader.current_def_level(),
1008+
ValueReader::Option(ref reader) => reader.current_def_level(),
1009+
}
1010+
}
1011+
fn current_rep_level(&self) -> i16 {
1012+
match self {
1013+
ValueReader::Bool(ref reader) => reader.current_rep_level(),
1014+
ValueReader::U8(ref reader) => reader.current_rep_level(),
1015+
ValueReader::I8(ref reader) => reader.current_rep_level(),
1016+
ValueReader::U16(ref reader) => reader.current_rep_level(),
1017+
ValueReader::I16(ref reader) => reader.current_rep_level(),
1018+
ValueReader::U32(ref reader) => reader.current_rep_level(),
1019+
ValueReader::I32(ref reader) => reader.current_rep_level(),
1020+
ValueReader::U64(ref reader) => reader.current_rep_level(),
1021+
ValueReader::I64(ref reader) => reader.current_rep_level(),
1022+
ValueReader::F32(ref reader) => reader.current_rep_level(),
1023+
ValueReader::F64(ref reader) => reader.current_rep_level(),
1024+
ValueReader::Timestamp(ref reader) => reader.current_rep_level(),
1025+
ValueReader::Array(ref reader) => reader.current_rep_level(),
1026+
ValueReader::String(ref reader) => reader.current_rep_level(),
1027+
ValueReader::List(ref reader) => reader.current_rep_level(),
1028+
ValueReader::Map(ref reader) => reader.current_rep_level(),
1029+
ValueReader::Group(ref reader) => reader.current_rep_level(),
1030+
ValueReader::Option(ref reader) => reader.current_rep_level(),
1031+
}
1032+
}
1033+
}
1034+
1035+
pub struct TupleReader<T>(pub(super) T);
1036+
1037+
pub struct TryIntoReader<R: RRReader, T>(pub(super) R, pub(super) PhantomData<fn(T)>);
1038+
impl<R: RRReader, T> RRReader for TryIntoReader<R, T> where R::Item: TryInto<T>, <R::Item as TryInto<T>>::Error: Error {
1039+
type Item = T;
1040+
1041+
fn read_field(&mut self) -> Result<Self::Item> {
1042+
self.0.read_field().and_then(|x|x.try_into().map_err(|err|ParquetError::General(err.description().to_owned())))
1043+
}
1044+
fn advance_columns(&mut self) {
1045+
self.0.advance_columns()
1046+
}
1047+
fn has_next(&self) -> bool {
1048+
self.0.has_next()
1049+
}
1050+
fn current_def_level(&self) -> i16 {
1051+
self.0.current_def_level()
1052+
}
1053+
fn current_rep_level(&self) -> i16 {
1054+
self.0.current_rep_level()
1055+
}
1056+
}
1057+
1058+
pub struct MapReader<R: RRReader, F>(pub(super) R, pub(super) F);
1059+
impl<R: RRReader, F, T> RRReader for MapReader<R, F> where F: FnMut(R::Item) -> Result<T> {
1060+
type Item = T;
1061+
1062+
fn read_field(&mut self) -> Result<Self::Item> {
1063+
self.0.read_field().and_then(&mut self.1)
1064+
}
1065+
fn advance_columns(&mut self) {
1066+
self.0.advance_columns()
1067+
}
1068+
fn has_next(&self) -> bool {
1069+
self.0.has_next()
1070+
}
1071+
fn current_def_level(&self) -> i16 {
1072+
self.0.current_def_level()
1073+
}
1074+
fn current_rep_level(&self) -> i16 {
1075+
self.0.current_rep_level()
1076+
}
1077+
}
1078+
1079+
pub struct RootReader<R>(pub R);
1080+
impl<R> RRReader for RootReader<R> where R: RRReader {
1081+
type Item = Root<R::Item>;
1082+
1083+
fn read_field(&mut self) -> Result<Self::Item> {
1084+
self.0.read_field().map(Root)
1085+
}
1086+
fn advance_columns(&mut self) {
1087+
self.0.advance_columns();
1088+
}
1089+
fn has_next(&self) -> bool {
1090+
self.0.has_next()
1091+
}
1092+
fn current_def_level(&self) -> i16 {
1093+
self.0.current_def_level()
1094+
}
1095+
fn current_rep_level(&self) -> i16 {
1096+
self.0.current_rep_level()
1097+
}
1098+
}
1099+
8631100
// /// Reader tree for record assembly
8641101
// pub enum Reader {
8651102
// // Primitive reader with type information and triplet iterator

0 commit comments

Comments
 (0)