Skip to content

Commit 93d6ffc

Browse files
authored
fix: do not rename files on mmap failure (#25356)
If NewTSMReader() fails because mmap fails, do not rename the file, because the error is probably caused by vm.max_map_count being too low Closes: #25351 (cherry picked from commit 5aff511)
1 parent 07df053 commit 93d6ffc

File tree

5 files changed

+209
-24
lines changed

5 files changed

+209
-24
lines changed

tsdb/engine/tsm1/engine.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,11 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
172172
wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
173173
}
174174

175-
fs := NewFileStore(path, etags)
175+
fs := NewFileStore(path, etags, WithMadviseWillNeed(opt.Config.TSMWillNeed))
176176
fs.openLimiter = opt.OpenLimiter
177177
if opt.FileStoreObserver != nil {
178178
fs.WithObserver(opt.FileStoreObserver)
179179
}
180-
fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed
181180

182181
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), etags)
183182

tsdb/engine/tsm1/file_store.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,8 @@ type FileStore struct {
176176
currentGeneration int
177177
dir string
178178

179-
files []TSMFile
180-
tsmMMAPWillNeed bool // If true then the kernel will be advised MMAP_WILLNEED for TSM files.
181-
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.
179+
files []TSMFile
180+
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.
182181

183182
logger *zap.Logger // Logger to be used for important messages
184183
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
@@ -198,6 +197,8 @@ type FileStore struct {
198197
// newReaderBlockCount keeps track of the current new reader block requests.
199198
// If non-zero, no new TSMReader objects may be created.
200199
newReaderBlockCount int
200+
201+
readerOptions []tsmReaderOption
201202
}
202203

203204
// FileStat holds information about a TSM file on disk.
@@ -234,7 +235,7 @@ func (f FileStat) ContainsKey(key []byte) bool {
234235
}
235236

236237
// NewFileStore returns a new instance of FileStore based on the given directory.
237-
func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore {
238+
func NewFileStore(dir string, tags tsdb.EngineTags, options ...tsmReaderOption) *FileStore {
238239
logger := zap.NewNop()
239240
fs := &FileStore{
240241
dir: dir,
@@ -250,6 +251,7 @@ func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore {
250251
obs: noFileStoreObserver{},
251252
parseFileName: DefaultParseFileName,
252253
copyFiles: runtime.GOOS == "windows",
254+
readerOptions: options,
253255
}
254256
fs.purger.fileStore = fs
255257
return fs
@@ -616,28 +618,37 @@ func (f *FileStore) Open(ctx context.Context) error {
616618
defer f.openLimiter.Release()
617619

618620
start := time.Now()
619-
df, err := NewTSMReader(file, WithMadviseWillNeed(f.tsmMMAPWillNeed))
621+
df, err := NewTSMReader(file, f.readerOptions...)
620622
f.logger.Info("Opened file",
621623
zap.String("path", file.Name()),
622624
zap.Int("id", idx),
623625
zap.Duration("duration", time.Since(start)))
624626

625-
// If we are unable to read a TSM file then log the error, rename
626-
// the file, and continue loading the shard without it.
627+
// If we are unable to read a TSM file then log the error.
627628
if err != nil {
628629
if cerr := file.Close(); cerr != nil {
629630
f.logger.Error("Error closing TSM file after error", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(cerr))
630631
}
631-
// If the file is corrupt, rename it and
632-
// continue loading the shard without it.
633-
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
634-
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
635-
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
636-
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %w", file.Name(), e)}
632+
if errors.Is(err, MmapError{}) {
633+
// An MmapError may indicate we have insufficient
634+
// handles for the mmap call, in which case the file should
635+
// be left untouched, and the vm.max_map_count be raised.
636+
f.logger.Error("Cannot read TSM file, system limit for vm.max_map_count may be too low",
637+
zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
638+
readerC <- &res{r: df, err: fmt.Errorf("cannot read file %s, system limit for vm.max_map_count may be too low: %v", file.Name(), err)}
639+
return
640+
} else {
641+
// If the file is corrupt, rename it and
642+
// continue loading the shard without it.
643+
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
644+
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
645+
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
646+
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)}
647+
return
648+
}
649+
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)}
637650
return
638651
}
639-
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %w", file.Name(), err)}
640-
return
641652
}
642653
df.WithObserver(f.obs)
643654
readerC <- &res{r: df}
@@ -920,7 +931,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
920931
}
921932
}
922933

923-
tsm, err := NewTSMReader(fd, WithMadviseWillNeed(f.tsmMMAPWillNeed))
934+
tsm, err := NewTSMReader(fd, f.readerOptions...)
924935
if err != nil {
925936
if newName != oldName {
926937
if err1 := os.Rename(newName, oldName); err1 != nil {
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package tsm1
2+
3+
import (
4+
"github.com/influxdata/influxdb/v2/tsdb"
5+
)
6+
7+
var TestMmapInitFailOption = func(err error) tsmReaderOption {
8+
return func(r *TSMReader) {
9+
r.accessor = &badBlockAccessor{error: err}
10+
}
11+
}
12+
13+
type badBlockAccessor struct {
14+
error
15+
initCalled bool
16+
}
17+
18+
func (b *badBlockAccessor) init() (*indirectIndex, error) {
19+
b.initCalled = true
20+
return nil, b.error
21+
}
22+
23+
func (b *badBlockAccessor) read(key []byte, timestamp int64) ([]Value, error) {
24+
//TODO implement me
25+
panic("implement me")
26+
}
27+
28+
func (b *badBlockAccessor) readAll(key []byte) ([]Value, error) {
29+
//TODO implement me
30+
panic("implement me")
31+
}
32+
33+
func (b *badBlockAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
34+
//TODO implement me
35+
panic("implement me")
36+
}
37+
38+
func (b *badBlockAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
39+
//TODO implement me
40+
panic("implement me")
41+
}
42+
43+
func (b *badBlockAccessor) readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error {
44+
//TODO implement me
45+
panic("implement me")
46+
}
47+
48+
func (b *badBlockAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
49+
//TODO implement me
50+
panic("implement me")
51+
}
52+
53+
func (b *badBlockAccessor) readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error {
54+
//TODO implement me
55+
panic("implement me")
56+
}
57+
58+
func (b *badBlockAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
59+
//TODO implement me
60+
panic("implement me")
61+
}
62+
63+
func (b *badBlockAccessor) readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error {
64+
//TODO implement me
65+
panic("implement me")
66+
}
67+
68+
func (b *badBlockAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
69+
//TODO implement me
70+
panic("implement me")
71+
}
72+
73+
func (b *badBlockAccessor) readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error {
74+
//TODO implement me
75+
panic("implement me")
76+
}
77+
78+
func (b *badBlockAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
79+
//TODO implement me
80+
panic("implement me")
81+
}
82+
83+
func (b *badBlockAccessor) readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error {
84+
//TODO implement me
85+
panic("implement me")
86+
}
87+
88+
func (b *badBlockAccessor) readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error) {
89+
//TODO implement me
90+
panic("implement me")
91+
}
92+
93+
func (b *badBlockAccessor) rename(path string) error {
94+
//TODO implement me
95+
panic("implement me")
96+
}
97+
98+
func (b *badBlockAccessor) path() string {
99+
//TODO implement me
100+
panic("implement me")
101+
}
102+
103+
func (b *badBlockAccessor) close() error {
104+
if !b.initCalled {
105+
panic("close called without an init call")
106+
}
107+
b.initCalled = false
108+
return nil
109+
}
110+
111+
func (b *badBlockAccessor) free() error {
112+
//TODO implement me
113+
panic("implement me")
114+
}

tsdb/engine/tsm1/file_store_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/influxdata/influxdb/v2/tsdb"
1515
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
16+
"github.com/stretchr/testify/assert"
1617
"github.com/stretchr/testify/require"
1718
"go.uber.org/zap/zaptest"
1819
)
@@ -2412,6 +2413,42 @@ func TestFileStore_Open(t *testing.T) {
24122413
}
24132414
}
24142415

2416+
func TestFileStore_OpenFail(t *testing.T) {
2417+
var err error
2418+
dir := t.TempDir()
2419+
2420+
// Create a TSM file...
2421+
data := keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}}
2422+
2423+
files, err := newFileDir(t, dir, data)
2424+
if err != nil {
2425+
fatal(t, "creating test files", err)
2426+
}
2427+
assert.Equal(t, 1, len(files))
2428+
f := files[0]
2429+
2430+
const mmapErrMsg = "mmap failure in test"
2431+
const fullMmapErrMsg = "system limit for vm.max_map_count may be too low: " + mmapErrMsg
2432+
// With an mmap failure, the files should all be left where they are, because they are not corrupt
2433+
openFail(t, dir, fullMmapErrMsg, tsm1.NewMmapError(fmt.Errorf(mmapErrMsg)))
2434+
assert.FileExistsf(t, f, "file not found, but should not have been moved for mmap failure")
2435+
2436+
// With a non-mmap failure, the file failing to open should be moved aside
2437+
const otherErrMsg = "some Random Init Failure"
2438+
openFail(t, dir, otherErrMsg, fmt.Errorf(otherErrMsg))
2439+
assert.NoFileExistsf(t, f, "file found, but should have been moved for open failure")
2440+
assert.FileExistsf(t, f+"."+tsm1.BadTSMFileExtension, "file not found, but should have been moved here for open failure")
2441+
}
2442+
2443+
func openFail(t *testing.T, dir string, fullErrMsg string, initErr error) {
2444+
fs := tsm1.NewFileStore(dir, tsdb.EngineTags{}, tsm1.TestMmapInitFailOption(initErr))
2445+
err := fs.Open(context.Background())
2446+
assert.Error(t, err)
2447+
assert.Contains(t, err.Error(), fullErrMsg)
2448+
defer func() { assert.NoError(t, fs.Close(), "unexpected error on FileStore.Close") }()
2449+
assert.Equal(t, 0, fs.Count(), "file count mismatch")
2450+
}
2451+
24152452
func TestFileStore_Remove(t *testing.T) {
24162453
dir := t.TempDir()
24172454

tsdb/engine/tsm1/reader.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tsm1
33
import (
44
"bytes"
55
"encoding/binary"
6+
"errors"
67
"fmt"
78
"io"
89
"math"
@@ -218,6 +219,7 @@ var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption {
218219
}
219220
}
220221

222+
// TODO(DSB) - add a tsmReaderOption in a test call that has the mmmapAccessor mock a failure
221223
// NewTSMReader returns a new TSMReader from the given file.
222224
func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
223225
t := &TSMReader{}
@@ -231,15 +233,17 @@ func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
231233
}
232234
t.size = stat.Size()
233235
t.lastModified = stat.ModTime().UnixNano()
234-
t.accessor = &mmapAccessor{
235-
f: f,
236-
mmapWillNeed: t.madviseWillNeed,
236+
if t.accessor == nil {
237+
t.accessor = &mmapAccessor{
238+
f: f,
239+
mmapWillNeed: t.madviseWillNeed,
240+
}
237241
}
238242

239243
index, err := t.accessor.init()
240244
if err != nil {
241-
_ = t.accessor.close()
242-
return nil, err
245+
cerr := t.accessor.close()
246+
return nil, errors.Join(err, cerr)
243247
}
244248

245249
t.index = index
@@ -1314,6 +1318,24 @@ type mmapAccessor struct {
13141318
index *indirectIndex
13151319
}
13161320

1321+
type MmapError struct {
1322+
error
1323+
}
1324+
1325+
func (m *MmapError) Unwrap() error {
1326+
return m.error
1327+
}
1328+
1329+
func (m MmapError) Is(e error) bool {
1330+
_, oks := e.(MmapError)
1331+
_, okp := e.(*MmapError)
1332+
return oks || okp
1333+
}
1334+
1335+
func NewMmapError(e error) MmapError {
1336+
return MmapError{error: e}
1337+
}
1338+
13171339
func (m *mmapAccessor) init() (*indirectIndex, error) {
13181340
m.mu.Lock()
13191341
defer m.mu.Unlock()
@@ -1335,7 +1357,9 @@ func (m *mmapAccessor) init() (*indirectIndex, error) {
13351357

13361358
m.b, err = mmap(m.f, 0, int(stat.Size()))
13371359
if err != nil {
1338-
return nil, err
1360+
// Wrap the error to let callers know this was an error
1361+
// from mmap, and may indicate vm.max_map_count is too low
1362+
return nil, NewMmapError(err)
13391363
}
13401364
if len(m.b) < 8 {
13411365
return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")

0 commit comments

Comments
 (0)