Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ihabunek committed Sep 15, 2024
1 parent 3cf26b0 commit 4e36ce1
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
27 changes: 16 additions & 11 deletions twitchdl/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async def download(
async with client.stream("GET", source) as response:
response.raise_for_status()
size = int(response.headers.get("content-length"))
progress.start(task_id, size)
progress.start(task_id, size, source, target)
async for chunk in response.aiter_bytes(chunk_size=CHUNK_SIZE):
f.write(chunk)
size = len(chunk)
Expand All @@ -101,16 +101,17 @@ async def download_with_retries(
target: Path,
progress: Progress,
token_bucket: TokenBucket,
skip_existing: bool,
):
if target.exists():
if skip_existing and target.exists():
size = os.path.getsize(target)
progress.already_downloaded(task_id, size)
return

for n in range(RETRY_COUNT):
try:
return await download(client, task_id, source, target, progress, token_bucket)
except httpx.RequestError as ex:
except httpx.HTTPError as ex:
logger.exception(f"Task {task_id} failed. Retrying. Maybe.")
if n + 1 >= RETRY_COUNT:
progress.failed(task_id, ex)
Expand All @@ -136,7 +137,8 @@ async def download_all(
source_targets: Iterable[Tuple[str, Path]],
worker_count: int,
*,
allow_failures: bool = False,
allow_failures: bool = True,
skip_existing: bool = True,
rate_limit: Optional[int] = None,
progress: Optional[Progress] = None,
) -> DownloadAllResult:
Expand All @@ -157,7 +159,13 @@ async def worker(client: httpx.AsyncClient, worker_id: int):
# print(f"worker {worker_id} starting item {item.task_id}")
try:
await download_with_retries(
client, item.task_id, item.url, item.target, progress, token_bucket
client,
item.task_id,
item.url,
item.target,
progress,
token_bucket,
skip_existing,
)
# print(f"worker {worker_id} finished item {item.task_id}")
except Exception:
Expand All @@ -166,7 +174,7 @@ async def worker(client: httpx.AsyncClient, worker_id: int):
# print("raising because allow_failures is False")
raise
finally:
print(f"worker {worker_id} task done {item.task_id}")
# print(f"worker {worker_id} task done {item.task_id}")
queue.task_done()

async with httpx.AsyncClient(timeout=TIMEOUT) as client:
Expand All @@ -180,14 +188,11 @@ async def worker(client: httpx.AsyncClient, worker_id: int):
# Task to monitor if all the items have been processed
queue_complete = asyncio.create_task(producer(), name="Producer")

# return_when = asyncio.ALL_COMPLETED if allow_failures else asyncio.FIRST_COMPLETED
return_when = asyncio.FIRST_COMPLETED

# Wait for queue to deplete or tasks to finish, whichever comes first
await asyncio.wait([queue_complete, *worker_tasks], return_when=return_when)

# Since the worker tasks run an endless loop, they will only finish if
# an exception is raised.
await asyncio.wait([queue_complete, *worker_tasks], return_when=asyncio.FIRST_COMPLETED)

success = queue_complete.done()

# Cancel tasks and wait until they are cancelled
Expand Down
4 changes: 2 additions & 2 deletions twitchdl/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def end(self, task_id: int):

class BasicProgress(Progress):
def start(self, task_id: int, size: int, source: str, target: Path):
print("start", task_id, size)
print("start", task_id, source)

def already_downloaded(self, task_id: int, size: int):
print("already_downloaded", task_id, size)
Expand All @@ -72,7 +72,7 @@ def failed(self, task_id: int, ex: Exception):
print("failed", task_id, ex)

def end(self, task_id: int):
print("end", task_id)
print("downloaded", task_id)


class VideoDownloadProgress(Progress):
Expand Down

0 comments on commit 4e36ce1

Please sign in to comment.