ReactKit 
 
Swift Reactive Programming.
How to install
See Wiki page.
Example
For UI Demo, please see ReactKit/ReactKitCatalog.
Key-Value Observing
// create stream via KVO
self.obj1Stream = KVO.stream(obj1, "value")
// bind stream via KVC (`<~` as binding operator)
(obj2, "value") <~ self.obj1Stream
XCTAssertEqual(obj1.value, "initial")
XCTAssertEqual(obj2.value, "initial")
obj1.value = "REACT"
XCTAssertEqual(obj1.value, "REACT")
XCTAssertEqual(obj2.value, "REACT")
To remove stream-bindings, just release stream itself (or call stream.cancel()).
self.obj1Stream = nil   // release stream & its bindings
obj1.value = "Done"
XCTAssertEqual(obj1.value, "Done")
XCTAssertEqual(obj2.value, "REACT")
If you want to observe changes in Swift.Array or NSMutableArray, use DynamicArray feature in Pull Request #23.
NSNotification
self.stream = Notification.stream("MyNotification", obj1)
    |> map { notification -> NSString? in
        return "hello" // convert NSNotification? to NSString?
    }
(obj2, "value") <~ self.stream
Normally, NSNotification itself is useless value for binding with other objects, so use Stream Operations e.g. map(f: T -> U) to convert it.
To understand more about |> pipelining operator, see Stream Pipelining.
Target-Action
// UIButton
self.buttonStream = self.button.buttonStream("OK")
// UITextField
self.textFieldStream = self.textField.textChangedStream()
^{ println($0) } <~ self.buttonStream     // prints "OK" on tap
// NOTE: ^{ ... } = closure-first operator, same as `stream ~> { ... }`
^{ println($0) } <~ self.textFieldStream  // prints textField.text on change
Complex example
The example below is taken from
- iOS - ReactiveCocoaをかじってみた - Qiita (well-illustrated)
 
where it describes 4 UITextFields which enables/disables UIButton at certain condition (demo available in ReactKit/ReactKitCatalog):
let usernameTextStream = self.usernameTextField.textChangedStream()
let emailTextStream = self.emailTextField.textChangedStream()
let passwordTextStream = self.passwordTextField.textChangedStream()
let password2TextStream = self.password2TextField.textChangedStream()
let allTextStreams = [usernameTextStream, emailTextStream, passwordTextStream, password2TextStream]
let combinedTextStream = allTextStreams |> merge2All
// create button-enabling stream via any textField change
self.buttonEnablingStream = combinedTextStream
    |> map { (values, changedValue) -> NSNumber? in
        let username: NSString? = values[0] ?? nil
        let email: NSString? = values[1] ?? nil
        let password: NSString? = values[2] ?? nil
        let password2: NSString? = values[3] ?? nil
        // validation
        let buttonEnabled = username?.length > 0 && email?.length > 0 && password?.length >= MIN_PASSWORD_LENGTH && password == password2
        // NOTE: use NSNumber because KVO does not understand Bool
        return NSNumber(bool: buttonEnabled)
    }
// REACT: enable/disable okButton
(self.okButton, "enabled") <~ self.buttonEnablingStream!
For more examples, please see XCTestCases.
How it works
ReactKit is based on powerful SwiftTask (JavaScript Promise-like) library, allowing to start & deliver multiple events (KVO, NSNotification, Target-Action, etc) continuously over time using its resume & progress feature (react() or <~ operator in ReactKit).
Unlike Reactive Extensions (Rx) libraries which has a basic concept of "hot" and "cold" observables, ReactKit gracefully integrated them into one hot + paused (lazy) stream Stream<T> class. Lazy streams will be auto-resumed via react() & <~ operator.
Here are some differences in architecture:
| Reactive Extensions (Rx) | ReactKit | |
|---|---|---|
| Basic Classes | Hot Observable (broadcasting) Cold Observable (laziness)  |  
   Stream<T> |  
  
| Generating | Cold Observable (cloneability) | Void -> Stream<T>(= Stream<T>.Producer) |  
  
| Subscribing | observable.subscribe(onNext, onError, onComplete) |  
   stream.react {...}.then {...}(method-chainable)  |  
  
| Pausing | pausableObservable.pause() |  
   stream.pause() |  
  
| Disposing | disposable.dispose() |  
   stream.cancel() |  
  
Stream Pipelining
Streams can be composed by using |> stream-pipelining operator and Stream Operations.
For example, a very common incremental search technique using searchTextStream will look like this:
let searchResultsStream: Stream<[Result]> = searchTextStream
    |> debounce(0.3)
    |> distinctUntilChanged
    |> map { text -> Stream<[Result]> in
        return API.getSearchResultsStream(text)
    }
    |> switchLatestInner
There are some scenarios (e.g. repeat()) when you want to use a cloneable Stream<T>.Producer (Void -> Stream<T>) rather than plain Stream<T>. In this case, you can use |>> streamProducer-pipelining operator instead.
// first, wrap stream with closure
let timerProducer: Void -> Stream<Int> = {
    return createTimerStream(interval: 1)
        |> map { ... }
        |> filter { ... }
}
// then, use `|>>`  (streamProducer-pipelining operator)
let repeatTimerProducer = timerProducer |>> repeat(3)
But in the above case, wrapping with closure will always become cumbersome, so you can also use |>> operator for Stream & Stream Operations as well (thanks to @autoclosure).
let repeatTimerProducer = createTimerStream(interval: 1)
    |>> map { ... }
    |>> filter { ... }
    |>> repeat(3)
Functions
Stream Operations
-  
For Single Stream
- Transforming 
    
asStream(ValueType)map(f: T -> U)flatMap(f: T -> Stream<U>)map2(f: (old: T?, new: T) -> U)mapAccumulate(initialValue, accumulator)(alias:scan)buffer(count)bufferBy(stream)groupBy(classifier: T -> Key)
 - Filtering 
    
filter(f: T -> Bool)filter2(f: (old: T?, new: T) -> Bool)take(count)takeUntil(stream)skip(count)skipUntil(stream)sample(stream)distinct()distinctUntilChanged()
 - Combining 
    
merge(stream)concat(stream)startWith(initialValue)combineLatest(stream)zip(stream)recover(stream)
 - Timing 
    
delay(timeInterval)interval(timeInterval)throttle(timeInterval)debounce(timeInterval)
 - Collecting 
    
reduce(initialValue, accumulator)
 - Other Utilities 
    
peek(f: T -> Void)(for injecting side effects e.g. debug-logging)customize(...)
 
 - Transforming 
    
 -  
For Array Streams
mergeAll(streams)merge2All(streams)(generalized method formergeAll&combineLatestAll)combineLatestAll(streams)zipAll(streams)
 -  
For Nested Stream (
Stream<Stream<T>>)mergeInner(nestedStream)concatInner(nestedStream)switchLatestInner(nestedStream)
 -  
For Stream Producer (
Void -> Stream<T>)prestart(bufferCapacity)(alias:replay)times(count)retry(count)
 
Helpers
-  
Creating
Stream.once(value)(alias:just)Stream.never()Stream.fulfilled()(alias:empty)Stream.rejected()(alias:error)Stream.sequence(values)(a.k.a Rx.fromArray)Stream.infiniteSequence(initialValue, iterator)(a.k.a Rx.iterate)
 -  
Other Utilities
ownedBy(owner: NSObject)(easy strong referencing to keep streams alive)
 
Dependencies
References
- Introducing ReactKit // Speaker Deck (ver 0.3.0)