Skip to content

Commit 52894f8

Browse files
Takashi Kusumisuperbrothers
andauthored
Support resuming from the last log when retrying (#230)
* Support resuming from the last log when retrying * Tweak rate limiter of retrying * Apply suggestions from code review Co-authored-by: Kazuki Suda <230185+superbrothers@users.noreply.github.com> * Use k8s.io/utils/pointer in stern.go --------- Co-authored-by: Kazuki Suda <230185+superbrothers@users.noreply.github.com>
1 parent 72a5854 commit 52894f8

File tree

4 files changed

+303
-106
lines changed

4 files changed

+303
-106
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
k8s.io/cli-runtime v0.26.0
1616
k8s.io/client-go v0.26.0
1717
k8s.io/klog/v2 v2.80.1
18+
k8s.io/utils v0.0.0-20221128185143-99ec85e7a448
1819
)
1920

2021
require (
@@ -63,7 +64,6 @@ require (
6364
gopkg.in/yaml.v2 v2.4.0 // indirect
6465
gopkg.in/yaml.v3 v3.0.1 // indirect
6566
k8s.io/kube-openapi v0.0.0-20230109183929-3758b55a6596 // indirect
66-
k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 // indirect
6767
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
6868
sigs.k8s.io/kustomize/api v0.12.1 // indirect
6969
sigs.k8s.io/kustomize/kyaml v0.13.9 // indirect

stern/stern.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"golang.org/x/time/rate"
3131

3232
"k8s.io/apimachinery/pkg/labels"
33+
"k8s.io/utils/pointer"
3334

3435
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3536
clientset "k8s.io/client-go/kubernetes"
@@ -97,7 +98,7 @@ func Run(ctx context.Context, config *Config) error {
9798
return NewTail(client.CoreV1(), t.Node, t.Namespace, t.Pod, t.Container, config.Template, config.Out, config.ErrOut, &TailOptions{
9899
Timestamps: config.Timestamps,
99100
Location: config.Location,
100-
SinceSeconds: int64(config.Since.Seconds()),
101+
SinceSeconds: pointer.Int64(int64(config.Since.Seconds())),
101102
Exclude: config.Exclude,
102103
Include: config.Include,
103104
Namespace: config.AllNamespaces || len(namespaces) > 1,
@@ -140,14 +141,20 @@ func Run(ctx context.Context, config *Config) error {
140141
// We use a rate limiter to prevent a burst of retries.
141142
// It also enables us to retry immediately, in most cases,
142143
// when it is disconnected on the way.
143-
limiter := rate.NewLimiter(rate.Every(time.Second*10), 3)
144+
limiter := rate.NewLimiter(rate.Every(time.Second*20), 2)
145+
var resumeRequest *ResumeRequest
144146
for {
145147
if err := limiter.Wait(ctx); err != nil {
146148
fmt.Fprintf(config.ErrOut, "failed to retry: %v\n", err)
147149
return
148150
}
149151
tail := newTail(target)
150-
err := tail.Start(ctx)
152+
var err error
153+
if resumeRequest == nil {
154+
err = tail.Start(ctx)
155+
} else {
156+
err = tail.Resume(ctx, resumeRequest)
157+
}
151158
tail.Close()
152159
if err == nil {
153160
return
@@ -157,6 +164,9 @@ func Run(ctx context.Context, config *Config) error {
157164
return
158165
}
159166
fmt.Fprintf(config.ErrOut, "failed to tail: %v, will retry\n", err)
167+
if resumeReq := tail.GetResumeRequest(); resumeReq != nil {
168+
resumeRequest = resumeReq
169+
}
160170
}
161171
}
162172

stern/tail.go

Lines changed: 138 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ import (
2626
"strings"
2727
"text/template"
2828
"time"
29+
"unicode"
2930

3031
"github.com/fatih/color"
3132
corev1 "k8s.io/api/core/v1"
33+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3234
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
3335
"k8s.io/client-go/rest"
3436
)
@@ -45,15 +47,21 @@ type Tail struct {
4547
podColor *color.Color
4648
containerColor *color.Color
4749
tmpl *template.Template
48-
out io.Writer
49-
errOut io.Writer
50+
last struct {
51+
timestamp string // RFC3339 timestamp (not RFC3339Nano)
52+
lines int // the number of lines seen during this timestamp
53+
}
54+
resumeRequest *ResumeRequest
55+
out io.Writer
56+
errOut io.Writer
5057
}
5158

5259
type TailOptions struct {
5360
Timestamps bool
5461
Location *time.Location
5562

56-
SinceSeconds int64
63+
SinceSeconds *int64
64+
SinceTime *metav1.Time
5765
Exclude []*regexp.Regexp
5866
Include []*regexp.Regexp
5967
Namespace bool
@@ -62,6 +70,11 @@ type TailOptions struct {
6270
OnlyLogLines bool
6371
}
6472

73+
type ResumeRequest struct {
74+
Timestamp string // RFC3339 timestamp (not RFC3339Nano)
75+
LinesToSkip int // the number of lines to skip during this timestamp
76+
}
77+
6578
func (o TailOptions) IsExclude(msg string) bool {
6679
for _, rex := range o.Exclude {
6780
if rex.MatchString(msg) {
@@ -86,23 +99,12 @@ func (o TailOptions) IsInclude(msg string) bool {
8699
return false
87100
}
88101

89-
func (o TailOptions) UpdateTimezoneIfNeeded(message string) (string, error) {
90-
if !o.Timestamps {
91-
return message, nil
92-
}
93-
94-
idx := strings.IndexRune(message, ' ')
95-
if idx == -1 {
96-
return message, errors.New("missing timestamp")
97-
}
98-
99-
datetime := message[:idx]
100-
t, err := time.ParseInLocation(time.RFC3339Nano, datetime, time.UTC)
102+
func (o TailOptions) UpdateTimezone(timestamp string) (string, error) {
103+
t, err := time.ParseInLocation(time.RFC3339Nano, timestamp, time.UTC)
101104
if err != nil {
102-
return message, errors.New("missing timestamp")
105+
return "", errors.New("missing timestamp")
103106
}
104-
105-
return t.In(o.Location).Format("2006-01-02T15:04:05.000000000Z07:00") + message[idx:], nil
107+
return t.In(o.Location).Format("2006-01-02T15:04:05.000000000Z07:00"), nil
106108
}
107109

108110
// NewTail returns a new tail for a Kubernetes container inside a pod
@@ -156,9 +158,10 @@ func (t *Tail) Start(ctx context.Context) error {
156158

157159
req := t.clientset.Pods(t.Namespace).GetLogs(t.PodName, &corev1.PodLogOptions{
158160
Follow: t.Options.Follow,
159-
Timestamps: t.Options.Timestamps,
161+
Timestamps: true,
160162
Container: t.ContainerName,
161-
SinceSeconds: &t.Options.SinceSeconds,
163+
SinceSeconds: t.Options.SinceSeconds,
164+
SinceTime: t.Options.SinceTime,
162165
TailLines: t.Options.TailLines,
163166
})
164167

@@ -171,6 +174,19 @@ func (t *Tail) Start(ctx context.Context) error {
171174
return err
172175
}
173176

177+
func (t *Tail) Resume(ctx context.Context, resumeRequest *ResumeRequest) error {
178+
sinceTime, err := resumeRequest.sinceTime()
179+
if err != nil {
180+
fmt.Fprintf(t.errOut, "failed to resume: %s, fallback to Start()\n", err)
181+
return t.Start(ctx)
182+
}
183+
t.resumeRequest = resumeRequest
184+
t.Options.SinceTime = sinceTime
185+
t.Options.SinceSeconds = nil
186+
t.Options.TailLines = nil
187+
return t.Start(ctx)
188+
}
189+
174190
// Close stops tailing
175191
func (t *Tail) Close() {
176192
t.printStopping()
@@ -217,21 +233,7 @@ func (t *Tail) ConsumeRequest(ctx context.Context, request rest.ResponseWrapper)
217233
for {
218234
line, err := r.ReadBytes('\n')
219235
if len(line) != 0 {
220-
msg := string(line)
221-
// Remove a line break
222-
msg = strings.TrimSuffix(msg, "\n")
223-
224-
if t.Options.IsExclude(msg) || !t.Options.IsInclude(msg) {
225-
continue
226-
}
227-
228-
msg, err := t.Options.UpdateTimezoneIfNeeded(msg)
229-
if err != nil {
230-
t.Print(fmt.Sprintf("[%v] %s", err, msg))
231-
continue
232-
}
233-
234-
t.Print(msg)
236+
t.consumeLine(strings.TrimSuffix(string(line), "\n"))
235237
}
236238

237239
if err != nil {
@@ -264,6 +266,53 @@ func (t *Tail) Print(msg string) {
264266
fmt.Fprint(t.out, buf.String())
265267
}
266268

269+
func (t *Tail) GetResumeRequest() *ResumeRequest {
270+
if t.last.timestamp == "" {
271+
return nil
272+
}
273+
return &ResumeRequest{Timestamp: t.last.timestamp, LinesToSkip: t.last.lines}
274+
}
275+
276+
func (t *Tail) consumeLine(line string) {
277+
rfc3339Nano, content, err := splitLogLine(line)
278+
if err != nil {
279+
t.Print(fmt.Sprintf("[%v] %s", err, line))
280+
return
281+
}
282+
283+
// PodLogOptions.SinceTime is RFC3339, not RFC3339Nano.
284+
// We convert it to RFC3339 to skip the lines seen during this timestamp when resuming.
285+
rfc3339 := removeSubsecond(rfc3339Nano)
286+
t.rememberLastTimestamp(rfc3339)
287+
if t.resumeRequest.shouldSkip(rfc3339) {
288+
return
289+
}
290+
291+
if t.Options.IsExclude(content) || !t.Options.IsInclude(content) {
292+
return
293+
}
294+
295+
msg := content
296+
if t.Options.Timestamps {
297+
updatedTs, err := t.Options.UpdateTimezone(rfc3339Nano)
298+
if err != nil {
299+
t.Print(fmt.Sprintf("[%v] %s", err, line))
300+
return
301+
}
302+
msg = updatedTs + " " + msg
303+
}
304+
t.Print(msg)
305+
}
306+
307+
func (t *Tail) rememberLastTimestamp(timestamp string) {
308+
if t.last.timestamp == timestamp {
309+
t.last.lines++
310+
return
311+
}
312+
t.last.timestamp = timestamp
313+
t.last.lines = 1
314+
}
315+
267316
// Log is the object which will be used together with the template to generate
268317
// the output.
269318
type Log struct {
@@ -285,3 +334,57 @@ type Log struct {
285334
PodColor *color.Color `json:"-"`
286335
ContainerColor *color.Color `json:"-"`
287336
}
337+
338+
func (r *ResumeRequest) sinceTime() (*metav1.Time, error) {
339+
sinceTime, err := time.Parse(time.RFC3339, r.Timestamp)
340+
341+
if err != nil {
342+
return nil, err
343+
}
344+
metaTime := metav1.NewTime(sinceTime)
345+
return &metaTime, nil
346+
}
347+
348+
func (r *ResumeRequest) shouldSkip(timestamp string) bool {
349+
if r == nil {
350+
return false
351+
}
352+
if r.Timestamp == "" {
353+
return false
354+
}
355+
if r.Timestamp != timestamp {
356+
return false
357+
}
358+
if r.LinesToSkip <= 0 {
359+
return false
360+
}
361+
r.LinesToSkip--
362+
return true
363+
}
364+
365+
func splitLogLine(line string) (timestamp string, content string, err error) {
366+
idx := strings.IndexRune(line, ' ')
367+
if idx == -1 {
368+
return "", "", errors.New("missing timestamp")
369+
}
370+
return line[:idx], line[idx+1:], nil
371+
}
372+
373+
// removeSubsecond removes the subsecond of the timestamp.
374+
// It converts RFC3339Nano to RFC3339 fast.
375+
func removeSubsecond(timestamp string) string {
376+
dot := strings.IndexRune(timestamp, '.')
377+
if dot == -1 {
378+
return timestamp
379+
}
380+
var last int
381+
for i := dot; i < len(timestamp); i++ {
382+
if unicode.IsDigit(rune(timestamp[i])) {
383+
last = i
384+
}
385+
}
386+
if last == 0 {
387+
return timestamp
388+
}
389+
return timestamp[:dot] + timestamp[last+1:]
390+
}

0 commit comments

Comments
 (0)