-
Notifications
You must be signed in to change notification settings - Fork 3.4k
k8s: Resource[T], an implementation of informers with per-sub queues #21352
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
k8s: Resource[T], an implementation of informers with per-sub queues #21352
Conversation
f613ce9
to
76eede8
Compare
76eede8
to
b9b1b58
Compare
Full /test not needed as this PR adds a new, for now unused, package and the change to |
b9b1b58
to
747bd49
Compare
pkg/k8s/resource/event.go
Outdated
b.done(err) | ||
} | ||
|
||
// SyncEvent is emitted when the store has completed the initial synchronization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mean, exactly? Does it indicate simply that the store is loaded? Does it mean that every pre-existing item has been Observed at least once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means that the set of updated objects preceding the sync event constitute a complete snapshot and that the store can now be used (e.g. ListKeys() will not return a partially synced view).
In practical terms we need the sync event for garbage collection purposes. For example when the agent starts we would synchronize state to BPF maps and then run a garbage collection to see if there are entries in the BPF maps that no longer have a corresponding k8s object.
I'll try to improve the documentation!
This looks very good. I have one question. Separately, would you consider making an |
Ah good idea! I'll make some examples (already have one in https://github.com/joamaki/goreactive/tree/main/sources/k8s/example, I'll maybe adapt that). EDIT: Added an example to pkg/k8s/resource/example. |
747bd49
to
3d5cac5
Compare
3d5cac5
to
9001a31
Compare
/test |
ToChannel is often used with multiple observables in a for-select loop with a shared errs channel. The channel should be created and closed by the caller. Signed-off-by: Jussi Maki <jussi@isovalent.com>
This significantly simplifies the output for cells that export lots of objects. We're not that interested in the actual constructor name anyway as that can be easily found. Signed-off-by: Jussi Maki <jussi@isovalent.com>
The Resource[T] provides access to a Kubernetes resource as an observable stream of events or as via a read-only Store[T]: type Resource[T k8sRuntime.Object] interface { stream.Observable[Event[T]] Store(context.Context) (Store[T], error) } An event is either *SyncEvent, *UpdateEvent or *DeleteEvent. The event can be processed synchronously with event.Handle(onSync, onUpdate, onDelete), or asynchronously with a type switch and call to event.Done(err). On non-nil errors from event handling the key corresponding to the object being handled is requeued for later processing and after retries are exhausted the stream completes with the error from final retry. The Store() method blocks until the resource has been synchronized with the api-server (e.g. CachesSynced == true). Store[T] is a wrapper around cache.Store that only provides the methods that are safe to use (Get, GetByKey, List, ListKeys) and that are specialized to type T. This abstracts the management of an informer and cache behind the `NewResourceConstructor` function that takes a cache.ListerWatcher and provides a constructor that can be used from a cell to provide the Resource to the application: var exampleCell = hive.NewCell( "example", fx.Provide( // Provide `Resource[*slim_corev1.Pod]` to the application: resource.NewResourceConstructor(func(c k8sClient.Clientset) cache.ListerWatcher { return utils.ListerWatcherFromTyped[*slim_corev1.PodList](c.Slim().CoreV1().Pods("")) }), }), ... ) Resource is implemented as an informer whose handlers enqueue the changes keys to per-subscriber queues, from which a key is dequeued, the objects then retrieved from the cache.Store and then provided to the subscriber. This allows subscribers to process the changes at their own pace and allows coalescing changes to the same key when the subscriber is slow. Signed-off-by: Jussi Maki <jussi@isovalent.com>
Example for how Resource[T] can be used in a complete application. Signed-off-by: Jussi Maki <jussi@isovalent.com>
9001a31
to
fa5864b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Marking as ready-to-merge since we've got 2 reviews in, unit tests passing and this is not integrating to existing code yet so should have no adverse impact. |
The Resource[T] provides access to a Kubernetes resource as an
observable stream of events or as via a read-only Store[T]:
An event is either *SyncEvent, *UpdateEvent or *DeleteEvent. The
event can be processed synchronously with event.Handle(onSync,
onUpdate, onDelete), or asynchronously with a type switch and call
to event.Done(err). On non-nil errors from event handling the key
corresponding to the object being handled is requeued for later
processing and after retries are exhausted the stream completes with
the error from final retry.
The Store() method blocks until the resource has been synchronized with
the api-server (e.g. CachesSynced == true). Store[T] is a wrapper around
cache.Store that only provides the methods that are safe to use (Get,
GetByKey, List, ListKeys) and that are specialized to type T.
This abstracts the management of an informer and cache behind the
NewResourceConstructor
function that takes a cache.ListerWatcherand provides a constructor that can be used from a cell to provide the
Resource to the application:
If multiple resources are being used, it may make sense to go via channels to ensure
sequential processing and to avoid locks: