@@ -26,9 +26,11 @@ import (
26
26
"strings"
27
27
"text/template"
28
28
"time"
29
+ "unicode"
29
30
30
31
"github.com/fatih/color"
31
32
corev1 "k8s.io/api/core/v1"
33
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32
34
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
33
35
"k8s.io/client-go/rest"
34
36
)
@@ -45,15 +47,21 @@ type Tail struct {
45
47
podColor * color.Color
46
48
containerColor * color.Color
47
49
tmpl * template.Template
48
- out io.Writer
49
- errOut io.Writer
50
+ last struct {
51
+ timestamp string // RFC3339 timestamp (not RFC3339Nano)
52
+ lines int // the number of lines seen during this timestamp
53
+ }
54
+ resumeRequest * ResumeRequest
55
+ out io.Writer
56
+ errOut io.Writer
50
57
}
51
58
52
59
type TailOptions struct {
53
60
Timestamps bool
54
61
Location * time.Location
55
62
56
- SinceSeconds int64
63
+ SinceSeconds * int64
64
+ SinceTime * metav1.Time
57
65
Exclude []* regexp.Regexp
58
66
Include []* regexp.Regexp
59
67
Namespace bool
@@ -62,6 +70,11 @@ type TailOptions struct {
62
70
OnlyLogLines bool
63
71
}
64
72
73
+ type ResumeRequest struct {
74
+ Timestamp string // RFC3339 timestamp (not RFC3339Nano)
75
+ LinesToSkip int // the number of lines to skip during this timestamp
76
+ }
77
+
65
78
func (o TailOptions ) IsExclude (msg string ) bool {
66
79
for _ , rex := range o .Exclude {
67
80
if rex .MatchString (msg ) {
@@ -86,23 +99,12 @@ func (o TailOptions) IsInclude(msg string) bool {
86
99
return false
87
100
}
88
101
89
- func (o TailOptions ) UpdateTimezoneIfNeeded (message string ) (string , error ) {
90
- if ! o .Timestamps {
91
- return message , nil
92
- }
93
-
94
- idx := strings .IndexRune (message , ' ' )
95
- if idx == - 1 {
96
- return message , errors .New ("missing timestamp" )
97
- }
98
-
99
- datetime := message [:idx ]
100
- t , err := time .ParseInLocation (time .RFC3339Nano , datetime , time .UTC )
102
+ func (o TailOptions ) UpdateTimezone (timestamp string ) (string , error ) {
103
+ t , err := time .ParseInLocation (time .RFC3339Nano , timestamp , time .UTC )
101
104
if err != nil {
102
- return message , errors .New ("missing timestamp" )
105
+ return "" , errors .New ("missing timestamp" )
103
106
}
104
-
105
- return t .In (o .Location ).Format ("2006-01-02T15:04:05.000000000Z07:00" ) + message [idx :], nil
107
+ return t .In (o .Location ).Format ("2006-01-02T15:04:05.000000000Z07:00" ), nil
106
108
}
107
109
108
110
// NewTail returns a new tail for a Kubernetes container inside a pod
@@ -156,9 +158,10 @@ func (t *Tail) Start(ctx context.Context) error {
156
158
157
159
req := t .clientset .Pods (t .Namespace ).GetLogs (t .PodName , & corev1.PodLogOptions {
158
160
Follow : t .Options .Follow ,
159
- Timestamps : t . Options . Timestamps ,
161
+ Timestamps : true ,
160
162
Container : t .ContainerName ,
161
- SinceSeconds : & t .Options .SinceSeconds ,
163
+ SinceSeconds : t .Options .SinceSeconds ,
164
+ SinceTime : t .Options .SinceTime ,
162
165
TailLines : t .Options .TailLines ,
163
166
})
164
167
@@ -171,6 +174,19 @@ func (t *Tail) Start(ctx context.Context) error {
171
174
return err
172
175
}
173
176
177
+ func (t * Tail ) Resume (ctx context.Context , resumeRequest * ResumeRequest ) error {
178
+ sinceTime , err := resumeRequest .sinceTime ()
179
+ if err != nil {
180
+ fmt .Fprintf (t .errOut , "failed to resume: %s, fallback to Start()\n " , err )
181
+ return t .Start (ctx )
182
+ }
183
+ t .resumeRequest = resumeRequest
184
+ t .Options .SinceTime = sinceTime
185
+ t .Options .SinceSeconds = nil
186
+ t .Options .TailLines = nil
187
+ return t .Start (ctx )
188
+ }
189
+
174
190
// Close stops tailing
175
191
func (t * Tail ) Close () {
176
192
t .printStopping ()
@@ -217,21 +233,7 @@ func (t *Tail) ConsumeRequest(ctx context.Context, request rest.ResponseWrapper)
217
233
for {
218
234
line , err := r .ReadBytes ('\n' )
219
235
if len (line ) != 0 {
220
- msg := string (line )
221
- // Remove a line break
222
- msg = strings .TrimSuffix (msg , "\n " )
223
-
224
- if t .Options .IsExclude (msg ) || ! t .Options .IsInclude (msg ) {
225
- continue
226
- }
227
-
228
- msg , err := t .Options .UpdateTimezoneIfNeeded (msg )
229
- if err != nil {
230
- t .Print (fmt .Sprintf ("[%v] %s" , err , msg ))
231
- continue
232
- }
233
-
234
- t .Print (msg )
236
+ t .consumeLine (strings .TrimSuffix (string (line ), "\n " ))
235
237
}
236
238
237
239
if err != nil {
@@ -264,6 +266,53 @@ func (t *Tail) Print(msg string) {
264
266
fmt .Fprint (t .out , buf .String ())
265
267
}
266
268
269
+ func (t * Tail ) GetResumeRequest () * ResumeRequest {
270
+ if t .last .timestamp == "" {
271
+ return nil
272
+ }
273
+ return & ResumeRequest {Timestamp : t .last .timestamp , LinesToSkip : t .last .lines }
274
+ }
275
+
276
+ func (t * Tail ) consumeLine (line string ) {
277
+ rfc3339Nano , content , err := splitLogLine (line )
278
+ if err != nil {
279
+ t .Print (fmt .Sprintf ("[%v] %s" , err , line ))
280
+ return
281
+ }
282
+
283
+ // PodLogOptions.SinceTime is RFC3339, not RFC3339Nano.
284
+ // We convert it to RFC3339 to skip the lines seen during this timestamp when resuming.
285
+ rfc3339 := removeSubsecond (rfc3339Nano )
286
+ t .rememberLastTimestamp (rfc3339 )
287
+ if t .resumeRequest .shouldSkip (rfc3339 ) {
288
+ return
289
+ }
290
+
291
+ if t .Options .IsExclude (content ) || ! t .Options .IsInclude (content ) {
292
+ return
293
+ }
294
+
295
+ msg := content
296
+ if t .Options .Timestamps {
297
+ updatedTs , err := t .Options .UpdateTimezone (rfc3339Nano )
298
+ if err != nil {
299
+ t .Print (fmt .Sprintf ("[%v] %s" , err , line ))
300
+ return
301
+ }
302
+ msg = updatedTs + " " + msg
303
+ }
304
+ t .Print (msg )
305
+ }
306
+
307
+ func (t * Tail ) rememberLastTimestamp (timestamp string ) {
308
+ if t .last .timestamp == timestamp {
309
+ t .last .lines ++
310
+ return
311
+ }
312
+ t .last .timestamp = timestamp
313
+ t .last .lines = 1
314
+ }
315
+
267
316
// Log is the object which will be used together with the template to generate
268
317
// the output.
269
318
type Log struct {
@@ -285,3 +334,57 @@ type Log struct {
285
334
PodColor * color.Color `json:"-"`
286
335
ContainerColor * color.Color `json:"-"`
287
336
}
337
+
338
+ func (r * ResumeRequest ) sinceTime () (* metav1.Time , error ) {
339
+ sinceTime , err := time .Parse (time .RFC3339 , r .Timestamp )
340
+
341
+ if err != nil {
342
+ return nil , err
343
+ }
344
+ metaTime := metav1 .NewTime (sinceTime )
345
+ return & metaTime , nil
346
+ }
347
+
348
+ func (r * ResumeRequest ) shouldSkip (timestamp string ) bool {
349
+ if r == nil {
350
+ return false
351
+ }
352
+ if r .Timestamp == "" {
353
+ return false
354
+ }
355
+ if r .Timestamp != timestamp {
356
+ return false
357
+ }
358
+ if r .LinesToSkip <= 0 {
359
+ return false
360
+ }
361
+ r .LinesToSkip --
362
+ return true
363
+ }
364
+
365
+ func splitLogLine (line string ) (timestamp string , content string , err error ) {
366
+ idx := strings .IndexRune (line , ' ' )
367
+ if idx == - 1 {
368
+ return "" , "" , errors .New ("missing timestamp" )
369
+ }
370
+ return line [:idx ], line [idx + 1 :], nil
371
+ }
372
+
373
+ // removeSubsecond removes the subsecond of the timestamp.
374
+ // It converts RFC3339Nano to RFC3339 fast.
375
+ func removeSubsecond (timestamp string ) string {
376
+ dot := strings .IndexRune (timestamp , '.' )
377
+ if dot == - 1 {
378
+ return timestamp
379
+ }
380
+ var last int
381
+ for i := dot ; i < len (timestamp ); i ++ {
382
+ if unicode .IsDigit (rune (timestamp [i ])) {
383
+ last = i
384
+ }
385
+ }
386
+ if last == 0 {
387
+ return timestamp
388
+ }
389
+ return timestamp [:dot ] + timestamp [last + 1 :]
390
+ }
0 commit comments