Skip to content

Commit 726da0d

Browse files
committed
ml: Lhotse multi-batch integration
- multi-batch generation with template-based output naming - archive path extraction from TAR/ZIP files - (new) integration (scripted) test - test cases and validation - backward compatible with existing single-batch workflow - with minor refactoring Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent f6b07d0 commit 726da0d

File tree

9 files changed

+284
-34
lines changed

9 files changed

+284
-34
lines changed

ais/test/scripted_cli_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,3 +345,14 @@ func TestRateLimitBackendUsingScript(t *testing.T) {
345345
}
346346
tassert.CheckFatal(t, err)
347347
}
348+
349+
// as in: `ais ml lhotse-get-batch`
350+
func TestLhotseManifestBatch(t *testing.T) {
351+
cmd := exec.Command("./scripts/lhotse_test_suite.sh")
352+
353+
out, err := cmd.CombinedOutput()
354+
if len(out) > 0 {
355+
tlog.Logln(string(out))
356+
}
357+
tassert.CheckFatal(t, err)
358+
}

ais/test/scripts/lhotse_test_suite.sh

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
#!/bin/bash
2+
3+
## Prerequisites: #################################################################################
4+
# - aistore cluster
5+
# - ais (CLI)
6+
#
7+
## Example:
8+
## ./lhotse_test_suite.sh --bucket ais://lhotse-test --cuts 247 --batch-size 50
9+
#################################################################################################
10+
11+
if ! [ -x "$(command -v ais)" ]; then
12+
echo "Error: ais (CLI) not installed" >&2
13+
exit 1
14+
fi
15+
16+
## Command line options and respective defaults
17+
bucket="ais://lhotse-test"
18+
num_cuts=247
19+
batch_size=50
20+
num_shards=50
21+
22+
while (( "$#" )); do
23+
case "${1}" in
24+
--bucket) bucket=$2; shift; shift;;
25+
--cuts) num_cuts=$2; shift; shift;;
26+
--batch-size) batch_size=$2; shift; shift;;
27+
--shards) num_shards=$2; shift; shift;;
28+
--help|-h)
29+
echo "Usage: $0 [options]"
30+
echo "Options:"
31+
echo " --bucket BUCKET Test bucket (default: ais://lhotse-test)"
32+
echo " --cuts NUM Number of cuts to generate (default: 247)"
33+
echo " --batch-size NUM Cuts per batch (default: 50)"
34+
echo " --shards NUM Number of audio shards to create (default: 50)"
35+
echo " --help, -h Show this help"
36+
exit 0;;
37+
*) echo "fatal: unknown argument '${1}'. Use --help for usage."; exit 1;;
38+
esac
39+
done
40+
41+
## uncomment for verbose output
42+
## set -x
43+
44+
TEST_DIR="/tmp/lhotse_test_output_$$"
45+
MANIFEST="$TEST_DIR/test_manifest.jsonl"
46+
47+
## Generate synthetic Lhotse cuts manifest
48+
gen_test_manifest() {
49+
local num_cuts=$1
50+
local output=$2
51+
52+
echo "Generating $num_cuts cuts in $output..."
53+
54+
rm -f "$output"
55+
56+
for i in $(seq 1 $num_cuts); do
57+
# Pick random shard (001 to num_shards)
58+
local shard_num=$(printf "%03d" $(( (RANDOM % num_shards) + 1 )))
59+
60+
# Pick random file within shard (01-05, matching template)
61+
local file_num=$(printf "%02d" $(( (RANDOM % 5) + 1 )))
62+
63+
# Generate random timing (ensure leading zero for valid JSON)
64+
local start=$(printf "%.3f" $(echo "scale=3; $RANDOM / 32767 * 10" | bc))
65+
local duration=$(printf "%.3f" $(echo "scale=3; $RANDOM / 32767 * 5 + 1" | bc))
66+
67+
# Create cut JSON (modern layout with sources array)
68+
cat >> "$output" << EOF
69+
{"id": "cut_${i}", "start": ${start}, "duration": ${duration}, "recording": {"sources": [{"source": "${bucket}/audio-${shard_num}.tar/audio-file-${file_num}.wav"}]}}
70+
EOF
71+
done
72+
73+
echo "Generated $num_cuts cuts referencing audio-001.tar through audio-$(printf "%03d" $num_shards).tar"
74+
echo "Each shard contains: audio-file-01.wav through audio-file-05.wav"
75+
}
76+
77+
echo "Lhotse Multi-Batch Test Suite"
78+
echo "================================="
79+
echo "Bucket: $bucket"
80+
echo "Cuts: $num_cuts"
81+
echo "Batch size: $batch_size"
82+
echo "Shards: $num_shards"
83+
echo
84+
85+
# Setup test environment
86+
echo "Setting up test environment..."
87+
rm -rf "$TEST_DIR"
88+
mkdir -p "$TEST_DIR" || { echo "Error: failed to create test directory"; exit 1; }
89+
cd "$TEST_DIR"
90+
91+
cleanup() {
92+
rc=$?
93+
echo "Cleaning up..."
94+
cd ..
95+
rm -rf "$TEST_DIR" 2>/dev/null
96+
ais rmb "$bucket" -y 1>/dev/null 2>&1
97+
exit $rc
98+
}
99+
100+
trap cleanup EXIT INT TERM
101+
102+
# Generate test shards
103+
echo "Generating test audio shards..."
104+
ais archive gen-shards "${bucket}/audio-{001..$(printf "%03d" $num_shards)}.tar" \
105+
--fext '.wav' --fcount 5 --fsize 512KB \
106+
--output-template "audio-file-{01..05}.wav" \
107+
--cleanup || { echo "Error: failed to generate shards"; exit 1; }
108+
109+
# Verify shards were created
110+
shard_count=$(ais ls "$bucket" -H --name-only | wc -l)
111+
echo "Created $shard_count shards"
112+
113+
# Generate test manifest
114+
gen_test_manifest $num_cuts "$MANIFEST"
115+
gzip "$MANIFEST" || { echo "Error: failed to compress manifest"; exit 1; }
116+
117+
# Verify manifest
118+
echo "Manifest info:"
119+
echo " Size: $(ls -lh ${MANIFEST}.gz | awk '{print $5}')"
120+
echo " Sample cut:"
121+
if command -v jq >/dev/null; then
122+
zcat "${MANIFEST}.gz" | head -1 | jq . || { echo "Error: invalid JSON in manifest"; exit 1; }
123+
else
124+
zcat "${MANIFEST}.gz" | head -1
125+
fi
126+
127+
echo
128+
129+
# Test 1: Single batch mode (backward compatibility)
130+
echo "Test 1: Single batch mode..."
131+
ais ml lhotse-get-batch --cuts "${MANIFEST}.gz" single_output.tar || { echo "Error: single batch test failed"; exit 1; }
132+
echo "Single batch: $(ls -lh single_output.tar | awk '{print $5}')"
133+
134+
# Test 2: Multi-batch mode
135+
echo "Test 2: Multi-batch mode..."
136+
expected_batches=$(( (num_cuts + batch_size - 1) / batch_size )) # ceiling division
137+
ais ml lhotse-get-batch --cuts "${MANIFEST}.gz" \
138+
--batch-size $batch_size \
139+
--output-template "batch-{001..$(printf "%03d" $expected_batches)}.tar" || { echo "Error: multi-batch test failed"; exit 1; }
140+
141+
# Verify outputs
142+
batch_count=$(ls batch-*.tar 2>/dev/null | wc -l)
143+
echo "Generated $batch_count batch files (expected: $expected_batches):"
144+
ls -lh batch-*.tar | head -3
145+
if [ $batch_count -gt 3 ]; then
146+
echo " ..."
147+
ls -lh batch-*.tar | tail -1
148+
fi
149+
150+
# Verify batch count matches expectation
151+
[ "$batch_count" -eq "$expected_batches" ] || { echo "Error: batch count mismatch"; exit 1; }
152+
153+
# Test 3: TBD: Sample rate conversion
154+
echo "Test 3: TBD: Sample rate conversion..."
155+
ais ml lhotse-get-batch --cuts "${MANIFEST}.gz" \
156+
--batch-size 25 \
157+
--output-template "audio-{01..20}.tar"
158+
## NOTE: range read not supported yet
159+
## --sample-rate 16000 || { echo "Error: sample rate test failed"; exit 1; }
160+
161+
audio_count=$(ls audio-*.tar 2>/dev/null | wc -l)
162+
echo "Generated $audio_count audio batches with sample rate conversion"
163+
164+
# Test 4: Template exhaustion
165+
echo "Test 4: Template exhaustion..."
166+
if ais ml lhotse-get-batch --cuts "${MANIFEST}.gz" \
167+
--batch-size 100 \
168+
--output-template "small-{01..02}.tar" 2>&1 | grep -q "template exhausted"; then
169+
echo "Template exhaustion handled correctly"
170+
else
171+
echo "Error: template exhaustion not detected"
172+
exit 1
173+
fi
174+
175+
# Test 5: Streaming mode
176+
echo "Test 5: Streaming mode..."
177+
ais ml lhotse-get-batch --cuts "${MANIFEST}.gz" \
178+
--batch-size 30 \
179+
--output-template "stream-{001..010}.tar" \
180+
--streaming || { echo "Error: streaming test failed"; exit 1; }
181+
182+
stream_count=$(ls stream-*.tar 2>/dev/null | wc -l)
183+
echo "Generated $stream_count streaming batches"
184+
185+
# Test 6: Verify batch contents
186+
echo "Test 6: Verifying batch contents..."
187+
first_batch=$(ls batch-001.tar 2>/dev/null || ls audio-01.tar 2>/dev/null || echo "")
188+
if [[ -n "$first_batch" ]]; then
189+
echo "Contents of $(basename $first_batch):"
190+
ais ls "$first_batch" --archive | head -5 || { echo "Error: failed to list archive contents"; exit 1; }
191+
echo "Archive contents verified"
192+
else
193+
echo "Error: no batch files found for content verification"
194+
fi
195+
196+
# Test 7: Performance test with larger manifest
197+
echo "Test 7: Performance test..."
198+
large_cuts=$((num_cuts * 4))
199+
gen_test_manifest $large_cuts "large_manifest.jsonl"
200+
echo "Running performance test with $large_cuts cuts..."
201+
time ais ml lhotse-get-batch --cuts "large_manifest.jsonl" \
202+
--batch-size 100 \
203+
--output-template "perf-{001..100}.tar" >/dev/null || { echo "Error: performance test failed"; exit 1; }
204+
205+
perf_count=$(ls perf-*.tar 2>/dev/null | wc -l)
206+
echo "Performance test completed: $perf_count batches generated"
207+
208+
echo
209+
echo "All tests passed:"
210+
echo " - Single batch files: $(ls single_output.tar 2>/dev/null | wc -l)"
211+
echo " - Multi-batch files: $(ls batch-*.tar audio-*.tar stream-*.tar perf-*.tar 2>/dev/null | wc -l)"
212+
echo " - Total output size: $(du -sh . | awk '{print $1}')"
213+
echo " - Test directory: $TEST_DIR"
214+
echo " - Test bucket: $bucket (will be cleaned up)"

cmd/cli/cli/const.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,9 @@ const (
303303

304304
bucketObjectOrTemplateMultiArg = "BUCKET[/OBJECT_NAME_or_TEMPLATE] [BUCKET[/OBJECT_NAME_or_TEMPLATE] ...]"
305305

306-
getBatchSpecArgument = "[BUCKET[/NAME_or_TEMPLATE] ...] DST_ARCHIVE --spec [JSON_SPECIFICATION|YAML_SPECIFICATION]"
306+
// Lhotse: DST_ARCHIVE is optional when using --output-template (multi-batch mode)
307+
getBatchSpecArgument = "[BUCKET[/NAME_or_TEMPLATE] ...] DST_ARCHIVE --spec [JSON_SPECIFICATION|YAML_SPECIFICATION]"
308+
getBatchLhotseSpecArgument = "[BUCKET[/NAME_or_TEMPLATE] ...] [DST_ARCHIVE] --spec [JSON_SPECIFICATION|YAML_SPECIFICATION]"
307309

308310
bucketEmbeddedPrefixArg = "[BUCKET[/PREFIX]]"
309311

cmd/cli/cli/get.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func getMultiObj(c *cli.Context, bck cmn.Bck, outFile string, lsarch, extract bo
178178
)
179179
if flagIsSet(c, listArchFlag) && prefix != "" {
180180
// when prefix crosses shard boundary
181-
if external, internal := splitPrefixShardBoundary(prefix); internal != "" {
181+
if external, internal := splitArchivePath(prefix); internal != "" {
182182
prefix = external
183183
debug.Assert(prefix != origPrefix)
184184
lstFilter._add(func(obj *cmn.LsoEnt) bool {
@@ -393,7 +393,7 @@ func getObject(c *cli.Context, bck cmn.Bck, objName, outFile string, a qparamArc
393393
return errors.New("cannot extract and discard archived files - " + NIY)
394394
}
395395
if flagIsSet(c, listArchFlag) && a.archpath == "" {
396-
if external, internal := splitPrefixShardBoundary(objName); internal != "" {
396+
if external, internal := splitArchivePath(objName); internal != "" {
397397
objName, a.archpath = external, internal
398398
}
399399
}

cmd/cli/cli/lhotse.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,18 +174,23 @@ func (cut *lhotseCut) toMossIn(c *cli.Context) (in *apc.MossIn, err error) {
174174
if err != nil {
175175
return
176176
}
177+
if oname, archpath := splitArchivePath(in.ObjName); archpath != "" {
178+
in.ObjName = oname
179+
in.ArchPath = archpath
180+
}
177181
in.Bucket, in.Provider = bck.Name, bck.Provider
178182

179183
if !flagIsSet(c, sampleRateFlag) {
180184
return
181185
}
186+
182187
rate := int64(parseIntFlag(c, sampleRateFlag))
183188
if err := validateSampleRate(rate); err != nil {
184189
return nil, err
185190
}
186191
in.Start = int64(cut.Start) * rate
187192
in.Length = int64(cut.Duration) * rate
188-
return
193+
return in, nil
189194
}
190195

191196
// minor sanity check

cmd/cli/cli/ls.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func listObjects(c *cli.Context, bck cmn.Bck, prefix string, listArch, printEmpt
264264
}
265265

266266
// when prefix crosses shard boundary
267-
if external, internal := splitPrefixShardBoundary(prefix); internal != "" {
267+
if external, internal := splitArchivePath(prefix); internal != "" {
268268
origPrefix := prefix
269269
prefix = external
270270
lstFilter._add(func(obj *cmn.LsoEnt) bool { return strings.HasPrefix(obj.Name, origPrefix) })
@@ -747,7 +747,7 @@ func (o *lstFilter) apply(entries cmn.LsoEntries) (matching, rest cmn.LsoEntries
747747

748748
// prefix that crosses shard boundary, e.g.:
749749
// `ais ls bucket --prefix virt-subdir/A.tar.gz/dir-or-prefix-inside`
750-
func splitPrefixShardBoundary(prefix string) (external, internal string) {
750+
func splitArchivePath(prefix string) (external, internal string) {
751751
if prefix == "" {
752752
return
753753
}

cmd/cli/cli/make_alias.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ import (
3434
// [NOTE] for `makeAlias usage guidelines, please refer to [make_alias.md](https://github.com/NVIDIA/aistore/blob/main/cmd/cli/cli/make_alias.md)
3535

3636
type mkaliasOpts struct {
37-
newName string
38-
aliasFor string
39-
addFlags []cli.Flag
40-
delFlags []cli.Flag
41-
replace cos.StrKVs
42-
usage string
37+
newName string
38+
aliasFor string
39+
addFlags []cli.Flag
40+
delFlags []cli.Flag
41+
replace cos.StrKVs
42+
usage string
43+
argsUsage string
4344
}
4445

4546
func makeAlias(cmd *cli.Command, opts *mkaliasOpts) cli.Command {
@@ -88,6 +89,9 @@ func makeAlias(cmd *cli.Command, opts *mkaliasOpts) cli.Command {
8889
} else if opts.replace != nil {
8990
_updAliasedHelp(&aliasCmd, opts.replace)
9091
}
92+
if opts.argsUsage != "" {
93+
aliasCmd.ArgsUsage = opts.argsUsage
94+
}
9195

9296
return aliasCmd
9397
}

0 commit comments

Comments
 (0)