A Go client implementing a client-side distributed consumer group client for Amazon Kinesis.

Kinesumer

Run tests Release

Kinesumer is a Go client implementing a client-side distributed consumer group client for Amazon Kinesis. It supports following features:

  • Implement the client-side distributed Kinesis consumer group client.
  • A client can consume messages from multiple Kinesis streams.
  • Clients are automatically assigned a shard id range for each stream.
  • Rebalance each shard id range when clients or upstream shards are changed. (by restart or scaling issues)
  • Manage the checkpoint for each shard, so that clients can continue to consume from the last checkpoints.
  • Able to consume from the Kinesis stream in a different AWS account.
  • Manage all the consumer group client states with a DynamoDB table. (we call this table as state store.)

architecture

Setup

Kinesumer manages the state of the distributed clients with a database, called "state store". It uses the DynamoDB as the state store, so you need to create a DynamoDB table first. Create a table with LSI schema. See the details in here.

Current state store implementation supports multiple applications (you will pass the app name when initialize the client). So, if you already have a kinesumer state store, you don't need to create another state store table.

If your Kinesis stream is in different account

If you want to connect to Kinesis in a different account, you need to set up the IAM role to access to the target account, and pass the role arn (kinesumer.Config.RoleARN) when initialze the Kinesumer client: Reference.

Usage

package main

import (
    "fmt"
    "time"

    "github.com/daangn/kinesumer"
)

func main() {
    client, err := kinesumer.NewKinesumer(
        &kinesumer.Config{
            App:            "myapp",
            KinesisRegion:  "ap-northeast-2",
            DynamoDBRegion: "ap-northeast-2",
            DynamoDBTable:  "kinesumer-state-store",
            ScanLimit:      1500,
            ScanTimeout:    2 * time.Second,
        },
    )
    if err != nil {
        // Error handling.
    }

    go func() {
        for err := range client.Errors() {
            // Error handling.
        }
    }()

    // Consume multiple streams.
    // You can refresh the streams with `client.Refresh()` method.
    records, err := client.Consume([]string{"stream1", "stream2"})
    if err != nil {
        // Error handling.
    }

    for record := range records {
        fmt.Printf("record: %v\n", record)
    }
}

How it works

Kinesumer implements the client-side distributed consumer group client without any communications between clients. Then, how do clients know the state of an entire system? The answer is the distributed key-value store.

To evenly distribute the shard range among clients, the Kinesumer relies on a centralized database, called state store. State store manages the states of the distributed clients, shard cache, and checkpoints.

This is the overview architecture of Kinesumer:

how-it-works

Following explains how the Kinesumer works:

  • Leader election: Clients register themselves to the state store and set their indexes. The index is determined by sorting all active client ids. And, a client who has zero index will be a leader. So, when clients are scaled or restarted, the leader could be changed.
  • Shard rebalancing: A client will fetch the full shard id list and client list from the state store. Then, divide the shard id list by the number of clients and assign a range of shard id corresponding to their index. All clients will repeat this process periodically.
  • Synchronization: The leader client is responsible to sync the shard cache with the latest shard list, and pruning the outdated client list (to prevent the orphan shard range) periodically.
  • Offset checkpoint: Whenever a client consumes messages from its assigned shards, it updates a per-shard checkpoint with the sequence number of the last message read from each shard.

License

See LICENSE.

Owner
당근마켓
당신의 근처에서 만나는 중고 직거래 앱, 당근마켓을 만들고 있습니다
당근마켓
Comments
  • feat: add manual commit feature to kinesumer

    feat: add manual commit feature to kinesumer

    Manual commit feature to kinesumer

    There are things to consider,

    1. exception handling: current write error to error channel. is logging better?
    2. performance: wrap updating state store with goroutine?
    3. accept context: add parameter to accept context? (for example, custom configuring timeout)
    4. enforcing latest sequence number: enforcing latest sequence number means if sequenceNumber paramter lower than current sequence number, doesn't update sequence number. I don't think so it is good.

    Thank you for Any feedback & Reviews

  • feat(option): make layer interface to implement logic state store

    feat(option): make layer interface to implement logic state store

    hello @dlsrb6342, please kindly review my pr.

    make layer interface to implement logic state store to support implement store with another store (mongodb, redis, etc)

    thank you

  • docs: need to document consuming errors.

    docs: need to document consuming errors.

    if not handle errors using Errors(), maybe stop consuming forever.

    if first time error happens when consuming, then send errors to kinesumer errors. If not read from errors channel, channel blocked after second time error happen

    Need Documentation for Errors()

  • fix: update checkpoint when cancel subscription on efo mode

    fix: update checkpoint when cancel subscription on efo mode

    If canceled in the middle of time interval, not update check point.

    I want to add test case. but it is hard to add test case for efo mode. because with localstack kinesis, subscription not work correctly.

    If you care about test case in this case, tell me a advice

  • Drain the remaining messages from CLOSED shards

    Drain the remaining messages from CLOSED shards

    Current implementation ignores the shards in the CLOSED state immediately, but it can cause missing the messages that remained in the previous (parent) shards.

  • fix: hanging if error channel is full (#11)

    fix: hanging if error channel is full (#11)

    Solved:issue-11

    Kinesumer occurs deadlock when errors don't be consumed properly.

    So possibility of deadlock should be clarified in Errors function doc.

  • I have a question about PK of DynamoDB

    I have a question about PK of DynamoDB

    Nice project. I have a question. I think it's better to use UUID(like event id or client id) of PK for DynamoDB FYI, https://aws.amazon.com/blogs/database/choosing-the-right-dynamodb-partition-key/

    example, CKP#TESTCLIENT#EVENTS#ID#12345667 CLIENT#TESTCLIENT#ID#123456

    What do you think ?

    Screen Shot 2021-10-01 at 12 46 14 AM
A simple api built in Go that facilitates directly sending email from your client side html to your inbox

go-email-service A simple api built in Go that facilitates directly sending emai

Dec 28, 2021
Go Client Library for Amazon Product Advertising API

go-amazon-product-advertising-api Go Client Library for Amazon Product Advertising API How to Use go get -u github.com/ngs/go-amazon-product-advertisi

Sep 27, 2022
WhatsAppExpenseTracker - Way to track expenses using whatsapp group

WhatsAppExpenseTracker Way to track expenses using whatsapp group One needs to c

Jan 4, 2022
lambda-go-api-proxy makes it easy to port APIs written with Go frameworks such as Gin to AWS Lambda and Amazon API Gateway.

aws-lambda-go-api-proxy makes it easy to run Golang APIs written with frameworks such as Gin with AWS Lambda and Amazon API Gateway.

Jan 6, 2023
Simple program that uploads large files to Amazon S3 over slow connections.

shrimp is a small program that can reliably upload large files to Amazon S3. My personal use case is to upload large files to S3 over a slow residenti

Nov 30, 2022
A demonstration of the transactional outbox messaging pattern (+ Log Trailing) with Amazon DynamoDB (+ Streams) written in Go.
A demonstration of the transactional outbox messaging pattern (+ Log Trailing) with Amazon DynamoDB (+ Streams) written in Go.

Transactional Outbox Pattern in Amazon DynamoDB A demonstration of the transactional outbox messaging pattern (+ Log Trailing) with Amazon DynamoDB (+

Apr 12, 2022
CoreDNS plugin implementing K8s multi-cluster services DNS spec.

multicluster Name multicluster - implementation of Multicluster DNS Description This plugin implements the Kubernetes DNS-Based Multicluster Service D

Dec 3, 2022
The Fabric Token SDK is a set of API and services that lets developers create token-based distributed application on Hyperledger Fabric.

The Fabric Token SDK is a set of API and services that let developers create token-based distributed application on Hyperledger Fabric.

Dec 14, 2022
Clusterpedia-client - clusterpedia-client supports the use of native client-go mode to call the clusterpedia API

clusterpedia-client supports the use of native client-go mode to call the cluste

Jan 7, 2022
Client-go - Clusterpedia-client supports the use of native client-go mode to call the clusterpedia API

clusterpedia-client supports the use of native client-go mode to call the cluste

Dec 5, 2022
Client for the cloud-iso-client

cloud-iso-client Client for the cloud-iso-client. Register an API token Before using this client library, you need to register an API token under your

Dec 6, 2021
Go-http-client: An enhanced http client for Golang
Go-http-client: An enhanced http client for Golang

go-http-client An enhanced http client for Golang Documentation on go.dev ?? This package provides you a http client package for your http requests. Y

Jan 7, 2023
Nutanix-client-go - Go client for the Nutanix Prism V3 API

nutanix-client-go This repository contains portions of the Nutanix API client code in nutanix/terraform-provider-nutanix. It has been extracted to red

Jan 6, 2022
Aoe4-client - Client library for aoe4 leaderboards etc

AOE4 Client Overview This is a go client used to query AOE4 data from either the

Jan 18, 2022
Balabola-go-client - GO client for Yandex balabola service

Balabola GO Client GO client for Yandex balabola service Yandex warning The neur

Jan 29, 2022
Client-server-golang-sqs - Client Server with SQS and golang

Client Server with SQS and golang Multi-threaded client-server demo with Go What

Feb 14, 2022
A Go client library for the Twitter 1.1 API

Anaconda Anaconda is a simple, transparent Go package for accessing version 1.1 of the Twitter API. Successful API queries return native Go structs th

Jan 1, 2023
Go(lang) client library for Cachet (open source status page system).

cachet Go(lang) client library for Cachet (open source status page system). Features Full API support Components Incidents Metrics Subscribers Various

Sep 27, 2022
Go client library for interacting with Coinpaprika's API

Coinpaprika API Go Client Usage This library provides convenient way to use coinpaprika.com API in Go. Coinpaprika delivers full market data to the wo

Dec 8, 2022