Skip to content

DTable groupby #275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 41 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
dafc9cc
add proto1
krynju Aug 29, 2021
5ab554b
add merging
krynju Aug 31, 2021
f562e43
groupby discrete working
krynju Sep 1, 2021
f8e3cbb
separate file
krynju Sep 2, 2021
0b51a8a
single merge
krynju Sep 2, 2021
b3fab5d
fix the merging algo
krynju Sep 5, 2021
aeb00e2
add groupby with function input
krynju Sep 18, 2021
52ddb43
revert runtest edit
krynju Sep 18, 2021
5ffc4e2
fix merging algo, test adjustments and cleanup
krynju Sep 7, 2021
2ef2a8d
cleanup
krynju Sep 13, 2021
1d2b87f
add groupby on multiple cols
krynju Sep 14, 2021
4717c4f
rm temp fun
krynju Sep 14, 2021
0e0111e
add GDTable prototype
krynju Sep 16, 2021
95439db
add nicer reduce for grouped dtable
krynju Sep 16, 2021
fdf57f7
add map and filter for gdtable
krynju Sep 19, 2021
3a57d44
add test wip
krynju Sep 19, 2021
9da514a
add proto1
krynju Aug 29, 2021
b794710
add merging
krynju Aug 31, 2021
23a8c1e
fix the merging algo
krynju Sep 5, 2021
69207b6
add groupby with function input
krynju Sep 5, 2021
f7c0afd
revert runtest edit
krynju Sep 5, 2021
e24efa5
fix merging algo, test adjustments and cleanup
krynju Sep 7, 2021
86e7611
cleanup
krynju Sep 13, 2021
1c564ea
add groupby on multiple cols
krynju Sep 14, 2021
97aaed2
add GDTable prototype
krynju Sep 16, 2021
565282d
add adjustments
krynju Sep 23, 2021
452cd6f
add big groupby cleanup
krynju Sep 24, 2021
eb3d84e
fix branch
krynju Sep 24, 2021
59684eb
add docs & adjustments
krynju Sep 24, 2021
d4e46fc
add examples
krynju Sep 25, 2021
d64ede6
add docs
krynju Sep 25, 2021
8dffe30
add docs and getindex
krynju Sep 27, 2021
dbd2ca4
fix docstrings
krynju Sep 27, 2021
cefac65
add missing docs
krynju Sep 27, 2021
8066f34
fix docs
krynju Sep 28, 2021
bf543d6
switch to tochunk from spawn(identity
krynju Oct 11, 2021
7461bc5
fix ci?
krynju Oct 11, 2021
b5b10b7
Apply suggestions from code review
krynju Oct 14, 2021
eed646e
add review adjustments part 1
krynju Oct 14, 2021
9b05bc7
add the rest of adjustments
krynju Oct 14, 2021
680e700
adjust the custom function print
krynju Oct 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 132 additions & 4 deletions docs/src/dtable.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ julia> d = DTable(table, 2)
DTable with 3 partitions
Tabletype: NamedTuple


julia> fetch(d)
(a = [1, 2, 3, 4, 5], b = [6, 7, 8, 9, 10])
```
Expand All @@ -51,8 +50,7 @@ julia> files = ["1.csv", "2.csv", "3.csv"];

julia> d = DTable(CSV.File, files)
DTable with 3 partitions
Tabletype: unknown (use `tabletype(::DTable)`)

Tabletype: unknown (use `tabletype!(::DTable)`)

julia> tabletype(d)
NamedTuple
Expand All @@ -73,7 +71,6 @@ julia> d = DTable(table, 2)
DTable with 3 partitions
Tabletype: NamedTuple


julia> fetch(d)
(a = [1, 2, 3, 4, 5], b = [6, 7, 8, 9, 10])
```
Expand Down Expand Up @@ -165,12 +162,143 @@ julia> fetch(r)
(v = 5500,)
```

# Dagger.groupby interface

A `DTable` can be grouped which will result in creation of a `GDTable`.
A distinct set of values contained in a single or multiple columns can be used as grouping keys.
If a transformation of a row needs to be performed in order to obtain the grouping key there's
also an option to provide a custom function returning a key, which is applied per row.

The set of keys the `GDTable` is grouped by can be obtained using
the `keys(gd::GDTable)` function. To get a fragment of the `GDTable` containing
records belonging under a single keythe `getindex(gd::GDTable, key)` function can be used.

```julia
julia> d = DTable((a=shuffle(repeat('a':'d', inner=4, outer=4)),b=repeat(1:4, 16)), 4)
DTable with 16 partitions
Tabletype: NamedTuple

julia> Dagger.groupby(d, :a)
GDTable with 4 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]

julia> Dagger.groupby(d, [:a, :b])
GDTable with 16 partitions and 16 keys
Tabletype: NamedTuple
Grouped by: [:a, :b]

julia> Dagger.groupby(d, row -> row.a + row.b)
GDTable with 7 partitions and 7 keys
Tabletype: NamedTuple
Grouped by: custom function

julia> g = Dagger.groupby(d, :a); keys(g)
KeySet for a Dict{Char, Vector{UInt64}} with 4 entries. Keys:
'c'
'd'
'a'
'b'

julia> g['c']
DTable with 1 partitions
Tabletype: NamedTuple
```

## GDTable operations

Operations such as `map`, `filter`, `reduce` can be performed on a `GDTable`

```julia
julia> g = Dagger.groupby(d, [:a, :b])
GDTable with 16 partitions and 16 keys
Tabletype: NamedTuple
Grouped by: [:a, :b]

julia> f = filter(x -> x.a != 'd', g)
GDTable with 16 partitions and 16 keys
Tabletype: NamedTuple
Grouped by: [:a, :b]

julia> trim!(f)
GDTable with 12 partitions and 12 keys
Tabletype: NamedTuple
Grouped by: [:a, :b]

julia> m = map(r -> (a = r.a, b = r.b, c = r.b .- 3), f)
GDTable with 12 partitions and 12 keys
Tabletype: NamedTuple
Grouped by: [:a, :b]

julia> r = reduce(*, m)
EagerThunk (running)

julia> DataFrame(fetch(r))
12×5 DataFrame
Row │ a b result_a result_b result_c
│ Char Int64 String Int64 Int64
─────┼───────────────────────────────────────────
1 │ a 1 aaaa 1 16
2 │ c 3 ccc 27 0
3 │ a 3 aa 9 0
4 │ b 4 bbbb 256 1
5 │ c 4 cccc 256 1
6 │ b 2 bbbb 16 1
7 │ b 1 bbbb 1 16
8 │ a 2 aaa 8 -1
9 │ a 4 aaaaaaa 16384 1
10 │ b 3 bbbb 81 0
11 │ c 2 ccccc 32 -1
12 │ c 1 cccc 1 16
```

## Iterating over a GDTable

`GDTable` can be iterated over and each element returned will be a pair of key
and a `DTable` containing all rows associated with that grouping key.

```julia
julia> d = DTable((a=repeat('a':'b', inner=2),b=1:4), 2)
DTable with 2 partitions
Tabletype: NamedTuple

julia> g = Dagger.groupby(d, :a)
GDTable with 2 partitions and 2 keys
Tabletype: NamedTuple
Grouped by: [:a]

julia> for (key, dt) in g
println("Key: $key")
println(fetch(dt, DataFrame))
end
Key: a
2×2 DataFrame
Row │ a b
│ Char Int64
─────┼─────────────
1 │ a 1
2 │ a 2
Key: b
2×2 DataFrame
Row │ a b
│ Char Int64
─────┼─────────────
1 │ b 3
2 │ b 4
```

# API

```@docs
DTable
tabletype
tabletype!
trim
trim!
map
filter
reduce
groupby
keys
getindex
```
2 changes: 2 additions & 0 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ include("ui/gantt-common.jl")
include("ui/gantt-text.jl")

include("table/dtable.jl")
include("table/gdtable.jl")
include("table/operations.jl")
include("table/groupby.jl")

include("lib/logging-extras.jl")

Expand Down
148 changes: 148 additions & 0 deletions src/table/gdtable.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import Base: keys, iterate, length, getindex

"""
GDTable

Structure representing a grouped `DTable`.
It wraps over a DTable object and provides additional information on how the `GDTable` is grouped.
To represent the grouping a `cols` field is used, which contains the column symbols used for
grouping and an `index`, which allows to effectively lookup the partitions grouped under a single key.
"""
mutable struct GDTable
dtable::DTable
cols::Union{Vector{Symbol}, Nothing}
index::Dict

GDTable(dtable, cols, index) = new(dtable, cols, deepcopy(index))
end

fetch(gd::GDTable) = fetch(gd.dtable)
fetch(gd::GDTable, sink) = fetch(gd.dtable, sink)

"""
grouped_cols(gd::GDTable) -> Vector{Symbol}

Returns the symbols of columns used in the grouping.
In case grouping on a function was performed a `:KEYS` symbol will be returned.
"""
grouped_cols(gd::GDTable) = gd.cols === nothing ? [:KEYS] : gd.cols

"""
keys(gd::GDTable)

Returns the keys by which the `gd` is grouped by.
"""
keys(gd::GDTable) = keys(gd.index)

partition(gd::GDTable, key) = partition(gd, gd.index[key])
partition(gd::GDTable, indices::Vector{UInt}) = DTable(getindex.(Ref(gd.dtable.chunks), indices), gd.dtable.tabletype)

length(gd::GDTable) = length(keys(gd.index))


function iterate(gd::GDTable)
it = iterate(gd.index)
if it !== nothing
((key, partition_indices), state) = it
return key => partition(gd, partition_indices), state
end
return nothing
end


function iterate(gd::GDTable, state)
it = iterate(gd.index, state)
if it !== nothing
((key, partition_indices), state) = it
return key => partition(gd, partition_indices), state
end
return nothing
end


"""
trim!(gd::GDTable) -> GDTable

Removes empty chunks from `gd` and empty keys from its index.
"""
function trim!(gd::GDTable)
d = gd.dtable
check_result = [Dagger.@spawn isnonempty(c) for c in d.chunks]
results = fetch.(check_result)

ok_indices = filter(x -> results[x], 1:length(results))
d.chunks = getindex.(Ref(d.chunks), sort(ok_indices))

offsets = zeros(UInt, length(results))

counter = zero(UInt)
for (i, r) in enumerate(results)
counter = r ? counter : counter + 1
offsets[i] = counter
end

for key in keys(gd.index)
ind = gd.index[key]
filter!(x -> results[x], ind)

if isempty(ind)
delete!(gd.index, key)
else
gd.index[key] = ind .- getindex.(Ref(offsets), ind)
end
end
gd
end


"""
trim(gd::GDTable) -> GDTable

Returns `gd` with empty chunks and keys removed.
"""
trim(gd::GDTable) = trim!(GDTable(DTable(gd.dtable.chunks, gd.dtable.tabletype), gd.cols, gd.index))


"""
tabletype!(gd::GDTable)

Provides the type of the underlying table partition and caches it in `gd`.

In case the tabletype cannot be obtained the default return value is `NamedTuple`.
"""
tabletype!(gd::GDTable) = gd.dtable.tabletype = resolve_tabletype(gd.dtable)


"""
tabletype(gd::GDTable)

Provides the type of the underlying table partition.
Uses the cached tabletype if available.

In case the tabletype cannot be obtained the default return value is `NamedTuple`.
"""
tabletype(gd::GDTable) = gd.dtable.tabletype === nothing ? resolve_tabletype(gd.dtable) : gd.dtable.tabletype


show(io::IO, gd::GDTable) = show(io, MIME"text/plain"(), gd)


function show(io::IO, ::MIME"text/plain", gd::GDTable)
tabletype = isnothing(gd.dtable.tabletype) ? "unknown (use `tabletype!(::GDTable)`)" : gd.dtable.tabletype
grouped_by_cols = isnothing(gd.cols) ? "custom function" : grouped_cols(gd)
println(io, "GDTable with $(length(gd.dtable.chunks)) partitions and $(length(keys(gd.index))) keys")
println(io, "Tabletype: $tabletype")
print(io, "Grouped by: $grouped_by_cols")
nothing
end

"""
getindex(gdt::GDTable, key) -> DTable

Retrieves a `DTable` from `gdt` with rows belonging to the provided grouping key.
"""
function getindex(gdt::GDTable, key)
key ∉ keys(gdt) && throw(KeyError("Key $key not present in the GDTable"))
# TODO: try to resolve more forms of key even if it doesn't exactly match the key in the dict
partition(gdt, key)
end
Loading