Data syncing in golang for ClickHouse.

ClickHouse Data Synchromesh

Data syncing in golang for ClickHouse.

based on go-zero

ARCH

A typical data warehouse architecture

avatar

design of data sync

Automatically synchronizing data from MySQL/MongoDB data source to ClickHouse cluster in real time.

同步drawio

quick start

git clone https://github.com/tal-tech/cds.git
cd cds
make up

demo by docker

data model in clickhouse

CDS中ClickHouse使用的建表方案

help

提问的智慧

如何有效的报告bug


if you like this project and want to support it,please star 🤝

Owner
好未来技术
好未来开源
好未来技术
Comments
  • 是否会考虑支持ReplicatedCollapsingMergeTree?

    是否会考虑支持ReplicatedCollapsingMergeTree?

    开发组成员们,你们好。 在研究本项目的时候产生了一个疑问,希望能得到解答。 背景:在Galaxy生成的Clichouse DDL中,为每个表建立了”ck_is_delete“列,复制表的引擎选用:ReplicatedMergeTree。 问题:这里为什么没有选用ReplicatedCollapsingMergeTree作为复制表的引擎,使用”sign“代替”ck_is_delete“?

    望解答

  • make up mongo error

    make up mongo error

    mongodb test_mongo.example inserted 10000 lines Traceback (most recent call last): File "/tmp/init_db.py", line 74, in init_mongo() File "/tmp/init_db.py", line 69, in init_mongo collection.insert_many(result) File "/usr/local/lib/python3.10/site-packages/pymongo/collection.py", line 615, in insert_many blk.execute(write_concern, session=session) File "/usr/local/lib/python3.10/site-packages/pymongo/bulk.py", line 459, in execute return self.execute_command(generator, write_concern, session) File "/usr/local/lib/python3.10/site-packages/pymongo/bulk.py", line 351, in execute_command with client._tmp_session(session) as s: File "/usr/local/lib/python3.10/contextlib.py", line 135, in enter return next(self.gen) File "/usr/local/lib/python3.10/site-packages/pymongo/mongo_client.py", line 1656, in _tmp_session s = self._ensure_session(session) File "/usr/local/lib/python3.10/site-packages/pymongo/mongo_client.py", line 1643, in _ensure_session return self.__start_session(True, causal_consistency=False) File "/usr/local/lib/python3.10/site-packages/pymongo/mongo_client.py", line 1594, in __start_session server_session = self._get_server_session() File "/usr/local/lib/python3.10/site-packages/pymongo/mongo_client.py", line 1629, in _get_server_session return self._topology.get_server_session() File "/usr/local/lib/python3.10/site-packages/pymongo/topology.py", line 534, in get_server_session session_timeout = self._check_session_support() File "/usr/local/lib/python3.10/site-packages/pymongo/topology.py", line 520, in _check_session_support self._select_servers_loop( File "/usr/local/lib/python3.10/site-packages/pymongo/topology.py", line 223, in _select_servers_loop raise ServerSelectionTimeoutError( pymongo.errors.ServerSelectionTimeoutError: mongo1:30001: [Errno -2] Name or service not known,mongo2:30002: [Errno -2] Name or service not known,mongo3:30003: [Errno -2] Name or service not known, Timeout: 30s, Topology Description: <TopologyDescription id: 624a83e9fd437d56bbaaf7a3, topology_type: ReplicaSetNoPrimary, servers: [<ServerDescription ('mongo1', 30001) server_type: Unknown, rtt: None, error=AutoReconnect('mongo1:30001: [Errno -2] Name or service not known')>, <ServerDescription ('mongo2', 30002) server_type: Unknown, rtt: None, error=AutoReconnect('mongo2:30002: [Errno -2] Name or service not known')>, <ServerDescription ('mongo3', 30003) server_type: Unknown, rtt: None, error=AutoReconnect('mongo3:30003: [Errno -2] Name or service not known')>]> make: *** [init] Error 1

  • make up时候运行错误

    make up时候运行错误

    Traceback (most recent call last): File "/tmp/init_db.py", line 74, in init_mongo() File "/tmp/init_db.py", line 69, in init_mongo collection.insert_many(result) File "/usr/local/lib/python3.9/site-packages/pymongo/collection.py", line 761, in insert_many blk.execute(write_concern, session=session) File "/usr/local/lib/python3.9/site-packages/pymongo/bulk.py", line 528, in execute return self.execute_command(generator, write_concern, session) File "/usr/local/lib/python3.9/site-packages/pymongo/bulk.py", line 359, in execute_command client._retry_with_session( File "/usr/local/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1384, in _retry_with_session return self._retry_internal(retryable, func, session, bulk) File "/usr/local/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1414, in _retry_internal raise last_error File "/usr/local/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1416, in _retry_internal return func(session, sock_info, retryable) File "/usr/local/lib/python3.9/site-packages/pymongo/bulk.py", line 353, in retryable_bulk self._execute_command( File "/usr/local/lib/python3.9/site-packages/pymongo/bulk.py", line 309, in _execute_command result, to_send = bwc.execute(ops, client) File "/usr/local/lib/python3.9/site-packages/pymongo/message.py", line 907, in execute result = self.write_command(request_id, msg, to_send) File "/usr/local/lib/python3.9/site-packages/pymongo/message.py", line 999, in write_command reply = self.sock_info.write_command(request_id, msg) File "/usr/local/lib/python3.9/site-packages/pymongo/pool.py", line 771, in write_command helpers._check_command_response(result, self.max_wire_version) File "/usr/local/lib/python3.9/site-packages/pymongo/helpers.py", line 151, in _check_command_response raise NotMasterError(errmsg, response) pymongo.errors.NotMasterError: operation was interrupted, full error: {'errorLabels': ['RetryableWriteError'], 'topologyVersion': {'processId': ObjectId('6071d9eecc09c5721c8f196f'), 'counter': 6}, 'operationTime': Timestamp(1618074848, 500), 'ok': 0.0, 'errmsg': 'operation was interrupted', 'code': 11602, 'codeName': 'InterruptedDueToReplStateChange', '$clusterTime': {'clusterTime': Timestamp(1618074848, 500), 'signature': {'hash': b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 'keyId': 0}}} make: *** [init] Error 1

  • null字段添加Nullable配置,否则同步mysql回报nil异常;增加bit类型go处理

    null字段添加Nullable配置,否则同步mysql回报nil异常;增加bit类型go处理

    在同步mysql数据时候,发现当字段可以为Null,并且数据是Null的时候,dm 数据同步报nil异常的异常

    {"@timestamp":"2021-01-04T17:03:44.268+08","level":"error","content":"mysqltypeconv.go:59 sql: Scan error on column index 4, name "Default": converting NULL to string is unsupported"}

    通过如下代码可以解决

  • ERROR-全量同步过程中出现错误

    ERROR-全量同步过程中出现错误

    环境: centos 7 | clickhouse ClickHouse server version 20.12.5 | cds 最新版(建议添加一个版本号, commitid: 28c51c5)

    现像: 在配置好全量同步,执行同步过程出现如下错误:

    {"@timestamp":"2021-01-04T17:03:44.268+08","level":"error","content":"mysqltypeconv.go:59 sql: Scan error on column index 4, name \"Default\":
     converting NULL to string is unsupported"}
    

    可能原因: 建表语句指定了字段类型, 导致NULL转换失败

    问题:

    * 建表语句是否可支持 Nullable
    * 全库同步需要选中所有表 ?
    
Baker is a high performance, composable and extendable data-processing pipeline for the big data era

Baker is a high performance, composable and extendable data-processing pipeline for the big data era. It shines at converting, processing, extracting or storing records (structured data), applying whatever transformation between input and output through easy-to-write filters.

Dec 14, 2022
Dud is a lightweight tool for versioning data alongside source code and building data pipelines.

Dud Website | Install | Getting Started | Source Code Dud is a lightweight tool for versioning data alongside source code and building data pipelines.

Jan 1, 2023
CUE is an open source data constraint language which aims to simplify tasks involving defining and using data.

CUE is an open source data constraint language which aims to simplify tasks involving defining and using data.

Jan 1, 2023
xyr is a very lightweight, simple and powerful data ETL platform that helps you to query available data sources using SQL.

xyr [WIP] xyr is a very lightweight, simple and powerful data ETL platform that helps you to query available data sources using SQL. Supported Drivers

Dec 2, 2022
DEPRECATED: Data collection and processing made easy.

This project is deprecated. Please see this email for more details. Heka Data Acquisition and Processing Made Easy Heka is a tool for collecting and c

Nov 30, 2022
Open source framework for processing, monitoring, and alerting on time series data

Kapacitor Open source framework for processing, monitoring, and alerting on time series data Installation Kapacitor has two binaries: kapacitor – a CL

Dec 24, 2022
A library for performing data pipeline / ETL tasks in Go.
A library for performing data pipeline / ETL tasks in Go.

Ratchet A library for performing data pipeline / ETL tasks in Go. The Go programming language's simplicity, execution speed, and concurrency support m

Jan 19, 2022
A distributed, fault-tolerant pipeline for observability data

Table of Contents What Is Veneur? Use Case See Also Status Features Vendor And Backend Agnostic Modern Metrics Format (Or Others!) Global Aggregation

Dec 25, 2022
Kanzi is a modern, modular, expendable and efficient lossless data compressor implemented in Go.

kanzi Kanzi is a modern, modular, expendable and efficient lossless data compressor implemented in Go. modern: state-of-the-art algorithms are impleme

Dec 22, 2022
sq is a command line tool that provides jq-style access to structured data sources such as SQL databases, or document formats like CSV or Excel.

sq: swiss-army knife for data sq is a command line tool that provides jq-style access to structured data sources such as SQL databases, or document fo

Jan 1, 2023
Machine is a library for creating data workflows.
Machine is a library for creating data workflows.

Machine is a library for creating data workflows. These workflows can be either very concise or quite complex, even allowing for cycles for flows that need retry or self healing mechanisms.

Dec 26, 2022
churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline applications.

Churro - ETL for Kubernetes churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline appli

Mar 10, 2022
Stream data into Google BigQuery concurrently using InsertAll() or BQ Storage.

bqwriter A Go package to write data into Google BigQuery concurrently with a high throughput. By default the InsertAll() API is used (REST API under t

Dec 16, 2022
Dev Lake is the one-stop solution that integrates, analyzes, and visualizes software development data
Dev Lake is the one-stop solution that integrates, analyzes, and visualizes software development data

Dev Lake is the one-stop solution that integrates, analyzes, and visualizes software development data throughout the software development life cycle (SDLC) for engineering teams.

Dec 30, 2022
Several functional programming supporting in golang

A golang library that makes operations on slice easilier What can I do? slice process Map Filter Sort Reverse map process Keys Values output (starting

Jun 27, 2022
Convert struct, slice, array, map or others for Golang

XConv zh-CN XConv is a golang type convertor. It convert any value between types (base type, struct, array, slice, map, etc.) Features Convert between

Dec 8, 2022
indodate is a plugin for golang programming language for date convertion on indonesian format

indodate is a package for golang programming language for date conversion on indonesian format

Oct 23, 2021
Basic Crud operation api's in golang

Basic Crud operation api's in golang

Nov 9, 2021
ClickHouse Operator creates, configures and manages ClickHouse clusters running on Kubernetes

ClickHouse Operator ClickHouse Operator creates, configures and manages ClickHouse clusters running on Kubernetes. Features The ClickHouse Operator fo

Dec 29, 2022
Go-clickhouse - ClickHouse client for Go

ClickHouse client for Go 1.18+ This client uses native protocol to communicate w

Jan 9, 2023