Skip to content

Commit 2f18344

Browse files
committed
cli: refactor get-batch CLI
* split request parsing in two parts: - parse/validate CLI flags and arguments - populate `apc.MossReq` from native spec or Lhotse manifest * output file: - clarify and maybe warn when Lhotse multi-batch - enforce for single-batch * with refactoring Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent c72af15 commit 2f18344

File tree

2 files changed

+130
-122
lines changed

2 files changed

+130
-122
lines changed

cmd/cli/cli/lhotse.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ func parseLhotseSrc(uri string) (bck cmn.Bck, objName string, err error) {
230230
// generate multiple batches in a streaming fashion
231231
//
232232

233-
func lhotseMultiBatch(c *cli.Context, outCtx *mossReqParseCtx) error {
233+
func lhotseMultiBatch(c *cli.Context, outCtx *mossReqCtx) error {
234234
inCtx, err := openLhotseReader(c)
235235
if err != nil {
236236
return err
@@ -310,7 +310,7 @@ func lhotseMultiBatch(c *cli.Context, outCtx *mossReqParseCtx) error {
310310
return nil
311311
}
312312

313-
func genLhotseBatch(c *cli.Context, outCtx *mossReqParseCtx, batch []apc.MossIn, outputFile string) error {
313+
func genLhotseBatch(c *cli.Context, outCtx *mossReqCtx, batch []apc.MossIn, outputFile string) error {
314314
// 1) create MossReq for this batch
315315
req := apc.MossReq{
316316
In: batch,

cmd/cli/cli/ml.go

Lines changed: 128 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -91,22 +91,78 @@ var (
9191
}
9292
)
9393

94-
type (
95-
mossReqParseCtx struct {
96-
req apc.MossReq
97-
outFile string
98-
bck cmn.Bck
99-
pt *cos.ParsedTemplate // valid when batchSize > 0
100-
batchSize int
94+
type mossReqCtx struct {
95+
// get-batch API
96+
req apc.MossReq
97+
bck cmn.Bck
98+
99+
// output file
100+
outFile string
101+
102+
// additional/optional command line input
103+
names []string
104+
shift int
105+
106+
// Lhotse-specific
107+
pt *cos.ParsedTemplate
108+
batchSize int
109+
}
110+
111+
func getBatchHandler(c *cli.Context) error {
112+
ctx, err := parseMlArgs(c)
113+
if err != nil {
114+
return err
115+
}
116+
if err := ctx.load(c); err != nil {
117+
return err
118+
}
119+
120+
if ctx.batchSize > 0 {
121+
debug.Assert(flagIsSet(c, lhotseManifestFlag), "native (non-lhotse) batching not implemented yet")
122+
return lhotseMultiBatch(c, ctx)
123+
}
124+
outFile, bck := ctx.outFile, ctx.bck
125+
126+
// output
127+
w, wfh, err := createDstFile(c, outFile, false /*allow stdout*/)
128+
if err != nil {
129+
if err == errUserCancel {
130+
return nil
131+
}
132+
return err
133+
}
134+
135+
// do
136+
req := &ctx.req
137+
resp, err := api.GetBatch(apiBP, bck, req, w)
138+
139+
if wfh != nil {
140+
wfh.Close()
141+
}
142+
if err == nil {
143+
debug.Assert(req.StreamingGet || len(resp.Out) == len(req.In))
144+
145+
if !flagIsSet(c, nonverboseFlag) {
146+
var msg string
147+
if req.StreamingGet {
148+
msg = fmt.Sprintf("Streamed %d objects to %s", len(req.In), outFile)
149+
} else {
150+
msg = fmt.Sprintf("Created ML batch archive %s with %d objects", outFile, len(resp.Out))
151+
}
152+
actionDone(c, msg)
153+
}
154+
return nil
101155
}
102-
)
103156

104-
// TODO -- FIXME: refactor - split in parts
105-
func buildMossReq(c *cli.Context) (*mossReqParseCtx, error) {
106-
var (
107-
req apc.MossReq
108-
shift int
109-
)
157+
// cleanup
158+
if wfh != nil {
159+
cos.RemoveFile(outFile)
160+
}
161+
return err
162+
}
163+
164+
// parse CLI arguments and setup context
165+
func parseMlArgs(c *cli.Context) (ctx *mossReqCtx, _ error) {
110166
if err := errMutuallyExclusive(c, listFlag, templateFlag, specFlag); err != nil {
111167
return nil, err
112168
}
@@ -118,9 +174,6 @@ func buildMossReq(c *cli.Context) (*mossReqParseCtx, error) {
118174
}
119175

120176
// bucket [+list|template]
121-
var (
122-
names []string
123-
)
124177
bck, objNameOrTmpl, err := parseBckObjURI(c, c.Args().Get(0), true /*emptyObjnameOK*/)
125178
if flagIsSet(c, listFlag) && err != nil {
126179
return nil, fmt.Errorf("option %s requires bucket in the command line [err: %v]", qflprn(listFlag), err)
@@ -129,8 +182,9 @@ func buildMossReq(c *cli.Context) (*mossReqParseCtx, error) {
129182
return nil, fmt.Errorf("option %s requires bucket in the command line [err: %v]", qflprn(templateFlag), err)
130183
}
131184

185+
ctx = &mossReqCtx{bck: bck}
132186
if err == nil {
133-
shift++
187+
ctx.shift++
134188
oltp, err := dopOLTP(c, bck, objNameOrTmpl)
135189
if err != nil {
136190
return nil, err
@@ -144,99 +198,104 @@ func buildMossReq(c *cli.Context) (*mossReqParseCtx, error) {
144198
if err := pt.CheckIsRange(); err != nil {
145199
return nil, err
146200
}
147-
if names, err = pt.Expand(); err != nil {
201+
if ctx.names, err = pt.Expand(); err != nil {
148202
return nil, err
149203
}
150204
case oltp.list == "":
151205
if objNameOrTmpl != "" {
152-
names = []string{objNameOrTmpl}
206+
ctx.names = []string{objNameOrTmpl}
153207
}
154208
default:
155-
names = splitCsv(oltp.list)
209+
ctx.names = splitCsv(oltp.list)
156210
}
157211
}
158212

159-
if len(names) == 0 && !flagIsSet(c, specFlag) && !flagIsSet(c, lhotseManifestFlag) {
213+
if len(ctx.names) == 0 && !flagIsSet(c, specFlag) && !flagIsSet(c, lhotseManifestFlag) {
160214
return nil, fmt.Errorf("with no (%s, %s) options expecting object names and/or archived filenames in the command line",
161215
qflprn(specFlag), qflprn(lhotseManifestFlag))
162216
}
163217

164-
// native spec
165-
if flagIsSet(c, specFlag) {
166-
specBytes, ext, err := loadSpec(c)
218+
// lhotse only: given (batchSizeFlag & outputTemplateFlag) outFile(s) are computed from the latter
219+
if c.NArg() > ctx.shift {
220+
// Note: may be unused in Lhotse multi-batch mode
221+
ctx.outFile = c.Args().Get(ctx.shift)
222+
outputFormat, err := archive.Strict("", cos.Ext(ctx.outFile))
167223
if err != nil {
168224
return nil, err
169225
}
170-
if err := parseSpec(ext, specBytes, &req); err != nil {
171-
return nil, err
172-
}
226+
ctx.req.OutputFormat = outputFormat
173227
}
174228

175-
var (
176-
outputFormat string
177-
outFile string
178-
)
179-
// given (batchSizeFlag and outputTemplateFlag)
180-
// outFile can be computed from the latter
181-
// (currently, lhotse only)
182-
if c.NArg() > shift {
183-
outFile = c.Args().Get(shift)
184-
outputFormat, err = archive.Strict("", cos.Ext(outFile))
229+
// there's no real way to check these assorted overrides; common expectation, though,
230+
// is for command line to take precedence
231+
232+
ctx.req.ContinueOnErr = flagIsSet(c, continueOnErrorFlag)
233+
ctx.req.StreamingGet = flagIsSet(c, streamingGetFlag)
234+
ctx.req.OnlyObjName = flagIsSet(c, omitSrcBucketNameFlag)
235+
236+
return ctx, nil
237+
}
238+
239+
// populate get-batch request from native (single-batch) spec or Lhotse manifest
240+
func (ctx *mossReqCtx) load(c *cli.Context) error {
241+
// native spec
242+
if flagIsSet(c, specFlag) {
243+
specBytes, ext, err := loadSpec(c)
185244
if err != nil {
186-
return nil, err
245+
return err
246+
}
247+
248+
outputFormat := ctx.req.OutputFormat
249+
if err := parseSpec(ext, specBytes, &ctx.req); err != nil {
250+
return err
187251
}
188-
if req.OutputFormat != "" && outputFormat != "" && req.OutputFormat != outputFormat {
252+
// warn
253+
if ctx.req.OutputFormat != "" && outputFormat != "" && ctx.req.OutputFormat != outputFormat {
189254
if !flagIsSet(c, nonverboseFlag) {
190255
warn := fmt.Sprintf("output format %s in the command line takes precedence (over %s specified %s)",
191-
outputFormat, qflprn(specFlag), req.OutputFormat)
256+
outputFormat, qflprn(specFlag), ctx.req.OutputFormat)
192257
actionWarn(c, warn)
193258
}
194259
}
195-
req.OutputFormat = outputFormat
196260
}
197261

198-
// NOTE: no real way to check these assorted overrides; common expectation, though,
199-
// is for command line to take precedence
200-
req.ContinueOnErr = flagIsSet(c, continueOnErrorFlag)
201-
req.StreamingGet = flagIsSet(c, streamingGetFlag)
202-
req.OnlyObjName = flagIsSet(c, omitSrcBucketNameFlag)
203-
204-
var (
205-
ctx = mossReqParseCtx{outFile: outFile, bck: bck}
206-
)
207262
// lhotse spec
208263
if flagIsSet(c, lhotseManifestFlag) {
264+
var err error
209265
ctx.batchSize, ctx.pt, err = parseLhotseBatchFlags(c)
210266
if err != nil {
211-
return nil, err
267+
return err
212268
}
213269
if ctx.batchSize > 0 {
214270
// note: early return to generate multiple apc.MossReq requests and batches
215-
return &ctx, nil
271+
if ctx.outFile != "" {
272+
warn := fmt.Sprintf("output file %s is ignored in multi-batch mode (batch size %d)", ctx.outFile, ctx.batchSize)
273+
actionWarn(c, warn)
274+
}
275+
return nil
216276
}
217-
218-
if outFile == "" {
219-
return nil, missingArgumentsError(c, c.Command.ArgsUsage)
277+
if ctx.outFile == "" {
278+
return errors.New("output file is required for single-batch mode")
220279
}
221280

222281
ins, err := loadAndParseLhotse(c)
223282
if err != nil {
224-
return nil, err
283+
return err
225284
}
226-
req.In = ins
227-
} else if outFile == "" {
228-
return nil, missingArgumentsError(c, c.Command.ArgsUsage)
285+
ctx.req.In = ins
286+
} else if ctx.outFile == "" {
287+
return missingArgumentsError(c, c.Command.ArgsUsage)
229288
}
230289

231-
if len(names) > 0 {
232-
if len(req.In) == 0 {
233-
req.In = make([]apc.MossIn, 0, len(names))
290+
if len(ctx.names) > 0 {
291+
if len(ctx.req.In) == 0 {
292+
ctx.req.In = make([]apc.MossIn, 0, len(ctx.names))
234293
} else {
235294
warn := fmt.Sprintf("adding %d command-line defined name%s to the %d spec-defined",
236-
len(names), cos.Plural(len(names)), len(req.In))
295+
len(ctx.names), cos.Plural(len(ctx.names)), len(ctx.req.In))
237296
actionWarn(c, warn)
238297
}
239-
for _, o := range names {
298+
for _, o := range ctx.names {
240299
in := apc.MossIn{
241300
ObjName: o,
242301
// no need to insert command-line bck -
@@ -246,64 +305,13 @@ func buildMossReq(c *cli.Context) (*mossReqParseCtx, error) {
246305
in.ObjName = oname
247306
in.ArchPath = archpath
248307
}
249-
req.In = append(req.In, in)
250-
}
251-
}
252-
253-
if len(req.In) == 0 {
254-
return nil, errors.New("empty get-batch request")
255-
}
256-
257-
ctx.req = req
258-
return &ctx, nil
259-
}
260-
261-
func getBatchHandler(c *cli.Context) error {
262-
ctx, err := buildMossReq(c)
263-
if err != nil {
264-
return err
265-
}
266-
267-
if ctx.batchSize > 0 {
268-
debug.Assert(flagIsSet(c, lhotseManifestFlag), "native (non-lhotse) batching not implemented yet")
269-
return lhotseMultiBatch(c, ctx)
270-
}
271-
outFile, bck := ctx.outFile, ctx.bck
272-
273-
// output
274-
w, wfh, err := createDstFile(c, outFile, false /*allow stdout*/)
275-
if err != nil {
276-
if err == errUserCancel {
277-
return nil
308+
ctx.req.In = append(ctx.req.In, in)
278309
}
279-
return err
280-
}
281-
282-
// do
283-
req := &ctx.req
284-
resp, err := api.GetBatch(apiBP, bck, req, w)
285-
286-
if wfh != nil {
287-
wfh.Close()
288310
}
289-
if err == nil {
290-
debug.Assert(req.StreamingGet || len(resp.Out) == len(req.In))
291311

292-
if !flagIsSet(c, nonverboseFlag) {
293-
var msg string
294-
if req.StreamingGet {
295-
msg = fmt.Sprintf("Streamed %d objects to %s", len(req.In), outFile)
296-
} else {
297-
msg = fmt.Sprintf("Created ML batch archive %s with %d objects", outFile, len(resp.Out))
298-
}
299-
actionDone(c, msg)
300-
}
301-
return nil
312+
if len(ctx.req.In) == 0 {
313+
return errors.New("empty get-batch request")
302314
}
303315

304-
// cleanup
305-
if wfh != nil {
306-
cos.RemoveFile(outFile)
307-
}
308-
return err
316+
return nil
309317
}

0 commit comments

Comments
 (0)