A complete set of primitives for concurrency and reactive programming on Swift
- 1.4.0 is the latest and greatest, but only for Swift 4.2 and 5.0
 - use 1.3.0 is for Swift 4.0+
 - use 1.2.4 for latest release for Swift 3
 
| Features | |
|---|---|
| 
     powerful primitives  |  
   Future, Promise, Channel, Producer, Sink, Cache, ... |  
  
| 
     versatile transformations  |  
   map, filter, recover, debounce, distinct, ... |  
  
| 
     convenient combination  |  
   flatMap, merge, zip, sample, scan, reduce, ... |  
  
| 
     improves existing things  |  
   Key-Value Observing, target-action, notifications, bindings | 
| 
     less boilerplate code  |  
   neat cancellation, threading, memory manament | 
| 
     extendable  |  
   powerful extensions for URLSession, UI controls, CoreData, ... |  
  
| 
     all platforms  |  
   
     |  
  
| 
     documentation  |  
   100% + sample code, see full documentation | 
| 
     simple integration  |  
   SPM, CocoaPods, Carthage | 
- Related articles
 - Known users
 
Communication
Reactive Programming
reactive properties
let searchResults = searchBar.rp.text
  .debounce(interval: 0.3)
  .distinct()
  .flatMap(behavior: .keepLatestTransform) { (query) -> Future<[SearchResult]> in
    return query.isEmpty
      ? .just([])
      : searchGitHub(query: query).recover([])
  }
bindings
- unbinds automatically
 - dispatches to a correct queue automatically
 - no 
.observeOn(MainScheduler.instance)and.disposed(by: disposeBag) 
class MyViewController: UIViewController {
  /* ... */
  @IBOutlet weak var myLabel: UILabel!
  override func viewDidLoad() {
    super.viewDidLoad()
    UIDevice.current.rp.orientation
      .map { $0.description }
      .bind(myLabel.rp.text)
  }
  
  /* ... */
}
contexts usage
- no 
[weak self] - no 
DispatchQueue.main.async { ... } - no 
.observeOn(MainScheduler.instance) 
class MyViewController: NSViewController {
  let service: MyService
  /* ... */
  
  func fetchAndPresentItems(for request: Request) {
    service.perform(request: request)
      .map(context: self, executor: .primary) { (self, response) in
        return self.items(from: response)
      }
      .onSuccess(context: self) { (self, items) in
        self.present(items: items)
      }
      .onFailure(context: self) { (self, error) in
        self.present(error: error)
      }
  }
  
  func items(from response: Response) throws -> [Items] {
    /* ... extract items from response ... */
  }
  
  func present(items: [Items]) {
    /* ... update UI ... */
  }
}
class MyService {
  func perform(request: Request) -> Future {
    /* ... */
  }
} 
In Depth
Let's assume that we have:
Personis an example of a struct that contains information about the person.MyServiceis an example of a class that serves as an entry point to the model. Works in a background.MyViewControlleris an example of a class that manages UI-related instances. Works on the main queue.
Code on callbacks
extension MyViewController {
  func present(personWithID identifier: String) {
    myService.fetch(personWithID: identifier) {
      (person, error) in
      /* do not forget to dispatch to the main queue */
      DispatchQueue.main.async {
        /* do not forget the [weak self] */
        [weak self] in
        guard let strongSelf = self
          else { return }
        if let person = person {
          strongSelf.present(person: person)
        } else if let error = error {
          strongSelf.present(error: error)
        } else {
          fatalError("There is neither person nor error. What has happened to this world?")
        }
      }
    }
  }
}
extension MyService {
  func fetch(personWithID: String, callback: @escaping (Person?, Error?) -> Void) {
    /* ... */
  }
}
- "do not forget" comment x2
 - the block will be retained and called even if MyViewController was already deallocated
 
Code with other libraries that provide futures
extension MyViewController {
  func present(personWithID identifier: String) {
    myService.fetch(personWithID: identifier)
      /* do not forget to dispatch to the main queue */
      .onComplete(executor: .main) {
        /* do not forget the [weak self] */
        [weak self] (completion) in
        if let strongSelf = self {
          completion.onSuccess(strongSelf.present(person:))
          completion.onFailure(strongSelf.present(error:))
        }
      }
  }
}
extension MyService {
  func fetch(personWithID: String) -> Future {
    /* ... */
  }
} 
- "do not forget" comment x2
 - the block will be retained and called even if MyViewController was already deallocated
 
Code with AsyncNinja
extension MyViewController {
  func present(personWithID identifier: String) {
    myService.fetch(personWithID: identifier)
      .onSuccess(context: self) { (self, person) in
        self.present(person: person)
      }
      .onFailure(context: self) { (self, error) in
        self.present(error: error)
      }
  }
}
extension MyService {
  func fetch(personWithID: String) -> Future {
    /* ... */
  }
} 
- "do not forget" comment NONE
 - the block will be retained and called as long as specified context (MyViewController) exists
 - Want to see extended explanation?
 
Using Futures
Let's assume that we have function that finds all prime numbers lesser than n
func primeNumbers(to n: Int) -> [Int] { /* ... */ }
Making future
let futurePrimeNumbers: Future<[Int]> = future { primeNumbers(to: 10_000_000) }
Applying transformation
let futureSquaredPrimeNumbers = futurePrimeNumbers
  .map { (primeNumbers) -> [Int] in
    return primeNumbers.map { (number) -> Int
      return number * number
    }
  }
Synchronously waiting for completion
if let fallibleNumbers = futurePrimeNumbers.wait(seconds: 1.0) {
  print("Number of prime numbers is \(fallibleNumbers.success?.count)")
} else {
  print("Did not calculate prime numbers yet")
}
Subscribing for completion
futurePrimeNumbers.onComplete { (falliblePrimeNumbers) in
  print("Number of prime numbers is \(falliblePrimeNumbers.success?.count)")
}
Combining futures
let futureA: Future = /* ... */
let futureB: Future = /* ... */
let futureC: Future = /* ... */
let futureABC: Future<(A, B, C)> = zip(futureA, futureB, futureC) 
Transition from callbacks-based flow to futures-based flow:
class MyService {
  /* implementation */
  
  func fetchPerson(withID personID: Person.Identifier) -> Future {
    let promise = Promise<Person>()
    self.fetchPerson(withID: personID, callback: promise.complete)
    return promise
  }
} 
Transition from futures-based flow to callbacks-based flow
class MyService {
  /* implementation */
  
  func fetchPerson(withID personID: Person.Identifier,
                   callback: @escaping (Fallible) -> Void) {
    self.fetchPerson(withID: personID)
      .onComplete(callback)
  }
} 
Using Channels
Let's assume we have function that returns channel of prime numbers: sends prime numbers as finds them and sends number of found numbers as completion
func makeChannelOfPrimeNumbers(to n: Int) -> Channel<Int, Int> { /* ... */ }
Applying transformation
let channelOfSquaredPrimeNumbers = channelOfPrimeNumbers
  .map { (number) -> Int in
      return number * number
    }
Synchronously iterating over update values.
for number in channelOfPrimeNumbers {
  print(number)
}
Synchronously waiting for completion
if let fallibleNumberOfPrimes = channelOfPrimeNumbers.wait(seconds: 1.0) {
  print("Number of prime numbers is \(fallibleNumberOfPrimes.success)")
} else {
  print("Did not calculate prime numbers yet")
}
Synchronously waiting for completion #2
let (primeNumbers, numberOfPrimeNumbers) = channelOfPrimeNumbers.waitForAll()
Subscribing for update
channelOfPrimeNumbers.onUpdate { print("Update: \($0)") }
Subscribing for completion
channelOfPrimeNumbers.onComplete { print("Completed: \($0)") }
Making Channel
 
func makeChannelOfPrimeNumbers(to n: Int) -> Channel<Int, Int> {
  return channel { (update) -> Int in
    var numberOfPrimeNumbers = 0
    var isPrime = Array(repeating: true, count: n)
    for number in 2..<n where isPrime[number] {
      numberOfPrimeNumbers += 1
      update(number)
      // updating seive
      var seiveNumber = number + number
      while seiveNumber < n {
        isPrime[seiveNumber] = false
        seiveNumber += number
      }
    }
    return numberOfPrimeNumbers
  }
}





