Skip to content

Commit 0d44e81

Browse files
authored
feat: improve dropped point logging (#26303) (#26412)
1 parent a2f1b41 commit 0d44e81

File tree

3 files changed

+159
-30
lines changed

3 files changed

+159
-30
lines changed

tsdb/shard.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,23 @@ func (e ShardError) Unwrap() error {
102102
// PartialWriteError indicates a write request could only write a portion of the
103103
// requested values.
104104
type PartialWriteError struct {
105-
Reason string
106-
Dropped int
107-
105+
Reason string
106+
Dropped int
107+
Database string
108+
RetentionPolicy string
108109
// A sorted slice of series keys that were dropped.
109110
DroppedKeys [][]byte
110111
}
111112

112113
func (e PartialWriteError) Error() string {
113-
return fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped)
114+
message := fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped)
115+
if len(e.Database) > 0 {
116+
message = fmt.Sprintf("%s for database: %s", message, e.Database)
117+
}
118+
if len(e.RetentionPolicy) > 0 {
119+
message = fmt.Sprintf("%s for retention policy: %s", message, e.RetentionPolicy)
120+
}
121+
return message
114122
}
115123

116124
// Shard represents a self-contained time series database. An inverted index of

v1/coordinator/points_writer.go

Lines changed: 89 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,21 +75,91 @@ func NewPointsWriter(writeTimeout time.Duration, path string) *PointsWriter {
7575
}
7676
}
7777

78+
type BoundType int
79+
80+
const (
81+
WithinBounds BoundType = iota
82+
RetentionPolicyBound
83+
WriteWindowUpperBound
84+
WriteWindowLowerBound
85+
MaxBoundType // always the largest bound type, not for actual use
86+
)
87+
88+
func (b BoundType) String() string {
89+
switch b {
90+
case RetentionPolicyBound:
91+
return "Retention Policy Lower Bound"
92+
case WriteWindowUpperBound:
93+
return "Write Window Upper Bound"
94+
case WriteWindowLowerBound:
95+
return "Write Window Lower Bound"
96+
case WithinBounds:
97+
return "Within Bounds"
98+
default:
99+
return "Unknown"
100+
}
101+
}
102+
103+
type DroppedPoint struct {
104+
Point models.Point
105+
ViolatedBound time.Time
106+
Reason BoundType
107+
}
108+
109+
func (d *DroppedPoint) String() string {
110+
return fmt.Sprintf("point %s at %s dropped because it violates a %s at %s", d.Point.Key(), d.Point.Time().UTC().Format(time.RFC3339Nano), d.Reason.String(), d.ViolatedBound.UTC().Format(time.RFC3339Nano))
111+
}
112+
78113
// ShardMapping contains a mapping of shards to points.
79114
type ShardMapping struct {
80-
n int
81-
Points map[uint64][]models.Point // The points associated with a shard ID
82-
Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
83-
Dropped []models.Point // Points that were dropped
115+
n int
116+
Points map[uint64][]models.Point // The points associated with a shard ID
117+
Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
118+
MaxDropped DroppedPoint
119+
MinDropped DroppedPoint
120+
RetentionDropped int
121+
WriteWindowDropped int
122+
rpi *meta.RetentionPolicyInfo
84123
}
85124

86125
// NewShardMapping creates an empty ShardMapping.
87-
func NewShardMapping(n int) *ShardMapping {
126+
func NewShardMapping(rpi *meta.RetentionPolicyInfo, n int) *ShardMapping {
88127
return &ShardMapping{
89128
n: n,
90129
Points: map[uint64][]models.Point{},
91130
Shards: map[uint64]*meta.ShardInfo{},
131+
rpi: rpi,
132+
}
133+
}
134+
135+
func (s *ShardMapping) AddDropped(p models.Point, t time.Time, b BoundType) {
136+
if s.MaxDropped.Point == nil || p.Time().After(s.MaxDropped.Point.Time()) {
137+
s.MaxDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b}
92138
}
139+
if s.MinDropped.Point == nil || p.Time().Before(s.MinDropped.Point.Time()) {
140+
s.MinDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b}
141+
}
142+
switch b {
143+
case RetentionPolicyBound:
144+
s.RetentionDropped++
145+
case WriteWindowLowerBound, WriteWindowUpperBound:
146+
s.WriteWindowDropped++
147+
}
148+
}
149+
150+
func (s *ShardMapping) Dropped() int {
151+
return s.RetentionDropped + s.WriteWindowDropped
152+
}
153+
154+
func (s *ShardMapping) SummariseDropped() string {
155+
if s.Dropped() <= 0 {
156+
return ""
157+
}
158+
return fmt.Sprintf("dropped %d points outside retention policy of duration %s - oldest %s, newest %s",
159+
s.RetentionDropped,
160+
s.rpi.Duration.String(),
161+
s.MinDropped.String(),
162+
s.MaxDropped.String())
93163
}
94164

95165
// MapPoint adds the point to the ShardMapping, associated with the given shardInfo.
@@ -247,13 +317,13 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
247317
list.Add(*sg)
248318
}
249319

250-
mapping := NewShardMapping(len(wp.Points))
320+
mapping := NewShardMapping(rp, len(wp.Points))
251321
for _, p := range wp.Points {
252322
sg := list.ShardGroupAt(p.Time())
253323
if sg == nil {
254324
// We didn't create a shard group because the point was outside the
255-
// scope of the RP.
256-
mapping.Dropped = append(mapping.Dropped, p)
325+
// scope of the RP
326+
mapping.AddDropped(p, min, RetentionPolicyBound)
257327
continue
258328
} else if len(sg.Shards) <= 0 {
259329
// Shard groups should have at least one shard.
@@ -361,7 +431,7 @@ func (w *PointsWriter) WritePoints(
361431
ctx context.Context,
362432
database, retentionPolicy string,
363433
consistencyLevel models.ConsistencyLevel,
364-
user meta.User,
434+
_ meta.User,
365435
points []models.Point,
366436
) error {
367437
return w.WritePointsPrivileged(ctx, database, retentionPolicy, consistencyLevel, points)
@@ -406,12 +476,18 @@ func (w *PointsWriter) WritePointsPrivileged(
406476
}(shardMappings.Shards[shardID], database, retentionPolicy, points)
407477
}
408478

409-
if len(shardMappings.Dropped) > 0 {
410-
w.stats.pointsWriteDropped.Observe(float64(len(shardMappings.Dropped)))
411-
err = tsdb.PartialWriteError{Reason: "points beyond retention policy", Dropped: len(shardMappings.Dropped)}
412-
}
413479
timeout := time.NewTimer(w.WriteTimeout)
414480
defer timeout.Stop()
481+
482+
if err == nil && shardMappings.Dropped() > 0 {
483+
w.stats.pointsWriteDropped.Observe(float64(shardMappings.Dropped()))
484+
err = tsdb.PartialWriteError{Reason: shardMappings.SummariseDropped(),
485+
Dropped: shardMappings.Dropped(),
486+
Database: database,
487+
RetentionPolicy: retentionPolicy,
488+
}
489+
}
490+
415491
for range shardMappings.Points {
416492
select {
417493
case <-w.closing:

v1/coordinator/points_writer_test.go

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package coordinator_test
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"github.com/stretchr/testify/require"
68
"sync"
79
"sync/atomic"
810
"testing"
@@ -51,6 +53,47 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
5153
}
5254
}
5355

56+
func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, droppedCount int, minDropped *coordinator.DroppedPoint, maxDropped *coordinator.DroppedPoint, summary string) {
57+
var (
58+
shardMappings *coordinator.ShardMapping
59+
err error
60+
)
61+
if shardMappings, err = c.MapShards(pr); err != nil {
62+
t.Fatalf("unexpected error: %v", err)
63+
}
64+
65+
if exp := 1; len(shardMappings.Points) != exp {
66+
t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings.Points), exp)
67+
}
68+
69+
p := func() []models.Point {
70+
for _, v := range shardMappings.Points {
71+
return v
72+
}
73+
return nil
74+
}()
75+
verify :=
76+
func(p []models.Point, values []float64) {
77+
require.Equal(t, len(values), len(p), "unexpected number of points")
78+
for i, expV := range values {
79+
f, err := p[i].Fields()
80+
require.NoError(t, err, "error retrieving fields")
81+
v, ok := f["value"]
82+
require.True(t, ok, "\"value\" field not found")
83+
require.Equal(t, expV, v, "unexpected value")
84+
}
85+
}
86+
verify(p, values)
87+
require.Equal(t, shardMappings.Dropped(), droppedCount, "wrong number of points dropped")
88+
if shardMappings.Dropped() > 0 {
89+
require.Equal(t, minDropped.Point, shardMappings.MinDropped.Point, "minimum dropped point mismatch")
90+
require.Equal(t, minDropped.Reason, shardMappings.MinDropped.Reason, "minimum dropped reason mismatch")
91+
require.Equal(t, maxDropped.Point, shardMappings.MaxDropped.Point, "maximum dropped point mismatch")
92+
require.Equal(t, maxDropped.Reason, shardMappings.MaxDropped.Reason, "maximum dropped reason mismatch")
93+
require.Contains(t, shardMappings.SummariseDropped(), summary, "summary mismatch")
94+
}
95+
}
96+
5497
// Ensures the points writer maps to a new shard group when the shard duration
5598
// is changed.
5699
func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) {
@@ -239,9 +282,11 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) {
239282
t.Errorf("MapShards() len mismatch. got %v, exp %v", got, exp)
240283
}
241284

242-
if got, exp := len(shardMappings.Dropped), 1; got != exp {
285+
if got, exp := shardMappings.RetentionDropped, 1; got != exp {
243286
t.Fatalf("MapShard() dropped mismatch: got %v, exp %v", got, exp)
244287
}
288+
289+
require.Equal(t, coordinator.RetentionPolicyBound, shardMappings.MinDropped.Reason, "unexpected reason for dropped point")
245290
}
246291

247292
func TestPointsWriter_WritePoints(t *testing.T) {
@@ -288,7 +333,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
288333
pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil)
289334

290335
// copy to prevent data race
291-
sm := coordinator.NewShardMapping(16)
336+
sm := coordinator.NewShardMapping(nil, 16)
292337
sm.MapPoint(
293338
&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{
294339
{NodeID: 1},
@@ -360,16 +405,9 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
360405
// are created.
361406
ms := NewPointsWriterMetaClient()
362407

363-
// Three points that range over the shardGroup duration (1h) and should map to two
364-
// distinct shards
408+
// Add a point earlier than the retention period
365409
pr.AddPoint("cpu", 1.0, time.Now().Add(-24*time.Hour), nil)
366410

367-
// copy to prevent data race
368-
sm := coordinator.NewShardMapping(16)
369-
370-
// ShardMapper dropped this point
371-
sm.Dropped = append(sm.Dropped, pr.Points[0])
372-
373411
// Local coordinator.Node ShardWriter
374412
// lock on the write increment since these functions get called in parallel
375413
var mu sync.Mutex
@@ -392,13 +430,20 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
392430
c.TSDBStore = store
393431
c.Node = &influxdb.Node{ID: 1}
394432

395-
c.Open()
396-
defer c.Close()
433+
require.NoError(t, c.Open(), "failure opening PointsWriter")
434+
defer func(pw *coordinator.PointsWriter) {
435+
require.NoError(t, pw.Close(), "failure closing PointsWriter")
436+
}(c)
397437

398438
err := c.WritePointsPrivileged(context.Background(), pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points)
399-
if _, ok := err.(tsdb.PartialWriteError); !ok {
439+
require.Error(t, err, "unexpected success writing points")
440+
var pwErr tsdb.PartialWriteError
441+
if !errors.As(err, &pwErr) {
400442
t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{})
401443
}
444+
require.Equal(t, 1, pwErr.Dropped, "wrong number of points dropped")
445+
require.ErrorContains(t, pwErr, "partial write: dropped 1 points outside retention policy of duration 1h0m0s")
446+
require.ErrorContains(t, pwErr, "Retention Policy Lower Bound")
402447
}
403448

404449
var shardID uint64

0 commit comments

Comments
 (0)