Swift SDK for Apache Kafka

Related tags

Messaging SwiftKafka
Overview

Kitura

APIDoc Build Status - Master macOS Linux Apache 2 Slack Status

SwiftKafka

A swift implementation of Kafka for producing and consuming from event streams.

This works by wrapping the librdkafka C library.

Swift version

The latest version of SwiftKafka requires Swift 5.0 or later. You can download this version of the Swift binaries by following this link.

Usage

Swift Package Manager

Add dependencies

Add the SwiftKafka package to the dependencies within your application’s Package.swift file. Substitute "x.x.x" with the latest SwiftKafka release.

.package(url: "https://github.com/IBM-Swift/SwiftKafka.git", from: "x.x.x")

Add SwiftKafka to your target's dependencies:

.target(name: "example", dependencies: ["SwiftKafka"]),

Import package

import SwiftKafka

Getting Started

To use SwiftKafka you will need to install the librdkafka package:

macOS

brew install librdkafka

Linux

Install librdkafka from the Confluent APT repositories - see instructions here (following steps 1 and 2 to add the Confluent package signing key and apt repository), and then install librdkafka:

sudo apt install librdkafka

Running a Kafka instance locally

To experiment locally, you can set up your own Kafka server to produce/consume from.

On macOS you can follow this guide on Kafka Installation using Homebrew to run a local server.

On Linux, you can follow this guide for a manual install on Ubuntu.

KafkaConfig

The KafkaConfig class contains your configuration settings for a KafkaConsumer/KafkaProducer.

The class is initialized with default values which can then be changed using the helper functions. For example, to enable all logging you would set the debug variable:

let config = KafkaConfig()
config.debug = [.all]

Alternatively, you can access the configuration dictionary directly on the KafkaConfig object:

let config = KafkaConfig()
config["debug"] = "all"

The list of configuration keys and descriptions can be found in the librdkafka CONFIGURATION.md.

When you pass this class to a producer/consumer, a copy is made so further changes to the instance will not affect existing configurations.

KafkaProducer:

The KafkaProducer class produces messages to a Kafka server.

You can initialize a KafkaProducer using a KafkaConfig instance or with the default configuration.

The producer sends a KafkaProducerRecord with the following fields:

  • topic: The topic where the record will be sent. If this topic doesn't exist the producer will try to create it.
  • value: The message body that will be sent with the record.
  • partition: The topic partition the record will be sent to. If this is not set the partition will be automatically assigned.
  • key: If the partition is not set, records with the same key will be sent to the same partition. Since order is guaranteed within a partition, these records will be read in order they were produced.

The send() function is asynchronous. The result is returned in a callback which contains a KafkaConsumerRecord on success or a KafkaError on failure.

The following example produces a message with the value "Hello World" to a "test" topic of a Kafka server running on localhost.

do {
    let producer = try KafkaProducer()
    guard producer.connect(brokers: "localhost:9092") == 1 else {
        throw KafkaError(rawValue: 8)
    }
    producer.send(producerRecord: KafkaProducerRecord(topic: "test", value: "Hello world", key: "Key")) { result in
        switch result {
        case .success(let message):
            print("Message at offset \(message.offset) successfully sent")
        case .failure(let error):
            print("Error producing: \(error)")
        }
    }
} catch {
    print("Error creating producer: \(error)")
}

KafkaConsumer:

The KafkaConsumer class consumes messages from a Kafka server.

You can initialize a KafkaConsumer using a KafkaConfig instance or with the default configuration.

You can then subscribe to topics using subscribe(). This will distribute the topic partitions evenly between consumers with the same group id. If you do not set a group id, a random UUID will be used.

Alternatively to can use assign() to manually set the partition and offset for the consumer.

Both subscribe() and assign() are asynchronous and will return immediately, however they may take up to sessionTimeoutMs (Default 10 seconds) * 2 before the consumer completely connects.

To consume messages from Kafka you call poll(timeout:). This will poll Kafka, blocking for timeout seconds. When it completes, it returns an array of KafkaConsumerRecord with the following fields:

  • value: The message value if it can be UTF8 decoded to a String.
  • valueData: The message value as raw data.
  • key: The message key if it can be utf8 decoded to a String.
  • keyData: The message key as raw data.
  • offset: The message offset.
  • topic: The topic that the message was consumed from.
  • partition: The partition that the message was consumed from.

When you have finished consuming, you can call close() to close the connection and unassigns the consumer. The unassigned partitions will then be rebalanced between other consumers in the group. If close() is not called, the consumer will be closed when the class is deallocated.

The following example consumes and print all unread messages from the "test" topic of the Kafka server.

do {
    let config = KafkaConfig()
    config.groupId = "Kitura"
    config.autoOffsetReset = .beginning
    let consumer = try KafkaConsumer(config: config)
    guard consumer.connect(brokers: "localhost:9092") == 1 else {
        throw KafkaError(rawValue: 8)
    }
    try consumer.subscribe(topics: ["test"])
    while(true) {
        let records = try consumer.poll()
        print(records)
    }
} catch {
    print("Error creating consumer: \(error)")
}

API Documentation

For more information visit our API reference.

Community

We love to talk server-side Swift, and Kitura. Join our Slack to meet the team!

License

This library is licensed under Apache 2.0. Full license text is available in LICENSE.

Comments
  • Xcode 12.2 fails to link rdkafka

    Xcode 12.2 fails to link rdkafka

    Attempting to run SwiftKafkaTests on macOS target is resulting in a build error:

    ld: warning: Could not find or use auto-linked library 'rdkafka'
    
    ...(all the librdkafka C API calls listed here)...
    
    ld: symbol(s) not found for architecture x86_64
    clang: error: linker command failed with exit code 1 (use -v to see invocation)
    

    What's interesting is that it seems Xcode's package loading is failing to find rdkafka even before I attempt to compile showing this warning message:

    You may be able to install rdkafka using your system package manager: brew install librdkafka
    

    This is despite the fact that I'm absolutely positive librdkafka is installed.

    rami@machine: ~ % brew info librdkafka
    librdkafka: stable 1.5.2 (bottled), HEAD
    Apache Kafka C/C++ library
    https://github.com/edenhill/librdkafka
    /usr/local/Cellar/librdkafka/1.5.2 (36 files, 4MB) *
      Poured from bottle on 2020-11-24 at 21:53:28
    From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/librdkafka.rb
    License: BSD-2-Clause
    ==> Dependencies
    Build: pkg-config ✔, [email protected] ✔
    Required: lz4 ✔, lzlib ✔, [email protected] ✔, zstd ✔
    ==> Options
    --HEAD
    	Install HEAD version
    ==> Analytics
    install: 9,191 (30 days), 19,972 (90 days), 71,599 (365 days)
    install-on-request: 5,667 (30 days), 11,336 (90 days), 35,858 (365 days)
    build-error: 0 (30 days)
    

    Config details

    macOS Big Sur 11.0.1 (20B29) Xcode 12.2 (12B45b) + Command Line Tools 12B45b Swift version 5.3.1 clang version 12.0.0

    Homebrew 2.5.11 Homebrew/homebrew-core (git revision 699c9; last commit 2020-11-25) Homebrew/homebrew-cask (git revision 18071; last commit 2020-11-25) librdkafka stable 1.5.2

    opened by ramikay 8
  • Compression / Buffer issue

    Compression / Buffer issue

    Hi, I've been trying to use SwiftKafka for a project of mine, but run into an issue when sending messages of around 1.5KB. Instead of the JSON I try to send, it sends some malformed data like this:

    2t_id":"16315612"??????
    

    followed by the rest of the JSON in the correct format. Because of this the JSON becomes invalid and unusable.

    I've tried multiple config options to increase buffer sizes to allow for bigger messages, but so far nothing has seemed to work. The options I've tried is setting

    config.messageMaxBytes = 1000000000
    config.socketAendBufferBytes = 1000000000
    

    If I do this however, the message is still malformed and I get the following error in my console:

    Failed to set socket send buffer size to 10000000: No buffer space available
    

    Any and all help is much appreciated!

    opened by MrLotU 2
  • Add locking around dictionary as computed property

    Add locking around dictionary as computed property

    KafkaProducer.kafkaHandleToMessageCallback is a dictionary that is accessed by multiple threads. The code in KafkaProducer used a semaphore to prevent concurrent access, but this was missed in accesses outside of that class (such as in KafkaConfig.setDeliveredMessageCallback()).

    This replaces the semaphore with a computed property and NSLock approach similar to https://github.com/IBM-Swift/LoggerAPI/pull/47 .

    opened by djones6 1
  • Add Travis CI

    Add Travis CI

    Adds CI testing against a local Kafka install.

    This was complicated by the fact that we typically test our Linux Kitura builds under Docker, but it is not recommended to install all of the required Kafka components into a single Docker image. I tried Confluent's dockerized images (using their provided 'all-in-one' docker-compose script) but, while this worked locally for me, it seems to fail in Travis. To get it working, I've removed the Docker step entirely, so Linux will run on the Travis xenial environment.

    For the benefit of anyone trying to get that working in the future, I've left the code for it in setupKafka.sh and left the sequence of test commits in this PR.

    opened by djones6 0
  • ci: Add build script for travis

    ci: Add build script for travis

    This pull request changed the .travis file to the standard Kitura one. We also add the terminal commands to start a Zookeeper and Kafka service which is required for the tests.

    opened by Andrew-Lees11 0
  • Config properties not reflecting in rdkafka

    Config properties not reflecting in rdkafka

    I have added the following config property in my Swift file.

    config["bootstrap.servers"] = "localhost:9092"
    

    I tested the property using breakpoint, and it is successfully reflected.

    (lldb) po config["bootstrap.servers"]
    ▿ Optional<String>
      - some : "localhost:9092"
    

    However, I still get the following warning and producer messages do not pass through.

    %5|1667541464.556|CONFWARN|rdkafka#producer-1| [thrd:app]: No `bootstrap.servers` configured: client will not be able to connect to Kafka cluster
    
    opened by aneee004 0
  • Using SwiftKafka for watchOS

    Using SwiftKafka for watchOS

    I am developing a watchOS app, that uses data coming from a Kafkastream and I want to use this library for this purpose, but it keeps ignoring my local "librdkafka" library.

    Detailed error description:

    • Warnings:
    1. ignoring file /usr/local/Cellar/librdkafka/1.3.0/lib/librdkafka.dylib, building for watchOS-arm64_32 but attempting to link with file built for macOS-x86_64
    • Errors: Undefined symbol: _rd_kafka_produce Undefined symbol: _rd_kafka_poll Undefined symbol: _rd_kafka_conf_dump Undefined symbol: _rd_kafka_topic_name Undefined symbol: _rd_kafka_consumer_close Undefined symbol: _rd_kafka_consumer_poll Undefined symbol: _rd_kafka_flush Undefined symbol: _rd_kafka_brokers_add Undefined symbol: _rd_kafka_commit Undefined symbol: _rd_kafka_topic_partition_list_new Undefined symbol: _rd_kafka_topic_partition_list_destroy Undefined symbol: _rd_kafka_message_destroy Undefined symbol: _rd_kafka_topic_new Undefined symbol: _rd_kafka_topic_partition_list_add Undefined symbol: _rd_kafka_subscribe Undefined symbol: _rd_kafka_conf_set_dr_msg_cb Undefined symbol: _rd_kafka_conf_set Undefined symbol: _rd_kafka_conf_dup Undefined symbol: _rd_kafka_conf_new Undefined symbol: _rd_kafka_conf_destroy Undefined symbol: _rd_kafka_conf_dump_free Undefined symbol: _rd_kafka_poll_set_consumer Undefined symbol: _rd_kafka_topic_destroy Undefined symbol: _rd_kafka_assign Undefined symbol: _rd_kafka_new Undefined symbol: _rd_kafka_destroy

    What I did:

    1. Installed librdkafka
    • brew install librdkafka
    1. Created app
    • Created a new watchOS standalone app in Xcode
    • Added a new SwiftPackage with the built-in SPM from Xcode using the URL from your GitHub SwiftKafka project
    • Added SwiftKafka to "Link Binary With Libraries" in my WatchKit Extension
    1. Build the App -> Build failed with above errors
    opened by MFabi23 2
Releases(0.0.1)
Owner
Kitura
Kitura - Server Side framework written in Swift
Kitura
MQTT for iOS and macOS written with Swift

CocoaMQTT MQTT v3.1.1 client library for iOS/macOS/tvOS written with Swift 5 Build Build with Xcode 11.1 / Swift 5.1 Installation CocoaPods Install us

EMQ X MQTT Broker 1.4k Dec 29, 2022
A very flexible message bar for iOS written in Swift.

A very flexible message bar for iOS written in Swift.

SwiftKick Mobile 6.7k Jan 5, 2023
A lightweight framework to build chat applications, made in Swift

Chatto Chatto is a Swift lightweight framework to build chat applications. It's been designed to be extensible and performant. Along with Chatto there

Badoo Tech 4.4k Dec 19, 2022
A SlackTextViewController replacement written in Swift for the iPhone X.

Installation Just add MessageViewController to your Podfile and install. Done! pod 'MessageViewController' Setup You must subclass MessageViewControll

GitHawk 1.7k Jan 4, 2023
Swift toolkit for passing messages between iOS apps and extensions.

_________ ___ ___ _ _____ _________ ___ ___ / / \ \ / / |_| ( ___ \ \__ __/ \ \ / / / _____/ \ \ /\ /

Abdullah Selek 58 Nov 3, 2022
Messenger Clone - Real-time iOS Chat with Firebase Firestore written in Swift

Real time Swift iOS Chat with Firebase - Messenger Clone This is an extremely simple chat app source code of an iOS Swift Chat app. It leverages Messa

Instamobile 621 Jan 6, 2023
A fully fledged syscfg editor. Just the editor. Written in pure swift.

MagicCFG Reloaded The SysCFG Writing Utility - UPDATED, OSV Report Bug Table of Contents About MagicCFG Reloaded Getting Started Roadmap Contact Credi

Jan Fabel 47 Dec 31, 2022
Swift SDK for Apache Kafka

SwiftKafka A swift implementation of Kafka for producing and consuming from event streams. This works by wrapping the librdkafka C library. Swift vers

Kitura 55 Dec 27, 2022
Reliable Server Side Swift ✭ Make Apache great again!

mod_swift mod_swift is a technology demo which shows how to write native modules for the Apache Web Server in the Swift 3 programming language. The de

The ApacheExpress Alliance 174 Oct 22, 2022
Px-mobile-sdk-demo-app - PerimeterX Mobile SDK - Demo App

About PerimeterX PerimeterX is the leading provider of application security solu

PerimeterX 1 Nov 20, 2022
Alter SDK is a cross-platform SDK consisting of a real-time 3D avatar system, facial motion capture, and an Avatar Designer component built from scratch for web3 interoperability and the open metaverse.

Alter SDK is a cross-platform SDK consisting of a real-time 3D avatar system, facial motion capture, and an Avatar Designer component built from scratch for web3 interoperability and the open metaverse.

Alter 45 Nov 29, 2022
RadioTimeKit - The Swift SDK for TuneIn RadioTimeKit is a Swift package to use the TuneIn API.

RadioTimeKit - The Swift SDK for TuneIn RadioTimeKit is a Swift package to use the TuneIn API. The goal for development was to have a Swift SDK to get

Frank Gregor 2 Jun 20, 2022
This is swift project example to connect VNPTSmartCA SDK using Swift Language.

Example source code to integrate with VNPTSmartCA iOS SDK To run the example project, clone repository, and run pod install Requirements Installation

null 1 Feb 14, 2022
Home-assistant-swift-sdk - Used to integrate the Home Assistant APIs with your Swift-based apps.

home-assistant-swift-sdk This open-source library allows you to interact with a Home Assistant instance in your Swift-based (e.g., iOS, macOS, etc.) a

Alexander Golden 0 Dec 31, 2021
Contentful.swift : Swift Delivery SDK for Contentful

contentful.swift - Swift Delivery SDK for Contentful Swift SDK for the Contentfu

An Tran 1 Jan 6, 2022
Swift-ndi - Swift wrapper around NewTek's NDI SDK

swift-ndi Swift wrapper around NewTek's NDI SDK. Make sure you extracted latest

Alessio Nossa 12 Dec 29, 2022
Matrix-rust-components-swift - Swift package providing components from the matrix-rust-sdk

Swift package for Matrix Rust components This repository is a Swift Package for

matrix.org 10 Nov 4, 2022
👤 Framework to Generate Random Users - An Unofficial Swift SDK for randomuser.me

RandomUserSwift is an easy to use Swift framework that provides the ability to generate random users and their accompanying data for your Swift applic

Wilson Ding 95 Sep 9, 2022
Apphud SDK is a lightweight open-source Swift library to manage auto-renewable subscriptions and other in-app purchases in your iOS app.

Apphud SDK Apphud SDK is a lightweight open-source Swift library to manage auto-renewable subscriptions and other in-app purchases in your iOS app. No

Apphud 143 Dec 16, 2022
Poly is an unofficial Google Poly SDK, written in Swift

?? Unofficial Google Poly SDK in Swift – search, discover, and download 3D models and scenes

patrick piemonte 85 Dec 27, 2022