Stream data into Google BigQuery concurrently using InsertAll()

Kik and me (@oryband) are no longer maintaining this repository. Thanks for all the contributions. You are welcome to fork and continue development.

BigQuery Streamer BigQuery GoDoc

Stream insert data into BigQuery fast and concurrently, using InsertAll().

Features

  • Insert rows from multiple tables, datasets, and projects, and insert them bulk. No need to manage data structures and sort rows by tables - bqstreamer does it for you.
  • Multiple background workers (i.e. goroutines) to enqueue and insert rows.
  • Insert can be done in a blocking or in the background (asynchronously).
  • Perform insert operations in predefined set sizes, according to BigQuery's quota policy.
  • Handle and retry BigQuery server errors.
  • Backoff interval between failed insert operations.
  • Error reporting.
  • Production ready, and thoroughly tested. We - at Rounds (now acquired by Kik) - are using it in our data gathering workflow.
  • Thorough testing and documentation for great good!

Getting Started

  1. Install Go, version should be at least 1.5.
  2. Clone this repository and download dependencies:
  3. Version v2: go get gopkg.in/kikinteractive/go-bqstreamer.v2
  4. Version v1: go get gopkg.in/kikinteractive/go-bqstreamer.v1
  5. Acquire Google OAuth2/JWT credentials, so you can authenticate with BigQuery.

How Does It Work?

There are two types of inserters you can use:

  1. SyncWorker, which is a single blocking (synchronous) worker.
  2. It enqueues rows and performs insert operations in a blocking manner.
  3. AsyncWorkerGroup, which employes multiple background SyncWorkers.
  4. The AsyncWorkerGroup enqueues rows, and its background workers pull and insert in a fan-out model.
  5. An insert operation is executed according to row amount or time thresholds for each background worker.
  6. Errors are reported to an error channel for processing by the user.
  7. This provides a higher insert throughput for larger scale scenarios.

Examples

Check the GoDoc examples section.

Contribute

  1. Please check the issues page.
  2. File new bugs and ask for improvements.
  3. Pull requests welcome!

Test

# Run unit tests and check coverage.
$ make test

# Run integration tests.
# This requires an active project, dataset and pem key.
$ export BQSTREAMER_PROJECT=my-project
$ export BQSTREAMER_DATASET=my-dataset
$ export BQSTREAMER_TABLE=my-table
$ export BQSTREAMER_KEY=my-key.json
$ make testintegration
Comments
  • Fix test cases

    Fix test cases

    This pull request fixes the execution of Travis tests which failed for two distinct reasons:

    1. Recently, a NullFields field was added to the TableDataInsertAllRequestRows struct of google-api-go-client (see https://github.com/google/google-api-go-client/commit/1ba8c3c8628c255599cf16f4dbbbb79499f4e307 ). This pull request updates all instantiations of TableDataInsertAllRequestRows in test cases to match the new format.

    2. grpc, which is a dependency of this project, no longer support Go 1.5 (see https://github.com/grpc/grpc-go/pull/1132) hence making all Travis builds with Go 1.5.x fail. This pull request removes those targets.

    This pull request also adds Go 1.8 to Travis build matrix since it seems to be supported.

  • Question: AsyncWorkerGroup.Close()

    Question: AsyncWorkerGroup.Close()

    Hello,

    I was wondering how this function makes sure that all remaining messages are flushed. When I send a batch of messages via workerGroup.Enqueue(), and then close the workerGroup, it seems that some messages do not get sent. When the main thread sleeps for a while, the messages do get sent. Could it be that because of the use of a buffered channel for rowChan, all workers stop while it can still be that some messages are buffered in rowChan?

    See https://github.com/rounds/go-bqstreamer/blob/master/async_worker_group.go#L76 and https://github.com/rounds/go-bqstreamer/blob/master/async_worker.go#L47

  • Fix async worker insert at max delay expiration

    Fix async worker insert at max delay expiration

    The async worker is supposed to insert all enqueued rows if maxDelay time has elapsed since the last insertion and fewer than maxRows have been enqueued in the meanwhile.

    However, the maxDelay timer is reset each time a new row is enqueued, rather than each time a batch of rows is inserted, causing to the insertion at maxDelay expiration to never occur.

    As an example, if max delay is set to 10 sec. and maxRows is set to 500 and a row is enqueued every 9 sec., then insertions will always occur each 500*9 sec. rather than each 10 sec. as expected.

    This pull requests fixes this issue.

    Travis tests currently fail because of the reasons I explained in https://github.com/kikinteractive/go-bqstreamer/pull/24.

  • Getting queue length

    Getting queue length

    At the moment, rows is not exported, so I can't access it to run len() on. I don't mind implementing it myself, but would you prefer it implemented as an interface, or to export rows?

  • Consider closing error channel as signal to stop

    Consider closing error channel as signal to stop

    Thanks for the library guys! Found it tonight and its just about exactly what I'm looking for, though I did run into a couple issues while trying to use it that I though I'd ask about addressing before getting too much deeper.

    The first is that it doesn't seem possible to know -- after calling stop() -- that the queue is flushed. This is because the errors channel is never closed, so I assume that clients are just supposed to wait some reasonable amount of time after calling stop?

    I also wonder whether there's value in providing the two different interfaces (streamer and multi-streamer). It seems like all you really need is 1 "producer", and N "consumers", and the single streamer is just the N=1 case? I ran into this because I was trying to send to a streamer from multiple goroutines, but that requires me to do my own synchonization around streamer. I finally saw the note that you "probably shouldn't use streamer", but that got me wondering whether both of these issues could be addressed by just changing up the interface to always take a "numWorkers" param, and let the client close the requests channel as a stop-signal, then just iterating over the errors channel to know that the queue is drained.

    Not sure if that makes any sense (it's past my bedtime and my brain is fried!) but I've tried to lay it out in code here: http://play.golang.org/p/qBJ5nofG39

    Though I obviously understand if you think that deviates too far from your intended use-cases -- just thought I'd float the idea. Thanks anyway -- this looks very useful!

  • complete refactor of entire package

    complete refactor of entire package

    • easier to grasp (and use) types:
      • add SyncWorker type, which provides blocking streaming. this is instead of Streamer type
      • asyncWorker, which wraps SyncWorker and provides asynchronous streaming
      • AsyncWorkerGroup, which manages multiple asyncWorkers. this is isntead of MultiStreamer type
    • add insert error type heirarchy. this provides a way to know when an insert operation has occurred, how many retries have been made, and whether the insert has succeeded or not, fixes #8
    • enqueuing rows now support passing insert ids, fixes #7, fixes #11
    • add support for SkipInvalidRows and IgnoreUnknownValues: https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll#request-body
    • switch to use functional options when passing arguments to New() constructor functions
    • add option to automatically retry/not retry (with backoff) when executing insert operations
    • update, add examples
  • tests use testify;

    tests use testify; "stop" channel uses empty struct;

    • until now unit tests used the built-in "testing" package, testify makes tests much more concise. less code for great good!
    • "stop" channel used to pass a meaningless boolean, no it passes an empty struct{}
    • fixed some readme typos
  • How to know when all inserts are done

    How to know when all inserts are done

    Hi, first of all, amazing library.

    I have been working with it for a few days and I have seen that MultiStreamer.Stop is not synchronous, leading to the possibility that rows are not imported when the program finishes.

    So, is there a way to know when all rows have been inserted? Right now I have a dirty patch to make Stop sync but I'd like to know if there is an official way to do this or is planned to be implemented.

    Cheers!

  • Use default application credentials if no JSON config is available.

    Use default application credentials if no JSON config is available.

    This is quite handy if you are running on GCE or GAE.. When the JWT config is nil it will search for credentials on the machine. https://developers.google.com/identity/protocols/application-default-credentials

Stream new blocks to various services (redis/elasticsearch/...)

AlgoNode algostreamer utility About algostreamer Small utility to stream past and/or current Algorand node JSON blocks to Redis or stdout. About AlgoN

Apr 25, 2022
Google Go Client and Connectors for Redis

Go-Redis Go Clients and Connectors for Redis. The initial release provides the interface and implementation supporting the (~) full set of current Red

Oct 25, 2022
Data access layer for PostgreSQL, CockroachDB, MySQL, SQLite and MongoDB with ORM-like features.
Data access layer for PostgreSQL, CockroachDB, MySQL, SQLite and MongoDB with ORM-like features.

upper/db is a productive data access layer (DAL) for Go that provides agnostic tools to work with different data sources

Jan 3, 2023
A golang tool to view Redis data in terminal
A golang tool to view Redis data in terminal

Redis Viewer A tool to view Redis data in terminal. Usage: KeyBoard Description ctrl+c exit redis viewer ↑ previous key ↓ next key ← previous page → n

Dec 26, 2022
Microsoft ActiveX Object DataBase driver for go that using exp/sql

go-adodb Microsoft ADODB driver conforming to the built-in database/sql interface Installation This package can be installed with the go get command:

Dec 30, 2022
Oracle driver for Go using database/sql

go-oci8 Description Golang Oracle database driver conforming to the Go database/sql interface Installation Install Oracle full client or Instant Clien

Dec 30, 2022
sqlite3 driver for go using database/sql

go-sqlite3 Latest stable version is v1.14 or later not v2. NOTE: The increase to v2 was an accident. There were no major changes or features. Descript

Jan 8, 2023
Go driver for PostgreSQL over SSH. This driver can connect to postgres on a server via SSH using the local ssh-agent, password, or private-key.

pqssh Go driver for PostgreSQL over SSH. This driver can connect to postgres on a server via SSH using the local ssh-agent, password, or private-key.

Nov 6, 2022
Uptrace - Distributed tracing backend using OpenTelemetry and ClickHouse
Uptrace - Distributed tracing backend using OpenTelemetry and ClickHouse

Distributed tracing backend using OpenTelemetry and ClickHouse Uptrace is a dist

Mar 8, 2022
Books-rest api - Simple CRUD Rest API architecture using postgresql db with standard Library

books-rest_api Simple CRUD Rest API architecture using postgresql db with standa

Feb 8, 2022
Stream data into Google BigQuery concurrently using InsertAll()

Kik and me (@oryband) are no longer maintaining this repository. Thanks for all the contributions. You are welcome to fork and continue development. B

Dec 7, 2022
Go-bqstreamer - Stream data into Google BigQuery concurrently using InsertAll()

Kik and me (@oryband) are no longer maintaining this repository. Thanks for all the contributions. You are welcome to fork and continue development. B

Dec 7, 2022
A serverless bot which periodically checks configured BigQuery capacity commitments, reservations and assignments against actual slot consumption of running jobs and reports findings to Slack/Google Chat.
A serverless bot which periodically checks configured BigQuery capacity commitments, reservations and assignments against actual slot consumption of running jobs and reports findings to Slack/Google Chat.

Solution Guide This solution implements a ChatOps-like approach to monitoring slot utilization of Google Cloud BigQuery reservations. As an alternativ

Dec 7, 2022
GC2 is a Command and Control application that allows an attacker to execute commands on the target machine using Google Sheet and exfiltrate data using Google Drive.
GC2 is a Command and Control application that allows an attacker to execute commands on the target machine using Google Sheet and exfiltrate data using Google Drive.

GC2 GC2 (Google Command and Control) is a Command and Control application that allows an attacker to execute commands on the target machine using Goog

Dec 13, 2022
Data Connector is a Google Sheets Add-on that lets you import (and export) data to/from Google Sheets

Data Connector Data Connector is a Google Sheets Add-on that lets you import (and export) data to/from Google Sheets. Our roadmap: Connect to JSON/XML

Jul 30, 2022
Tiny Go tool for running multiple functions concurrently and collecting their results into an error slice.

Overview Short for "ConCurrent". Tiny Go tool for running multiple functions concurrently and collecting their results into an error slice. Dependency

Nov 22, 2021
Go Stream, like Java 8 Stream.

Go Stream, like Java 8 Stream.

Dec 1, 2022
Go Collection Stream API, inspired in Java 8 Stream.

GoStream gostream 是一个数据流式处理库。它可以声明式地对数据进行转换、过滤、排序、分组、收集,而无需关心操作细节。 Changelog 2021-11-18 add ToSet() collector Roadmap 移除go-linq依赖 Get GoStream go get

Nov 21, 2022
Using golang to produce data to kinesis data stream

Using golang to produce data to kinesis data stream What is this The idea behind this repo was to quickly determine how easy it would be to add a serv

Dec 22, 2022
Reads MAWS formatted data and converts it into JSON output stream.

maws2json Usage examples Over serial line (stdin pipe) Lets assume that Vaisala weather station is connected via RS232 to USB serial dongle in /dev/tt

Feb 6, 2022