Skip to content

Using asyncio/threads to download packages #11608

@shughes-uk

Description

@shughes-uk

Checklist

  • I added a descriptive title
  • I searched for other speed issues and didn't find a duplicate

What happened?

This is a continuation of an email thread with @LtDan33 and @pzwang

I wrote a hacky little experiment that pre-populates the pkgs cache using asyncio to download packages listed in a conda-lock lockfile. I found it to be much faster than the default conda implementation. I suspect it would be faster than using multiple threads too.

Mostly looking to optimize start time for dask clusters, I can create the environment and wrap it up as a docker image, but it ends up being slower than doing this.

In benchmarking I found this to be as fast or faster than mamba's implementation. Definitely much faster when mamba falls back to what I think is conda's single threaded implementation!

My very hacky use of a processpool executor to farm out validation/extraction of the package tarballs probably needs some extra thought, it was quite fast though!

import asyncio
import hashlib
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from pathlib import Path
from typing import Dict
from urllib.parse import urlparse

import aiofiles
import aiohttp
import yaml
from conda_package_handling import api

cheap = yaml.load(Path("./conda-lock.yml").open(), Loader=yaml.SafeLoader)
root = Path("~/miniconda3/pkgs").expanduser()
root.mkdir(exist_ok=True)


def extract(sha256: str, file: str, target: str):
    hasher = hashlib.new("sha256")
    with open(file, "rb") as fh:
        for chunk in iter(partial(fh.read, 8192), b""):
            hasher.update(chunk)
    assert hasher.hexdigest() == sha256
    print("Extracting", file, target)
    api.extract(file, target)


async def main(packages: Dict):
    async with aiohttp.ClientSession() as session:
        tasks = []
        urls = []
        for package in packages:
            if package["platform"] == "linux-aarch64" and package["manager"] == "conda":
                url = package["url"]
                parsed = urlparse(url)
                pth = Path(parsed.path)
                pth.name
                tasks.append(
                    download_url(
                        sha256=package["hash"]["sha256"],
                        session=session,
                        fp=root / pth.name,
                        url=package["url"],
                    )
                )
                urls.append(url)
        await asyncio.gather(*tasks)
        async with aiofiles.open(root / Path("urls.txt").expanduser(), mode="w") as f:
            for url in urls:
                await f.write(url + "\n")
        async with aiofiles.open(root / Path("urls").expanduser(), mode="w") as f:
            await f.write("\n")


async def download_url(session: aiohttp.ClientSession, sha256: str, fp: Path, url: str):
    print("Downloading", url)
    async with session.get(url) as resp:
        async with aiofiles.open(fp, mode="wb") as f:
            async for chunk in resp.content.iter_chunked(10000):
                await f.write(chunk)

    loop = asyncio.get_running_loop()
    target_dir = (
        str(fp).removesuffix(".tar.bz2").removesuffix(".tar").removesuffix(".conda")
    )
    await loop.run_in_executor(p, extract, sha256, str(fp), target_dir)


if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=10) as p:
        asyncio.run(main(cheap["package"]))

Debug

na

Conda info

na

Conda config

na

Conda list

na

Additional Context

No response

Metadata

Metadata

Assignees

Labels

in-progressissue is actively being worked onlocked[bot] locked due to inactivitysource::communitycatch-all for issues filed by community memberstag::performancerelated to degraded performancetype::featurerequest for a new feature or capability

Type

No type

Projects

Status

🏁 Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions