Skip to content

Conversation

joamaki
Copy link
Contributor

@joamaki joamaki commented Sep 19, 2022

The Resource[T] provides access to a Kubernetes resource as an
observable stream of events or as via a read-only Store[T]:

type Resource[T k8sRuntime.Object] interface {
    stream.Observable[Event[T]]
    Store(context.Context) (Store[T], error)
}

An event is either *SyncEvent, *UpdateEvent or *DeleteEvent. The
event can be processed synchronously with event.Handle(onSync,
onUpdate, onDelete), or asynchronously with a type switch and call
to event.Done(err). On non-nil errors from event handling the key
corresponding to the object being handled is requeued for later
processing and after retries are exhausted the stream completes with
the error from final retry.

The Store() method blocks until the resource has been synchronized with
the api-server (e.g. CachesSynced == true). Store[T] is a wrapper around
cache.Store that only provides the methods that are safe to use (Get,
GetByKey, List, ListKeys) and that are specialized to type T.

This abstracts the management of an informer and cache behind the
NewResourceConstructor function that takes a cache.ListerWatcher
and provides a constructor that can be used from a cell to provide the
Resource to the application:

var exampleCell = hive.NewCell(
       "example",
       fx.Provide(
               // Provide `Resource[*slim_corev1.Pod]` to the application:
               resource.NewResourceConstructor(func(c k8sClient.Clientset) cache.ListerWatcher {
                       return utils.ListerWatcherFromTyped[*slim_corev1.PodList](c.Slim().CoreV1().Pods(""))
               }),
               // Use the resource:
               newExample,
       }),
)

func newExample(pods resource.Resource[*slim_corev1.Pod]) *Example {
           e := &Example{...}
           pods.Observe(e.ctx, e.onPodEvent, e.onPodsComplete)
           return e
}
func (e *Example) onPodEvent(key resource.Key, ev resource.Event[*slim_core.Pod]) error {
           // Process event ...
}
func (e *Example) onPodsComplete(err error) {
           // Handle error. We may end up here when:
           // 1. Observing context cancelled (e.g. we're stopping)
           // 2. Resource is being stopped
           // 3. Processing an event has failed after maximum number of retries.
}

If multiple resources are being used, it may make sense to go via channels to ensure
sequential processing and to avoid locks:

type Foo struct {
    ctx context.Context
    pods resource.Resource[*slim_corev1.Pod]
    services resource.Resource[*slim_corev1.Service]
}

func (f *Foo) processLoop() {
  errs := make(chan error)
  pods := stream.ToChannel(f.ctx, errs, f.pods)
  services := stream.ToChannel(f.ctx, errs, f.services)
  for {
    select {
    case ev := <-pods:
      ev.Handle(f.podsSynced, f.podUpdated, f.podsComplete)
    case ev := <-services:
      ev.Handle(f.servicesSynced, f.serviceUpdated, f.servicesComplete)
    case err := <-errs:
      // handle error by e.g. full state reset and replay etc.
    }
  }
}

@maintainer-s-little-helper maintainer-s-little-helper bot added the dont-merge/needs-release-note-label The author needs to describe the release impact of these changes. label Sep 19, 2022
@joamaki joamaki force-pushed the pr/joamaki/k8s-resources-minimal branch from f613ce9 to 76eede8 Compare September 19, 2022 15:08
@joamaki joamaki force-pushed the pr/joamaki/k8s-resources-minimal branch from 76eede8 to b9b1b58 Compare September 27, 2022 12:04
@joamaki joamaki added the release-note/misc This PR makes changes that have no direct user impact. label Sep 27, 2022
@maintainer-s-little-helper maintainer-s-little-helper bot removed the dont-merge/needs-release-note-label The author needs to describe the release impact of these changes. label Sep 27, 2022
@joamaki
Copy link
Contributor Author

joamaki commented Sep 27, 2022

Full /test not needed as this PR adds a new, for now unused, package and the change to pkg/logging/fx_logger.go only affects the cilium-agent objects command. Relevant check to pass is Travis CI which unit tests pkg/k8s/resource.

@joamaki joamaki marked this pull request as ready for review September 27, 2022 12:16
@joamaki joamaki requested review from a team as code owners September 27, 2022 12:16
@joamaki joamaki force-pushed the pr/joamaki/k8s-resources-minimal branch from b9b1b58 to 747bd49 Compare September 27, 2022 14:09
@christarazi christarazi self-requested a review September 27, 2022 23:38
@christarazi christarazi added area/daemon Impacts operation of the Cilium daemon. area/k8s Impacts the kubernetes API, or kubernetes -> cilium internals translation layers. labels Sep 27, 2022
b.done(err)
}

// SyncEvent is emitted when the store has completed the initial synchronization
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean, exactly? Does it indicate simply that the store is loaded? Does it mean that every pre-existing item has been Observed at least once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that the set of updated objects preceding the sync event constitute a complete snapshot and that the store can now be used (e.g. ListKeys() will not return a partially synced view).

In practical terms we need the sync event for garbage collection purposes. For example when the agent starts we would synchronize state to BPF maps and then run a garbage collection to see if there are entries in the BPF maps that no longer have a corresponding k8s object.

I'll try to improve the documentation!

@squeed
Copy link
Contributor

squeed commented Sep 28, 2022

This looks very good. I have one question.

Separately, would you consider making an examples folder? That way there's some basic code to crib from -- code that is compiled (since comments might rot out). Or, a dead-simple test that is meant for copying rather than testing.

@joamaki
Copy link
Contributor Author

joamaki commented Sep 28, 2022

This looks very good. I have one question.

Separately, would you consider making an examples folder? That way there's some basic code to crib from -- code that is compiled (since comments might rot out). Or, a dead-simple test that is meant for copying rather than testing.

Ah good idea! I'll make some examples (already have one in https://github.com/joamaki/goreactive/tree/main/sources/k8s/example, I'll maybe adapt that).

EDIT: Added an example to pkg/k8s/resource/example.

@joamaki joamaki force-pushed the pr/joamaki/k8s-resources-minimal branch from 747bd49 to 3d5cac5 Compare September 28, 2022 13:30
@joamaki joamaki requested a review from a team as a code owner September 28, 2022 13:30
@joamaki joamaki force-pushed the pr/joamaki/k8s-resources-minimal branch from 3d5cac5 to 9001a31 Compare September 28, 2022 15:01
@joamaki
Copy link
Contributor Author

joamaki commented Sep 29, 2022

/test

@joamaki joamaki requested a review from squeed September 29, 2022 12:17
ToChannel is often used with multiple observables in a for-select loop
with a shared errs channel. The channel should be created and closed by
the caller.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
This significantly simplifies the output for cells that export lots
of objects. We're not that interested in the actual constructor name
anyway as that can be easily found.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
The Resource[T] provides access to a Kubernetes resource as an
observable stream of events or as via a read-only Store[T]:

   type Resource[T k8sRuntime.Object] interface {
   	stream.Observable[Event[T]]
   	Store(context.Context) (Store[T], error)
   }

An event is either *SyncEvent, *UpdateEvent or *DeleteEvent. The
event can be processed synchronously with event.Handle(onSync,
onUpdate, onDelete), or asynchronously with a type switch and call
to event.Done(err). On non-nil errors from event handling the key
corresponding to the object being handled is requeued for later
processing and after retries are exhausted the stream completes with
the error from final retry.

The Store() method blocks until the resource has been synchronized with
the api-server (e.g. CachesSynced == true).  Store[T] is a wrapper around
cache.Store that only provides the methods that are safe to use (Get,
GetByKey, List, ListKeys) and that are specialized to type T.

This abstracts the management of an informer and cache behind the
`NewResourceConstructor` function that takes a cache.ListerWatcher
and provides a constructor that can be used from a cell to provide the
Resource to the application:

   var exampleCell = hive.NewCell(
           "example",
           fx.Provide(
                   // Provide `Resource[*slim_corev1.Pod]` to the application:
                   resource.NewResourceConstructor(func(c k8sClient.Clientset) cache.ListerWatcher {
                           return utils.ListerWatcherFromTyped[*slim_corev1.PodList](c.Slim().CoreV1().Pods(""))
                   }),
           }),
           ...
   )

Resource is implemented as an informer whose handlers enqueue the
changes keys to per-subscriber queues, from which a key is dequeued,
the objects then retrieved from the cache.Store and then provided
to the subscriber. This allows subscribers to process the changes
at their own pace and allows coalescing changes to the same key when
the subscriber is slow.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
Example for how Resource[T] can be used in a complete application.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
@joamaki joamaki force-pushed the pr/joamaki/k8s-resources-minimal branch from 9001a31 to fa5864b Compare September 30, 2022 08:36
Copy link
Member

@dylandreimerink dylandreimerink left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@joamaki
Copy link
Contributor Author

joamaki commented Sep 30, 2022

Marking as ready-to-merge since we've got 2 reviews in, unit tests passing and this is not integrating to existing code yet so should have no adverse impact.

@joamaki joamaki added the ready-to-merge This PR has passed all tests and received consensus from code owners to merge. label Sep 30, 2022
@ti-mo ti-mo merged commit f687dd5 into cilium:master Sep 30, 2022
@joamaki joamaki deleted the pr/joamaki/k8s-resources-minimal branch September 30, 2022 13:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/daemon Impacts operation of the Cilium daemon. area/k8s Impacts the kubernetes API, or kubernetes -> cilium internals translation layers. ready-to-merge This PR has passed all tests and received consensus from code owners to merge. release-note/misc This PR makes changes that have no direct user impact.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants