-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Checklist
- I added a descriptive title
- I searched for other speed issues and didn't find a duplicate
What happened?
This is a continuation of an email thread with @LtDan33 and @pzwang
I wrote a hacky little experiment that pre-populates the pkgs
cache using asyncio to download packages listed in a conda-lock lockfile. I found it to be much faster than the default conda implementation. I suspect it would be faster than using multiple threads too.
Mostly looking to optimize start time for dask clusters, I can create the environment and wrap it up as a docker image, but it ends up being slower than doing this.
In benchmarking I found this to be as fast or faster than mamba's implementation. Definitely much faster when mamba falls back to what I think is conda's single threaded implementation!
My very hacky use of a processpool executor to farm out validation/extraction of the package tarballs probably needs some extra thought, it was quite fast though!
import asyncio
import hashlib
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from pathlib import Path
from typing import Dict
from urllib.parse import urlparse
import aiofiles
import aiohttp
import yaml
from conda_package_handling import api
cheap = yaml.load(Path("./conda-lock.yml").open(), Loader=yaml.SafeLoader)
root = Path("~/miniconda3/pkgs").expanduser()
root.mkdir(exist_ok=True)
def extract(sha256: str, file: str, target: str):
hasher = hashlib.new("sha256")
with open(file, "rb") as fh:
for chunk in iter(partial(fh.read, 8192), b""):
hasher.update(chunk)
assert hasher.hexdigest() == sha256
print("Extracting", file, target)
api.extract(file, target)
async def main(packages: Dict):
async with aiohttp.ClientSession() as session:
tasks = []
urls = []
for package in packages:
if package["platform"] == "linux-aarch64" and package["manager"] == "conda":
url = package["url"]
parsed = urlparse(url)
pth = Path(parsed.path)
pth.name
tasks.append(
download_url(
sha256=package["hash"]["sha256"],
session=session,
fp=root / pth.name,
url=package["url"],
)
)
urls.append(url)
await asyncio.gather(*tasks)
async with aiofiles.open(root / Path("urls.txt").expanduser(), mode="w") as f:
for url in urls:
await f.write(url + "\n")
async with aiofiles.open(root / Path("urls").expanduser(), mode="w") as f:
await f.write("\n")
async def download_url(session: aiohttp.ClientSession, sha256: str, fp: Path, url: str):
print("Downloading", url)
async with session.get(url) as resp:
async with aiofiles.open(fp, mode="wb") as f:
async for chunk in resp.content.iter_chunked(10000):
await f.write(chunk)
loop = asyncio.get_running_loop()
target_dir = (
str(fp).removesuffix(".tar.bz2").removesuffix(".tar").removesuffix(".conda")
)
await loop.run_in_executor(p, extract, sha256, str(fp), target_dir)
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=10) as p:
asyncio.run(main(cheap["package"]))
Debug
na
Conda info
na
Conda config
na
Conda list
na
Additional Context
No response
Metadata
Metadata
Assignees
Labels
Type
Projects
Status