Skip to content

Commit 3d4a02a

Browse files
committed
Rename Until to UntilWithoutRetry and move to using context so it's
cancelable
1 parent ccb92f6 commit 3d4a02a

File tree

16 files changed

+178
-88
lines changed

16 files changed

+178
-88
lines changed

pkg/client/tests/listwatch_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/watch"
3131
restclient "k8s.io/client-go/rest"
3232
. "k8s.io/client-go/tools/cache"
33+
watchtools "k8s.io/client-go/tools/watch"
3334
utiltesting "k8s.io/client-go/util/testing"
3435
"k8s.io/kubernetes/pkg/api/testapi"
3536
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
@@ -201,7 +202,7 @@ func TestListWatchUntil(t *testing.T) {
201202
watch: fw,
202203
}
203204

204-
conditions := []watch.ConditionFunc{
205+
conditions := []watchtools.ConditionFunc{
205206
func(event watch.Event) (bool, error) {
206207
t.Logf("got %#v", event)
207208
return event.Type == watch.Added, nil

pkg/kubectl/cmd/get/get.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package get
1818

1919
import (
20+
"context"
2021
"encoding/json"
2122
"fmt"
2223
"io"
@@ -36,6 +37,7 @@ import (
3637
"k8s.io/apimachinery/pkg/util/sets"
3738
"k8s.io/apimachinery/pkg/watch"
3839
"k8s.io/client-go/rest"
40+
watchtools "k8s.io/client-go/tools/watch"
3941
"k8s.io/kubernetes/pkg/api/legacyscheme"
4042
api "k8s.io/kubernetes/pkg/apis/core"
4143
"k8s.io/kubernetes/pkg/kubectl"
@@ -564,9 +566,11 @@ func (o *GetOptions) watch(f cmdutil.Factory, cmd *cobra.Command, args []string)
564566
}
565567

566568
first := true
567-
intr := interrupt.New(nil, w.Stop)
569+
ctx, cancel := context.WithCancel(context.Background())
570+
defer cancel()
571+
intr := interrupt.New(nil, cancel)
568572
intr.Run(func() error {
569-
_, err := watch.Until(0, w, func(e watch.Event) (bool, error) {
573+
_, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
570574
if !isList && first {
571575
// drop the initial watch event in the single resource case
572576
first = false

pkg/kubectl/cmd/rollout/rollout_status.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@ limitations under the License.
1717
package rollout
1818

1919
import (
20+
"context"
2021
"fmt"
22+
"time"
2123

2224
"github.com/spf13/cobra"
2325

2426
"k8s.io/apimachinery/pkg/api/meta"
2527
"k8s.io/apimachinery/pkg/watch"
28+
watchtools "k8s.io/client-go/tools/watch"
2629
"k8s.io/kubernetes/pkg/kubectl"
2730
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
2831
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
@@ -191,9 +194,13 @@ func (o *RolloutStatusOptions) Run() error {
191194
}
192195

193196
// if the rollout isn't done yet, keep watching deployment status
194-
intr := interrupt.New(nil, w.Stop)
197+
// TODO: expose timeout
198+
timeout := 0 * time.Second
199+
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
200+
defer cancel()
201+
intr := interrupt.New(nil, cancel)
195202
return intr.Run(func() error {
196-
_, err := watch.Until(0, w, func(e watch.Event) (bool, error) {
203+
_, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
197204
// print deployment's status
198205
status, done, err := statusViewer.Status(info.Namespace, info.Name, o.Revision)
199206
if err != nil {

pkg/kubectl/cmd/run.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ limitations under the License.
1717
package cmd
1818

1919
import (
20+
"context"
2021
"fmt"
22+
"time"
2123

2224
"github.com/docker/distribution/reference"
23-
"github.com/spf13/cobra"
24-
2525
"github.com/golang/glog"
26+
"github.com/spf13/cobra"
2627

2728
corev1 "k8s.io/api/core/v1"
2829
"k8s.io/apimachinery/pkg/api/errors"
@@ -34,6 +35,7 @@ import (
3435
"k8s.io/client-go/dynamic"
3536
"k8s.io/client-go/kubernetes"
3637
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
38+
watchtools "k8s.io/client-go/tools/watch"
3739
"k8s.io/kubernetes/pkg/api/legacyscheme"
3840
"k8s.io/kubernetes/pkg/kubectl"
3941
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
@@ -467,16 +469,19 @@ func (o *RunOptions) removeCreatedObjects(f cmdutil.Factory, createdObjects []*R
467469
}
468470

469471
// waitForPod watches the given pod until the exitCondition is true
470-
func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitCondition watch.ConditionFunc) (*corev1.Pod, error) {
472+
func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitCondition watchtools.ConditionFunc) (*corev1.Pod, error) {
471473
w, err := podClient.Pods(ns).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: name}))
472474
if err != nil {
473475
return nil, err
474476
}
475477

476-
intr := interrupt.New(nil, w.Stop)
478+
// TODO: expose the timeout
479+
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), 0*time.Second)
480+
defer cancel()
481+
intr := interrupt.New(nil, cancel)
477482
var result *corev1.Pod
478483
err = intr.Run(func() error {
479-
ev, err := watch.Until(0, w, func(ev watch.Event) (bool, error) {
484+
ev, err := watchtools.UntilWithoutRetry(ctx, w, func(ev watch.Event) (bool, error) {
480485
return exitCondition(ev)
481486
})
482487
if ev != nil {

pkg/kubectl/cmd/wait/wait.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package wait
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"strings"
@@ -33,6 +34,7 @@ import (
3334
"k8s.io/apimachinery/pkg/util/wait"
3435
"k8s.io/apimachinery/pkg/watch"
3536
"k8s.io/client-go/dynamic"
37+
watchtools "k8s.io/client-go/tools/watch"
3638
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
3739
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
3840
"k8s.io/kubernetes/pkg/kubectl/genericclioptions"
@@ -272,11 +274,14 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error
272274
// we're out of time
273275
return gottenObj, false, wait.ErrWaitTimeout
274276
}
275-
watchEvent, err := watch.Until(o.Timeout, objWatch, isDeleted)
277+
278+
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
279+
watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, isDeleted)
280+
cancel()
276281
switch {
277282
case err == nil:
278283
return watchEvent.Object, true, nil
279-
case err == watch.ErrWatchClosed:
284+
case err == watchtools.ErrWatchClosed:
280285
continue
281286
case err == wait.ErrWaitTimeout:
282287
if watchEvent != nil {
@@ -334,11 +339,14 @@ func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (ru
334339
// we're out of time
335340
return gottenObj, false, wait.ErrWaitTimeout
336341
}
337-
watchEvent, err := watch.Until(o.Timeout, objWatch, w.isConditionMet)
342+
343+
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
344+
watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, w.isConditionMet)
345+
cancel()
338346
switch {
339347
case err == nil:
340348
return watchEvent.Object, true, nil
341-
case err == watch.ErrWatchClosed:
349+
case err == watchtools.ErrWatchClosed:
342350
continue
343351
case err == wait.ErrWaitTimeout:
344352
if watchEvent != nil {

pkg/kubectl/polymorphichelpers/helpers.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package polymorphichelpers
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"sort"
2223
"time"
@@ -33,6 +34,7 @@ import (
3334
"k8s.io/apimachinery/pkg/runtime"
3435
"k8s.io/apimachinery/pkg/watch"
3536
coreclient "k8s.io/client-go/kubernetes/typed/core/v1"
37+
watchtools "k8s.io/client-go/tools/watch"
3638
"k8s.io/kubernetes/pkg/apis/apps"
3739
"k8s.io/kubernetes/pkg/apis/batch"
3840
api "k8s.io/kubernetes/pkg/apis/core"
@@ -69,7 +71,10 @@ func GetFirstPod(client coreclient.PodsGetter, namespace string, selector string
6971
condition := func(event watch.Event) (bool, error) {
7072
return event.Type == watch.Added || event.Type == watch.Modified, nil
7173
}
72-
event, err := watch.Until(timeout, w, condition)
74+
75+
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
76+
defer cancel()
77+
event, err := watchtools.UntilWithoutRetry(ctx, w, condition)
7378
if err != nil {
7479
return nil, 0, err
7580
}

staging/src/k8s.io/client-go/tools/cache/listwatch.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/apimachinery/pkg/watch"
2929
restclient "k8s.io/client-go/rest"
3030
"k8s.io/client-go/tools/pager"
31+
watchtools "k8s.io/client-go/tools/watch"
3132
)
3233

3334
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
@@ -116,7 +117,7 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error)
116117
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
117118
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
118119
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
119-
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
120+
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
120121
if len(conditions) == 0 {
121122
return nil, nil
122123
}
@@ -178,8 +179,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch
178179
return nil, err
179180
}
180181

181-
evt, err := watch.Until(timeout, watchInterface, remainingConditions...)
182-
if err == watch.ErrWatchClosed {
182+
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
183+
defer cancel()
184+
evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
185+
if err == watchtools.ErrWatchClosed {
183186
// present a consistent error interface to callers
184187
err = wait.ErrWaitTimeout
185188
}

staging/src/k8s.io/client-go/tools/watch/until.go

+32-17
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,39 @@ limitations under the License.
1717
package watch
1818

1919
import (
20+
"context"
2021
"errors"
2122
"time"
2223

24+
"github.com/golang/glog"
2325
"k8s.io/apimachinery/pkg/util/wait"
26+
"k8s.io/apimachinery/pkg/watch"
2427
)
2528

2629
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
2730
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
2831
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
2932
// from false to true).
30-
type ConditionFunc func(event Event) (bool, error)
33+
type ConditionFunc func(event watch.Event) (bool, error)
3134

32-
// ErrWatchClosed is returned when the watch channel is closed before timeout in Until.
33-
var ErrWatchClosed = errors.New("watch closed before Until timeout")
35+
// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry.
36+
var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout")
3437

35-
// Until reads items from the watch until each provided condition succeeds, and then returns the last watch
38+
// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch
3639
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
3740
// If no event has been received, the returned event will be nil.
3841
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
39-
// A zero timeout means to wait forever.
40-
func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
42+
// Waits until context deadline or until context is canceled.
43+
//
44+
// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!!
45+
// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error.
46+
// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below,
47+
// Warning: solving such issues.
48+
// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone.
49+
func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) {
4150
ch := watcher.ResultChan()
4251
defer watcher.Stop()
43-
var after <-chan time.Time
44-
if timeout > 0 {
45-
after = time.After(timeout)
46-
} else {
47-
ch := make(chan time.Time)
48-
defer close(ch)
49-
after = ch
50-
}
51-
var lastEvent *Event
52+
var lastEvent *watch.Event
5253
for _, condition := range conditions {
5354
// check the next condition against the previous event and short circuit waiting for the next watch
5455
if lastEvent != nil {
@@ -69,7 +70,6 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
6970
}
7071
lastEvent = &event
7172

72-
// TODO: check for watch expired error and retry watch from latest point?
7373
done, err := condition(event)
7474
if err != nil {
7575
return lastEvent, err
@@ -78,10 +78,25 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
7878
break ConditionSucceeded
7979
}
8080

81-
case <-after:
81+
case <-ctx.Done():
8282
return lastEvent, wait.ErrWaitTimeout
8383
}
8484
}
8585
}
8686
return lastEvent, nil
8787
}
88+
89+
// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration.
90+
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
91+
if timeout < 0 {
92+
// This should be handled in validation
93+
glog.Errorf("Timeout for context shall not be negative!")
94+
timeout = 0
95+
}
96+
97+
if timeout == 0 {
98+
return context.WithCancel(parent)
99+
}
100+
101+
return context.WithTimeout(parent, timeout)
102+
}

0 commit comments

Comments
 (0)