Extensions and additions to AsyncSequence, AsyncStream and AsyncThrowingStream.
Extensions and additions for Swift’s async sequence.
In Xcode:
Project
.Package Dependencies
.+
.https://github.com/reddavis/Asynchrone
.Asynchrone
to your app target.Documentation can be found here.
class MyClass {
var value: Int = 0 {
didSet { print("Set to \(self.value)") }
}
}
let sequence = AsyncStream<Int> { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.finish()
}
let object = MyClass()
sequence.assign(to: \.value, on: object)
// Prints:
// Set to 1
// Set to 2
// Set to 3
let sequence = AsyncStream<Int> { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.finish()
}
print(await sequence.first())
// Prints:
// 1
let sequence = AsyncStream<Int> { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.finish()
}
print(await sequence.last())
// Prints:
// 3
let sequence = AsyncStream<Int> { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.finish()
}
print(await sequence.collect())
// Prints:
// [1, 2, 3]
let sequence = .init { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.finish()
}
sequence.sink { print($0) }
// Prints:
// 1
// 2
// 3
let sequence = .init { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.finish(throwing: TestError())
}
sequence.sink(
receiveValue: { print("Value: \($0)") },
receiveCompletion: { print("Complete: \($0)") }
)
// Prints:
// Value: 1
// Value: 2
// Value: 3
// Complete: failure(TestError())
let sequence = Just(1)
.map(String.init)
.eraseToAnyAsyncSequenceable()
let stream = Fail<Int, TestError>(error: TestError.a)
.eraseToAnyThrowingAsyncSequenceable()
let sequence = Fail<Int, TestError>(
error: TestError()
)
.catch { error in
Just(-1)
}
for await value in sequence {
print(value)
}
// Prints:
// -1
let sequenceA = AsyncStream<Int> { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.finish()
}
let sequenceB = AsyncStream<Int> { continuation in
continuation.yield(4)
continuation.yield(5)
continuation.yield(6)
continuation.finish()
}
let sequenceC = AsyncStream<Int> { continuation in
continuation.yield(7)
continuation.yield(8)
continuation.yield(9)
continuation.finish()
}
for await value in sequenceA.chain(with: sequenceB).chain(with: sequenceC) {
print(value)
}
// Prints:
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
let streamA = .init { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.yield(4)
continuation.finish()
}
let streamB = .init { continuation in
continuation.yield(5)
continuation.yield(6)
continuation.yield(7)
continuation.yield(8)
continuation.yield(9)
continuation.finish()
}
for await value in streamA.combineLatest(streamB) {
print(value)
}
// Prints:
// (1, 5)
// (2, 6)
// (3, 7)
// (4, 8)
// (4, 9)
let streamA = .init { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.yield(4)
continuation.finish()
}
let streamB = .init { continuation in
continuation.yield(5)
continuation.yield(6)
continuation.yield(7)
continuation.yield(8)
continuation.yield(9)
continuation.finish()
}
let streamC = .init { continuation in
continuation.yield(10)
continuation.yield(11)
continuation.finish()
}
for await value in streamA.combineLatest(streamB, streamC) {
print(value)
}
// Prints:
// (1, 5, 10)
// (2, 6, 11)
// (3, 7, 11)
// (4, 8, 11)
// (4, 9, 11)
let sequence = CurrentElementAsyncSequence(0)
print(await sequence.element)
await stream.yield(1)
print(await sequence.element)
await stream.yield(2)
await stream.yield(3)
await stream.yield(4)
print(await sequence.element)
// Prints:
// 0
// 1
// 4
let stream = AsyncStream<Int> { continuation in
continuation.yield(0)
try? await Task.sleep(nanoseconds: 200_000_000)
continuation.yield(1)
try? await Task.sleep(nanoseconds: 200_000_000)
continuation.yield(2)
continuation.yield(3)
continuation.yield(4)
continuation.yield(5)
continuation.finish()
}
for element in try await self.stream.debounce(for: 0.1) {
print(element)
}
// Prints:
// 0
// 1
// 5
let stream = AsyncStream<Int> { continuation in
continuation.yield(0)
continuation.yield(1)
continuation.yield(2)
continuation.finish()
}
let start = Date.now
for element in try await self.stream.delay(for: 0.5) {
print("\(element) - \(Date.now.timeIntervalSince(start))")
}
// Prints:
// 0 - 0.5
// 1 - 1.0
// 2 - 1.5
>>>>>>> main
Empty<Int>().sink(
receiveValue: { print($0) },
receiveCompletion: { completion in
switch completion {
case .finished:
print("Finished")
case .failure:
print("Failed")
}
}
)
// Prints:
// Finished
let stream = Fail<Int, TestError>(error: TestError())
do {
for try await value in stream {
print(value)
}
} catch {
print("Error!")
}
// Prints:
// Error!
let stream = Just(1)
for await value in stream {
print(value)
}
// Prints:
// 1
let streamA = .init { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.yield(4)
continuation.finish()
}
let streamB = .init { continuation in
continuation.yield(5)
continuation.yield(6)
continuation.yield(7)
continuation.yield(8)
continuation.yield(9)
continuation.finish()
}
for await value in streamA.merge(with: streamB) {
print(value)
}
// Prints:
// 1
// 5
// 2
// 6
// 3
// 7
// 4
// 8
// 9
let streamA = .init { continuation in
continuation.yield(1)
continuation.yield(4)
continuation.finish()
}
let streamB = .init { continuation in
continuation.yield(2)
continuation.finish()
}
let streamC = .init { continuation in
continuation.yield(3)
continuation.finish()
}
for await value in self.streamA.merge(with: self.streamB, self.streamC) {
print(value)
}
// Prints:
// 1
// 2
// 3
// 4
let sequence = NotificationCenter.default.sequence(for: UIDevice.orientationDidChangeNotification)
for await element in sequence {
print(element)
}
let sequence = PassthroughAsyncSequence<Int>()
sequence.yield(0)
sequence.yield(1)
sequence.yield(2)
sequence.finish()
for await value in sequence {
print(value)
}
// Prints:
// 0
// 1
// 2
let stream = .init { continuation in
continuation.yield(1)
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
continuation.finish()
}
for await value in stream.removeDuplicates() {
print(value)
}
// Prints:
// 1
// 2
// 3
let sequence = Fail<Int, TestError>(
error: TestError()
)
.replaceError(with: 0)
for await value in stream {
print(value)
}
// Prints:
// 0
let sequence = [0, 1, 2, 3].async
for await value in sequence {
print(value)
}
// Prints:
// 1
// 2
// 3
let values = [
"a",
"ab",
"abc",
"abcd"
]
let stream = AsyncStream { continuation in
for value in values {
continuation.yield(value)
}
continuation.finish()
}
.shared()
Task {
let values = try await self.stream.collect()
// ...
}
Task.detached {
let values = try await self.stream.collect()
// ...
}
let values = try await self.stream.collect()
// ...
let stream = AsyncStream<Int> { continuation in
continuation.yield(0)
try? await Task.sleep(nanoseconds: 100_000_000)
continuation.yield(1)
try? await Task.sleep(nanoseconds: 100_000_000)
continuation.yield(2)
continuation.yield(3)
continuation.yield(4)
continuation.yield(5)
continuation.finish()
}
for element in try await self.stream.throttle(for: 0.05, latest: true) {
print(element)
}
// Prints:
// 0
// 1
// 2
// 5
let sequence = ThrowingPassthroughAsyncSequence<Int>()
sequence.yield(0)
sequence.yield(1)
sequence.yield(2)
sequence.finish(throwing: TestError())
do {
for try await value in sequence {
print(value)
}
} catch {
print("Error!")
}
// Prints:
// 0
// 1
// 2
// Error!
let sequence = TimerAsyncSequence(interval: 1)
let start = Date.now
for element in await sequence {
print(element)
}
// Prints:
// 2022-03-19 20:49:30 +0000
// 2022-03-19 20:49:31 +0000
// 2022-03-19 20:49:32 +0000
let streamA = .init { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.finish()
}
let streamB = .init { continuation in
continuation.yield(5)
continuation.yield(6)
continuation.yield(7)
continuation.finish()
}
for await value in streamA.zip(streamB) {
print(value)
}
// Prints:
// (1, 5)
// (2, 6)
let streamA = .init { continuation in
continuation.yield(1)
continuation.yield(2)
continuation.finish()
}
let streamB = .init { continuation in
continuation.yield(5)
continuation.yield(6)
continuation.yield(7)
continuation.finish()
}
let streamC = .init { continuation in
continuation.yield(8)
continuation.yield(9)
continuation.finish()
}
for await value in streamA.zip(streamB, streamC) {
print(value)
}
// Prints:
// (1, 5, 8)
// (2, 6, 9)