diff --git a/wal/wal.go b/wal/wal.go index 7200ad088dd..73bef540a36 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -299,17 +299,55 @@ func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, err } func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) { - names, err := readWALNames(lg, dirpath) + names, nameIndex, err := selectWALFiles(lg, dirpath, snap) if err != nil { return nil, err } + rs, ls, closer, err := openWALFiles(lg, dirpath, names, nameIndex, write) + if err != nil { + return nil, err + } + + // create a WAL ready for reading + w := &WAL{ + dir: dirpath, + start: snap, + decoder: newDecoder(rs...), + readClose: closer, + locks: ls, + } + + if write { + // write reuses the file descriptors from read; don't close so + // WAL can append without dropping the file lock + w.readClose = nil + if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil { + closer() + return nil, err + } + w.fp = newFilePipeline(lg, w.dir, SegmentSizeBytes) + } + + return w, nil +} + +func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) { + names, err := readWALNames(lg, dirpath) + if err != nil { + return nil, -1, err + } + nameIndex, ok := searchIndex(lg, names, snap.Index) if !ok || !isValidSeq(lg, names[nameIndex:]) { - return nil, ErrFileNotFound + err = ErrFileNotFound + return nil, -1, err } - // open the wal files + return names, nameIndex, nil +} + +func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) { rcs := make([]io.ReadCloser, 0) rs := make([]io.Reader, 0) ls := make([]*fileutil.LockedFile, 0) @@ -319,7 +357,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode) if err != nil { closeAll(rcs...) - return nil, err + return nil, nil, nil, err } ls = append(ls, l) rcs = append(rcs, l) @@ -327,7 +365,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode) if err != nil { closeAll(rcs...) - return nil, err + return nil, nil, nil, err } ls = append(ls, nil) rcs = append(rcs, rf) @@ -337,28 +375,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool closer := func() error { return closeAll(rcs...) } - // create a WAL ready for reading - w := &WAL{ - lg: lg, - dir: dirpath, - start: snap, - decoder: newDecoder(rs...), - readClose: closer, - locks: ls, - } - - if write { - // write reuses the file descriptors from read; don't close so - // WAL can append without dropping the file lock - w.readClose = nil - if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil { - closer() - return nil, err - } - w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes) - } - - return w, nil + return rs, ls, closer, nil } // ReadAll reads out records of the current WAL. @@ -480,6 +497,85 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. return metadata, state, ents, err } +// Verify reads through the given WAL and verifies that it is not corrupted. +// It creates a new decoder to read through the records of the given WAL. +// It does not conflict with any open WAL, but it is recommended not to +// call this function after opening the WAL for writing. +// If it cannot read out the expected snap, it will return ErrSnapshotNotFound. +// If the loaded snap doesn't match with the expected one, it will +// return error ErrSnapshotMismatch. +func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) error { + var metadata []byte + var err error + var match bool + + rec := &walpb.Record{} + + names, nameIndex, err := selectWALFiles(lg, walDir, snap) + if err != nil { + return err + } + + // open wal files in read mode, so that there is no conflict + // when the same WAL is opened elsewhere in write mode + rs, _, closer, err := openWALFiles(lg, walDir, names, nameIndex, false) + if err != nil { + return err + } + + // create a new decoder from the readers on the WAL files + decoder := newDecoder(rs...) + + for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + switch rec.Type { + case metadataType: + if metadata != nil && !bytes.Equal(metadata, rec.Data) { + return ErrMetadataConflict + } + metadata = rec.Data + case crcType: + crc := decoder.crc.Sum32() + // Current crc of decoder must match the crc of the record. + // We need not match 0 crc, since the decoder is a new one at this point. + if crc != 0 && rec.Validate(crc) != nil { + return ErrCRCMismatch + } + decoder.updateCRC(rec.Crc) + case snapshotType: + var loadedSnap walpb.Snapshot + pbutil.MustUnmarshal(&loadedSnap, rec.Data) + if loadedSnap.Index == snap.Index { + if loadedSnap.Term != snap.Term { + return ErrSnapshotMismatch + } + match = true + } + // We ignore all entry and state type records as these + // are not necessary for validating the WAL contents + case entryType: + case stateType: + default: + return fmt.Errorf("unexpected block type %d", rec.Type) + } + } + + if closer != nil { + closer() + } + + // We do not have to read out all the WAL entries + // as the decoder is opened in read mode. + if err != io.EOF && err != io.ErrUnexpectedEOF { + return err + } + + if !match { + return ErrSnapshotNotFound + } + + return nil +} + // cut closes current file written and creates a new one ready to append. // cut first creates a temp wal file and writes necessary headers into it. // Then cut atomically rename temp wal file to a wal file. diff --git a/wal/wal_test.go b/wal/wal_test.go index 8477a0402e9..addd094cb00 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -20,6 +20,7 @@ import ( "io/ioutil" "math" "os" + "path" "path/filepath" "reflect" "testing" @@ -186,6 +187,57 @@ func TestOpenAtIndex(t *testing.T) { } } +// TestVerify tests that Verify throws a non-nil error when the WAL is corrupted. +// The test creates a WAL directory and cuts out multiple WAL files. Then +// it corrupts one of the files by completely truncating it. +func TestVerify(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(walDir) + + // create WAL + w, err := Create(zap.NewExample(), walDir, nil) + if err != nil { + t.Fatal(err) + } + defer w.Close() + + // make 5 separate files + for i := 0; i < 5; i++ { + es := []raftpb.Entry{{Index: uint64(i), Data: []byte("waldata" + string(i+1))}} + if err = w.Save(raftpb.HardState{}, es); err != nil { + t.Fatal(err) + } + if err = w.cut(); err != nil { + t.Fatal(err) + } + } + + // to verify the WAL is not corrupted at this point + err = Verify(zap.NewExample(), walDir, walpb.Snapshot{}) + if err != nil { + t.Errorf("expected a nil error, got %v", err) + } + + walFiles, err := ioutil.ReadDir(walDir) + if err != nil { + t.Fatal(err) + } + + // corrupt the WAL by truncating one of the WAL files completely + err = os.Truncate(path.Join(walDir, walFiles[2].Name()), 0) + if err != nil { + t.Fatal(err) + } + + err = Verify(zap.NewExample(), walDir, walpb.Snapshot{}) + if err == nil { + t.Error("expected a non-nil error, got nil") + } +} + // TODO: split it into smaller tests for better readability func TestCut(t *testing.T) { p, err := ioutil.TempDir(os.TempDir(), "waltest")