Swift SDK for Apache Kafka

Overview

Kitura

APIDoc Build Status - Master macOS Linux Apache 2 Slack Status

SwiftKafka

A swift implementation of Kafka for producing and consuming from event streams.

This works by wrapping the librdkafka C library.

Swift version

The latest version of SwiftKafka requires Swift 5.0 or later. You can download this version of the Swift binaries by following this link.

Usage

Swift Package Manager

Add dependencies

Add the SwiftKafka package to the dependencies within your application’s Package.swift file. Substitute "x.x.x" with the latest SwiftKafka release.

.package(url: "https://github.com/IBM-Swift/SwiftKafka.git", from: "x.x.x")

Add SwiftKafka to your target's dependencies:

.target(name: "example", dependencies: ["SwiftKafka"]),

Import package

import SwiftKafka

Getting Started

To use SwiftKafka you will need to install the librdkafka package:

macOS

brew install librdkafka

Linux

Install librdkafka from the Confluent APT repositories - see instructions here (following steps 1 and 2 to add the Confluent package signing key and apt repository), and then install librdkafka:

sudo apt install librdkafka

Running a Kafka instance locally

To experiment locally, you can set up your own Kafka server to produce/consume from.

On macOS you can follow this guide on Kafka Installation using Homebrew to run a local server.

On Linux, you can follow this guide for a manual install on Ubuntu.

KafkaConfig

The KafkaConfig class contains your configuration settings for a KafkaConsumer/KafkaProducer.

The class is initialized with default values which can then be changed using the helper functions. For example, to enable all logging you would set the debug variable:

let config = KafkaConfig()
config.debug = [.all]

Alternatively, you can access the configuration dictionary directly on the KafkaConfig object:

let config = KafkaConfig()
config["debug"] = "all"

The list of configuration keys and descriptions can be found in the librdkafka CONFIGURATION.md.

When you pass this class to a producer/consumer, a copy is made so further changes to the instance will not affect existing configurations.

KafkaProducer:

The KafkaProducer class produces messages to a Kafka server.

You can initialize a KafkaProducer using a KafkaConfig instance or with the default configuration.

The producer sends a KafkaProducerRecord with the following fields:

  • topic: The topic where the record will be sent. If this topic doesn't exist the producer will try to create it.
  • value: The message body that will be sent with the record.
  • partition: The topic partition the record will be sent to. If this is not set the partition will be automatically assigned.
  • key: If the partition is not set, records with the same key will be sent to the same partition. Since order is guaranteed within a partition, these records will be read in order they were produced.

The send() function is asynchronous. The result is returned in a callback which contains a KafkaConsumerRecord on success or a KafkaError on failure.

The following example produces a message with the value "Hello World" to a "test" topic of a Kafka server running on localhost.

do {
    let producer = try KafkaProducer()
    guard producer.connect(brokers: "localhost:9092") == 1 else {
        throw KafkaError(rawValue: 8)
    }
    producer.send(producerRecord: KafkaProducerRecord(topic: "test", value: "Hello world", key: "Key")) { result in
        switch result {
        case .success(let message):
            print("Message at offset \(message.offset) successfully sent")
        case .failure(let error):
            print("Error producing: \(error)")
        }
    }
} catch {
    print("Error creating producer: \(error)")
}

KafkaConsumer:

The KafkaConsumer class consumes messages from a Kafka server.

You can initialize a KafkaConsumer using a KafkaConfig instance or with the default configuration.

You can then subscribe to topics using subscribe(). This will distribute the topic partitions evenly between consumers with the same group id. If you do not set a group id, a random UUID will be used.

Alternatively to can use assign() to manually set the partition and offset for the consumer.

Both subscribe() and assign() are asynchronous and will return immediately, however they may take up to sessionTimeoutMs (Default 10 seconds) * 2 before the consumer completely connects.

To consume messages from Kafka you call poll(timeout:). This will poll Kafka, blocking for timeout seconds. When it completes, it returns an array of KafkaConsumerRecord with the following fields:

  • value: The message value if it can be UTF8 decoded to a String.
  • valueData: The message value as raw data.
  • key: The message key if it can be utf8 decoded to a String.
  • keyData: The message key as raw data.
  • offset: The message offset.
  • topic: The topic that the message was consumed from.
  • partition: The partition that the message was consumed from.

When you have finished consuming, you can call close() to close the connection and unassigns the consumer. The unassigned partitions will then be rebalanced between other consumers in the group. If close() is not called, the consumer will be closed when the class is deallocated.

The following example consumes and print all unread messages from the "test" topic of the Kafka server.

do {
    let config = KafkaConfig()
    config.groupId = "Kitura"
    config.autoOffsetReset = .beginning
    let consumer = try KafkaConsumer(config: config)
    guard consumer.connect(brokers: "localhost:9092") == 1 else {
        throw KafkaError(rawValue: 8)
    }
    try consumer.subscribe(topics: ["test"])
    while(true) {
        let records = try consumer.poll()
        print(records)
    }
} catch {
    print("Error creating consumer: \(error)")
}

API Documentation

For more information visit our API reference.

Community

We love to talk server-side Swift, and Kitura. Join our Slack to meet the team!

License

This library is licensed under Apache 2.0. Full license text is available in LICENSE.

Comments
  • Xcode 12.2 fails to link rdkafka

    Xcode 12.2 fails to link rdkafka

    Attempting to run SwiftKafkaTests on macOS target is resulting in a build error:

    ld: warning: Could not find or use auto-linked library 'rdkafka'
    
    ...(all the librdkafka C API calls listed here)...
    
    ld: symbol(s) not found for architecture x86_64
    clang: error: linker command failed with exit code 1 (use -v to see invocation)
    

    What's interesting is that it seems Xcode's package loading is failing to find rdkafka even before I attempt to compile showing this warning message:

    You may be able to install rdkafka using your system package manager: brew install librdkafka
    

    This is despite the fact that I'm absolutely positive librdkafka is installed.

    rami@machine: ~ % brew info librdkafka
    librdkafka: stable 1.5.2 (bottled), HEAD
    Apache Kafka C/C++ library
    https://github.com/edenhill/librdkafka
    /usr/local/Cellar/librdkafka/1.5.2 (36 files, 4MB) *
      Poured from bottle on 2020-11-24 at 21:53:28
    From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/librdkafka.rb
    License: BSD-2-Clause
    ==> Dependencies
    Build: pkg-config ✔, [email protected] ✔
    Required: lz4 ✔, lzlib ✔, [email protected] ✔, zstd ✔
    ==> Options
    --HEAD
    	Install HEAD version
    ==> Analytics
    install: 9,191 (30 days), 19,972 (90 days), 71,599 (365 days)
    install-on-request: 5,667 (30 days), 11,336 (90 days), 35,858 (365 days)
    build-error: 0 (30 days)
    

    Config details

    macOS Big Sur 11.0.1 (20B29) Xcode 12.2 (12B45b) + Command Line Tools 12B45b Swift version 5.3.1 clang version 12.0.0

    Homebrew 2.5.11 Homebrew/homebrew-core (git revision 699c9; last commit 2020-11-25) Homebrew/homebrew-cask (git revision 18071; last commit 2020-11-25) librdkafka stable 1.5.2

    opened by ramikay 8
  • Compression / Buffer issue

    Compression / Buffer issue

    Hi, I've been trying to use SwiftKafka for a project of mine, but run into an issue when sending messages of around 1.5KB. Instead of the JSON I try to send, it sends some malformed data like this:

    2t_id":"16315612"??????
    

    followed by the rest of the JSON in the correct format. Because of this the JSON becomes invalid and unusable.

    I've tried multiple config options to increase buffer sizes to allow for bigger messages, but so far nothing has seemed to work. The options I've tried is setting

    config.messageMaxBytes = 1000000000
    config.socketAendBufferBytes = 1000000000
    

    If I do this however, the message is still malformed and I get the following error in my console:

    Failed to set socket send buffer size to 10000000: No buffer space available
    

    Any and all help is much appreciated!

    opened by MrLotU 2
  • Add locking around dictionary as computed property

    Add locking around dictionary as computed property

    KafkaProducer.kafkaHandleToMessageCallback is a dictionary that is accessed by multiple threads. The code in KafkaProducer used a semaphore to prevent concurrent access, but this was missed in accesses outside of that class (such as in KafkaConfig.setDeliveredMessageCallback()).

    This replaces the semaphore with a computed property and NSLock approach similar to https://github.com/IBM-Swift/LoggerAPI/pull/47 .

    opened by djones6 1
  • Add Travis CI

    Add Travis CI

    Adds CI testing against a local Kafka install.

    This was complicated by the fact that we typically test our Linux Kitura builds under Docker, but it is not recommended to install all of the required Kafka components into a single Docker image. I tried Confluent's dockerized images (using their provided 'all-in-one' docker-compose script) but, while this worked locally for me, it seems to fail in Travis. To get it working, I've removed the Docker step entirely, so Linux will run on the Travis xenial environment.

    For the benefit of anyone trying to get that working in the future, I've left the code for it in setupKafka.sh and left the sequence of test commits in this PR.

    opened by djones6 0
  • ci: Add build script for travis

    ci: Add build script for travis

    This pull request changed the .travis file to the standard Kitura one. We also add the terminal commands to start a Zookeeper and Kafka service which is required for the tests.

    opened by Andrew-Lees11 0
  • Config properties not reflecting in rdkafka

    Config properties not reflecting in rdkafka

    I have added the following config property in my Swift file.

    config["bootstrap.servers"] = "localhost:9092"
    

    I tested the property using breakpoint, and it is successfully reflected.

    (lldb) po config["bootstrap.servers"]
    ▿ Optional<String>
      - some : "localhost:9092"
    

    However, I still get the following warning and producer messages do not pass through.

    %5|1667541464.556|CONFWARN|rdkafka#producer-1| [thrd:app]: No `bootstrap.servers` configured: client will not be able to connect to Kafka cluster
    
    opened by aneee004 0
  • Using SwiftKafka for watchOS

    Using SwiftKafka for watchOS

    I am developing a watchOS app, that uses data coming from a Kafkastream and I want to use this library for this purpose, but it keeps ignoring my local "librdkafka" library.

    Detailed error description:

    • Warnings:
    1. ignoring file /usr/local/Cellar/librdkafka/1.3.0/lib/librdkafka.dylib, building for watchOS-arm64_32 but attempting to link with file built for macOS-x86_64
    • Errors: Undefined symbol: _rd_kafka_produce Undefined symbol: _rd_kafka_poll Undefined symbol: _rd_kafka_conf_dump Undefined symbol: _rd_kafka_topic_name Undefined symbol: _rd_kafka_consumer_close Undefined symbol: _rd_kafka_consumer_poll Undefined symbol: _rd_kafka_flush Undefined symbol: _rd_kafka_brokers_add Undefined symbol: _rd_kafka_commit Undefined symbol: _rd_kafka_topic_partition_list_new Undefined symbol: _rd_kafka_topic_partition_list_destroy Undefined symbol: _rd_kafka_message_destroy Undefined symbol: _rd_kafka_topic_new Undefined symbol: _rd_kafka_topic_partition_list_add Undefined symbol: _rd_kafka_subscribe Undefined symbol: _rd_kafka_conf_set_dr_msg_cb Undefined symbol: _rd_kafka_conf_set Undefined symbol: _rd_kafka_conf_dup Undefined symbol: _rd_kafka_conf_new Undefined symbol: _rd_kafka_conf_destroy Undefined symbol: _rd_kafka_conf_dump_free Undefined symbol: _rd_kafka_poll_set_consumer Undefined symbol: _rd_kafka_topic_destroy Undefined symbol: _rd_kafka_assign Undefined symbol: _rd_kafka_new Undefined symbol: _rd_kafka_destroy

    What I did:

    1. Installed librdkafka
    • brew install librdkafka
    1. Created app
    • Created a new watchOS standalone app in Xcode
    • Added a new SwiftPackage with the built-in SPM from Xcode using the URL from your GitHub SwiftKafka project
    • Added SwiftKafka to "Link Binary With Libraries" in my WatchKit Extension
    1. Build the App -> Build failed with above errors
    opened by MFabi23 2
Releases(0.0.1)
Owner
Kitura
Kitura - Server Side framework written in Swift
Kitura
Alter SDK is a cross-platform SDK consisting of a real-time 3D avatar system, facial motion capture, and an Avatar Designer component built from scratch for web3 interoperability and the open metaverse.

Alter SDK is a cross-platform SDK consisting of a real-time 3D avatar system, facial motion capture, and an Avatar Designer component built from scratch for web3 interoperability and the open metaverse.

Alter 45 Nov 29, 2022
RadioTimeKit - The Swift SDK for TuneIn RadioTimeKit is a Swift package to use the TuneIn API.

RadioTimeKit - The Swift SDK for TuneIn RadioTimeKit is a Swift package to use the TuneIn API. The goal for development was to have a Swift SDK to get

Frank Gregor 2 Jun 20, 2022
This is swift project example to connect VNPTSmartCA SDK using Swift Language.

Example source code to integrate with VNPTSmartCA iOS SDK To run the example project, clone repository, and run pod install Requirements Installation

null 1 Feb 14, 2022
Home-assistant-swift-sdk - Used to integrate the Home Assistant APIs with your Swift-based apps.

home-assistant-swift-sdk This open-source library allows you to interact with a Home Assistant instance in your Swift-based (e.g., iOS, macOS, etc.) a

Alexander Golden 0 Dec 31, 2021
Official Appwrite Swift SDK 🦅🍎

Appwrite Swift SDK This SDK is compatible with Appwrite server version 0.11.x. For older versions, please check previous releases. This is the Swift S

Appwrite 27 Dec 25, 2022
Swift SDK for Blockfrost.io API

Swift5 API client for Blockfrost Swift 5 SDK for Blockfrost.io API. Installation • Usage • API Endpoints Installation Swift package manager dependenci

blockfrost.io 10 Dec 24, 2022
WalletConnect Swift SDK v2

Wallet Connect v.2 - Swift Swift implementation of WalletConnect v.2 protocol for native iOS applications. Requirements iOS 13 XCode 13 Swift 5 Usage

WalletConnect Labs 16 Mar 30, 2022
MpesaSDK - Swift SDK for the M-Pesa API (Mozambique)

M-Pesa SDK Swift package for M-Pesa API (Mozambique) Ready Methods/APIs C2B B2B

Algy Ali 16 Jul 29, 2022
MbientLab 2 Feb 5, 2022
Federal Data SDK built in the Swift programming language. Follow the link for the documentation:

Swift-Federal-Data-SDK Federal Data SDK built in the Swift programming language Until the Swift language becomes more stable, consider this a beta rel

null 65 May 28, 2022
A simple to use iOS/tvOS/watchOS SDK to help get you off the ground quickly and efficiently with your Elastic Path Commerce Cloud written in Swift.

Elastic Path Commerce Cloud iOS Swift SDK A simple to use iOS/tvOS/watchOS SDK to help get you off the ground quickly and efficiently with your Elasti

Moltin 36 Aug 1, 2022
Sample project with local swift package linked NDI SDK.

NDISwiftPackage Sample project with local swift package linked NDI SDK. Preparation Install NDA SDK on your mac. Software Developer Kit Make package c

Naruki Chigira 4 Dec 20, 2022
Stacksift App SDK

Stacksift SDK Capture and submit crashes to Stacksift. This library ties together Wells and Impact to provide a full crash capturing and submission sy

Stacksift 44 Aug 18, 2022
TelegramStickersImport — Telegram stickers importing SDK for iOS

TelegramStickersImport — Telegram stickers importing SDK for iOS TelegramStickersImport helps your users import third-party programaticaly created sti

null 35 Oct 26, 2022
Official Appwrite SDK for Apple Devices 🍎

Appwrite Apple SDK This SDK is compatible with Appwrite server version 0.11.x. For older versions, please check previous releases. Appwrite is an open

Appwrite 55 Jan 2, 2023
Muxer used on top of Feed iOS SDK for airplay

FeedAirplayMuxer Muxer used on top of Feed iOS SDK for airplay purposes. Demo Project --> https://github.com/feedfm/AirplayDemo Feed Airplay Muxer is

Feed Media 0 May 6, 2022
Basispay IOS SDK Version 2

BasisPay-IOS-KIT BasisPay IOS Payment Gateway kit for developers INTRODUCTION This document describes the steps for integrating Basispay online paymen

null 0 Oct 21, 2021
The Gini Bank SDK provides components for capturing, reviewing and analyzing photos of invoices and remittance slips.

Gini Bank SDK for iOS The Gini Bank SDK provides components for capturing, reviewing and analyzing photos of invoices and remittance slips. By integra

Gini GmbH 0 Dec 16, 2021
Release repo for Gini Bank SDK for iOS

Gini Bank SDK for iOS The Gini Bank SDK provides components for capturing, reviewing and analyzing photos of invoices and remittance slips. By integra

Gini GmbH 1 Dec 6, 2022