Skip to content

[Bug]: Some jobs fail when submitted too many at once #2790

@peterschmidt85

Description

@peterschmidt85

Steps to reproduce

  1. Set up dstack server on c6g.large (connected to Postgres running on the same instance)
  2. Run the following script
from logging import exception
import os
import sys
import requests
import random
import string
import time
from rich.console import Console
from rich.live import Live
from rich.table import Table

number_of_tests=100

url = "http://dev-dstack-server.tail7da59.ts.net:3000"
project = "dstack_testing"
token = os.environ["DSTACK_TOKEN"]
runs = []

def id_generator(size=6, chars=string.ascii_lowercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))

def run_status(run_name: string):
    try:
        resp = requests.post(
            url=f"{url}/api/project/{project}/runs/get",
            headers={"Authorization": f"Bearer {token}"},
            json={"run_name": run_name}, 
            timeout=60
        )
        resp.raise_for_status()
    except requests.exceptions.Timeout:
        exception("Request Timeout")
    except requests.exceptions.RequestException as e:
        exception(f"An error occurred: {e}")

    return resp.json()["status"], resp.json()["latest_job_submission"]["status"], resp.json()["last_processed_at"]

def abort_runs(run_names: list):
    try:
        resp = requests.post(
            url=f"{url}/api/project/{project}/runs/stop",
            headers={"Authorization": f"Bearer {token}"},
            json={
                    "runs_names": runs,
                    "abort": "true"
                    },
            timeout=5
        )
        resp.raise_for_status()
    except requests.exceptions.Timeout:
        return("Request Timeout")
    except requests.exceptions.RequestException as e:
        return(f"An error occurred: {e}")
    return resp


def generate_table(run_names: list) -> Table:
    table = Table ("Run Name", "Status", "Message", "Last Processed")

    for run_name in run_names:
        try:
            status, message, last_processed = run_status(run_name)
        except:
            pass
        table.add_row(run_name, status, message, last_processed)
    return table

rand_id = id_generator()

for i in range(number_of_tests):
    run_name = "stresstest-%s-%s" % (rand_id, i)
    print("Submitting task %s" % (run_name))
    resp = requests.post(
        url=f"{url}/api/project/{project}/runs/apply",
        headers={"Authorization": f"Bearer {token}"},
        json={
            "plan":{
                "run_spec": {
                    "run_name": run_name,
                    "configuration": {
                        "type": "task",
                        "image": "ghcr.io/colinianking/stress-ng:latest",
                        "max_price": "0.10",
                        "spot_policy": "auto",
                        "retry": {
                            "on_events": [
                                "no-capacity"
                                ],
                            "duration": "10m"
                            },
                        "backends": [
                            "aws"
                        ],
                        "regions": [
                            "us-west-2"
                        ],
                        "commands": [
                            "/usr/bin/stress-ng --fork 1 --cpu-load 70 --timeout 5m --verbose"
                        ],
                    },
                    "ssh_key_pub": "dummy",
                }
            },
            "force": False
        }
    )
    runs.append(resp.json()["run_spec"]["run_name"])

console = Console()

with Live(console=console, auto_refresh=False) as live:
    try:
        print("Running Live table")
        while True:
            live.update(generate_table(runs), refresh=True)
            time.sleep(2)
    except KeyboardInterrupt:
        print("\nProgram interrupted by user. Attempting to abort runs.")
        result = abort_runs(runs)
        print(result)
        # Perform cleanup actions here (e.g., closing files, releasing resources)
        print("Exiting gracefully...")
        sys.exit(0) # Exit the program with a success code
    except Exception as e:
        print(f"An error occurred: {e.with_traceback()}")
        result = abort_runs(runs)
        print(result)

Actual behaviour

  1. I started with 20 jobs and it seemed OK, then I tried 40 and ran into the errors.

Expected behaviour

  1. No jobs fail regardless of the number.

dstack version

0.19.12

Server logs

Additional information

Server logs: dstack-server-2-logs.txt

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions