Go-bqstreamer - Stream data into Google BigQuery concurrently using InsertAll()

Kik and me (@oryband) are no longer maintaining this repository. Thanks for all the contributions. You are welcome to fork and continue development.

BigQuery Streamer BigQuery GoDoc

Stream insert data into BigQuery fast and concurrently, using InsertAll().

Features

  • Insert rows from multiple tables, datasets, and projects, and insert them bulk. No need to manage data structures and sort rows by tables - bqstreamer does it for you.
  • Multiple background workers (i.e. goroutines) to enqueue and insert rows.
  • Insert can be done in a blocking or in the background (asynchronously).
  • Perform insert operations in predefined set sizes, according to BigQuery's quota policy.
  • Handle and retry BigQuery server errors.
  • Backoff interval between failed insert operations.
  • Error reporting.
  • Production ready, and thoroughly tested. We - at Rounds (now acquired by Kik) - are using it in our data gathering workflow.
  • Thorough testing and documentation for great good!

Getting Started

  1. Install Go, version should be at least 1.5.
  2. Clone this repository and download dependencies:
  3. Version v2: go get gopkg.in/kikinteractive/go-bqstreamer.v2
  4. Version v1: go get gopkg.in/kikinteractive/go-bqstreamer.v1
  5. Acquire Google OAuth2/JWT credentials, so you can authenticate with BigQuery.

How Does It Work?

There are two types of inserters you can use:

  1. SyncWorker, which is a single blocking (synchronous) worker.
  2. It enqueues rows and performs insert operations in a blocking manner.
  3. AsyncWorkerGroup, which employes multiple background SyncWorkers.
  4. The AsyncWorkerGroup enqueues rows, and its background workers pull and insert in a fan-out model.
  5. An insert operation is executed according to row amount or time thresholds for each background worker.
  6. Errors are reported to an error channel for processing by the user.
  7. This provides a higher insert throughput for larger scale scenarios.

Examples

Check the GoDoc examples section.

Contribute

  1. Please check the issues page.
  2. File new bugs and ask for improvements.
  3. Pull requests welcome!

Test

# Run unit tests and check coverage.
$ make test

# Run integration tests.
# This requires an active project, dataset and pem key.
$ export BQSTREAMER_PROJECT=my-project
$ export BQSTREAMER_DATASET=my-dataset
$ export BQSTREAMER_TABLE=my-table
$ export BQSTREAMER_KEY=my-key.json
$ make testintegration
Comments
  • Fix test cases

    Fix test cases

    This pull request fixes the execution of Travis tests which failed for two distinct reasons:

    1. Recently, a NullFields field was added to the TableDataInsertAllRequestRows struct of google-api-go-client (see https://github.com/google/google-api-go-client/commit/1ba8c3c8628c255599cf16f4dbbbb79499f4e307 ). This pull request updates all instantiations of TableDataInsertAllRequestRows in test cases to match the new format.

    2. grpc, which is a dependency of this project, no longer support Go 1.5 (see https://github.com/grpc/grpc-go/pull/1132) hence making all Travis builds with Go 1.5.x fail. This pull request removes those targets.

    This pull request also adds Go 1.8 to Travis build matrix since it seems to be supported.

  • Question: AsyncWorkerGroup.Close()

    Question: AsyncWorkerGroup.Close()

    Hello,

    I was wondering how this function makes sure that all remaining messages are flushed. When I send a batch of messages via workerGroup.Enqueue(), and then close the workerGroup, it seems that some messages do not get sent. When the main thread sleeps for a while, the messages do get sent. Could it be that because of the use of a buffered channel for rowChan, all workers stop while it can still be that some messages are buffered in rowChan?

    See https://github.com/rounds/go-bqstreamer/blob/master/async_worker_group.go#L76 and https://github.com/rounds/go-bqstreamer/blob/master/async_worker.go#L47

  • Fix async worker insert at max delay expiration

    Fix async worker insert at max delay expiration

    The async worker is supposed to insert all enqueued rows if maxDelay time has elapsed since the last insertion and fewer than maxRows have been enqueued in the meanwhile.

    However, the maxDelay timer is reset each time a new row is enqueued, rather than each time a batch of rows is inserted, causing to the insertion at maxDelay expiration to never occur.

    As an example, if max delay is set to 10 sec. and maxRows is set to 500 and a row is enqueued every 9 sec., then insertions will always occur each 500*9 sec. rather than each 10 sec. as expected.

    This pull requests fixes this issue.

    Travis tests currently fail because of the reasons I explained in https://github.com/kikinteractive/go-bqstreamer/pull/24.

  • Getting queue length

    Getting queue length

    At the moment, rows is not exported, so I can't access it to run len() on. I don't mind implementing it myself, but would you prefer it implemented as an interface, or to export rows?

  • Consider closing error channel as signal to stop

    Consider closing error channel as signal to stop

    Thanks for the library guys! Found it tonight and its just about exactly what I'm looking for, though I did run into a couple issues while trying to use it that I though I'd ask about addressing before getting too much deeper.

    The first is that it doesn't seem possible to know -- after calling stop() -- that the queue is flushed. This is because the errors channel is never closed, so I assume that clients are just supposed to wait some reasonable amount of time after calling stop?

    I also wonder whether there's value in providing the two different interfaces (streamer and multi-streamer). It seems like all you really need is 1 "producer", and N "consumers", and the single streamer is just the N=1 case? I ran into this because I was trying to send to a streamer from multiple goroutines, but that requires me to do my own synchonization around streamer. I finally saw the note that you "probably shouldn't use streamer", but that got me wondering whether both of these issues could be addressed by just changing up the interface to always take a "numWorkers" param, and let the client close the requests channel as a stop-signal, then just iterating over the errors channel to know that the queue is drained.

    Not sure if that makes any sense (it's past my bedtime and my brain is fried!) but I've tried to lay it out in code here: http://play.golang.org/p/qBJ5nofG39

    Though I obviously understand if you think that deviates too far from your intended use-cases -- just thought I'd float the idea. Thanks anyway -- this looks very useful!

  • complete refactor of entire package

    complete refactor of entire package

    • easier to grasp (and use) types:
      • add SyncWorker type, which provides blocking streaming. this is instead of Streamer type
      • asyncWorker, which wraps SyncWorker and provides asynchronous streaming
      • AsyncWorkerGroup, which manages multiple asyncWorkers. this is isntead of MultiStreamer type
    • add insert error type heirarchy. this provides a way to know when an insert operation has occurred, how many retries have been made, and whether the insert has succeeded or not, fixes #8
    • enqueuing rows now support passing insert ids, fixes #7, fixes #11
    • add support for SkipInvalidRows and IgnoreUnknownValues: https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll#request-body
    • switch to use functional options when passing arguments to New() constructor functions
    • add option to automatically retry/not retry (with backoff) when executing insert operations
    • update, add examples
  • tests use testify;

    tests use testify; "stop" channel uses empty struct;

    • until now unit tests used the built-in "testing" package, testify makes tests much more concise. less code for great good!
    • "stop" channel used to pass a meaningless boolean, no it passes an empty struct{}
    • fixed some readme typos
  • How to know when all inserts are done

    How to know when all inserts are done

    Hi, first of all, amazing library.

    I have been working with it for a few days and I have seen that MultiStreamer.Stop is not synchronous, leading to the possibility that rows are not imported when the program finishes.

    So, is there a way to know when all rows have been inserted? Right now I have a dirty patch to make Stop sync but I'd like to know if there is an official way to do this or is planned to be implemented.

    Cheers!

  • Use default application credentials if no JSON config is available.

    Use default application credentials if no JSON config is available.

    This is quite handy if you are running on GCE or GAE.. When the JWT config is nil it will search for credentials on the machine. https://developers.google.com/identity/protocols/application-default-credentials

mtail - extract internal monitoring data from application logs for collection into a timeseries database
 mtail - extract internal monitoring data from application logs for collection into a timeseries database

mtail - extract internal monitoring data from application logs for collection into a timeseries database mtail is a tool for extracting metrics from a

Dec 29, 2022
Package zaperations provides a Google Cloud operations suite (formerly Stackdriver) compatible config for the uber-go/zap logger.

Package zaperations provides a Google Cloud Operations (formerly Stackdriver) compatible config for the excellent uber-go/zap logger. Example This exa

Nov 6, 2021
Plan team's rotation on google calendar

google-rotation-planner Plan rota on google calendar. Usage Get a credentials.json file from GCloud and place it in the same folder as the executable.

Jul 4, 2022
A hack example to servce file as a webserver with a Google Cloud Functions

Overview This project is a hack example to servce file as a webserver with a Google Cloud Functions (normally single purpose, only one entry point). I

Nov 26, 2021
Implements a deep pretty printer for Go data structures to aid in debugging

go-spew Go-spew implements a deep pretty printer for Go data structures to aid in debugging. A comprehensive suite of tests with 100% test coverage is

Jan 9, 2023
A flexible process data collection, metrics, monitoring, instrumentation, and tracing client library for Go
A flexible process data collection, metrics, monitoring, instrumentation, and tracing client library for Go

Package monkit is a flexible code instrumenting and data collection library. See documentation at https://godoc.org/gopkg.in/spacemonkeygo/monkit.v3 S

Dec 14, 2022
The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.

The open-source platform for monitoring and observability. Grafana allows you to query, visualize, alert on and understand your metrics no matter wher

Jan 3, 2023
Open source framework for processing, monitoring, and alerting on time series data

Kapacitor Open source framework for processing, monitoring, and alerting on time series data Installation Kapacitor has two binaries: kapacitor – a CL

Dec 26, 2022
Golang beautify data display for Humans

Golang beautify data display for Humans English 简体中文 Usage Examples package main import ( ffmt "gopkg.in/ffmt.v1" ) func main() { example() } typ

Dec 22, 2022
pprof is a tool for visualization and analysis of profiling data

Introduction pprof is a tool for visualization and analysis of profiling data. pprof reads a collection of profiling samples in profile.proto format a

Jan 8, 2023
Litter is a pretty printer library for Go data structures to aid in debugging and testing.

Litter Litter is a pretty printer library for Go data structures to aid in debugging and testing. Litter is provided by Sanity: The Headless CMS Const

Dec 28, 2022
Very simple charts with some debug data for Go programs
Very simple charts with some debug data for Go programs

debugcharts Go memory debug charts. This package uses Plotly chart library. It is open source and free for use. Installation go get -v -u github.com/m

Dec 14, 2022
Visualise Go program GC trace data in real time

This project is no longer maintained I'm sorry but I do not have the bandwidth to maintain this tool. Please do not send issues or PRs. Thank you. gcv

Dec 14, 2022
Parametrized JSON logging library in Golang which lets you obfuscate sensitive data and marshal any kind of content.
Parametrized JSON logging library in Golang which lets you obfuscate sensitive data and marshal any kind of content.

Noodlog Summary Noodlog is a Golang JSON parametrized and highly configurable logging library. It allows you to: print go structs as JSON messages; pr

Oct 27, 2022
Interfaces for LZ77-based data compression

Pack Interfaces for LZ77-based data compression. Introduction Many compression libraries have two main parts: Something that looks for repeated sequen

Oct 19, 2021
Go Huobi Market Price Data Monitor
Go Huobi Market Price Data Monitor

火币(Huobi)价格监控 由于部分交易对火币官方未提供价格监控,因此写了个小程序,长期屯币党可以用它来提醒各种现货价格。 该工具只需要提前安装Go环境和Redis即可。 消息推送使用的「钉钉」,需要提前配置好钉钉机器人(企业群类型、带webhook的机器人)。 使用方法 下载本项目 拷贝根目录下

Oct 13, 2022
Beta tool to normalize Orbit member data

Orbit Normalize Member Data Thanks for checking out my handy tool to work with Orbit's api. Everything is written in go and will continue to be update

Sep 16, 2021
Secure logger in Go to avoid output sensitive data in log
Secure logger in Go to avoid output sensitive data in log

zlog A main distinct feature of zlog is secure logging that avoid to output secret/sensitive values to log. The feature reduce risk to store secret va

Dec 6, 2022
Implements a deep pretty printer for Go data structures to aid in debugging

spew Spew implements a deep pretty printer for Go data structures to aid in debugging. A comprehensive suite of tests with 100% test coverage is provi

Dec 25, 2022