diff --git a/reactivex/__init__.py b/reactivex/__init__.py index a707f978..704185cb 100644 --- a/reactivex/__init__.py +++ b/reactivex/__init__.py @@ -1153,6 +1153,9 @@ def timer( [ timer(2) ] --0-| + [ timer(2, 4) ] + --0----1----2-- + Examples: >>> res = reactivex.timer(datetime(...)) >>> res = reactivex.timer(datetime(...), 0.1) diff --git a/reactivex/observable/timer.py b/reactivex/observable/timer.py index 3bde5f40..9865ba41 100644 --- a/reactivex/observable/timer.py +++ b/reactivex/observable/timer.py @@ -35,14 +35,14 @@ def subscribe( observer: abc.ObserverBase[int], scheduler_: Optional[abc.SchedulerBase] = None ) -> abc.DisposableBase: _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() - nonlocal duetime + due_time = duetime - if not isinstance(duetime, datetime): - duetime = _scheduler.now + _scheduler.to_timedelta(duetime) + if not isinstance(due_time, datetime): + due_time = _scheduler.now + _scheduler.to_timedelta(due_time) p = max(0.0, _scheduler.to_seconds(period)) mad = MultipleAssignmentDisposable() - dt = duetime + dt = due_time count = 0 def action(scheduler: abc.SchedulerBase, state: Any) -> None: @@ -107,7 +107,7 @@ def action(count: Optional[int] = None) -> Optional[int]: return None if not isinstance(_scheduler, PeriodicScheduler): - raise ValueError("Sceduler must be PeriodicScheduler") + raise ValueError("Scheduler must be PeriodicScheduler") return _scheduler.schedule_periodic(period, action, state=0) return Observable(subscribe) diff --git a/tests/test_observable/test_timer.py b/tests/test_observable/test_timer.py index 7e0a9971..98af4d85 100644 --- a/tests/test_observable/test_timer.py +++ b/tests/test_observable/test_timer.py @@ -1,6 +1,7 @@ import unittest import reactivex +from reactivex import operators from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -126,3 +127,92 @@ def create(): results = scheduler.start(create) assert results.messages == [on_next(500, 0), on_next(800, 1)] + + def test_periodic_timer_repeat(self): + scheduler = TestScheduler() + t = reactivex.timer(duetime=130, period=200, scheduler=scheduler) + + def create(): + return t.pipe(operators.take(3), operators.repeat()) + + results = scheduler.start(create) + assert results.messages == [ + on_next(330, 0), + on_next(530, 1), + on_next(730, 2), + on_next(860, 0), + ] + + def test_periodic_timer_repeat_with_absolute_datetime(self): + scheduler = TestScheduler() + t = reactivex.timer( + duetime=scheduler.to_datetime(360), period=200, scheduler=scheduler + ) # here we have an absolute first value, so on second subscription, the timer should emit immediately + + def create(): + return t.pipe(operators.take(3), operators.repeat()) + + results = scheduler.start(create) + assert results.messages == [ + on_next(360, 0), + on_next(560, 1), + on_next(760, 2), + on_next( + 760, 0 + ), # our duetime is absolute and in the past so new sub emits immediately + on_next(960, 1), + ] + + def test_periodic_timer_repeat_with_relative_timespan(self): + scheduler = TestScheduler() + t = reactivex.timer( + duetime=scheduler.to_timedelta(130), + period=scheduler.to_timedelta(250), + scheduler=scheduler, + ) + + def create(): + return t.pipe(operators.take(3), operators.repeat()) + + results = scheduler.start(create) + assert results.messages == [ + on_next(330, 0), + on_next(580, 1), + on_next(830, 2), + on_next(960, 0), + ] + + def test_periodic_timer_second_subscription(self): + scheduler = TestScheduler() + t = reactivex.timer(duetime=200, period=300, scheduler=scheduler) + + def create(): + return reactivex.merge( + t.pipe(operators.map(lambda x: (x, "first"))), + reactivex.concat(reactivex.timer(100, scheduler=scheduler), t).pipe( + operators.map(lambda x: (x, "second")) + ), + ) + + results = scheduler.start(create) + assert results.messages == [ + on_next(300, (0, "second")), + on_next(400, (0, "first")), + on_next(500, (0, "second")), + on_next(700, (1, "first")), + on_next(800, (1, "second")), + ] + + def test_one_off_timer_repeat(self): + scheduler = TestScheduler() + t = reactivex.timer(duetime=230, scheduler=scheduler) + + def create(): + return t.pipe(operators.repeat()) + + results = scheduler.start(create) + assert results.messages == [ + on_next(430, 0), + on_next(660, 0), + on_next(890, 0), + ]