@@ -299,17 +299,55 @@ func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, err
299
299
}
300
300
301
301
func openAtIndex (lg * zap.Logger , dirpath string , snap walpb.Snapshot , write bool ) (* WAL , error ) {
302
- names , err := readWALNames (lg , dirpath )
302
+ names , nameIndex , err := selectWALFiles (lg , dirpath , snap )
303
303
if err != nil {
304
304
return nil , err
305
305
}
306
306
307
+ rs , ls , closer , err := openWALFiles (lg , dirpath , names , nameIndex , write )
308
+ if err != nil {
309
+ return nil , err
310
+ }
311
+
312
+ // create a WAL ready for reading
313
+ w := & WAL {
314
+ dir : dirpath ,
315
+ start : snap ,
316
+ decoder : newDecoder (rs ... ),
317
+ readClose : closer ,
318
+ locks : ls ,
319
+ }
320
+
321
+ if write {
322
+ // write reuses the file descriptors from read; don't close so
323
+ // WAL can append without dropping the file lock
324
+ w .readClose = nil
325
+ if _ , _ , err := parseWALName (filepath .Base (w .tail ().Name ())); err != nil {
326
+ closer ()
327
+ return nil , err
328
+ }
329
+ w .fp = newFilePipeline (lg , w .dir , SegmentSizeBytes )
330
+ }
331
+
332
+ return w , nil
333
+ }
334
+
335
+ func selectWALFiles (lg * zap.Logger , dirpath string , snap walpb.Snapshot ) ([]string , int , error ) {
336
+ names , err := readWALNames (lg , dirpath )
337
+ if err != nil {
338
+ return nil , - 1 , err
339
+ }
340
+
307
341
nameIndex , ok := searchIndex (lg , names , snap .Index )
308
342
if ! ok || ! isValidSeq (lg , names [nameIndex :]) {
309
- return nil , ErrFileNotFound
343
+ err = ErrFileNotFound
344
+ return nil , - 1 , err
310
345
}
311
346
312
- // open the wal files
347
+ return names , nameIndex , nil
348
+ }
349
+
350
+ func openWALFiles (lg * zap.Logger , dirpath string , names []string , nameIndex int , write bool ) ([]io.Reader , []* fileutil.LockedFile , func () error , error ) {
313
351
rcs := make ([]io.ReadCloser , 0 )
314
352
rs := make ([]io.Reader , 0 )
315
353
ls := make ([]* fileutil.LockedFile , 0 )
@@ -319,15 +357,15 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
319
357
l , err := fileutil .TryLockFile (p , os .O_RDWR , fileutil .PrivateFileMode )
320
358
if err != nil {
321
359
closeAll (rcs ... )
322
- return nil , err
360
+ return nil , nil , nil , err
323
361
}
324
362
ls = append (ls , l )
325
363
rcs = append (rcs , l )
326
364
} else {
327
365
rf , err := os .OpenFile (p , os .O_RDONLY , fileutil .PrivateFileMode )
328
366
if err != nil {
329
367
closeAll (rcs ... )
330
- return nil , err
368
+ return nil , nil , nil , err
331
369
}
332
370
ls = append (ls , nil )
333
371
rcs = append (rcs , rf )
@@ -337,28 +375,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
337
375
338
376
closer := func () error { return closeAll (rcs ... ) }
339
377
340
- // create a WAL ready for reading
341
- w := & WAL {
342
- lg : lg ,
343
- dir : dirpath ,
344
- start : snap ,
345
- decoder : newDecoder (rs ... ),
346
- readClose : closer ,
347
- locks : ls ,
348
- }
349
-
350
- if write {
351
- // write reuses the file descriptors from read; don't close so
352
- // WAL can append without dropping the file lock
353
- w .readClose = nil
354
- if _ , _ , err := parseWALName (filepath .Base (w .tail ().Name ())); err != nil {
355
- closer ()
356
- return nil , err
357
- }
358
- w .fp = newFilePipeline (w .lg , w .dir , SegmentSizeBytes )
359
- }
360
-
361
- return w , nil
378
+ return rs , ls , closer , nil
362
379
}
363
380
364
381
// ReadAll reads out records of the current WAL.
@@ -480,6 +497,85 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
480
497
return metadata , state , ents , err
481
498
}
482
499
500
+ // Verify reads through the given WAL and verifies that it is not corrupted.
501
+ // It creates a new decoder to read through the records of the given WAL.
502
+ // It does not conflict with any open WAL, but it is recommended not to
503
+ // call this function after opening the WAL for writing.
504
+ // If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
505
+ // If the loaded snap doesn't match with the expected one, it will
506
+ // return error ErrSnapshotMismatch.
507
+ func Verify (lg * zap.Logger , walDir string , snap walpb.Snapshot ) error {
508
+ var metadata []byte
509
+ var err error
510
+ var match bool
511
+
512
+ rec := & walpb.Record {}
513
+
514
+ names , nameIndex , err := selectWALFiles (lg , walDir , snap )
515
+ if err != nil {
516
+ return err
517
+ }
518
+
519
+ // open wal files in read mode, so that there is no conflict
520
+ // when the same WAL is opened elsewhere in write mode
521
+ rs , _ , closer , err := openWALFiles (lg , walDir , names , nameIndex , false )
522
+ if err != nil {
523
+ return err
524
+ }
525
+
526
+ // create a new decoder from the readers on the WAL files
527
+ decoder := newDecoder (rs ... )
528
+
529
+ for err = decoder .decode (rec ); err == nil ; err = decoder .decode (rec ) {
530
+ switch rec .Type {
531
+ case metadataType :
532
+ if metadata != nil && ! bytes .Equal (metadata , rec .Data ) {
533
+ return ErrMetadataConflict
534
+ }
535
+ metadata = rec .Data
536
+ case crcType :
537
+ crc := decoder .crc .Sum32 ()
538
+ // Current crc of decoder must match the crc of the record.
539
+ // We need not match 0 crc, since the decoder is a new one at this point.
540
+ if crc != 0 && rec .Validate (crc ) != nil {
541
+ return ErrCRCMismatch
542
+ }
543
+ decoder .updateCRC (rec .Crc )
544
+ case snapshotType :
545
+ var loadedSnap walpb.Snapshot
546
+ pbutil .MustUnmarshal (& loadedSnap , rec .Data )
547
+ if loadedSnap .Index == snap .Index {
548
+ if loadedSnap .Term != snap .Term {
549
+ return ErrSnapshotMismatch
550
+ }
551
+ match = true
552
+ }
553
+ // We ignore all entry and state type records as these
554
+ // are not necessary for validating the WAL contents
555
+ case entryType :
556
+ case stateType :
557
+ default :
558
+ return fmt .Errorf ("unexpected block type %d" , rec .Type )
559
+ }
560
+ }
561
+
562
+ if closer != nil {
563
+ closer ()
564
+ }
565
+
566
+ // We do not have to read out all the WAL entries
567
+ // as the decoder is opened in read mode.
568
+ if err != io .EOF && err != io .ErrUnexpectedEOF {
569
+ return err
570
+ }
571
+
572
+ if ! match {
573
+ return ErrSnapshotNotFound
574
+ }
575
+
576
+ return nil
577
+ }
578
+
483
579
// cut closes current file written and creates a new one ready to append.
484
580
// cut first creates a temp wal file and writes necessary headers into it.
485
581
// Then cut atomically rename temp wal file to a wal file.
0 commit comments