Skip to content

Commit 981f2fc

Browse files
authored
feat: add option to flush WAL on shutdown (#25452)
Add `--storage-wal-flush-on-shutdown` to flush WAL on database shutdown. On successful shutdown, all WAL data will be committed to TSM files and the WAL directories will not contain any .wal files. Clean cherry-pick of #25444 from main-2.x. Closes: #25422 (cherry picked from commit 96bade4)
1 parent 7483bea commit 981f2fc

File tree

10 files changed

+307
-68
lines changed

10 files changed

+307
-68
lines changed

cmd/influxd/launcher/cmd.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,11 @@ func (o *InfluxdOpts) BindCliOpts() []cli.Opt {
513513
Default: o.StorageConfig.Data.WALMaxWriteDelay,
514514
Desc: "The max amount of time a write will wait when the WAL already has `storage-wal-max-concurrent-writes` active writes. Set to 0 to disable the timeout.",
515515
},
516+
{
517+
DestP: &o.StorageConfig.Data.WALFlushOnShutdown,
518+
Flag: "storage-wal-flush-on-shutdown",
519+
Desc: "Flushes and clears the WAL on shutdown",
520+
},
516521
{
517522
DestP: &o.StorageConfig.Data.ValidateKeys,
518523
Flag: "storage-validate-keys",

tsdb/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ type Config struct {
9595
// disks or when WAL write contention is seen. A value of 0 fsyncs every write to the WAL.
9696
WALFsyncDelay toml.Duration `toml:"wal-fsync-delay"`
9797

98+
// WALFlushOnShutdown determines if the WAL should be flushed when influxd is shutdown.
99+
// This is useful in upgrade and downgrade scenarios to prevent WAL format compatibility issues.
100+
WALFlushOnShutdown bool `toml:"wal-flush-on-shutdown"`
101+
98102
// Enables unicode validation on series keys on write.
99103
ValidateKeys bool `toml:"validate-keys"`
100104

tsdb/config_test.go

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,29 @@ import (
55
"time"
66

77
"github.com/BurntSushi/toml"
8+
"github.com/stretchr/testify/require"
9+
810
"github.com/influxdata/influxdb/v2/tsdb"
911
)
1012

1113
func TestConfig_Parse(t *testing.T) {
1214
// Parse configuration.
1315
c := tsdb.NewConfig()
14-
if _, err := toml.Decode(`
16+
_, err := toml.Decode(`
1517
dir = "/var/lib/influxdb/data"
1618
wal-dir = "/var/lib/influxdb/wal"
1719
wal-fsync-delay = "10s"
20+
wal-flush-on-shutdown = true
1821
tsm-use-madv-willneed = true
19-
`, &c); err != nil {
20-
t.Fatal(err)
21-
}
22-
23-
if err := c.Validate(); err != nil {
24-
t.Errorf("unexpected validate error: %s", err)
25-
}
26-
27-
if got, exp := c.Dir, "/var/lib/influxdb/data"; got != exp {
28-
t.Errorf("unexpected dir:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
29-
}
30-
if got, exp := c.WALDir, "/var/lib/influxdb/wal"; got != exp {
31-
t.Errorf("unexpected wal-dir:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
32-
}
33-
if got, exp := c.WALFsyncDelay, time.Duration(10*time.Second); time.Duration(got).Nanoseconds() != exp.Nanoseconds() {
34-
t.Errorf("unexpected wal-fsync-delay:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
35-
}
36-
if got, exp := c.TSMWillNeed, true; got != exp {
37-
t.Errorf("unexpected tsm-madv-willneed:\n\nexp=%v\n\ngot=%v\n\n", exp, got)
38-
}
22+
`, &c)
23+
require.NoError(t, err)
24+
25+
require.NoError(t, c.Validate())
26+
require.Equal(t, "/var/lib/influxdb/data", c.Dir)
27+
require.Equal(t, "/var/lib/influxdb/wal", c.WALDir)
28+
require.Equal(t, time.Duration(10*time.Second), time.Duration(c.WALFsyncDelay))
29+
require.True(t, c.WALFlushOnShutdown)
30+
require.True(t, c.TSMWillNeed)
3931
}
4032

4133
func TestConfig_Validate_Error(t *testing.T) {

tsdb/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ var (
2929
// Engine represents a swappable storage engine for the shard.
3030
type Engine interface {
3131
Open(ctx context.Context) error
32-
Close() error
32+
Close(flush bool) error
3333
SetEnabled(enabled bool)
3434
SetCompactionsEnabled(enabled bool)
3535
ScheduleFullCompaction() error

tsdb/engine/tsm1/engine.go

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -774,25 +774,31 @@ func (e *Engine) Open(ctx context.Context) error {
774774
}
775775

776776
// Close closes the engine. Subsequent calls to Close are a nop.
777-
func (e *Engine) Close() error {
777+
// If flush is true, then the WAL is flushed and cleared before the
778+
// engine is closed.
779+
func (e *Engine) Close(flush bool) error {
780+
// Flushing the WAL involves writing a snapshot, which has to be done before
781+
// compactions are disabled.
782+
var flushErr error
783+
if e.WALEnabled && flush {
784+
// Even if the snapshot fails, we still have to proceed and close the engine.
785+
flushErr = e.flushWAL()
786+
}
787+
778788
e.SetCompactionsEnabled(false)
779789

780790
// Lock now and close everything else down.
781791
e.mu.Lock()
782792
defer e.mu.Unlock()
783793
e.done = nil // Ensures that the channel will not be closed again.
784794

785-
var err error = nil
786-
err = e.fieldset.Close()
787-
if err2 := e.FileStore.Close(); err2 != nil && err == nil {
788-
err = err2
789-
}
795+
setCloseErr := e.fieldset.Close()
796+
storeCloseErr := e.FileStore.Close()
797+
var walCloseErr error
790798
if e.WALEnabled {
791-
if err2 := e.WAL.Close(); err2 != nil && err == nil {
792-
err = err2
793-
}
799+
walCloseErr = e.WAL.Close()
794800
}
795-
return err
801+
return errors.Join(flushErr, setCloseErr, storeCloseErr, walCloseErr)
796802
}
797803

798804
// WithLogger sets the logger for the engine.
@@ -1835,7 +1841,11 @@ func (e *Engine) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) err
18351841
func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
18361842

18371843
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
1838-
func (e *Engine) WriteSnapshot() (err error) {
1844+
func (e *Engine) WriteSnapshot() error {
1845+
return e.doWriteSnapshot(false)
1846+
}
1847+
1848+
func (e *Engine) doWriteSnapshot(flush bool) (err error) {
18391849
// Lock and grab the cache snapshot along with all the closed WAL
18401850
// filenames associated with the snapshot
18411851

@@ -1858,8 +1868,14 @@ func (e *Engine) WriteSnapshot() (err error) {
18581868
defer e.mu.Unlock()
18591869

18601870
if e.WALEnabled {
1861-
if err = e.WAL.CloseSegment(); err != nil {
1862-
return
1871+
if !flush {
1872+
if err = e.WAL.CloseSegment(); err != nil {
1873+
return
1874+
}
1875+
} else {
1876+
if err = e.WAL.CloseAllSegments(); err != nil {
1877+
return
1878+
}
18631879
}
18641880

18651881
segments, err = e.WAL.ClosedSegments()
@@ -1897,17 +1913,30 @@ func (e *Engine) WriteSnapshot() (err error) {
18971913
return e.writeSnapshotAndCommit(log, closedFiles, snapshot)
18981914
}
18991915

1916+
// flushWAL flushes the WAL and empties the WAL directory.
1917+
func (e *Engine) flushWAL() error {
1918+
return e.writeSnapshotWithRetries(true)
1919+
}
1920+
1921+
// writeSnapshotWithRetries calls WriteSnapshot and will retry with a backoff if WriteSnapshot
1922+
// fails with ErrSnapshotInProgress. If flush is true then no new WAL segments are opened so
1923+
// that the WAL has no segment files on success.
1924+
func (e *Engine) writeSnapshotWithRetries(flush bool) error {
1925+
err := e.doWriteSnapshot(flush)
1926+
for i := 0; i < 3 && err == ErrSnapshotInProgress; i += 1 {
1927+
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
1928+
time.Sleep(backoff)
1929+
err = e.doWriteSnapshot(flush)
1930+
}
1931+
return err
1932+
}
1933+
19001934
// CreateSnapshot will create a temp directory that holds
19011935
// temporary hardlinks to the underlying shard files.
19021936
// skipCacheOk controls whether it is permissible to fail writing out
19031937
// in-memory cache data when a previous snapshot is in progress.
19041938
func (e *Engine) CreateSnapshot(skipCacheOk bool) (string, error) {
1905-
err := e.WriteSnapshot()
1906-
for i := 0; i < 3 && err == ErrSnapshotInProgress; i += 1 {
1907-
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
1908-
time.Sleep(backoff)
1909-
err = e.WriteSnapshot()
1910-
}
1939+
err := e.writeSnapshotWithRetries(false)
19111940
if err == ErrSnapshotInProgress && skipCacheOk {
19121941
e.logger.Warn("Snapshotter busy: proceeding without cache contents")
19131942
} else if err != nil {

tsdb/engine/tsm1/engine_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1825,7 +1825,7 @@ func TestEngine_SnapshotsDisabled(t *testing.T) {
18251825
t.Cleanup(func() { idx.Close() })
18261826

18271827
e := tsm1.NewEngine(1, idx, dir, walPath, sfile.SeriesFile, opt).(*tsm1.Engine)
1828-
t.Cleanup(func() { e.Close() })
1828+
t.Cleanup(func() { e.Close(false) })
18291829

18301830
// mock the planner so compactions don't run during the test
18311831
e.CompactionPlan = &mockPlanner{}
@@ -2644,7 +2644,7 @@ func (e *Engine) close(cleanup bool) error {
26442644
os.RemoveAll(e.root)
26452645
}
26462646
}()
2647-
return e.Engine.Close()
2647+
return e.Engine.Close(false)
26482648
}
26492649

26502650
// Reopen closes and reopens the engine.

tsdb/engine/tsm1/wal.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,14 @@ func (l *WAL) CloseSegment() error {
559559
return nil
560560
}
561561

562+
// CloseAllSegments closes the current segment regardless of whether it contains data
563+
// and does not open a new one.
564+
func (l *WAL) CloseAllSegments() error {
565+
l.mu.Lock()
566+
defer l.mu.Unlock()
567+
return l.closeCurrentSegmentFile()
568+
}
569+
562570
// Delete deletes the given keys, returning the segment ID for the operation.
563571
func (l *WAL) Delete(ctx context.Context, keys [][]byte) (int, error) {
564572
if len(keys) == 0 {
@@ -634,15 +642,24 @@ func segmentFileNames(dir string) ([]string, error) {
634642
return names, nil
635643
}
636644

637-
// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the log.
638-
func (l *WAL) newSegmentFile() error {
639-
l.currentSegmentID++
645+
// closeCurrentSegmentFile will close the current segment file. l.mu must be held before calling this method.
646+
func (l *WAL) closeCurrentSegmentFile() error {
640647
if l.currentSegmentWriter != nil {
641648
l.sync()
642649

643650
if err := l.currentSegmentWriter.close(); err != nil {
644651
return err
645652
}
653+
l.currentSegmentWriter = nil
654+
}
655+
return nil
656+
}
657+
658+
// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the log.
659+
func (l *WAL) newSegmentFile() error {
660+
l.currentSegmentID++
661+
if err := l.closeCurrentSegmentFile(); err != nil {
662+
return err
646663
}
647664

648665
fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension))

tsdb/shard.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ func (s *Shard) openNoLock(ctx context.Context) (bool, error) {
496496

497497
return nil
498498
}(); err != nil {
499-
s.closeNoLock()
499+
s.closeNoLock(false)
500500
return true, NewShardError(s.id, err)
501501
}
502502

@@ -508,12 +508,18 @@ func (s *Shard) openNoLock(ctx context.Context) (bool, error) {
508508
return false, nil
509509
}
510510

511-
// Close shuts down the shard's store.
512-
func (s *Shard) Close() error {
511+
// FlushAndClose flushes the shard's WAL and then closes down the shard's store.
512+
func (s *Shard) FlushAndClose() error {
513+
return s.closeAndWait(true)
514+
}
515+
516+
// closeAndWait shuts down the shard's store and waits for the close to complete.
517+
// If flush is true, the WAL is flushed and cleared before closing.
518+
func (s *Shard) closeAndWait(flush bool) error {
513519
err := func() error {
514520
s.mu.Lock()
515521
defer s.mu.Unlock()
516-
return s.closeNoLock()
522+
return s.closeNoLock(flush)
517523
}()
518524
// make sure not to hold a lock while waiting for close to finish
519525
werr := s.closeWait()
@@ -522,12 +528,19 @@ func (s *Shard) Close() error {
522528
return err
523529
}
524530
return werr
531+
532+
}
533+
534+
// Close shuts down the shard's store.
535+
func (s *Shard) Close() error {
536+
return s.closeAndWait(false)
525537
}
526538

527539
// closeNoLock closes the shard an removes reference to the shard from associated
528-
// indexes. The s.mu mutex must be held before calling closeNoLock. closeWait should always
540+
// indexes. If flush is true, the WAL is flushed and cleared before closing.
541+
// The s.mu mutex must be held before calling closeNoLock. closeWait should always
529542
// be called after calling closeNoLock.
530-
func (s *Shard) closeNoLock() error {
543+
func (s *Shard) closeNoLock(flush bool) error {
531544
if s._engine == nil {
532545
return nil
533546
}
@@ -536,7 +549,7 @@ func (s *Shard) closeNoLock() error {
536549
close(s.metricUpdater.closing)
537550
}
538551

539-
err := s._engine.Close()
552+
err := s._engine.Close(flush)
540553
if err == nil {
541554
s._engine = nil
542555
}
@@ -1271,7 +1284,7 @@ func (s *Shard) Restore(ctx context.Context, r io.Reader, basePath string) error
12711284

12721285
// Close shard.
12731286
closeWaitNeeded = true // about to call closeNoLock, closeWait will be needed
1274-
if err := s.closeNoLock(); err != nil {
1287+
if err := s.closeNoLock(false); err != nil {
12751288
return closeWaitNeeded, err
12761289
}
12771290
return closeWaitNeeded, nil

tsdb/store.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,11 @@ func (s *Store) Close() error {
512512

513513
// Close all the shards in parallel.
514514
if err := s.walkShards(s.shardsSlice(), func(sh *Shard) error {
515-
return sh.Close()
515+
if s.EngineOptions.Config.WALFlushOnShutdown {
516+
return sh.FlushAndClose()
517+
} else {
518+
return sh.Close()
519+
}
516520
}); err != nil {
517521
return err
518522
}

0 commit comments

Comments
 (0)