diff --git a/twitchdl/http.py b/twitchdl/http.py index 6b4ec96..f5e9d1b 100644 --- a/twitchdl/http.py +++ b/twitchdl/http.py @@ -84,7 +84,7 @@ async def download( async with client.stream("GET", source) as response: response.raise_for_status() size = int(response.headers.get("content-length")) - progress.start(task_id, size) + progress.start(task_id, size, source, target) async for chunk in response.aiter_bytes(chunk_size=CHUNK_SIZE): f.write(chunk) size = len(chunk) @@ -101,8 +101,9 @@ async def download_with_retries( target: Path, progress: Progress, token_bucket: TokenBucket, + skip_existing: bool, ): - if target.exists(): + if skip_existing and target.exists(): size = os.path.getsize(target) progress.already_downloaded(task_id, size) return @@ -110,7 +111,7 @@ async def download_with_retries( for n in range(RETRY_COUNT): try: return await download(client, task_id, source, target, progress, token_bucket) - except httpx.RequestError as ex: + except httpx.HTTPError as ex: logger.exception(f"Task {task_id} failed. Retrying. Maybe.") if n + 1 >= RETRY_COUNT: progress.failed(task_id, ex) @@ -136,7 +137,8 @@ async def download_all( source_targets: Iterable[Tuple[str, Path]], worker_count: int, *, - allow_failures: bool = False, + allow_failures: bool = True, + skip_existing: bool = True, rate_limit: Optional[int] = None, progress: Optional[Progress] = None, ) -> DownloadAllResult: @@ -157,7 +159,13 @@ async def worker(client: httpx.AsyncClient, worker_id: int): # print(f"worker {worker_id} starting item {item.task_id}") try: await download_with_retries( - client, item.task_id, item.url, item.target, progress, token_bucket + client, + item.task_id, + item.url, + item.target, + progress, + token_bucket, + skip_existing, ) # print(f"worker {worker_id} finished item {item.task_id}") except Exception: @@ -166,7 +174,7 @@ async def worker(client: httpx.AsyncClient, worker_id: int): # print("raising because allow_failures is False") raise finally: - print(f"worker {worker_id} task done {item.task_id}") + # print(f"worker {worker_id} task done {item.task_id}") queue.task_done() async with httpx.AsyncClient(timeout=TIMEOUT) as client: @@ -180,14 +188,11 @@ async def worker(client: httpx.AsyncClient, worker_id: int): # Task to monitor if all the items have been processed queue_complete = asyncio.create_task(producer(), name="Producer") - # return_when = asyncio.ALL_COMPLETED if allow_failures else asyncio.FIRST_COMPLETED - return_when = asyncio.FIRST_COMPLETED - # Wait for queue to deplete or tasks to finish, whichever comes first - await asyncio.wait([queue_complete, *worker_tasks], return_when=return_when) - # Since the worker tasks run an endless loop, they will only finish if # an exception is raised. + await asyncio.wait([queue_complete, *worker_tasks], return_when=asyncio.FIRST_COMPLETED) + success = queue_complete.done() # Cancel tasks and wait until they are cancelled diff --git a/twitchdl/progress.py b/twitchdl/progress.py index 182f641..a836ff6 100644 --- a/twitchdl/progress.py +++ b/twitchdl/progress.py @@ -60,7 +60,7 @@ def end(self, task_id: int): class BasicProgress(Progress): def start(self, task_id: int, size: int, source: str, target: Path): - print("start", task_id, size) + print("start", task_id, source) def already_downloaded(self, task_id: int, size: int): print("already_downloaded", task_id, size) @@ -72,7 +72,7 @@ def failed(self, task_id: int, ex: Exception): print("failed", task_id, ex) def end(self, task_id: int): - print("end", task_id) + print("downloaded", task_id) class VideoDownloadProgress(Progress):