@@ -19,41 +19,20 @@ import (
19
19
"fmt"
20
20
"regexp"
21
21
"strings"
22
- "sync "
22
+ "time "
23
23
24
24
"github.com/pkg/errors"
25
25
26
26
"github.com/stern/stern/kubernetes"
27
27
"golang.org/x/sync/errgroup"
28
+ "golang.org/x/time/rate"
28
29
29
30
"k8s.io/apimachinery/pkg/labels"
30
31
31
32
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32
33
clientset "k8s.io/client-go/kubernetes"
33
34
)
34
35
35
- var tails = make (map [string ]* Tail )
36
- var tailLock sync.RWMutex
37
-
38
- func getTail (targetID string ) (* Tail , bool ) {
39
- tailLock .RLock ()
40
- defer tailLock .RUnlock ()
41
- tail , ok := tails [targetID ]
42
- return tail , ok
43
- }
44
-
45
- func setTail (targetID string , tail * Tail ) {
46
- tailLock .Lock ()
47
- defer tailLock .Unlock ()
48
- tails [targetID ] = tail
49
- }
50
-
51
- func clearTail (targetID string ) {
52
- tailLock .Lock ()
53
- defer tailLock .Unlock ()
54
- delete (tails , targetID )
55
- }
56
-
57
36
// Run starts the main run loop
58
37
func Run (ctx context.Context , config * Config ) error {
59
38
clientConfig := kubernetes .NewClientConfig (config .KubeConfig , config .ContextName )
@@ -103,15 +82,15 @@ func Run(ctx context.Context, config *Config) error {
103
82
}
104
83
}
105
84
106
- filter := & targetFilter {
85
+ filter := newTargetFilter ( targetFilterConfig {
107
86
podFilter : config .PodQuery ,
108
87
excludePodFilter : config .ExcludePodQuery ,
109
88
containerFilter : config .ContainerQuery ,
110
89
containerExcludeFilter : config .ExcludeContainerQuery ,
111
90
initContainers : config .InitContainers ,
112
91
ephemeralContainers : config .EphemeralContainers ,
113
92
containerStates : config .ContainerStates ,
114
- }
93
+ })
115
94
newTail := func (t * Target ) * Tail {
116
95
return NewTail (client .CoreV1 (), t .Node , t .Namespace , t .Pod , t .Container , config .Template , config .Out , config .ErrOut , & TailOptions {
117
96
Timestamps : config .Timestamps ,
@@ -155,19 +134,17 @@ func Run(ctx context.Context, config *Config) error {
155
134
}
156
135
157
136
added := make (chan * Target )
158
- removed := make (chan * Target )
159
137
errCh := make (chan error )
160
138
161
139
defer close (added )
162
- defer close (removed )
163
140
defer close (errCh )
164
141
165
142
for _ , n := range namespaces {
166
143
selector , err := chooseSelector (ctx , client , n , resource .kind , resource .name , config .LabelSelector )
167
144
if err != nil {
168
145
return err
169
146
}
170
- a , r , err := WatchTargets (ctx ,
147
+ a , err := WatchTargets (ctx ,
171
148
client .CoreV1 ().Pods (n ),
172
149
selector ,
173
150
config .FieldSelector ,
@@ -186,50 +163,40 @@ func Run(ctx context.Context, config *Config) error {
186
163
return
187
164
}
188
165
added <- v
189
- case v , ok := <- r :
190
- if ! ok {
191
- errCh <- fmt .Errorf ("lost watch connection" )
192
- return
193
- }
194
- removed <- v
195
166
case <- ctx .Done ():
196
167
return
197
168
}
198
169
}
199
170
}()
200
171
}
201
172
202
- go func () {
203
- for p := range added {
204
- targetID := p .GetID ()
205
-
206
- if tail , ok := getTail (targetID ); ok {
207
- if tail .isActive () {
208
- continue
209
- } else {
210
- tail .Close ()
211
- clearTail (targetID )
212
- }
173
+ addTarget := func (ctx context.Context , target * Target ) {
174
+ // We use a rate limiter to prevent a burst of retries.
175
+ // It also enables us to retry immediately, in most cases,
176
+ // when it is disconnected on the way.
177
+ limiter := rate .NewLimiter (rate .Every (time .Second * 10 ), 3 )
178
+ for {
179
+ if err := limiter .Wait (ctx ); err != nil {
180
+ fmt .Fprintf (config .ErrOut , "failed to retry: %v\n " , err )
181
+ return
213
182
}
214
-
215
- tail := newTail (p )
216
- setTail (targetID , tail )
217
-
218
- go func (tail * Tail ) {
219
- if err := tail .Start (ctx ); err != nil {
220
- fmt .Fprintf (config .ErrOut , "unexpected error: %v\n " , err )
221
- }
222
- }(tail )
183
+ tail := newTail (target )
184
+ err := tail .Start (ctx )
185
+ tail .Close ()
186
+ if err == nil {
187
+ return
188
+ }
189
+ if ! filter .isActive (target ) {
190
+ fmt .Fprintf (config .ErrOut , "failed to tail: %v\n " , err )
191
+ return
192
+ }
193
+ fmt .Fprintf (config .ErrOut , "failed to tail: %v, will retry\n " , err )
223
194
}
224
- }()
195
+ }
225
196
226
197
go func () {
227
- for p := range removed {
228
- targetID := p .GetID ()
229
- if tail , ok := getTail (targetID ); ok {
230
- tail .Close ()
231
- clearTail (targetID )
232
- }
198
+ for target := range added {
199
+ go addTarget (ctx , target )
233
200
}
234
201
}()
235
202
0 commit comments