diff --git a/av/error.pyi b/av/error.pyi index 527d8c00d..088f78d45 100644 --- a/av/error.pyi +++ b/av/error.pyi @@ -11,6 +11,9 @@ def make_error( filename: str | None = None, log: tuple[int, tuple[int, str, str] | None] | None = None, ) -> None: ... +def err_check(res: int, filename: str | None = None) -> int: ... + +BUFFER_TOO_SMALL: EnumItem class ErrorType(EnumItem): BSF_NOT_FOUND: int diff --git a/av/frame.pyi b/av/frame.pyi index 6e5348119..7c34e91bf 100644 --- a/av/frame.pyi +++ b/av/frame.pyi @@ -1,4 +1,10 @@ from fractions import Fraction +from typing import TypedDict + +from av.sidedata.motionvectors import MotionVectors + +class SideData(TypedDict, total=False): + MOTION_VECTORS: MotionVectors class Frame: dts: int | None @@ -6,7 +12,7 @@ class Frame: time: float | None time_base: Fraction is_corrupt: bool - side_data: dict[str, str] + side_data: SideData opaque: object def make_writable(self) -> None: ... diff --git a/av/video/frame.pyi b/av/video/frame.pyi index de84faaa0..29f961ba4 100644 --- a/av/video/frame.pyi +++ b/av/video/frame.pyi @@ -46,8 +46,8 @@ class VideoFrame(Frame): width: int | None = None, height: int | None = None, format: str | None = None, - src_colorspace: int | None = None, - dst_colorspace: int | None = None, + src_colorspace: str | int | None = None, + dst_colorspace: str | int | None = None, interpolation: int | str | None = None, src_color_range: int | str | None = None, dst_color_range: int | str | None = None, diff --git a/tests/test_codec_context.py b/tests/test_codec_context.py index fa56a5e4f..6425c4dd1 100644 --- a/tests/test_codec_context.py +++ b/tests/test_codec_context.py @@ -169,7 +169,7 @@ def test_bits_per_coded_sample(self): stream.bits_per_coded_sample = 31 with pytest.raises(av.error.InvalidDataError): - for frame in container.decode(stream): + for _ in container.decode(stream): pass with av.open(self.sandboxed("output.mov"), "w") as output: @@ -396,7 +396,7 @@ def video_encoding( assert i == gop_size final_gop_size = decoded_frame_count - max(keyframe_indices) - self.assertLessEqual(final_gop_size, gop_size) + assert final_gop_size < gop_size def test_encoding_pcm_s24le(self) -> None: self.audio_encoding("pcm_s24le") diff --git a/tests/test_decode.py b/tests/test_decode.py index 7622693b5..20abdf840 100644 --- a/tests/test_decode.py +++ b/tests/test_decode.py @@ -41,7 +41,7 @@ def test_decode_audio_sample_count(self) -> None: audio_stream = next(s for s in container.streams if s.type == "audio") assert audio_stream is container.streams.audio[0] - assert isinstance(audio_stream, av.audio.AudioStream) + assert isinstance(audio_stream, av.AudioStream) sample_count = 0 diff --git a/tests/test_encode.py b/tests/test_encode.py index 617231409..89d2e3eb3 100644 --- a/tests/test_encode.py +++ b/tests/test_encode.py @@ -1,9 +1,9 @@ import io import math from fractions import Fraction -from unittest import SkipTest import numpy as np +import pytest import av from av import AudioFrame, VideoFrame @@ -19,7 +19,7 @@ def write_rgb_rotate(output: av.container.OutputContainer) -> None: if not has_pillow: - raise SkipTest("Don't have Pillow") + pytest.skip() import PIL.Image as Image @@ -233,8 +233,8 @@ def test_stream_index(self) -> None: astream = output.add_stream("mp2", 48000) assert astream in output.streams.audio - astream.layout = "stereo" # type: ignore - astream.format = "s16" # type: ignore + astream.layout = "stereo" + astream.format = "s16" assert vstream.index == 0 assert astream.index == 1 @@ -385,4 +385,4 @@ def test_max_b_frames(self) -> None: for max_b_frames in range(4): file = encode_file_with_max_b_frames(max_b_frames) actual_max_b_frames = max_b_frame_run_in_file(file) - self.assertTrue(actual_max_b_frames <= max_b_frames) + assert actual_max_b_frames <= max_b_frames diff --git a/tests/test_errors.py b/tests/test_errors.py index 5f7440402..1cad5d086 100644 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -53,11 +53,11 @@ def test_filenotfound(): assert False, "No exception raised!" -def test_buffertoosmall(): +def test_buffertoosmall() -> None: """Throw an exception from an enum.""" try: av.error.err_check(-av.error.BUFFER_TOO_SMALL.value) - except av.BufferTooSmallError as e: + except av.error.BufferTooSmallError as e: assert e.errno == av.error.BUFFER_TOO_SMALL.value else: assert False, "No exception raised!" diff --git a/tests/test_file_probing.py b/tests/test_file_probing.py index 87d462264..f71391697 100644 --- a/tests/test_file_probing.py +++ b/tests/test_file_probing.py @@ -9,7 +9,7 @@ class TestAudioProbe(TestCase): def setUp(self): self.file = av.open(fate_suite("aac/latm_stereo_to_51.ts")) - def test_container_probing(self): + def test_container_probing(self) -> None: assert self.file.bit_rate == 269558 assert self.file.duration == 6165333 assert str(self.file.format) == "" @@ -20,9 +20,10 @@ def test_container_probing(self): assert self.file.start_time == 1400000 assert len(self.file.streams) == 1 - def test_stream_probing(self): + def test_stream_probing(self) -> None: stream = self.file.streams[0] + assert isinstance(stream, av.AudioStream) assert str(stream).startswith( " None: # write an empty file path = self.sandboxed("empty.flac") with open(path, "wb"): diff --git a/tests/test_filters.py b/tests/test_filters.py index e7bbeca1a..a87d6b5b0 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -126,7 +126,7 @@ def test_audio_buffer_sink(self): if e.errno != errno.EAGAIN: raise - def test_audio_buffer_resample(self): + def test_audio_buffer_resample(self) -> None: graph = Graph() graph.link_nodes( graph.add_abuffer( @@ -147,6 +147,7 @@ def test_audio_buffer_resample(self): ) ) out_frame = graph.pull() + assert isinstance(out_frame, av.AudioFrame) assert out_frame.format.name == "s16" assert out_frame.layout.name == "stereo" assert out_frame.sample_rate == 44100 @@ -202,9 +203,7 @@ def test_audio_buffer_volume_filter(self): input_data = input_frame.to_ndarray() output_data = out_frame.to_ndarray() - self.assertTrue( - np.allclose(input_data * 0.5, output_data), "Check that volume is reduced" - ) + assert np.allclose(input_data * 0.5, output_data) def test_video_buffer(self): input_container = av.open(format="lavfi", file="color=c=pink:duration=1:r=30") diff --git a/tests/test_logging.py b/tests/test_logging.py index 8e863791c..c8c705b1c 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -5,80 +5,81 @@ import av.error import av.logging -from .common import TestCase - def do_log(message: str) -> None: av.logging.log(av.logging.INFO, "test", message) -class TestLogging(TestCase): - def test_adapt_level(self): - assert av.logging.adapt_level(av.logging.ERROR) == logging.ERROR - assert av.logging.adapt_level(av.logging.WARNING) == logging.WARNING - assert ( - av.logging.adapt_level((av.logging.WARNING + av.logging.ERROR) // 2) - == logging.WARNING - ) - - def test_threaded_captures(self): - av.logging.set_level(av.logging.VERBOSE) - - with av.logging.Capture(local=True) as logs: - do_log("main") - thread = threading.Thread(target=do_log, args=("thread",)) - thread.start() - thread.join() - - assert (av.logging.INFO, "test", "main") in logs - av.logging.set_level(None) - - def test_global_captures(self): - av.logging.set_level(av.logging.VERBOSE) - - with av.logging.Capture(local=False) as logs: - do_log("main") - thread = threading.Thread(target=do_log, args=("thread",)) - thread.start() - thread.join() - - assert (av.logging.INFO, "test", "main") in logs - assert (av.logging.INFO, "test", "thread") in logs - av.logging.set_level(None) - - def test_repeats(self): - av.logging.set_level(av.logging.VERBOSE) - - with av.logging.Capture() as logs: - do_log("foo") - do_log("foo") - do_log("bar") - do_log("bar") - do_log("bar") - do_log("baz") - - logs = [log for log in logs if log[1] == "test"] - - assert logs == [ - (av.logging.INFO, "test", "foo"), - (av.logging.INFO, "test", "foo"), - (av.logging.INFO, "test", "bar"), - (av.logging.INFO, "test", "bar (repeated 2 more times)"), - (av.logging.INFO, "test", "baz"), - ] - - av.logging.set_level(None) - - def test_error(self): - av.logging.set_level(av.logging.VERBOSE) - - log = (av.logging.ERROR, "test", "This is a test.") - av.logging.log(*log) - try: - av.error.err_check(-errno.EPERM) - except OSError as e: - assert e.log == log - else: - self.fail() - - av.logging.set_level(None) +def test_adapt_level() -> None: + assert av.logging.adapt_level(av.logging.ERROR) == logging.ERROR + assert av.logging.adapt_level(av.logging.WARNING) == logging.WARNING + assert ( + av.logging.adapt_level((av.logging.WARNING + av.logging.ERROR) // 2) + == logging.WARNING + ) + + +def test_threaded_captures() -> None: + av.logging.set_level(av.logging.VERBOSE) + + with av.logging.Capture(local=True) as logs: + do_log("main") + thread = threading.Thread(target=do_log, args=("thread",)) + thread.start() + thread.join() + + assert (av.logging.INFO, "test", "main") in logs + av.logging.set_level(None) + + +def test_global_captures() -> None: + av.logging.set_level(av.logging.VERBOSE) + + with av.logging.Capture(local=False) as logs: + do_log("main") + thread = threading.Thread(target=do_log, args=("thread",)) + thread.start() + thread.join() + + assert (av.logging.INFO, "test", "main") in logs + assert (av.logging.INFO, "test", "thread") in logs + av.logging.set_level(None) + + +def test_repeats() -> None: + av.logging.set_level(av.logging.VERBOSE) + + with av.logging.Capture() as logs: + do_log("foo") + do_log("foo") + do_log("bar") + do_log("bar") + do_log("bar") + do_log("baz") + + logs = [log for log in logs if log[1] == "test"] + + assert logs == [ + (av.logging.INFO, "test", "foo"), + (av.logging.INFO, "test", "foo"), + (av.logging.INFO, "test", "bar"), + (av.logging.INFO, "test", "bar (repeated 2 more times)"), + (av.logging.INFO, "test", "baz"), + ] + + av.logging.set_level(None) + + +def test_error() -> None: + av.logging.set_level(av.logging.VERBOSE) + + log = (av.logging.ERROR, "test", "This is a test.") + av.logging.log(*log) + try: + av.error.err_check(-errno.EPERM) + except av.error.PermissionError as e: + assert e.log == log + else: + assert False + + av.logging.set_level(None) diff --git a/tests/test_python_io.py b/tests/test_python_io.py index 3cb4b7720..d657404d9 100644 --- a/tests/test_python_io.py +++ b/tests/test_python_io.py @@ -355,13 +355,13 @@ def test_writing_to_pipe_readonly(self) -> None: ) as cm: write(buf) - def test_writing_to_pipe_writeonly(self): + def test_writing_to_pipe_writeonly(self) -> None: av.logging.set_level(av.logging.VERBOSE) buf = WriteOnlyPipe() with pytest.raises( ValueError, match=escape("[mp4] muxer does not support non seekable output") - ) as cm: + ): write(buf) av.logging.set_level(None) diff --git a/tests/test_videoframe.py b/tests/test_videoframe.py index 6b856be86..8dcdb838a 100644 --- a/tests/test_videoframe.py +++ b/tests/test_videoframe.py @@ -170,17 +170,17 @@ def test_to_image_rgb24(self): assert img.tobytes() == expected -def test_basic_to_ndarray(): +def test_basic_to_ndarray() -> None: array = VideoFrame(640, 480, "rgb24").to_ndarray() assert array.shape == (480, 640, 3) -def test_to_image_with_dimensions(): +def test_to_image_with_dimensions() -> None: img = VideoFrame(640, 480, format="rgb24").to_image(width=320, height=240) assert img.size == (320, 240) -def test_ndarray_gray(): +def test_ndarray_gray() -> None: array = numpy.random.randint(0, 256, size=(480, 640), dtype=numpy.uint8) for format in ("gray", "gray8"): frame = VideoFrame.from_ndarray(array, format=format) @@ -189,7 +189,7 @@ def test_ndarray_gray(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_ndarray_gray_align(): +def test_ndarray_gray_align() -> None: array = numpy.random.randint(0, 256, size=(238, 318), dtype=numpy.uint8) for format in ("gray", "gray8"): frame = VideoFrame.from_ndarray(array, format=format) @@ -198,7 +198,7 @@ def test_ndarray_gray_align(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_ndarray_rgb(): +def test_ndarray_rgb() -> None: array = numpy.random.randint(0, 256, size=(480, 640, 3), dtype=numpy.uint8) for format in ("rgb24", "bgr24"): frame = VideoFrame.from_ndarray(array, format=format) @@ -207,7 +207,7 @@ def test_ndarray_rgb(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_ndarray_rgb_align(): +def test_ndarray_rgb_align() -> None: array = numpy.random.randint(0, 256, size=(238, 318, 3), dtype=numpy.uint8) for format in ("rgb24", "bgr24"): frame = VideoFrame.from_ndarray(array, format=format) @@ -216,7 +216,7 @@ def test_ndarray_rgb_align(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_ndarray_rgba(): +def test_ndarray_rgba() -> None: array = numpy.random.randint(0, 256, size=(480, 640, 4), dtype=numpy.uint8) for format in ("argb", "rgba", "abgr", "bgra"): frame = VideoFrame.from_ndarray(array, format=format) @@ -225,7 +225,7 @@ def test_ndarray_rgba(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_ndarray_rgba_align(): +def test_ndarray_rgba_align() -> None: array = numpy.random.randint(0, 256, size=(238, 318, 4), dtype=numpy.uint8) for format in ("argb", "rgba", "abgr", "bgra"): frame = VideoFrame.from_ndarray(array, format=format) @@ -234,7 +234,7 @@ def test_ndarray_rgba_align(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_ndarray_gbrp(): +def test_ndarray_gbrp() -> None: array = numpy.random.randint(0, 256, size=(480, 640, 3), dtype=numpy.uint8) frame = VideoFrame.from_ndarray(array, format="gbrp") assert frame.width == 640 and frame.height == 480 @@ -242,7 +242,7 @@ def test_ndarray_gbrp(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_ndarray_gbrp_align(): +def test_ndarray_gbrp_align() -> None: array = numpy.random.randint(0, 256, size=(238, 318, 3), dtype=numpy.uint8) frame = VideoFrame.from_ndarray(array, format="gbrp") assert frame.width == 318 and frame.height == 238 @@ -250,7 +250,7 @@ def test_ndarray_gbrp_align(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_ndarray_gbrp10(): +def test_ndarray_gbrp10() -> None: array = numpy.random.randint(0, 1024, size=(480, 640, 3), dtype=numpy.uint16) for format in ("gbrp10be", "gbrp10le"): frame = VideoFrame.from_ndarray(array, format=format) @@ -259,7 +259,7 @@ def test_ndarray_gbrp10(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_ndarray_gbrp10_align(): +def test_ndarray_gbrp10_align() -> None: array = numpy.random.randint(0, 1024, size=(238, 318, 3), dtype=numpy.uint16) for format in ("gbrp10be", "gbrp10le"): frame = VideoFrame.from_ndarray(array, format=format) @@ -268,7 +268,7 @@ def test_ndarray_gbrp10_align(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_ndarray_gbrp12(): +def test_ndarray_gbrp12() -> None: array = numpy.random.randint(0, 4096, size=(480, 640, 3), dtype=numpy.uint16) for format in ("gbrp12be", "gbrp12le"): frame = VideoFrame.from_ndarray(array, format=format) @@ -277,7 +277,7 @@ def test_ndarray_gbrp12(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_ndarray_gbrp12_align(): +def test_ndarray_gbrp12_align() -> None: array = numpy.random.randint(0, 4096, size=(238, 318, 3), dtype=numpy.uint16) for format in ("gbrp12be", "gbrp12le"): frame = VideoFrame.from_ndarray(array, format=format) @@ -407,7 +407,7 @@ def test_ndarray_gray16be() -> None: assertPixelValue16(frame.planes[0], array[0][0], "big") -def test_ndarray_gray16le(): +def test_ndarray_gray16le() -> None: array = numpy.random.randint(0, 65536, size=(480, 640), dtype=numpy.uint16) frame = VideoFrame.from_ndarray(array, format="gray16le") assert frame.width == 640 and frame.height == 480 @@ -418,7 +418,7 @@ def test_ndarray_gray16le(): assertPixelValue16(frame.planes[0], array[0][0], "little") -def test_ndarray_rgb48be(): +def test_ndarray_rgb48be() -> None: array = numpy.random.randint(0, 65536, size=(480, 640, 3), dtype=numpy.uint16) frame = VideoFrame.from_ndarray(array, format="rgb48be") assert frame.width == 640 and frame.height == 480 @@ -561,7 +561,7 @@ def test_shares_memory_bgr8() -> None: assertNdarraysEqual(frame.to_ndarray(), array) -def test_shares_memory_rgb24(): +def test_shares_memory_rgb24() -> None: array = numpy.random.randint(0, 256, size=(357, 318, 3), dtype=numpy.uint8) frame = VideoFrame.from_numpy_buffer(array, "rgb24") assertNdarraysEqual(frame.to_ndarray(), array) @@ -572,7 +572,7 @@ def test_shares_memory_rgb24(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_shares_memory_yuv420p(): +def test_shares_memory_yuv420p() -> None: array = numpy.random.randint(0, 256, size=(512 * 6 // 4, 256), dtype=numpy.uint8) frame = VideoFrame.from_numpy_buffer(array, "yuv420p") assertNdarraysEqual(frame.to_ndarray(), array) @@ -583,7 +583,7 @@ def test_shares_memory_yuv420p(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_shares_memory_yuvj420p(): +def test_shares_memory_yuvj420p() -> None: array = numpy.random.randint(0, 256, size=(512 * 6 // 4, 256), dtype=numpy.uint8) frame = VideoFrame.from_numpy_buffer(array, "yuvj420p") assertNdarraysEqual(frame.to_ndarray(), array) @@ -594,7 +594,7 @@ def test_shares_memory_yuvj420p(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_shares_memory_nv12(): +def test_shares_memory_nv12() -> None: array = numpy.random.randint(0, 256, size=(512 * 6 // 4, 256), dtype=numpy.uint8) frame = VideoFrame.from_numpy_buffer(array, "nv12") assertNdarraysEqual(frame.to_ndarray(), array) @@ -605,7 +605,7 @@ def test_shares_memory_nv12(): assertNdarraysEqual(frame.to_ndarray(), array) -def test_shares_memory_bgr24(): +def test_shares_memory_bgr24() -> None: array = numpy.random.randint(0, 256, size=(357, 318, 3), dtype=numpy.uint8) frame = VideoFrame.from_numpy_buffer(array, "bgr24") assertNdarraysEqual(frame.to_ndarray(), array) @@ -624,13 +624,13 @@ def test_reformat_pts() -> None: assert frame.pts == 123 and frame.time_base == 456 -def test_reformat_identity(): +def test_reformat_identity() -> None: frame1 = VideoFrame(640, 480, "rgb24") frame2 = frame1.reformat(640, 480, "rgb24") assert frame1 is frame2 -def test_reformat_colorspace(): +def test_reformat_colorspace() -> None: # This is allowed. frame = VideoFrame(640, 480, "rgb24") frame.reformat(src_colorspace=None, dst_colorspace="smpte240") @@ -640,7 +640,7 @@ def test_reformat_colorspace(): frame.reformat(src_colorspace=None, dst_colorspace="smpte240") -def test_reformat_pixel_format_align(): +def test_reformat_pixel_format_align() -> None: height = 480 for width in range(2, 258, 2): frame_yuv = VideoFrame(width, height, "yuv420p")