Skip to content

Commit 4b43418

Browse files
committed
[Go API change] extend api.ETLObject to support ETL inline transform metadata passing
* add `etl_args` argument * add `TestETLInlineObjWithMetadata` integration test TODO: * deprecates `etl_meta` query field * update cli `ais etl object ...` to align with updated API Signed-off-by: Tony Chen <a122774007@gmail.com>
1 parent 8d9f2d1 commit 4b43418

File tree

10 files changed

+155
-66
lines changed

10 files changed

+155
-66
lines changed

ais/dpq.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@ type dpq struct {
3131
path, mime, regx, mmode string // QparamArchpath et al. (plus archmode below)
3232
}
3333
etl struct {
34-
name, meta string // QparamETLName, QparamETLMeta
34+
name, targs string // QparamETLName, QparamETLTransformArgs
35+
meta string // QparamETLMeta (DEPRECATED - Replace with QparamETLTransformArgs soon)
3536
}
3637

3738
ptime string // req timestamp at calling/redirecting proxy (QparamUnixTime)
3839
uuid string // xaction
3940
origURL string // ht://url->
4041
owt string // object write transaction { OwtPut, ... }
4142
fltPresence string // QparamFltPresence
42-
// etlName string // QparamETLName
43-
binfo string // bucket info, with or without requirement to summarize remote obj-s
43+
binfo string // bucket info, with or without requirement to summarize remote obj-s
4444

4545
skipVC bool // QparamSkipVC (skip loading existing object's metadata)
4646
isGFN bool // QparamIsGFNRequest
@@ -150,7 +150,9 @@ func (dpq *dpq) parse(rawQuery string) (err error) {
150150

151151
case apc.QparamETLName:
152152
dpq.etl.name = value
153-
case apc.QparamETLMeta:
153+
case apc.QparamETLTransformArgs:
154+
dpq.etl.targs = value
155+
case apc.QparamETLMeta: // DEPRECATED - Replace with QparamETLTransformArgs soon.
154156
dpq.etl.meta = value
155157
case apc.QparamSilent:
156158
dpq.silent = cos.IsParseBool(value)

ais/target.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ func (t *target) getObject(w http.ResponseWriter, r *http.Request, dpq *dpq, bck
709709

710710
// two special flows
711711
if dpq.etl.name != "" {
712-
t.getFromETL(w, r, dpq.etl.name, dpq.etl.meta, lom)
712+
t.getFromETL(w, r, dpq, lom)
713713
return lom, nil
714714
}
715715
if cos.IsParseBool(r.Header.Get(apc.HdrBlobDownload)) {

ais/test/etl_test.go

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
cryptorand "crypto/rand"
1010
"fmt"
1111
"io"
12+
"math/rand/v2"
1213
"net/http"
1314
"net/url"
1415
"os"
@@ -38,6 +39,7 @@ import (
3839
"github.com/NVIDIA/aistore/tools/trand"
3940
"github.com/NVIDIA/aistore/xact"
4041
"github.com/NVIDIA/go-tfdata/tfdata/core"
42+
"github.com/OneOfOne/xxhash"
4143
)
4244

4345
const (
@@ -117,7 +119,7 @@ func readExamples(fileName string) (examples []*core.TFExample, err error) {
117119
return core.NewTFRecordReader(f).ReadAllExamples()
118120
}
119121

120-
func testETLObject(t *testing.T, etlName, inPath, outPath string, fTransform transformFunc, fEq filesEqualFunc) {
122+
func testETLObject(t *testing.T, etlName string, args any, inPath, outPath string, fTransform transformFunc, fEq filesEqualFunc) {
121123
var (
122124
inputFilePath string
123125
expectedOutputFilePath string
@@ -161,17 +163,20 @@ func testETLObject(t *testing.T, etlName, inPath, outPath string, fTransform tra
161163
tassert.CheckFatal(t, err)
162164
defer fho.Close()
163165

164-
tlog.Logf("GET %s via etl[%s]\n", bck.Cname(objName), etlName)
165-
err = api.ETLObject(baseParams, etlName, bck, objName, fho)
166+
tlog.Logf("GET %s via etl[%s], args=%v\n", bck.Cname(objName), etlName, args)
167+
oah, err := api.ETLObject(baseParams, &api.ETLObjArgs{ETLName: etlName, Metadata: args}, bck, objName, fho)
166168
tassert.CheckFatal(t, err)
167169

170+
stat, _ := fho.Stat()
171+
tassert.Fatalf(t, stat.Size() == oah.Size(), "expected %d bytes, got %d", oah.Size(), stat.Size())
172+
168173
tlog.Logln("Compare output")
169174
same, err := fEq(outputFileName, expectedOutputFilePath)
170175
tassert.CheckError(t, err)
171176
tassert.Errorf(t, same, "file contents after transformation differ")
172177
}
173178

174-
func testETLObjectCloud(t *testing.T, bck cmn.Bck, etlName string, onlyLong, cached bool) {
179+
func testETLObjectCloud(t *testing.T, bck cmn.Bck, etlName, args string, onlyLong, cached bool) {
175180
var (
176181
proxyURL = tools.RandomProxyURL(t)
177182
baseParams = tools.BaseAPIParams(proxyURL)
@@ -208,7 +213,7 @@ func testETLObjectCloud(t *testing.T, bck cmn.Bck, etlName string, onlyLong, cac
208213

209214
bf := bytes.NewBuffer(nil)
210215
tlog.Logf("Use ETL[%s] to read transformed object\n", etlName)
211-
err = api.ETLObject(baseParams, etlName, bck, objName, bf)
216+
_, err = api.ETLObject(baseParams, &api.ETLObjArgs{ETLName: etlName, TransformArgs: args}, bck, objName, bf)
212217
tassert.CheckFatal(t, err)
213218
tassert.Errorf(t, bf.Len() == cos.KiB, "Expected %d bytes, got %d", cos.KiB, bf.Len())
214219
}
@@ -263,7 +268,7 @@ func TestETLObject(t *testing.T) {
263268
_ = tetl.InitSpec(t, baseParams, test.transformer, test.comm)
264269
t.Cleanup(func() { tetl.StopAndDeleteETL(t, baseParams, test.transformer) })
265270

266-
testETLObject(t, test.transformer, test.inPath, test.outPath, test.transform, test.filesEqual)
271+
testETLObject(t, test.transformer, "", test.inPath, test.outPath, test.transform, test.filesEqual)
267272
})
268273
}
269274
}
@@ -295,7 +300,7 @@ func TestETLObjectCloud(t *testing.T) {
295300

296301
for _, conf := range configs {
297302
t.Run(fmt.Sprintf("cached=%t", conf.cached), func(t *testing.T) {
298-
testETLObjectCloud(t, cliBck, tetl.Echo, conf.onlyLong, conf.cached)
303+
testETLObjectCloud(t, cliBck, tetl.Echo, "", conf.onlyLong, conf.cached)
299304
})
300305
}
301306
})
@@ -396,6 +401,48 @@ func TestETLInlineMD5SingleObj(t *testing.T) {
396401
got[:min(len(got), 16)])
397402
}
398403

404+
func TestETLInlineObjWithArgs(t *testing.T) {
405+
tools.CheckSkip(t, &tools.SkipTestArgs{RequiredDeployment: tools.ClusterTypeK8s})
406+
tetl.CheckNoRunningETLContainers(t, baseParams)
407+
408+
var (
409+
proxyURL = tools.RandomProxyURL(t)
410+
baseParams = tools.BaseAPIParams(proxyURL)
411+
transformer = tetl.HashWithMetadata
412+
413+
tests = []struct {
414+
name string
415+
commType string
416+
onlyLong bool
417+
}{
418+
{name: "etl-args-hpush", commType: etl.Hpush},
419+
{name: "etl-args-hpull", commType: etl.Hpull},
420+
{name: "etl-args-hrev", commType: etl.Hrev},
421+
}
422+
)
423+
424+
for _, test := range tests {
425+
t.Run(test.name, func(t *testing.T) {
426+
t.Cleanup(func() { tetl.StopAndDeleteETL(t, baseParams, transformer) })
427+
_ = tetl.InitSpec(t, baseParams, transformer, test.commType)
428+
429+
var seed = rand.Uint64N(1000)
430+
431+
testETLObject(t, transformer, seed, "", "",
432+
func(r io.Reader) io.Reader {
433+
// Read the object and calculate the hash using the same hash algorithm as the ETL (same random seed).
434+
// The results should match because the hash is calculated in the same way.
435+
data, _ := io.ReadAll(r)
436+
hash := xxhash.Checksum64S(data, seed)
437+
hashHex := fmt.Sprintf("%016x", hash)
438+
439+
return bytes.NewReader([]byte(hashHex))
440+
}, tools.FilesEqual,
441+
)
442+
})
443+
}
444+
}
445+
399446
func TestETLAnyToAnyBucket(t *testing.T) {
400447
tools.CheckSkip(t, &tools.SkipTestArgs{RequiredDeployment: tools.ClusterTypeK8s})
401448
tetl.CheckNoRunningETLContainers(t, baseParams)
@@ -648,7 +695,7 @@ def transform(input_bytes: bytes) -> bytes:
648695
case "etl_object":
649696
t.Cleanup(func() { tetl.StopAndDeleteETL(t, baseParams, test.etlName) })
650697

651-
testETLObject(t, test.etlName, "", "", func(r io.Reader) io.Reader {
698+
testETLObject(t, test.etlName, "", "", "", func(r io.Reader) io.Reader {
652699
return r // TODO: Write function to transform input to md5.
653700
}, func(_, _ string) (bool, error) {
654701
return true, nil // TODO: Write function to compare output from md5.

ais/tgtetl.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,13 @@ func (t *target) stopETL(w http.ResponseWriter, r *http.Request, etlName string)
150150
}
151151
}
152152

153-
func (t *target) getFromETL(w http.ResponseWriter, r *http.Request, etlName, etlMeta string, lom *core.LOM) {
154-
comm, err := etl.GetCommunicator(etlName)
153+
func (t *target) getFromETL(w http.ResponseWriter, r *http.Request, dpq *dpq, lom *core.LOM) {
154+
var (
155+
name = dpq.etl.name // apc.QparamETLName
156+
targs = dpq.etl.targs // apc.QparamETLTransformArgs
157+
meta = dpq.etl.meta // apc.QparamETLMeta, DEPRECATED - Replace with QparamETLTransformArgs soon
158+
)
159+
comm, err := etl.GetCommunicator(name)
155160
if err != nil {
156161
if cos.IsErrNotFound(err) {
157162
smap := t.owner.smap.Get()
@@ -164,8 +169,8 @@ func (t *target) getFromETL(w http.ResponseWriter, r *http.Request, etlName, etl
164169
return
165170
}
166171

167-
if err := comm.InlineTransform(w, r, lom, etlMeta); err != nil {
168-
errV := cmn.NewErrETL(&cmn.ETLErrCtx{ETLName: etlName, ETLMeta: etlMeta, PodName: comm.PodName(), SvcName: comm.SvcName()},
172+
if err := comm.InlineTransform(w, r, lom, targs, meta); err != nil {
173+
errV := cmn.NewErrETL(&cmn.ETLErrCtx{ETLName: name, ETLTransformArgs: targs, PodName: comm.PodName(), SvcName: comm.SvcName()},
169174
err.Error())
170175
xetl := comm.Xact()
171176
xetl.AddErr(errV)

api/apc/query.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ const (
1414
QparamJobID = "jobid" // job
1515

1616
// etl
17-
QparamETLName = "etl_name"
18-
QparamETLMeta = "etl_meta"
17+
QparamETLName = "etl_name"
18+
QparamETLMeta = "etl_meta" // TODO: DEPRECATED - Replace with QparamETLTransformArgs soon.
19+
QparamETLTransformArgs = "etl_args"
1920

2021
QparamRegex = "regex" // dsort: list regex
2122
QparamOnlyActive = "only_active" // dsort: list only active

api/etl.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,18 @@ import (
1717
"github.com/NVIDIA/aistore/ext/etl"
1818
)
1919

20+
type ETLObjArgs struct {
21+
// ETLName specifies the running ETL instance to be used in inline transform.
22+
ETLName string
23+
24+
// TransformArgs holds the arguments to be used in ETL inline transform,
25+
// which will be sent as `apc.QparamETLArgs` query parameter in the request.
26+
TransformArgs any
27+
28+
// DEPRECATED - Replace with TransformArgs soon.
29+
Metadata any
30+
}
31+
2032
// Initiate custom ETL workload by executing one of the documented `etl.InitMsg`
2133
// message types.
2234
// The API call results in deploying multiple ETL containers (K8s pods):
@@ -145,13 +157,25 @@ func etlPostAction(bp BaseParams, etlName, action string) (err error) {
145157
return
146158
}
147159

148-
// TODO: add ETL-specific query param and change the examples/docs (!4455)
149-
func ETLObject(bp BaseParams, etlName string, bck cmn.Bck, objName string, w io.Writer) (err error) {
150-
_, err = GetObject(bp, bck, objName, &GetArgs{
151-
Writer: w,
152-
Query: url.Values{apc.QparamETLName: []string{etlName}},
153-
})
154-
return
160+
func ETLObject(bp BaseParams, args *ETLObjArgs, bck cmn.Bck, objName string, w io.Writer) (oah ObjAttrs, err error) {
161+
query := url.Values{apc.QparamETLName: []string{args.ETLName}}
162+
if args.TransformArgs != nil {
163+
targs, err := cos.ConvertToString(args.TransformArgs)
164+
if err != nil {
165+
return oah, err
166+
}
167+
query.Add(apc.QparamETLTransformArgs, targs)
168+
}
169+
170+
if args.Metadata != nil {
171+
meta, err := cos.ConvertToString(args.Metadata)
172+
if err != nil {
173+
return oah, err
174+
}
175+
query.Add(apc.QparamETLMeta, meta)
176+
}
177+
178+
return GetObject(bp, bck, objName, &GetArgs{Writer: w, Query: query})
155179
}
156180

157181
// NOTE: for ETLBucket(), see api/bucket

cmn/err.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,11 @@ type (
189189
ETLErrCtx
190190
}
191191
ETLErrCtx struct {
192-
TID string
193-
ETLName string
194-
ETLMeta string
195-
PodName string
196-
SvcName string
192+
TID string
193+
ETLName string
194+
ETLTransformArgs string
195+
PodName string
196+
SvcName string
197197
}
198198
ErrWarning struct {
199199
what string

ext/etl/comm_internal_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ var _ = Describe("CommunicatorTest", func() {
4646
dataSize = int64(50 * cos.MiB)
4747
transformData = make([]byte, dataSize)
4848

49-
bck = cmn.Bck{Name: "commBck", Provider: apc.AIS, Ns: cmn.NsGlobal}
50-
objName = "commObj"
51-
etlMeta = "{\"from_time\":2.43,\"to_time\":3.43}"
52-
clusterBck = meta.NewBck(
49+
bck = cmn.Bck{Name: "commBck", Provider: apc.AIS, Ns: cmn.NsGlobal}
50+
objName = "commObj"
51+
etlTransformArgs = "{\"from_time\":2.43,\"to_time\":3.43}"
52+
clusterBck = meta.NewBck(
5353
bck.Name, bck.Provider, bck.Ns,
5454
&cmn.Bprops{Cksum: cmn.CksumConf{Type: cos.ChecksumXXHash}},
5555
)
@@ -88,15 +88,15 @@ var _ = Describe("CommunicatorTest", func() {
8888

8989
// Initialize the HTTP servers.
9090
transformerServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
91-
receivedEtlMeta := r.URL.Query().Get(apc.QparamETLMeta)
92-
Expect(receivedEtlMeta).To(Equal(etlMeta))
91+
receivedEtlTransformArgs := r.URL.Query().Get(apc.QparamETLTransformArgs)
92+
Expect(receivedEtlTransformArgs).To(Equal(etlTransformArgs))
9393

9494
_, err := w.Write(transformData)
9595
Expect(err).NotTo(HaveOccurred())
9696
}))
9797
targetServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
98-
receivedEtlMeta := r.URL.Query().Get(apc.QparamETLMeta)
99-
err := comm.InlineTransform(w, r, lom, receivedEtlMeta)
98+
receivedEtlTransformArgs := r.URL.Query().Get(apc.QparamETLTransformArgs)
99+
err := comm.InlineTransform(w, r, lom, receivedEtlTransformArgs, "")
100100
Expect(err).NotTo(HaveOccurred())
101101
}))
102102
proxyServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -140,7 +140,7 @@ var _ = Describe("CommunicatorTest", func() {
140140
comm = newCommunicator(nil, boot)
141141

142142
q := url.Values{}
143-
q.Add(apc.QparamETLMeta, etlMeta)
143+
q.Add(apc.QparamETLTransformArgs, etlTransformArgs)
144144
resp, err := http.Get(proxyServer.URL + "?" + q.Encode())
145145
Expect(err).NotTo(HaveOccurred())
146146
defer resp.Body.Close()

0 commit comments

Comments
 (0)