Honestly, I don't even *need* AsyncDownloader
This commit is contained in:
parent
66fa2a87f4
commit
21977244fd
59
make.py
59
make.py
|
@ -1,11 +1,10 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import json
|
import json
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional, Union
|
from typing import Union
|
||||||
import asyncio
|
import asyncio
|
||||||
from httpx import AsyncClient
|
from httpx import AsyncClient
|
||||||
from asyncio.queues import Queue
|
from rich.progress import Progress
|
||||||
from rich.progress import TaskID, Progress
|
|
||||||
from zipfile import ZipFile
|
from zipfile import ZipFile
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
|
|
||||||
|
@ -43,60 +42,6 @@ meta_url: str = (
|
||||||
meta_filename = INPUT_PATH / f"meta-{meta_url.split('/')[-1]}"
|
meta_filename = INPUT_PATH / f"meta-{meta_url.split('/')[-1]}"
|
||||||
|
|
||||||
|
|
||||||
class AsyncDownloader:
|
|
||||||
def __init__(
|
|
||||||
self, files: Optional[list[tuple[str, Path]]] = None, maxfiles: int = 8
|
|
||||||
):
|
|
||||||
self._queue: Queue[tuple[str, Path]] = Queue(maxfiles)
|
|
||||||
self._client = AsyncClient()
|
|
||||||
self._progress: Optional[Progress] = None
|
|
||||||
self._tasks: list[TaskID] = []
|
|
||||||
|
|
||||||
for url, path in files or []:
|
|
||||||
self._queue.put_nowait((url, path))
|
|
||||||
|
|
||||||
def add_file(self, url: str, path: Union[Path, str]):
|
|
||||||
self._queue.put_nowait((url, Path(path)))
|
|
||||||
|
|
||||||
async def __aenter__(self):
|
|
||||||
await self._client.__aenter__()
|
|
||||||
self._progress = Progress()
|
|
||||||
self._progress.__enter__()
|
|
||||||
for i in range(self._queue.maxsize):
|
|
||||||
self._tasks.append(
|
|
||||||
self._progress.add_task(f"Downloader {i + 1}", start=False)
|
|
||||||
)
|
|
||||||
return self
|
|
||||||
|
|
||||||
async def _worker(self, task: TaskID):
|
|
||||||
if not self._progress:
|
|
||||||
return
|
|
||||||
while not self._queue.empty():
|
|
||||||
url, path = await self._queue.get()
|
|
||||||
with path.open("wb") as fout:
|
|
||||||
async with self._client.stream("GET", url) as req:
|
|
||||||
size = int(req.headers.get("Content-Length", 0))
|
|
||||||
self._progress.start_task(task)
|
|
||||||
self._progress.update(
|
|
||||||
task,
|
|
||||||
total=size,
|
|
||||||
completed=0,
|
|
||||||
description=f"{path.name}",
|
|
||||||
)
|
|
||||||
async for chunk in req.aiter_bytes(8192):
|
|
||||||
self._progress.advance(task, fout.write(chunk))
|
|
||||||
self._progress.stop_task(task)
|
|
||||||
|
|
||||||
async def run(self):
|
|
||||||
await asyncio.gather(*[self._worker(task) for task in self._tasks])
|
|
||||||
|
|
||||||
async def __aexit__(self, a, b, c):
|
|
||||||
if self._progress is None:
|
|
||||||
raise ValueError("how did that happen?")
|
|
||||||
self._progress.__exit__(a, b, c)
|
|
||||||
await self._client.__aexit__()
|
|
||||||
|
|
||||||
|
|
||||||
async def download_if_missing(url: str, path: Union[Path, str]):
|
async def download_if_missing(url: str, path: Union[Path, str]):
|
||||||
path = Path(path)
|
path = Path(path)
|
||||||
if path.exists():
|
if path.exists():
|
||||||
|
|
Loading…
Reference in New Issue