Skip to content

Commit 78e1d77

Browse files
authored
feat: check for uncommitted WRR segments during startup (#25560)
Check for uncommitted WRR segments during startup and abort startup if found. Closes: #25559 (cherry picked from commit 037c6af)
1 parent 941a41b commit 78e1d77

File tree

2 files changed

+205
-0
lines changed

2 files changed

+205
-0
lines changed

tsdb/store.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"runtime"
1313
"sort"
1414
"strconv"
15+
"strings"
1516
"sync"
1617
"time"
1718

@@ -32,6 +33,8 @@ import (
3233
)
3334

3435
var (
36+
// ErrIncompatibleWAL is returned if incompatible WAL files are detected.
37+
ErrIncompatibleWAL = errors.New("incompatible WAL format")
3538
// ErrShardNotFound is returned when trying to get a non existing shard.
3639
ErrShardNotFound = fmt.Errorf("shard not found")
3740
// ErrStoreClosed is returned when trying to use a closed Store.
@@ -308,6 +311,76 @@ func (s *Store) Open(ctx context.Context) error {
308311
return nil
309312
}
310313

314+
const (
315+
wrrFileExtension = "wrr"
316+
wrrPrefixVersioned = "_v"
317+
wrrSnapshotExtension = "snapshot"
318+
)
319+
320+
// generateWRRSegmentFileGlob generates a glob to find all .wrr and related files in a
321+
// WAL directory.
322+
func generateWRRSegmentFileGlob() string {
323+
return fmt.Sprintf("%s*.%s*", wrrPrefixVersioned, wrrFileExtension)
324+
}
325+
326+
// checkUncommittedWRR determines if there are any uncommitted WRR files found in shardWALPath.
327+
// shardWALPath is the path to a single shard's WAL, not the overall WAL path.
328+
// If no uncommitted WRR files are found, then nil is returned. Otherwise, an error indicating
329+
// the names of uncommitted WRR files is returned. The error returned contains the full context
330+
// and does not require additional information.
331+
func checkUncommittedWRR(shardWALPath string) error {
332+
// It is OK if there are .wrr files as long as they are committed. Committed .wrr files will
333+
// have a .wrr.snapshot newer than the .wrr file. If there is no .wrr.snapshot file newer
334+
// than a given .wrr file, then that .wrr file is uncommitted and we should return an error
335+
// indicating possible data loss due to an in-place conversion of an incompatible WAL format.
336+
// Note that newness for .wrr and .wrr.snapshot files is determined lexically by the name,
337+
// and not the ctime or mtime of the files.
338+
339+
unfilteredNames, err := filepath.Glob(filepath.Join(shardWALPath, generateWRRSegmentFileGlob()))
340+
if err != nil {
341+
return fmt.Errorf("error finding WRR files in %q: %w", shardWALPath, err)
342+
}
343+
snapshotExt := fmt.Sprintf(".%s.%s", wrrFileExtension, wrrSnapshotExtension)
344+
345+
// Strip out files that are not .wal or .wal.snapshot, given the glob pattern
346+
// could include false positives, such as foo.wally or foo.wal.snapshotted
347+
names := make([]string, 0, len(unfilteredNames))
348+
for _, name := range unfilteredNames {
349+
if strings.HasSuffix(name, wrrFileExtension) || strings.HasSuffix(name, snapshotExt) {
350+
names = append(names, name)
351+
}
352+
}
353+
354+
sort.Strings(names)
355+
356+
// Find the last snapshot and collect the files after it
357+
for i := len(names) - 1; i >= 0; i-- {
358+
if strings.HasSuffix(names[i], snapshotExt) {
359+
names = names[i+1:]
360+
break
361+
}
362+
}
363+
364+
// names now contains a list of uncommitted WRR files.
365+
if len(names) > 0 {
366+
return fmt.Errorf("%w: uncommitted WRR files found: %v", ErrIncompatibleWAL, names)
367+
}
368+
369+
return nil
370+
}
371+
372+
// checkWALCompatibility ensures that an uncommitted WAL segments in an incompatible
373+
// format are not present. shardWALPath is the path to a single shard's WAL, not the
374+
// overall WAL path. A ErrIncompatibleWAL error with further details is returned if
375+
// an incompatible WAL with unflushed segments is found, The error returned contains
376+
// the full context and does not require additional information.
377+
func checkWALCompatibility(shardWALPath string) error {
378+
// There is one known incompatible WAL format, the .wrr format. Finding these is a problem
379+
// if they are uncommitted. OSS can not read .wrr WAL files, so any uncommitted data in them
380+
// will be lost.
381+
return checkUncommittedWRR(shardWALPath)
382+
}
383+
311384
// generateTrailingPath returns the last part of a shard path or WAL path
312385
// based on the shardID, db, and rp.
313386
func (s *Store) generateTrailingPath(shardID uint64, db, rp string) string {
@@ -507,6 +580,21 @@ func (s *Store) loadShards(ctx context.Context) error {
507580
}
508581
}
509582

583+
// Verify no incompatible WAL files. Do this before starting to load shards to fail early if found.
584+
// All shards are scanned instead of stopping at just the first one so that the admin will see
585+
// all the problematic shards.
586+
if s.EngineOptions.WALEnabled {
587+
var errs []error
588+
for _, sh := range shards {
589+
if err := checkWALCompatibility(s.generateWALPath(sh.id, sh.db, sh.rp)); err != nil {
590+
errs = append(errs, err)
591+
}
592+
}
593+
if len(errs) > 0 {
594+
return errors.Join(errs...)
595+
}
596+
}
597+
510598
// Do the actual work of loading shards.
511599
shardResC := make(chan *shardResponse, len(shards))
512600
pendingShardCount := 0

tsdb/store_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/influxdata/influxdb/v2/models"
2626
"github.com/influxdata/influxdb/v2/pkg/deep"
2727
"github.com/influxdata/influxdb/v2/pkg/slices"
28+
"github.com/influxdata/influxdb/v2/pkg/snowflake"
2829
"github.com/influxdata/influxdb/v2/predicate"
2930
"github.com/influxdata/influxdb/v2/tsdb"
3031
"github.com/influxdata/influxql"
@@ -886,6 +887,122 @@ func TestStore_FlushWALOnClose(t *testing.T) {
886887
}
887888
}
888889

890+
func TestStore_WRRSegments(t *testing.T) {
891+
// Check if uncommitted WRR segments are identified and cause opening the store to abort.
892+
for _, index := range tsdb.RegisteredIndexes() {
893+
t.Run("TestStore_WRRSegments_"+index, func(t *testing.T) {
894+
idGen := snowflake.New(0)
895+
generateWRRSegmentName := func() string {
896+
return fmt.Sprintf("_v01_%020d.wrr", idGen.Next())
897+
}
898+
createFile := func(t *testing.T, fn string) {
899+
t.Helper()
900+
require.NoError(t, os.WriteFile(fn, nil, 0666))
901+
}
902+
createWRR := func(t *testing.T, path string) string {
903+
t.Helper()
904+
fn := filepath.Join(path, generateWRRSegmentName())
905+
createFile(t, fn)
906+
return fn
907+
}
908+
generateWRRSnapshotName := func() string {
909+
return generateWRRSegmentName() + ".snapshot"
910+
}
911+
createWRRSnapshot := func(t *testing.T, path string) string {
912+
t.Helper()
913+
fn := filepath.Join(path, generateWRRSnapshotName())
914+
createFile(t, fn)
915+
return fn
916+
}
917+
checkWRRError := func(t *testing.T, err error, wrrs ...[]string) {
918+
t.Helper()
919+
require.ErrorIs(t, err, tsdb.ErrIncompatibleWAL)
920+
require.ErrorContains(t, err, "incompatible WAL format: uncommitted WRR files found")
921+
// We don't know the exact order of the errors if there are multiple shards with
922+
// uncommitted WRRs, but this will insure that all of them are included in the error
923+
// message.
924+
for _, w := range wrrs {
925+
if len(w) > 0 {
926+
require.ErrorContains(t, err, fmt.Sprintf("%v", w))
927+
}
928+
}
929+
}
930+
931+
s := MustOpenStore(t, index, WithWALFlushOnShutdown(true))
932+
defer s.Close()
933+
934+
// Create shard #0 with data.
935+
s.MustCreateShardWithData("db0", "rp0", 0,
936+
`cpu,host=serverA value=1 0`,
937+
`cpu,host=serverA value=2 10`,
938+
`cpu,host=serverB value=3 20`,
939+
)
940+
941+
// Create shard #1 with data.
942+
s.MustCreateShardWithData("db0", "rp0", 1,
943+
`cpu,host=serverA value=1 30`,
944+
`cpu,host=serverC value=3 60`,
945+
)
946+
947+
sh0WALPath := filepath.Join(s.walPath, "db0", "rp0", "0")
948+
require.DirExists(t, sh0WALPath)
949+
sh1WALPath := filepath.Join(s.walPath, "db0", "rp0", "1")
950+
require.DirExists(t, sh1WALPath)
951+
952+
// No WRR segments, no error
953+
require.NoError(t, s.Reopen(t))
954+
955+
// 1 uncommitted WRR segment in shard 0
956+
var sh0Uncommitted, sh1Uncommitted []string
957+
checkReopen := func(t *testing.T) {
958+
t.Helper()
959+
allUncommitted := [][]string{sh0Uncommitted, sh1Uncommitted}
960+
var hasUncommitted bool
961+
for _, u := range allUncommitted {
962+
if len(u) > 0 {
963+
hasUncommitted = true
964+
}
965+
}
966+
967+
if hasUncommitted {
968+
checkWRRError(t, s.Reopen(t), allUncommitted...)
969+
} else {
970+
require.NoError(t, s.Reopen(t))
971+
}
972+
}
973+
sh0Uncommitted = append(sh0Uncommitted, createWRR(t, sh0WALPath))
974+
checkReopen(t)
975+
976+
// 2 uncommitted WRR segments in shard 0
977+
sh0Uncommitted = append(sh0Uncommitted, createWRR(t, sh0WALPath))
978+
checkReopen(t)
979+
980+
// 2 uncommitted WR segments in shard 0, 1 in shard 1
981+
sh1Uncommitted = append(sh1Uncommitted, createWRR(t, sh1WALPath))
982+
checkReopen(t)
983+
984+
// No uncommitted WRR in shard 0, 1 in shard 1
985+
createWRRSnapshot(t, sh0WALPath)
986+
sh0Uncommitted = nil
987+
checkReopen(t)
988+
989+
// No uncommitted WRR segments
990+
createWRRSnapshot(t, sh1WALPath)
991+
sh1Uncommitted = nil
992+
checkReopen(t)
993+
994+
// Add 1 uncommitted to shard 1
995+
sh1Uncommitted = append(sh1Uncommitted, createWRR(t, sh1WALPath))
996+
checkReopen(t)
997+
998+
// No uncommitted WRR segments
999+
createWRRSnapshot(t, sh1WALPath)
1000+
sh1Uncommitted = nil
1001+
checkReopen(t)
1002+
})
1003+
}
1004+
}
1005+
8891006
// Test new reader blocking.
8901007
func TestStore_NewReadersBlocked(t *testing.T) {
8911008
//t.Parallel()

0 commit comments

Comments
 (0)