Skip to content

Commit

Permalink
Update imports
Browse files Browse the repository at this point in the history
  • Loading branch information
mthrok committed Nov 15, 2024
1 parent 3e09067 commit 687f9f6
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 19 deletions.
4 changes: 2 additions & 2 deletions docs/source/best_practice.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ separately.
buffer = spdl.io.convert_frames(frames)
return spdl.io.to_torch(buffer)
They can be combined in :py:class:`~spdl.dataloader.Pipeline`, which automatically
They can be combined in :py:class:`~spdl.pipeline.Pipeline`, which automatically
discards the items failed to process (for example due to invalid data), and
keep the batch size consistent by using other items successfully processed.

.. code-block::
from spdl.dataloader import PipelineBuilder
from spdl.pipeline import PipelineBuilder
pipeline = (
PipelineBuilder()
Expand Down
3 changes: 1 addition & 2 deletions docs/source/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ What if a function does not release GIL?
In case you need to use a function that takes long time to execute (e.g. network utilities)
but it does not release GIL, you can delegate the stage to sub-process.

:py:meth:`spdl.dataloader.PipelineBuilder.pipe` method takes an optional ``executor`` argument.
:py:meth:`spdl.pipeline.PipelineBuilder.pipe` method takes an optional ``executor`` argument.
The default behavior of the ``Pipeline`` is to use the thread pool shared among all stages.
You can pass an instance of :py:class:`concurrent.futures.ProcessPoolExecutor`,
and that stage will execute the function in the sub-process.
Expand Down Expand Up @@ -101,7 +101,6 @@ Which functions hold the GIL?
The following is the list of functions that we are aware that they hold the GIL. So it is advised to use them with ``ProcessPoolExecutor`` or avoid using them in SPDL.

* `np.load <https://github.com/numpy/numpy/blob/maintenance/2.1.x/numpy/lib/_npyio_impl.py#L312-L500>`_


Why Async IO?
-------------
Expand Down
2 changes: 1 addition & 1 deletion examples/image_dataloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class PerfResult:
def worker_entrypoint(args: list[str]) -> PerfResult:
"""Entrypoint for worker process. Load images to a GPU and measure its performance.
It builds a :py:class:`~spdl.dataloader.Pipeline` object using :py:func:`get_pipeline`
It builds a :py:class:`~spdl.pipeline.Pipeline` object using :py:func:`get_pipeline`
function and run it with :py:func:`benchmark` function.
"""
args = _parse_args(args)
Expand Down
4 changes: 2 additions & 2 deletions src/spdl/dataloader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import warnings
from typing import Any

import spdl.pipeline

from . import _dataloader

_mods = [
Expand All @@ -32,6 +30,8 @@ def __getattr__(name: str) -> Any:
return getattr(mod, name)

# For backward compatibility
import spdl.pipeline

if name in spdl.pipeline.__all__:
warnings.warn(
f"{name} has been moved to {spdl.pipeline.__name__}. "
Expand Down
2 changes: 1 addition & 1 deletion src/spdl/dataloader/_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class DataLoader(Generic[Source, Output]):
- :py:func:`spdl.io.demux_image`, :py:func:`spdl.io.decode_packets`: Decoding image.
- :py:func:`spdl.io.cpu_storage`: Allocate page-locked memory.
- :py:func:`spdl.io.convert_frames`: Merging the decoded frames into pre-allocated memory
without creating intermediate arrays.
without creating intermediate arrays.
- :py:func:`spdl.io.transfer_buffer`: Sending the data to GPU.
- :py:func:`spdl.io.to_torch`, :py:func:`spdl.io.to_numba`, :py:func:`spdl.io.to_jax`: Casting
the memroy buffer to array type.
Expand Down
12 changes: 6 additions & 6 deletions src/spdl/pipeline/_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ async def _sink(input_queue: AsyncQueue[T], output_queue: AsyncQueue[T]):
# TODO [Python 3.11]: Migrate to ExceptionGroup
class PipelineFailure(RuntimeError):
"""PipelineFailure()
Thrown by :py:class:`spdl.dataloader.Pipeline` when pipeline encounters an error.
Thrown by :py:class:`spdl.pipeline.Pipeline` when pipeline encounters an error.
"""

def __init__(self, errs):
Expand Down Expand Up @@ -505,9 +505,9 @@ async def disaggregate(items):


class PipelineBuilder:
"""Build :py:class:`~spdl.dataloader.Pipeline` object.
"""Build :py:class:`~spdl.pipeline.Pipeline` object.
See :py:class:`~spdl.dataloader.Pipeline` for details.
See :py:class:`~spdl.pipeline.Pipeline` for details.
"""

def __init__(self):
Expand Down Expand Up @@ -618,15 +618,15 @@ def pipe(
hooks: Hook objects to be attached to the stage. Hooks are intended for
collecting stats of the stage.
If ``None``, a default hook,
:py:class:`~spdl.dataloader.TaskStatsHook` is used.
:py:class:`~spdl.pipeline.TaskStatsHook` is used.
report_stats_interval:
The interval (in seconds) to log the stats of this stage when no
``hooks`` is provided. This argument is passed to
:py:class:`~spdl.dataloader.TaskStatsHook`.
:py:class:`~spdl.pipeline.TaskStatsHook`.
This argument is effective only when ``hooks`` are not provided.
If ``hooks`` is provided and stats report is needed,
the ``hooks`` argument should
include one of :py:class:`~spdl.dataloader.TaskStatsHook`
include one of :py:class:`~spdl.pipeline.TaskStatsHook`
instance with the desired interval.
output_order: If ``"completion"`` (default), the items are put to output queue
in the order their process is completed.
Expand Down
4 changes: 2 additions & 2 deletions src/spdl/pipeline/_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ class PipelineHook(ABC):
To add custom hook, subclass this class and override ``task_hook`` and
optionally ``stage_hook`` method, and pass an instance to methods such as
:py::meth:`spdl.dataloader.AsyncPipeline.pipe`.
:py:meth:`spdl.pipeline.Pipeline.pipe`.
.. tip::
When implementing a hook, you can decide how to react to task/stage failure, by
choosing the location of specific logics.
See :py:obj:`contextlib.contextmanager` for detail, and
:py:class:`spdl.dataloader.TaskStatsHook` for an example implementation.
:py:class:`spdl.pipeline.TaskStatsHook` for an example implementation.
.. code-block:: python
Expand Down
4 changes: 2 additions & 2 deletions src/spdl/pipeline/_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def __init__(
try:
from spdl.lib import _libspdl

_libspdl.log_api_usage("spdl.dataloader.Pipeline")
_libspdl.log_api_usage("spdl.pipeline.Pipeline")
except Exception:
pass # ignore if not supported.

Expand Down Expand Up @@ -431,7 +431,7 @@ def get_iterator(self, *, timeout: float | None = None) -> Iterator[T]:
return PipelineIterator(self, timeout)

def __iter__(self) -> Iterator[T]:
"""Call :py:meth:`~spdl.dataloader.Pipeline.get_iterator` without arguments."""
"""Call :py:meth:`~spdl.pipeline.Pipeline.get_iterator` without arguments."""
return self.get_iterator()


Expand Down
2 changes: 1 addition & 1 deletion tests/spdl_unittest/dataloader/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from multiprocessing import Process

import pytest
from spdl.dataloader import PipelineBuilder, PipelineHook, TaskStatsHook
from spdl.pipeline import PipelineBuilder, PipelineHook, TaskStatsHook
from spdl.pipeline._builder import _enqueue, _EOF, _pipe, _sink, _SKIP
from spdl.pipeline._hook import _periodic_dispatch

Expand Down

0 comments on commit 687f9f6

Please sign in to comment.