Skip to content

Commit

Permalink
Throttle refactor (#35)
Browse files Browse the repository at this point in the history
* refactor: simplify throttle

* fix: formatting
  • Loading branch information
reddavis authored Jun 26, 2022
1 parent aaf2e9f commit 657bfed
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 124 deletions.
157 changes: 40 additions & 117 deletions Asynchrone/Source/Sequences/ThrottleAsyncSequence.swift
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import Foundation


/// An async sequence that emits either the most-recent or first element emitted
/// by the base async sequence in a specified time interval.
///
/// ThrottleAsyncSequence selectively emits elements from a base async sequence during an
/// interval you specify. Other elements received within the throttling interval aren’t emitted.
///
/// ```swift
/// let stream = AsyncStream<Int> { continuation in
/// let sequence = AsyncStream<Int> { continuation in
/// continuation.yield(0)
/// try? await Task.sleep(nanoseconds: 100_000_000)
/// continuation.yield(1)
Expand All @@ -20,28 +19,23 @@ import Foundation
/// continuation.finish()
/// }
///
/// for element in try await self.stream.throttle(for: 0.05, latest: true) {
/// for element in try await sequence.throttle(for: 0.05, latest: true) {
/// print(element)
/// }
///
/// // Prints:
/// // 0
/// // 1
/// // 2
/// // 5
/// ```
public struct ThrottleAsyncSequence<T: AsyncSequence>: AsyncSequence {

/// The kind of elements streamed.
public typealias Element = T.Element

// MARK: ThrottleAsyncSequence (Private Properties)

// Private
private var base: T
private var stream: AsyncThrowingStream<T.Element, Error>
private var iterator: AsyncThrowingStream<T.Element, Error>.Iterator
private var continuation: AsyncThrowingStream<T.Element, Error>.Continuation
private var inner: ThrottleAsyncSequence.Inner<T>
private var interval: TimeInterval
private var latest: Bool

// MARK: Initialization

Expand All @@ -57,141 +51,71 @@ public struct ThrottleAsyncSequence<T: AsyncSequence>: AsyncSequence {
interval: TimeInterval,
latest: Bool
) {
var streamContinuation: AsyncThrowingStream<T.Element, Error>.Continuation!
let stream = AsyncThrowingStream<T.Element, Error> { streamContinuation = $0 }

self.base = base
self.stream = stream
self.iterator = stream.makeAsyncIterator()
self.continuation = streamContinuation
self.inner = ThrottleAsyncSequence.Inner<T>(
base: base,
continuation: streamContinuation,
interval: interval,
latest: latest
)

Task { [inner] in
await inner.startAwaitingForBaseSequence()
}
self.interval = interval
self.latest = latest
}

// MARK: AsyncSequence

/// Creates an async iterator that emits elements of this async sequence.
/// - Returns: An instance that conforms to `AsyncIteratorProtocol`.
public func makeAsyncIterator() -> AsyncThrowingStream<Element, Error>.Iterator {
self.iterator
}
}

// MARK: AsyncIteratorProtocol

extension ThrottleAsyncSequence: AsyncIteratorProtocol {

/// Produces the next element in the sequence.
/// - Returns: The next element or `nil` if the end of the sequence is reached.
public mutating func next() async throws -> Element? {
try await self.iterator.next()
public func makeAsyncIterator() -> Iterator {
Iterator(base: self.base.makeAsyncIterator(), interval: self.interval, latest: self.latest)
}
}



// MARK: ThrottleAsyncSequence > Inner
// MARK: Iterator

extension ThrottleAsyncSequence {

fileprivate actor Inner<T: AsyncSequence> {

fileprivate typealias Element = T.Element

// Private
private var base: T
private var continuation: AsyncThrowingStream<Element, Error>.Continuation?
private let interval: TimeInterval
private let latest: Bool
public struct Iterator: AsyncIteratorProtocol {
var base: T.AsyncIterator
var interval: TimeInterval
var latest: Bool

// Private
private var collectedElements: [Element] = []
private var lastEmission: Date?

// MARK: Initialization

fileprivate init(
base: T,
continuation: AsyncThrowingStream<Element, Error>.Continuation,

init(
base: T.AsyncIterator,
interval: TimeInterval,
latest: Bool
) {
)
{
self.base = base
self.continuation = continuation
self.interval = interval
self.latest = latest
}

deinit {
self.continuation = nil
}

// MARK: API
// MARK: AsyncIteratorProtocol

fileprivate func startAwaitingForBaseSequence() async {
defer { self.continuation = nil }

do {
for try await element in self.base {
self.handle(element: element)
public mutating func next() async rethrows -> Element? {
while true {
guard let value = try await self.base.next() else {
return nil
}

if let lastTime = self.lastEmission {
let gap = Date().timeIntervalSince(lastTime)
if gap < self.interval {
let delay = self.interval - gap
try? await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000))
}
guard let lastEmission = self.lastEmission else {
self.lastEmission = Date()
return value
}

self.collectedElements.append(value)
let element = (self.latest ? self.collectedElements.last : self.collectedElements.first) ?? value
let gap = Date().timeIntervalSince(lastEmission)
if gap >= self.interval {
self.lastEmission = Date()
self.collectedElements.removeAll()
return element
}

self.emitNextElement()
self.continuation?.finish()
} catch {
self.continuation?.finish(throwing: error)
}
}

// MARK: Inner (Private Methods)

private func nextElement() -> T.Element? {
self.latest
? self.collectedElements.last
: self.collectedElements.first
}

private func handle(element: T.Element) {
self.collectedElements.append(element)

guard let lastTime = self.lastEmission else {
self.emitNextElement()
return
}

let gap = Date().timeIntervalSince(lastTime)
if gap >= self.interval {
self.emitNextElement()
}
}

private func emitNextElement() {
self.lastEmission = Date()
if let element = self.nextElement() {
self.continuation?.yield(element)
}
self.collectedElements = []
}
}
}



// MARK: Throttle

extension AsyncSequence {
Expand All @@ -203,7 +127,7 @@ extension AsyncSequence {
/// interval you specify. Other elements received within the throttling interval aren’t emitted.
///
/// ```swift
/// let stream = AsyncStream<Int> { continuation in
/// let sequence = AsyncStream<Int> { continuation in
/// continuation.yield(0)
/// try? await Task.sleep(nanoseconds: 100_000_000)
/// continuation.yield(1)
Expand All @@ -215,15 +139,14 @@ extension AsyncSequence {
/// continuation.finish()
/// }
///
/// for element in try await self.stream.throttle(for: 0.05, latest: true) {
/// for element in try await sequence.throttle(for: 0.05, latest: true) {
/// print(element)
/// }
///
/// // Prints:
/// // 0
/// // 1
/// // 2
/// // 5
/// ```
/// - Parameters:
/// - interval: The interval in which to emit the most recent element.
Expand Down
13 changes: 6 additions & 7 deletions AsynchroneTests/Tests/Sequences/ThrottleAsyncSequenceTests.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import XCTest
@testable import Asynchrone


final class ThrottleAsyncSequenceTests: XCTestCase {
private var sequence: AsyncStream<Int>!

Expand All @@ -24,23 +23,23 @@ final class ThrottleAsyncSequenceTests: XCTestCase {
// MARK: Tests

func testThrottle() async throws {
let values = try await self.sequence
let values = await self.sequence
.throttle(for: 0.05, latest: false)
.collect()

XCTAssertEqual(values, [0, 1, 2, 3])
XCTAssertEqual(values, [0, 1, 2])
}

func testThrottleLatest() async throws {
let values = try await self.sequence
let values = await self.sequence
.throttle(for: 0.05, latest: true)
.collect()

XCTAssertEqual(values, [0, 1, 2, 5])
XCTAssertEqual(values, [0, 1, 2])
}

func testThrottleWithNoValues() async throws {
let values = try await AsyncStream<Int> {
let values = await AsyncStream<Int> {
$0.finish()
}
.throttle(for: 0.05, latest: true)
Expand All @@ -50,7 +49,7 @@ final class ThrottleAsyncSequenceTests: XCTestCase {
}

func testThrottleWithOneValue() async throws {
let values = try await Just(0)
let values = await Just(0)
.throttle(for: 0.05, latest: true)
.collect()

Expand Down

0 comments on commit 657bfed

Please sign in to comment.