Skip to content

Commit eb1dd04

Browse files
fix: prevent differing field types in the same shard (#26025) (#26403)
Co-authored-by: davidby-influx <72418212+davidby-influx@users.noreply.github.com> fix: lock MeasurementFields while validating (#25998) closes #23756 fix: switch MeasurementFields from atomic.Value to sync.Map (#26022) closes #26001
1 parent c6e7718 commit eb1dd04

File tree

5 files changed

+325
-168
lines changed

5 files changed

+325
-168
lines changed

pkg/data/gensyncmap/gensyncmap.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package gensyncmap
2+
3+
import "sync"
4+
5+
type Map[K comparable, V any] struct {
6+
m sync.Map
7+
}
8+
9+
func (m *Map[K, V]) Delete(key K) {
10+
m.m.Delete(key)
11+
}
12+
13+
func (m *Map[K, V]) Load(key K) (value V, ok bool) {
14+
v, ok := m.m.Load(key)
15+
if !ok {
16+
return value, ok
17+
}
18+
return v.(V), ok
19+
}
20+
21+
func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
22+
v, loaded := m.m.LoadAndDelete(key)
23+
if !loaded {
24+
return value, loaded
25+
}
26+
return v.(V), loaded
27+
}
28+
29+
func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
30+
a, loaded := m.m.LoadOrStore(key, value)
31+
return a.(V), loaded
32+
}
33+
34+
func (m *Map[K, V]) Range(f func(key K, value V) bool) {
35+
m.m.Range(func(key, value any) bool { return f(key.(K), value.(V)) })
36+
}
37+
38+
func (m *Map[K, V]) Store(key K, value V) { m.m.Store(key, value) }
39+
40+
func (m *Map[K, V]) Len() int {
41+
var n int
42+
m.m.Range(func(_, _ any) bool {
43+
n++
44+
return true
45+
})
46+
return n
47+
}

tsdb/engine/tsm1/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1264,7 +1264,7 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType
12641264
keys[i], field = SeriesAndFieldFromCompositeKey(keys[i])
12651265
name := models.ParseName(keys[i])
12661266
mf := e.fieldset.CreateFieldsIfNotExists(name)
1267-
if err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
1267+
if _, err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
12681268
return err
12691269
}
12701270

tsdb/field_validator.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,19 @@ const MaxFieldValueLength = 1048576
1313
// ValidateFields will return a PartialWriteError if:
1414
// - the point has inconsistent fields, or
1515
// - the point has fields that are too long
16-
func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidation bool) error {
16+
func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidation bool) ([]*FieldCreate, error) {
1717
pointSize := point.StringSize()
1818
iter := point.FieldIterator()
19+
var fieldsToCreate []*FieldCreate
20+
1921
for iter.Next() {
2022
if !skipSizeValidation {
2123
// Check for size of field too large. Note it is much cheaper to check the whole point size
2224
// than checking the StringValue size (StringValue potentially takes an allocation if it must
2325
// unescape the string, and must at least parse the string)
2426
if pointSize > MaxFieldValueLength && iter.Type() == models.String {
2527
if sz := len(iter.StringValue()); sz > MaxFieldValueLength {
26-
return PartialWriteError{
28+
return nil, PartialWriteError{
2729
Reason: fmt.Sprintf(
2830
"input field \"%s\" on measurement \"%s\" is too long, %d > %d",
2931
iter.FieldKey(), point.Name(), sz, MaxFieldValueLength),
@@ -33,14 +35,9 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio
3335
}
3436
}
3537

38+
fieldKey := iter.FieldKey()
3639
// Skip fields name "time", they are illegal.
37-
if bytes.Equal(iter.FieldKey(), timeBytes) {
38-
continue
39-
}
40-
41-
// If the fields is not present, there cannot be a conflict.
42-
f := mf.FieldBytes(iter.FieldKey())
43-
if f == nil {
40+
if bytes.Equal(fieldKey, timeBytes) {
4441
continue
4542
}
4643

@@ -49,18 +46,26 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio
4946
continue
5047
}
5148

52-
// If the types are not the same, there is a conflict.
53-
if f.Type != dataType {
54-
return PartialWriteError{
49+
// If the field is not present, remember to create it.
50+
f := mf.FieldBytes(fieldKey)
51+
if f == nil {
52+
fieldsToCreate = append(fieldsToCreate, &FieldCreate{
53+
Measurement: point.Name(),
54+
Field: &Field{
55+
Name: string(fieldKey),
56+
Type: dataType,
57+
}})
58+
} else if f.Type != dataType {
59+
// If the types are not the same, there is a conflict.
60+
return nil, PartialWriteError{
5561
Reason: fmt.Sprintf(
5662
"%s: input field \"%s\" on measurement \"%s\" is type %s, already exists as type %s",
57-
ErrFieldTypeConflict, iter.FieldKey(), point.Name(), dataType, f.Type),
63+
ErrFieldTypeConflict, fieldKey, point.Name(), dataType, f.Type),
5864
Dropped: 1,
5965
}
6066
}
6167
}
62-
63-
return nil
68+
return fieldsToCreate, nil
6469
}
6570

6671
// dataTypeFromModelsFieldType returns the influxql.DataType that corresponds to the

0 commit comments

Comments
 (0)