-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Closed
Labels
kind/featureNew feature or requestNew feature or requestwontfixThis will not be worked onThis will not be worked on
Description
Is your feature request related to a problem? Please describe.
Feature transformations should be comprised of reusable components in the form of python functions which can be built into a feature transformation pipeline.
Describe the solution you'd like
Chaining transformation into a pipeline which can be reused across FVs would reduce transformation code duplication across the feature repo.
Here is an example adapted from Tecton's transformation API: https://docs.tecton.ai/docs/defining-features/feature-views/transformations#a-feature-view-that-calls-a-pyspark-transformation-passing-two-pyspark-transformation-outputs
@transformation(mode="big_query")
def last_balance_time(transactions, max_user_transaction):
return f"""SELECT t.user_id, t.current_balance, last_t.last_transaction_date as timestamp
FROM {transactions} t
INNER JOIN {max_user_transaction} last_t
ON t.user_id = last_t.user_id AND t.timestamp = last_t.last_transaction_date;"""
@transformation(mode="big_query")
def user_last_transaction_time(transactions):
return f"""SELECT user_id, MAX(timestamp) AS last_transaction_date
FROM {transactions}
GROUP BY user_id"""
@feature_view(
sources=[credit_data_batch],
entities=[user],
mode="pipeline", # creates a DAG from re-useable transformation functions
batch_schedule=timedelta(days=1),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("current_balance", Float64)],
)
def user_last_balance(transactions):
user_last_transaction_time = user_last_transaction_time(transactions)
return last_balance_time(transactions, user_last_transaction_time)
Describe alternatives you've considered
Additional context
Metadata
Metadata
Assignees
Labels
kind/featureNew feature or requestNew feature or requestwontfixThis will not be worked onThis will not be worked on