Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zstd: Reduce per decoder allocations significantly #252

Merged
merged 2 commits into from
Apr 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions zstd/blockdec.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,25 @@ func (b *blockDec) reset(br byteBuffer, windowSize uint64) error {
b.Type = blockType((bh >> 1) & 3)
// find size.
cSize := int(bh >> 3)
maxSize := maxBlockSize
switch b.Type {
case blockTypeReserved:
return ErrReservedBlockType
case blockTypeRLE:
b.RLESize = uint32(cSize)
if b.lowMem {
maxSize = cSize
}
cSize = 1
case blockTypeCompressed:
if debug {
println("Data size on stream:", cSize)
}
b.RLESize = 0
maxSize = maxCompressedBlockSize
if windowSize < maxCompressedBlockSize && b.lowMem {
maxSize = int(windowSize)
}
if cSize > maxCompressedBlockSize || uint64(cSize) > b.WindowSize {
if debug {
printf("compressed block too big: csize:%d block: %+v\n", uint64(cSize), b)
Expand All @@ -160,8 +168,8 @@ func (b *blockDec) reset(br byteBuffer, windowSize uint64) error {
b.dataStorage = make([]byte, 0, maxBlockSize)
}
}
if cap(b.dst) <= maxBlockSize {
b.dst = make([]byte, 0, maxBlockSize+1)
if cap(b.dst) <= maxSize {
b.dst = make([]byte, 0, maxSize+1)
}
var err error
b.data, err = br.readBig(cSize, b.dataStorage)
Expand Down Expand Up @@ -679,8 +687,11 @@ func (b *blockDec) decodeCompressed(hist *history) error {
println("initializing sequences:", err)
return err
}

err = seqs.decode(nSeqs, br, hist.b)
hbytes := hist.b
if len(hbytes) > hist.windowSize {
hbytes = hbytes[len(hbytes)-hist.windowSize:]
}
err = seqs.decode(nSeqs, br, hbytes)
if err != nil {
return err
}
Expand Down
62 changes: 62 additions & 0 deletions zstd/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,68 @@ func TestNewDecoder(t *testing.T) {
testDecoderDecodeAll(t, "testdata/decoder.zip", dec)
}

func TestNewDecoderMemory(t *testing.T) {
defer timeout(60 * time.Second)()
var testdata bytes.Buffer
enc, err := NewWriter(&testdata, WithWindowSize(64<<10), WithSingleSegment(false))
if err != nil {
t.Fatal(err)
}
// Write 256KB
for i := 0; i < 256; i++ {
tmp := strings.Repeat(string([]byte{byte(i)}), 1024)
_, err := enc.Write([]byte(tmp))
if err != nil {
t.Fatal(err)
}
}
err = enc.Close()
if err != nil {
t.Fatal(err)
}

var n = 5000
if testing.Short() {
n = 200
}

var before, after runtime.MemStats
runtime.GC()
runtime.ReadMemStats(&before)

var decs = make([]*Decoder, n)
for i := range decs {
// Wrap in NopCloser to avoid shortcut.
input := ioutil.NopCloser(bytes.NewBuffer(testdata.Bytes()))
decs[i], err = NewReader(input, WithDecoderConcurrency(1), WithDecoderLowmem(true))
if err != nil {
t.Fatal(err)
}
}

// 32K buffer
var tmp [128 << 10]byte
for i := range decs {
_, err := io.ReadFull(decs[i], tmp[:])
if err != nil {
t.Fatal(err)
}
}

runtime.GC()
runtime.ReadMemStats(&after)
size := (after.HeapInuse - before.HeapInuse) / uint64(n) / 1024
t.Log(size, "KiB per decoder")
// This is not exact science, but fail if we suddenly get more than 2x what we expect.
if size > 221*2 && !testing.Short() {
t.Errorf("expected < 221KB per decoder, got %d", size)
}

for _, dec := range decs {
dec.Close()
}
}

func TestNewDecoderGood(t *testing.T) {
defer timeout(30 * time.Second)()
testDecoderFile(t, "testdata/good.zip")
Expand Down
10 changes: 7 additions & 3 deletions zstd/framedec.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,11 @@ func (d *frameDec) reset(br byteBuffer) error {
return ErrWindowSizeTooSmall
}
d.history.windowSize = int(d.WindowSize)
d.history.maxSize = d.history.windowSize + maxBlockSize
if d.o.lowMem && d.history.windowSize < maxBlockSize {
d.history.maxSize = d.history.windowSize * 2
} else {
d.history.maxSize = d.history.windowSize + maxBlockSize
}
// history contains input - maybe we do something
d.rawInput = br
return nil
Expand Down Expand Up @@ -320,8 +324,8 @@ func (d *frameDec) checkCRC() error {

func (d *frameDec) initAsync() {
if !d.o.lowMem && !d.SingleSegment {
// set max extra size history to 20MB.
d.history.maxSize = d.history.windowSize + maxBlockSize*10
// set max extra size history to 10MB.
d.history.maxSize = d.history.windowSize + maxBlockSize*5
}
// re-alloc if more than one extra block size.
if d.o.lowMem && cap(d.history.b) > d.history.maxSize+maxBlockSize {
Expand Down