Skip to content

Commit 8312782

Browse files
author
Takashi Kusumi
authored
Improve handling of container termination (#221)
1 parent fc51906 commit 8312782

File tree

7 files changed

+504
-252
lines changed

7 files changed

+504
-252
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/spf13/cobra v1.6.1
1010
github.com/spf13/pflag v1.0.5
1111
golang.org/x/sync v0.1.0
12+
golang.org/x/time v0.3.0
1213
k8s.io/api v0.26.0
1314
k8s.io/apimachinery v0.26.0
1415
k8s.io/cli-runtime v0.26.0
@@ -56,7 +57,6 @@ require (
5657
golang.org/x/sys v0.4.0 // indirect
5758
golang.org/x/term v0.4.0 // indirect
5859
golang.org/x/text v0.6.0 // indirect
59-
golang.org/x/time v0.3.0 // indirect
6060
google.golang.org/appengine v1.6.7 // indirect
6161
google.golang.org/protobuf v1.28.1 // indirect
6262
gopkg.in/inf.v0 v0.9.1 // indirect

stern/list.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,8 @@ func ListTargets(ctx context.Context, i corev1client.PodInterface, labelSelector
9898
}
9999
var targets []*Target
100100
for i := range list.Items {
101-
filter.visit(&list.Items[i], func(t *Target, containerStateMatched bool) {
102-
if containerStateMatched {
103-
targets = append(targets, t)
104-
}
101+
filter.visit(&list.Items[i], func(t *Target) {
102+
targets = append(targets, t)
105103
})
106104
}
107105
return targets, nil

stern/stern.go

Lines changed: 28 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -19,41 +19,20 @@ import (
1919
"fmt"
2020
"regexp"
2121
"strings"
22-
"sync"
22+
"time"
2323

2424
"github.com/pkg/errors"
2525

2626
"github.com/stern/stern/kubernetes"
2727
"golang.org/x/sync/errgroup"
28+
"golang.org/x/time/rate"
2829

2930
"k8s.io/apimachinery/pkg/labels"
3031

3132
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3233
clientset "k8s.io/client-go/kubernetes"
3334
)
3435

35-
var tails = make(map[string]*Tail)
36-
var tailLock sync.RWMutex
37-
38-
func getTail(targetID string) (*Tail, bool) {
39-
tailLock.RLock()
40-
defer tailLock.RUnlock()
41-
tail, ok := tails[targetID]
42-
return tail, ok
43-
}
44-
45-
func setTail(targetID string, tail *Tail) {
46-
tailLock.Lock()
47-
defer tailLock.Unlock()
48-
tails[targetID] = tail
49-
}
50-
51-
func clearTail(targetID string) {
52-
tailLock.Lock()
53-
defer tailLock.Unlock()
54-
delete(tails, targetID)
55-
}
56-
5736
// Run starts the main run loop
5837
func Run(ctx context.Context, config *Config) error {
5938
clientConfig := kubernetes.NewClientConfig(config.KubeConfig, config.ContextName)
@@ -103,15 +82,15 @@ func Run(ctx context.Context, config *Config) error {
10382
}
10483
}
10584

106-
filter := &targetFilter{
85+
filter := newTargetFilter(targetFilterConfig{
10786
podFilter: config.PodQuery,
10887
excludePodFilter: config.ExcludePodQuery,
10988
containerFilter: config.ContainerQuery,
11089
containerExcludeFilter: config.ExcludeContainerQuery,
11190
initContainers: config.InitContainers,
11291
ephemeralContainers: config.EphemeralContainers,
11392
containerStates: config.ContainerStates,
114-
}
93+
})
11594
newTail := func(t *Target) *Tail {
11695
return NewTail(client.CoreV1(), t.Node, t.Namespace, t.Pod, t.Container, config.Template, config.Out, config.ErrOut, &TailOptions{
11796
Timestamps: config.Timestamps,
@@ -155,19 +134,17 @@ func Run(ctx context.Context, config *Config) error {
155134
}
156135

157136
added := make(chan *Target)
158-
removed := make(chan *Target)
159137
errCh := make(chan error)
160138

161139
defer close(added)
162-
defer close(removed)
163140
defer close(errCh)
164141

165142
for _, n := range namespaces {
166143
selector, err := chooseSelector(ctx, client, n, resource.kind, resource.name, config.LabelSelector)
167144
if err != nil {
168145
return err
169146
}
170-
a, r, err := WatchTargets(ctx,
147+
a, err := WatchTargets(ctx,
171148
client.CoreV1().Pods(n),
172149
selector,
173150
config.FieldSelector,
@@ -186,50 +163,40 @@ func Run(ctx context.Context, config *Config) error {
186163
return
187164
}
188165
added <- v
189-
case v, ok := <-r:
190-
if !ok {
191-
errCh <- fmt.Errorf("lost watch connection")
192-
return
193-
}
194-
removed <- v
195166
case <-ctx.Done():
196167
return
197168
}
198169
}
199170
}()
200171
}
201172

202-
go func() {
203-
for p := range added {
204-
targetID := p.GetID()
205-
206-
if tail, ok := getTail(targetID); ok {
207-
if tail.isActive() {
208-
continue
209-
} else {
210-
tail.Close()
211-
clearTail(targetID)
212-
}
173+
addTarget := func(ctx context.Context, target *Target) {
174+
// We use a rate limiter to prevent a burst of retries.
175+
// It also enables us to retry immediately, in most cases,
176+
// when it is disconnected on the way.
177+
limiter := rate.NewLimiter(rate.Every(time.Second*10), 3)
178+
for {
179+
if err := limiter.Wait(ctx); err != nil {
180+
fmt.Fprintf(config.ErrOut, "failed to retry: %v\n", err)
181+
return
213182
}
214-
215-
tail := newTail(p)
216-
setTail(targetID, tail)
217-
218-
go func(tail *Tail) {
219-
if err := tail.Start(ctx); err != nil {
220-
fmt.Fprintf(config.ErrOut, "unexpected error: %v\n", err)
221-
}
222-
}(tail)
183+
tail := newTail(target)
184+
err := tail.Start(ctx)
185+
tail.Close()
186+
if err == nil {
187+
return
188+
}
189+
if !filter.isActive(target) {
190+
fmt.Fprintf(config.ErrOut, "failed to tail: %v\n", err)
191+
return
192+
}
193+
fmt.Fprintf(config.ErrOut, "failed to tail: %v, will retry\n", err)
223194
}
224-
}()
195+
}
225196

226197
go func() {
227-
for p := range removed {
228-
targetID := p.GetID()
229-
if tail, ok := getTail(targetID); ok {
230-
tail.Close()
231-
clearTail(targetID)
232-
}
198+
for target := range added {
199+
go addTarget(ctx, target)
233200
}
234201
}()
235202

stern/tail.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ type Tail struct {
4545
podColor *color.Color
4646
containerColor *color.Color
4747
tmpl *template.Template
48-
active bool
4948
out io.Writer
5049
errOut io.Writer
5150
}
@@ -119,7 +118,6 @@ func NewTail(clientset corev1client.CoreV1Interface, nodeName, namespace, podNam
119118
Options: options,
120119
closed: make(chan struct{}),
121120
tmpl: tmpl,
122-
active: true,
123121
podColor: podColor,
124122
containerColor: containerColor,
125123

@@ -165,7 +163,6 @@ func (t *Tail) Start(ctx context.Context) error {
165163
})
166164

167165
err := t.ConsumeRequest(ctx, req)
168-
t.active = false
169166

170167
if errors.Is(err, context.Canceled) {
171168
return nil
@@ -267,11 +264,6 @@ func (t *Tail) Print(msg string) {
267264
fmt.Fprint(t.out, buf.String())
268265
}
269266

270-
// isActive returns false if the log stream is closed.
271-
func (t *Tail) isActive() bool {
272-
return t.active
273-
}
274-
275267
// Log is the object which will be used together with the template to generate
276268
// the output.
277269
type Log struct {

0 commit comments

Comments
 (0)