-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Use Case
created from a discussion in Prefect Community Slack
I'd like to propose a new type of storage. For the sake of this conversation, I'll refer to it as Webhook
storage.
With Webhook
storage, flows are stored and retrieved by HTTP requests. The storage object contains the details needed to construct those requests. I think this could be a lightweight but powerful way to allow users to integrate Prefect with their existing stack.
benefit 1: custom storage with external services
This could be a route to using any type of external service that exposes writing and reading binary files over HTTP.
It would allow users to write their own storage classes for things like:
and would allow the use of other cloud providers' object stores that prefect
doesn't have first-class support for (like Alibaba Cloud Object Storage Service or IBM Cloud object store)
benefit 2: integration with internal services
In companies I've worked at / with before, I've seen the pattern where pubic cloud services can only be used directly by infrastructure teams, and data scientists and other application developers are restricted to only using the company's own microservices.
Adding Webhook
storage would allow users in such a situation to integrate with prefect core server (Cloud or run themselves) without needing to have any credentials that allow direct access to cloud providers (which is necessary to use S3
, GCS
, or Azure
storage).
Solution
This might look something like this:
rough sketch implementation (click me)
import io
from typing import TYPE_CHECKING, Any, Dict, List
from prefect.client import Secret
class Webhook(Storage):
"""
Args:
- build_kwargs (dict): Dictionary of keyword arguments to the
function from ``requests`` used to store the flow.
- build_http_method (str): HTTP method identifying the type of request
to execute when storing the flow. For example, ``"POST"`` for
``requests.post()``.
- get_flow_kwargs (dict): Dictionary of keyword arguments to the
function from ``requests`` used to retrive the flow.
- get_flow_http_method (str): HTTP method identifying the type of
request to execute when storing the flow. For example, ``"GET"``
for ``requests.post()``.
- secret_config (dict, optional): A dictionary describing how to set
request headers from environment variables or Prefect Cloud
secrets. See example for details on specifying this
Passing sensitive data in headers
---------------------------------
For services which require authentication, use `secret_config` to pass
sensitive data like API keys without storing their values in this Storage object.
This should be a dictionary whose keys are headers, and whose
values indicate whether to retrieve real values from environment
variables (``"type": "environment"``) or
Prefect secrets (``"type": "secret"``).
So, for example, to get an API key from an environment variable you
can do the following
.. code-block:: python
storage = Webhoook(
build_kwargs={
"url": "some-random-service.place.thing",
"headers" = {
"Content-Type" = "application/octet-stream"
}
},
build_http_method="POST",
...
...
secret_config={
"X-Api-Key": {
"value": "MY_COOL_ENV_VARIABLE",
"type": "environment"
}
}
)
"""
def __init__(
self,
build_kwargs: dict,
build_http_method: str,
get_flow_kwargs: dict,
get_flow_http_method: str,
secret_config: dict = {},
**kwargs: Any
) -> None:
self.flows = dict() # type: Dict[str, str]
self._flows = dict() # type: Dict[str, "Flow"]
self.build_kwargs = build_kwargs
self.build_http_method = build_http_method
self.get_flow_kwargs = get_flow_kwargs
self.get_flow_http_method = build_http_method
self.secret_config = {}
self._method_to_function = {
"GET": requests.get,
"POST": requests.post,
"PUT": requests.put
}
super().__init__(**kwargs)
@property
def default_labels(self) -> List[str]:
return ["webhook-flow-storage"]
def _render_headers(self, headers: dict) -> dict:
out_headers = headers.copy()
for header, _ in headers.items():
if header in self.secret_config.keys():
name = self.secret_config[header]
if self.secret_config["type"] == "environment":
out_headers[header] = os.environ[name]
elif self.secret_config["type"] == "secret":
out_headers[header] = Secret(name).get()
return out_header
def get_flow(self, flow_name: str) -> "Flow":
req_function = self._method_to_function[self.get_flow_http_method]
get_flow_kwargs = self.get_flow_kwargs.copy()
get_flow_kwargs["headers"] = self._render_headers(
get_flow_kwargs["headers"]
)
response = req_function(**get_flow_kwargs)
return cloudpickle.loads(response.content)
def build(self) -> "Storage":
self.run_basic_healthchecks()
for flow_name, flow in self._flows.items():
# Pickle Flow
data = cloudpickle.dumps(flow)
# Write pickled Flow to stream
try:
stream = io.BytesIO(data)
except TypeError:
stream = io.BytesIO(data.encode())
# write flow to the service
req_function = self._method_to_function[self.build_method]
build_kwargs = self.build_kwargs.copy()
build_kwargs["headers"] = self._render_headers(
build_kwargs["headers"]
)
response = req_function(**build_kwargs)
return self
My basic proposal is that build()
executes one HTTP request and get_flow()
executes another.
Open Questions
How could this work with multiple flows?
- I personally don't understand the use case for multiple flows in one Storage object, and I'm not certain how that could work with
Webhook
storage (but I'm sure it could be figured out)
How could this support services where you have to write a file before you know enough to read it?
- for example, services that generate a unique ID when you upload a file and then return that ID in the response.
get_flow()
needs that ID to work. - there is probably a workaround where, for such a service, you can
build()
, update details of the storage based on the response, then useflow.register(build=False)
Do any details of the HTTP client need to be customizable?
- in my proposal, I'm only proposing customizing requests. I think that's a good starting point.
- Do client details like retry logic need to be customizable, or could they be hardcoded to reasonable defaults?
- are flow objects ever so large that you need to use things like multipart upload?
Alternatives
Prefect Cloud Flow Storage
Some of the uses cases mentioned above might be solved by introducing a Prefect Cloud storage service, where you just authenticate with Prefect Cloud and it acts as the cloud storage service.
-
pros
- purpose-built for storing flows, so some things like
Content-Type
header and expectations about how the object is named can be hard-coded and hidden from users
- purpose-built for storing flows, so some things like
-
cons
- violates the design principle of all user code staying in users' infrastructure
Doing Nothing
Maybe this isn't a big enough concern to warrant growing the prefect
codebase. All new code comes with maintenance costs, and maybe the added maintenance cost of this feature outweighs the benefit to users of an extension like this.
Closing Thoughts
If the maintainers here agree that this feature is worth pursuing, I want to note that I'd be happy to attempt a pull request. You all have been so careful and thoughtful in the design of the boundaries between different components, I feel confident that I could come up with a reasonable implementation.
Thanks for your time and consideration!