-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Describe the current behavior
Currently, to retrieve the count of flow runs by state, I need to fetch all the flow run records from the API and compute the count manually. For example:
from collections import Counter
from prefect import get_client
async def count_flow_runs_by_state():
async with get_client() as client:
results = await client.read_flow_runs()
count = Counter()
for result in results:
count[result.state.type] += 1
return count
print(await count_flow_runs_by_state())
This approach requires loading all run data from the API into memory just to compute simple aggregates like counts, which is inefficient and not scalable for large datasets.
Describe the proposed behavior
There is a similar function count_task_runs_by_state
which already exists on the server side, it would be helpful to expose a similar method count_flow_runs_by_state
in the prefect.client
API.
This would allow users to query task or flow run counts by state without having to fetch all individual records, which improves performance and reduces API overhead—especially for large-scale deployments.
Ideally, this could look something like:
async with get_client() as client:
counts = await client.count_flow_runs_by_state(flow_filter=None, flow_run_filter=None, task_run_filter=None, deployment_filter=None)
This would align with how other server-side summary endpoints are exposed via the client.
Example Use
The ability to query flow or task run counts by state directly from the client is valuable for building monitoring and alerting systems, such as dashboards or CloudWatch metrics.
Fetching the full list of runs and computing counts client-side introduces unnecessary network and memory overhead, especially at scale. Having a lightweight, server-side aggregate endpoint exposed through the client would make it much easier to integrate Prefect into observability pipelines in a performant and reliable way.
Supporting this in the prefect.client
interface means it will be supported in the lightweight prefect-client
package.
Example of use case:
from collections import Counter
from prefect import get_client
async def count_flow_runs_by_state():
async with get_client() as client:
return await client.count_flow_runs_by_state()
# push to monitoring service
Additional context
No response