@@ -223,17 +223,55 @@ func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) {
223
223
}
224
224
225
225
func openAtIndex (dirpath string , snap walpb.Snapshot , write bool ) (* WAL , error ) {
226
- names , err := readWalNames (dirpath )
226
+ names , nameIndex , err := selectWALFiles (dirpath , snap )
227
227
if err != nil {
228
228
return nil , err
229
229
}
230
230
231
+ rs , ls , closer , err := openWALFiles (dirpath , names , nameIndex , write )
232
+ if err != nil {
233
+ return nil , err
234
+ }
235
+
236
+ // create a WAL ready for reading
237
+ w := & WAL {
238
+ dir : dirpath ,
239
+ start : snap ,
240
+ decoder : newDecoder (rs ... ),
241
+ readClose : closer ,
242
+ locks : ls ,
243
+ }
244
+
245
+ if write {
246
+ // write reuses the file descriptors from read; don't close so
247
+ // WAL can append without dropping the file lock
248
+ w .readClose = nil
249
+ if _ , _ , err := parseWALName (filepath .Base (w .tail ().Name ())); err != nil {
250
+ closer ()
251
+ return nil , err
252
+ }
253
+ w .fp = newFilePipeline (w .dir , SegmentSizeBytes )
254
+ }
255
+
256
+ return w , nil
257
+ }
258
+
259
+ func selectWALFiles (dirpath string , snap walpb.Snapshot ) ([]string , int , error ) {
260
+ names , err := readWALNames (dirpath )
261
+ if err != nil {
262
+ return nil , - 1 , err
263
+ }
264
+
231
265
nameIndex , ok := searchIndex (names , snap .Index )
232
266
if ! ok || ! isValidSeq (names [nameIndex :]) {
233
- return nil , ErrFileNotFound
267
+ err = ErrFileNotFound
268
+ return nil , - 1 , err
234
269
}
235
270
236
- // open the wal files
271
+ return names , nameIndex , nil
272
+ }
273
+
274
+ func openWALFiles (dirpath string , names []string , nameIndex int , write bool ) ([]io.Reader , []* fileutil.LockedFile , func () error , error ) {
237
275
rcs := make ([]io.ReadCloser , 0 )
238
276
rs := make ([]io.Reader , 0 )
239
277
ls := make ([]* fileutil.LockedFile , 0 )
@@ -243,15 +281,15 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
243
281
l , err := fileutil .TryLockFile (p , os .O_RDWR , fileutil .PrivateFileMode )
244
282
if err != nil {
245
283
closeAll (rcs ... )
246
- return nil , err
284
+ return nil , nil , nil , err
247
285
}
248
286
ls = append (ls , l )
249
287
rcs = append (rcs , l )
250
288
} else {
251
289
rf , err := os .OpenFile (p , os .O_RDONLY , fileutil .PrivateFileMode )
252
290
if err != nil {
253
291
closeAll (rcs ... )
254
- return nil , err
292
+ return nil , nil , nil , err
255
293
}
256
294
ls = append (ls , nil )
257
295
rcs = append (rcs , rf )
@@ -261,27 +299,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
261
299
262
300
closer := func () error { return closeAll (rcs ... ) }
263
301
264
- // create a WAL ready for reading
265
- w := & WAL {
266
- dir : dirpath ,
267
- start : snap ,
268
- decoder : newDecoder (rs ... ),
269
- readClose : closer ,
270
- locks : ls ,
271
- }
272
-
273
- if write {
274
- // write reuses the file descriptors from read; don't close so
275
- // WAL can append without dropping the file lock
276
- w .readClose = nil
277
- if _ , _ , err := parseWalName (filepath .Base (w .tail ().Name ())); err != nil {
278
- closer ()
279
- return nil , err
280
- }
281
- w .fp = newFilePipeline (w .dir , SegmentSizeBytes )
282
- }
283
-
284
- return w , nil
302
+ return rs , ls , closer , nil
285
303
}
286
304
287
305
// ReadAll reads out records of the current WAL.
@@ -398,6 +416,85 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
398
416
return metadata , state , ents , err
399
417
}
400
418
419
+ // Verify reads through the given WAL and verifies that it is not corrupted.
420
+ // It creates a new decoder to read through the records of the given WAL.
421
+ // It does not conflict with any open WAL, but it is recommended not to
422
+ // call this function after opening the WAL for writing.
423
+ // If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
424
+ // If the loaded snap doesn't match with the expected one, it will
425
+ // return error ErrSnapshotMismatch.
426
+ func Verify (walDir string , snap walpb.Snapshot ) error {
427
+ var metadata []byte
428
+ var err error
429
+ var match bool
430
+
431
+ rec := & walpb.Record {}
432
+
433
+ names , nameIndex , err := selectWALFiles (walDir , snap )
434
+ if err != nil {
435
+ return err
436
+ }
437
+
438
+ // open wal files in read mode, so that there is no conflict
439
+ // when the same WAL is opened elsewhere in write mode
440
+ rs , _ , closer , err := openWALFiles (walDir , names , nameIndex , false )
441
+ if err != nil {
442
+ return err
443
+ }
444
+
445
+ // create a new decoder from the readers on the WAL files
446
+ decoder := newDecoder (rs ... )
447
+
448
+ for err = decoder .decode (rec ); err == nil ; err = decoder .decode (rec ) {
449
+ switch rec .Type {
450
+ case metadataType :
451
+ if metadata != nil && ! bytes .Equal (metadata , rec .Data ) {
452
+ return ErrMetadataConflict
453
+ }
454
+ metadata = rec .Data
455
+ case crcType :
456
+ crc := decoder .crc .Sum32 ()
457
+ // Current crc of decoder must match the crc of the record.
458
+ // We need not match 0 crc, since the decoder is a new one at this point.
459
+ if crc != 0 && rec .Validate (crc ) != nil {
460
+ return ErrCRCMismatch
461
+ }
462
+ decoder .updateCRC (rec .Crc )
463
+ case snapshotType :
464
+ var loadedSnap walpb.Snapshot
465
+ pbutil .MustUnmarshal (& loadedSnap , rec .Data )
466
+ if loadedSnap .Index == snap .Index {
467
+ if loadedSnap .Term != snap .Term {
468
+ return ErrSnapshotMismatch
469
+ }
470
+ match = true
471
+ }
472
+ // We ignore all entry and state type records as these
473
+ // are not necessary for validating the WAL contents
474
+ case entryType :
475
+ case stateType :
476
+ default :
477
+ return fmt .Errorf ("unexpected block type %d" , rec .Type )
478
+ }
479
+ }
480
+
481
+ if closer != nil {
482
+ closer ()
483
+ }
484
+
485
+ // We do not have to read out all the WAL entries
486
+ // as the decoder is opened in read mode.
487
+ if err != io .EOF && err != io .ErrUnexpectedEOF {
488
+ return err
489
+ }
490
+
491
+ if ! match {
492
+ return ErrSnapshotNotFound
493
+ }
494
+
495
+ return nil
496
+ }
497
+
401
498
// cut closes current file written and creates a new one ready to append.
402
499
// cut first creates a temp wal file and writes necessary headers into it.
403
500
// Then cut atomically rename temp wal file to a wal file.
0 commit comments