ReactKit

Swift Reactive Programming.

1193
40
Swift

ReactKit Circle CI

Swift Reactive Programming.

How to install

See Wiki page.

Example

For UI Demo, please see ReactKit/ReactKitCatalog.

Key-Value Observing

// create stream via KVO
self.obj1Stream = KVO.stream(obj1, "value")

// bind stream via KVC (`<~` as binding operator)
(obj2, "value") <~ self.obj1Stream

XCTAssertEqual(obj1.value, "initial")
XCTAssertEqual(obj2.value, "initial")

obj1.value = "REACT"

XCTAssertEqual(obj1.value, "REACT")
XCTAssertEqual(obj2.value, "REACT")

To remove stream-bindings, just release stream itself (or call stream.cancel()).

self.obj1Stream = nil   // release stream & its bindings

obj1.value = "Done"

XCTAssertEqual(obj1.value, "Done")
XCTAssertEqual(obj2.value, "REACT")

If you want to observe changes in Swift.Array or NSMutableArray,
use DynamicArray feature in Pull Request #23.

NSNotification

self.stream = Notification.stream("MyNotification", obj1)
    |> map { notification -> NSString? in
        return "hello" // convert NSNotification? to NSString?
    }

(obj2, "value") <~ self.stream

Normally, NSNotification itself is useless value for binding with other objects, so use Stream Operations e.g. map(f: T -> U) to convert it.

To understand more about |> pipelining operator, see Stream Pipelining.

Target-Action

// UIButton
self.buttonStream = self.button.buttonStream("OK")

// UITextField
self.textFieldStream = self.textField.textChangedStream()

^{ println($0) } <~ self.buttonStream     // prints "OK" on tap

// NOTE: ^{ ... } = closure-first operator, same as `stream ~> { ... }`
^{ println($0) } <~ self.textFieldStream  // prints textField.text on change

Complex example

The example below is taken from

where it describes 4 UITextFields which enables/disables UIButton at certain condition (demo available in ReactKit/ReactKitCatalog):

let usernameTextStream = self.usernameTextField.textChangedStream()
let emailTextStream = self.emailTextField.textChangedStream()
let passwordTextStream = self.passwordTextField.textChangedStream()
let password2TextStream = self.password2TextField.textChangedStream()

let allTextStreams = [usernameTextStream, emailTextStream, passwordTextStream, password2TextStream]

let combinedTextStream = allTextStreams |> merge2All

// create button-enabling stream via any textField change
self.buttonEnablingStream = combinedTextStream
    |> map { (values, changedValue) -> NSNumber? in

        let username: NSString? = values[0] ?? nil
        let email: NSString? = values[1] ?? nil
        let password: NSString? = values[2] ?? nil
        let password2: NSString? = values[3] ?? nil

        // validation
        let buttonEnabled = username?.length > 0 && email?.length > 0 && password?.length >= MIN_PASSWORD_LENGTH && password == password2

        // NOTE: use NSNumber because KVO does not understand Bool
        return NSNumber(bool: buttonEnabled)
    }

// REACT: enable/disable okButton
(self.okButton, "enabled") <~ self.buttonEnablingStream!

For more examples, please see XCTestCases.

How it works

ReactKit is based on powerful SwiftTask (JavaScript Promise-like) library, allowing to start & deliver multiple events (KVO, NSNotification, Target-Action, etc) continuously over time using its resume & progress feature (react() or <~ operator in ReactKit).

Unlike Reactive Extensions (Rx) libraries which has a basic concept of “hot” and “cold” observables, ReactKit gracefully integrated them into one hot + paused (lazy) stream Stream<T> class. Lazy streams will be auto-resumed via react() & <~ operator.

Here are some differences in architecture:

Reactive Extensions (Rx) ReactKit
Basic Classes Hot Observable (broadcasting)
Cold Observable (laziness)
Stream<T>
Generating Cold Observable (cloneability) Void -> Stream<T>
(= Stream<T>.Producer)
Subscribing observable.subscribe(onNext, onError, onComplete) stream.react {...}.then {...}
(method-chainable)
Pausing pausableObservable.pause() stream.pause()
Disposing disposable.dispose() stream.cancel()

Stream Pipelining

Streams can be composed by using |> stream-pipelining operator and Stream Operations.

For example, a very common incremental search technique using searchTextStream will look like this:

let searchResultsStream: Stream<[Result]> = searchTextStream
    |> debounce(0.3)
    |> distinctUntilChanged
    |> map { text -> Stream<[Result]> in
        return API.getSearchResultsStream(text)
    }
    |> switchLatestInner

There are some scenarios (e.g. repeat()) when you want to use a cloneable Stream<T>.Producer (Void -> Stream<T>) rather than plain Stream<T>. In this case, you can use |>> streamProducer-pipelining operator instead.

// first, wrap stream with closure
let timerProducer: Void -> Stream<Int> = {
    return createTimerStream(interval: 1)
        |> map { ... }
        |> filter { ... }
}

// then, use `|>>`  (streamProducer-pipelining operator)
let repeatTimerProducer = timerProducer |>> repeat(3)

But in the above case, wrapping with closure will always become cumbersome, so you can also use |>> operator for Stream & Stream Operations as well (thanks to @autoclosure).

let repeatTimerProducer = createTimerStream(interval: 1)
    |>> map { ... }
    |>> filter { ... }
    |>> repeat(3)

Functions

Stream Operations

  • For Single Stream

    • Transforming
      • asStream(ValueType)
      • map(f: T -> U)
      • flatMap(f: T -> Stream<U>)
      • map2(f: (old: T?, new: T) -> U)
      • mapAccumulate(initialValue, accumulator) (alias: scan)
      • buffer(count)
      • bufferBy(stream)
      • groupBy(classifier: T -> Key)
    • Filtering
      • filter(f: T -> Bool)
      • filter2(f: (old: T?, new: T) -> Bool)
      • take(count)
      • takeUntil(stream)
      • skip(count)
      • skipUntil(stream)
      • sample(stream)
      • distinct()
      • distinctUntilChanged()
    • Combining
      • merge(stream)
      • concat(stream)
      • startWith(initialValue)
      • combineLatest(stream)
      • zip(stream)
      • recover(stream)
    • Timing
      • delay(timeInterval)
      • interval(timeInterval)
      • throttle(timeInterval)
      • debounce(timeInterval)
    • Collecting
      • reduce(initialValue, accumulator)
    • Other Utilities
      • peek(f: T -> Void) (for injecting side effects e.g. debug-logging)
      • customize(...)
  • For Array Streams

    • mergeAll(streams)
    • merge2All(streams) (generalized method for mergeAll & combineLatestAll)
    • combineLatestAll(streams)
    • zipAll(streams)
  • For Nested Stream (Stream<Stream<T>>)

    • mergeInner(nestedStream)
    • concatInner(nestedStream)
    • switchLatestInner(nestedStream)
  • For Stream Producer (Void -> Stream<T>)

    • prestart(bufferCapacity) (alias: replay)
    • times(count)
    • retry(count)

Helpers

  • Creating

    • Stream.once(value) (alias: just)
    • Stream.never()
    • Stream.fulfilled() (alias: empty)
    • Stream.rejected() (alias: error)
    • Stream.sequence(values) (a.k.a Rx.fromArray)
    • Stream.infiniteSequence(initialValue, iterator) (a.k.a Rx.iterate)
  • Other Utilities

    • ownedBy(owner: NSObject) (easy strong referencing to keep streams alive)

Dependencies

References

Licence

MIT