Skip to content

Commit 8d9f2d1

Browse files
python: add ETL transformation args QParam support
- Introduced `ETLConfig` dataclass to encapsulate ETL-related parameters. - Updated `get_reader` and `get` methods to support `ETLConfig`, ensuring consistent handling of ETL metadata. - Added ETL-related query parameter (`QPARAM_ETL_ARGS`) in Python SDK. - Refactored `get_reader` and `get` to use the new ETL configuration approach. Signed-off-by: Abhishek Gaikwad <gaikwadabhishek1997@gmail.com>
1 parent 8630b85 commit 8d9f2d1

File tree

16 files changed

+216
-110
lines changed

16 files changed

+216
-110
lines changed

docs/examples/aisio_webdataset/etl_webdataset.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import webdataset as wds
66
from PIL import Image
77
from aistore.sdk import Client
8+
from aistore.sdk.etl import ETLConfig
89
from torch.utils.data import IterableDataset
910
from torch.utils.data.dataset import T_co
1011

@@ -100,7 +101,7 @@ def read_object_tar(shard_data):
100101
def transform_object_inline():
101102
single_object = client.bucket(bucket_name).object("samples-00.tar")
102103
# Get object contents with ETL applied
103-
processed_shard = single_object.get_reader(etl_name=etl_name).read_all()
104+
processed_shard = single_object.get_reader(etl=ETLConfig(name=etl_name)).read_all()
104105
read_object_tar(processed_shard)
105106

106107

python/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ We structure this changelog in accordance with [Keep a Changelog](https://keepac
99
## Unreleased
1010

1111
### Added
12-
12+
- Added Support for ETL Transformation Arguments in GET Requests for Inline Objects
13+
1314
### Changed
1415

1516
## [1.11.1] - 2025-02-06

python/aistore/pytorch/aisio.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""
22
AIS IO Datapipe
3-
Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved.
3+
Copyright (c) 2022-2025, NVIDIA CORPORATION. All rights reserved.
44
"""
55

66
from typing import Iterator, Tuple, List
@@ -12,6 +12,7 @@
1212
from torchdata.datapipes.utils import StreamWrapper
1313

1414
from aistore.sdk.ais_source import AISSource
15+
from aistore.sdk.etl import ETLConfig
1516

1617
try:
1718
from aistore.sdk import Client
@@ -146,7 +147,7 @@ def __iter__(self) -> Iterator[Tuple[str, StreamWrapper]]:
146147
yield url, StreamWrapper(
147148
self.client.bucket(bck_name=bck_name, provider=provider)
148149
.object(obj_name=obj_name)
149-
.get_reader(etl_name=self.etl_name)
150+
.get_reader(etl=ETLConfig(name=self.etl_name))
150151
.raw()
151152
)
152153

@@ -156,25 +157,27 @@ def __len__(self) -> int:
156157

157158
@functional_datapipe("ais_list_sources")
158159
class AISSourceLister(IterDataPipe[str]):
159-
def __init__(self, ais_sources: List[AISSource], prefix="", etl_name=None):
160+
def __init__(
161+
self, ais_sources: List[AISSource], prefix="", etl: ETLConfig = None
162+
) -> None:
160163
"""
161164
Iterable DataPipe over the full URLs for each of the provided AIS source object types
162165
163166
Args:
164167
ais_sources (List[AISSource]): List of types implementing the AISSource interface: Bucket, ObjectGroup,
165168
Object, etc.
166169
prefix (str, optional): Filter results to only include objects with names starting with this prefix
167-
etl_name (str, optional): Pre-existing ETL on AIS to apply to all selected objects on the cluster side
170+
etl (ETLConfig, optional): Pre-existing ETL on AIS to apply to all selected objects on the cluster side
168171
"""
169172
_assert_aistore()
170173
self.sources = ais_sources
171174
self.prefix = prefix
172-
self.etl_name = etl_name
175+
self.etl = etl
173176

174177
def __getitem__(self, index) -> T_co:
175178
raise NotImplementedError
176179

177180
def __iter__(self) -> Iterator[T_co]:
178181
for source in self.sources:
179-
for url in source.list_urls(prefix=self.prefix, etl_name=self.etl_name):
182+
for url in source.list_urls(prefix=self.prefix, etl=self.etl):
180183
yield url

python/aistore/pytorch/multishard_dataset.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
"""
22
Multishard Stream Dataset for AIS.
33
4-
Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
4+
Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved.
55
"""
66

7+
from typing import Iterator, List, Iterable
8+
79
from aistore.sdk import ArchiveConfig, DataShard, ListObjectFlag
810
from aistore.sdk import Bucket
9-
from typing import Iterator, List, Iterable
11+
from aistore.sdk.etl import ETLConfig
1012
from torch.utils.data import IterableDataset
1113

1214

@@ -59,6 +61,6 @@ def _get_shard_objects_iterator(
5961
if obj.name != path:
6062
obj_name = obj.name.replace(f"{path}/", "", 1)
6163
yield bucket.object(path).get_reader(
62-
etl_name=etl_name,
64+
etl=ETLConfig(name=etl_name),
6365
archive_config=ArchiveConfig(archpath=obj_name),
6466
).read_all()

python/aistore/sdk/ais_source.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
#
2-
# Copyright (c) 2023 - 2024, NVIDIA CORPORATION. All rights reserved.
2+
# Copyright (c) 2023 - 2025, NVIDIA CORPORATION. All rights reserved.
33
#
44
from abc import ABC, abstractmethod
5-
from typing import Iterable
5+
from typing import Iterable, Optional
66
from aistore.sdk.obj.object import Object
77
from aistore.sdk.request_client import RequestClient
8+
from aistore.sdk.etl import ETLConfig
89

910

1011
# pylint: disable=too-few-public-methods
@@ -35,12 +36,15 @@ def list_all_objects_iter(
3536
"""
3637

3738
@abstractmethod
38-
def list_urls(self, prefix: str = "", etl_name: str = None) -> Iterable[str]:
39+
def list_urls(
40+
self, prefix: str = "", etl: Optional[ETLConfig] = None
41+
) -> Iterable[str]:
3942
"""
4043
Get an iterable of full urls to reference the objects contained in this source (bucket, group, etc.)
4144
Args:
4245
prefix (str, optional): Only include objects with names matching this prefix
43-
etl_name (str, optional): Apply an ETL when retrieving object contents
46+
etl (Optional[ETLConfig], optional): An optional ETL configuration. If provided, the URLs
47+
will include ETL processing parameters. Defaults to None.
4448
4549
Returns:
4650
Iterable over selected object URLS

python/aistore/sdk/bucket.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@
99
import os
1010
from pathlib import Path
1111
import time
12-
from typing import Dict, List, NewType, Iterable, Union
12+
from typing import Dict, List, NewType, Iterable, Union, Optional
1313
import requests
1414
from requests import structures
1515

1616
from aistore.sdk.ais_source import AISSource
1717
from aistore.sdk.etl.etl_const import DEFAULT_ETL_TIMEOUT
1818
from aistore.sdk.obj.object_iterator import ObjectIterator
19+
from aistore.sdk.etl import ETLConfig
1920
from aistore.sdk.const import (
2021
ACT_COPY_BCK,
2122
ACT_CREATE_BCK,
@@ -136,20 +137,23 @@ def namespace(self) -> Namespace:
136137
"""The namespace for this bucket."""
137138
return self._namespace
138139

139-
def list_urls(self, prefix: str = "", etl_name: str = None) -> Iterable[str]:
140+
def list_urls(
141+
self, prefix: str = "", etl: Optional[ETLConfig] = None
142+
) -> Iterable[str]:
140143
"""
141-
Implementation of the abstract method from AISSource that provides an iterator
142-
of full URLs to every object in this bucket matching the specified prefix
144+
Generates full URLs for all objects in the bucket that match the specified prefix.
143145
144146
Args:
145-
prefix (str, optional): Limit objects selected by a given string prefix
146-
etl_name (str, optional): ETL to include in URLs
147+
prefix (str, optional): A string prefix to filter objects. Only objects with names starting
148+
with this prefix will be included. Defaults to an empty string (no filtering).
149+
etl (Optional[ETLConfig], optional): An optional ETL configuration. If provided, the URLs
150+
will include ETL processing parameters. Defaults to None.
147151
148152
Returns:
149-
Iterator of full URLs of all objects matching the prefix
153+
Iterable[str]: An iterator yielding full URLs of all objects matching the prefix.
150154
"""
151155
for entry in self.list_objects_iter(prefix=prefix, props="name"):
152-
yield self.object(entry.name).get_url(etl_name=etl_name)
156+
yield self.object(entry.name).get_url(etl=etl)
153157

154158
def list_all_objects_iter(
155159
self, prefix: str = "", props: str = "name,size"

python/aistore/sdk/const.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
# URL Params
4545
# See api/apc/query.go
4646
QPARAM_WHAT = "what"
47-
QPARAM_ETL_NAME = "etl_name"
4847
QPARAM_PROVIDER = "provider"
4948
QPARAM_BCK_TO = "bck_to"
5049
QPARAM_FLT_PRESENCE = "presence"
@@ -62,6 +61,9 @@
6261
QPARAM_UUID = "uuid"
6362
QPARAM_LATEST = "latest-ver"
6463
QPARAM_NEW_CUSTOM = "set-new-custom"
64+
# etl
65+
QPARAM_ETL_NAME = "etl_name"
66+
QPARAM_ETL_ARGS = "etl_meta"
6567

6668
# URL Param values
6769
# See api/apc/query.go

python/aistore/sdk/etl/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#
2+
# ETL Module
3+
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
4+
#
5+
6+
from aistore.sdk.etl.etl_config import ETLConfig

python/aistore/sdk/etl/etl_config.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#
2+
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
3+
#
4+
from dataclasses import dataclass
5+
from typing import Dict, Optional, Union
6+
7+
8+
@dataclass
9+
class ETLConfig:
10+
"""
11+
Configuration for ETL operations.
12+
13+
Attributes:
14+
name (str): Name of the ETL pipeline.
15+
args (Optional[Union[str, Dict]]): Optional parameters for configuring the ETL pipeline.
16+
Can be provided as:
17+
- A string for simple arguments.
18+
- A dictionary for structured key-value configurations.
19+
Defaults to an empty dictionary.
20+
21+
Example:
22+
etl_config = ETLConfig(
23+
name="image-transform",
24+
args={"format": "jpeg", "resize": "256x256"}
25+
)
26+
"""
27+
28+
name: str
29+
args: Optional[Union[str, Dict]] = None

python/aistore/sdk/multiobj/object_group.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#
2-
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
2+
# Copyright (c) 2023-2025, NVIDIA CORPORATION. All rights reserved.
33
#
44
import logging
5-
from typing import Dict, List, Iterable
5+
from typing import Dict, List, Iterable, Optional
66

77
from aistore.sdk.ais_source import AISSource
88
from aistore.sdk.const import (
@@ -30,6 +30,7 @@
3030
PrefetchMsg,
3131
)
3232
from aistore.sdk.request_client import RequestClient
33+
from aistore.sdk.etl import ETLConfig
3334

3435

3536
class ObjectGroup(AISSource):
@@ -79,19 +80,22 @@ def client(self, client) -> RequestClient:
7980
"""Update the client bound to the bucket used by the ObjectGroup."""
8081
self.bck.client = client
8182

82-
def list_urls(self, prefix: str = "", etl_name: str = None) -> Iterable[str]:
83+
def list_urls(
84+
self, prefix: str = "", etl: Optional[ETLConfig] = None
85+
) -> Iterable[str]:
8386
"""
8487
Implementation of the abstract method from AISSource that provides an iterator
8588
of full URLs to every object in this bucket matching the specified prefix
8689
Args:
8790
prefix (str, optional): Limit objects selected by a given string prefix
88-
etl_name (str, optional): ETL to include in URLs
91+
etl (Optional[ETLConfig], optional): An optional ETL configuration. If provided, the URLs
92+
will include ETL processing parameters. Defaults to None.
8993
9094
Returns:
9195
Iterator of all object URLs in the group
9296
"""
9397
for obj_name in self._obj_collection:
94-
yield self.bck.object(obj_name).get_url(etl_name=etl_name)
98+
yield self.bck.object(obj_name).get_url(etl=etl)
9599

96100
def list_all_objects_iter(
97101
self, prefix: str = "", props: str = "name,size"

0 commit comments

Comments
 (0)