A lightweight, cloud-native data transfer agent and aggregator

Loggie is a lightweight, high-performance, cloud-native agent and aggregator based on Golang. It supports multiple pipeline and pluggable components:

  • One stack logging solution: supports data transfer, filtering, parsing, alarm, etc
  • Cloud native: native Kubernetes CRD usage
  • Production level: a full range of observability, automatic operation and reliability capabilities




  • 容器内日志如何采集


    需要pod的 yaml 需要做特定的 empty 或者 hostpath 的前提才可以采集到吗? 下面是一个我们的demo yaml

            - mountPath: /home/admin/logs
              subPath: logs
        - name:01606d0d6354456aa546dfbb36d8a764-datadir
            claimName: >-
    logconfig elasticsearch报错

    {"level":"warn","time":"2022-03-21T17:26:04Z","caller":"/go/src/loggie.io/loggie/pkg/interceptor/retry/interceptor.go:175","message":"interceptor/retry retry buffer size(2) too large"} {"level":"error","time":"2022-03-21T17:26:04Z","caller":"/go/src/loggie.io/loggie/pkg/pipeline/pipeline.go:267","message":"consumer batch fail,err: elasticsearch client not initialized yet"}


    goccy/go-yaml deal with blank string uncorrectly which is introduced by #242. And I met problem with parsing default containerd log.


    package main
    import (
    	errYaml "github.com/goccy/go-yaml"
    	okYaml "gopkg.in/yaml.v2"
    func main() {
    	v := struct {
    		Key string
    		Key: " ",
    	d1, _ := okYaml.Marshal(v)
    	fmt.Printf("%s\n%s\n", "YES", string(d1))
    	d2, _ := errYaml.Marshal(v)
    	fmt.Printf("%s\n%s\n", "NO", string(d2))


    key: ' '

    In what area(s)?

    /area interceptor

    What version of Loggie?


    Expected Behavior

    Load interceptor successfully

    Actual Behavior

    Got warnning log. get processor error: Key: 'SplitConfig.Separator' Error:Field validation for 'Separator' failed on the 'required' tag.

    Steps to Reproduce the Problem

    1. Config CRD Interceptor
        - type: normalize
            - split:
                separator: ' '
                max: 4
                keys: [ "time", "stream", "F", "message" ]
            - drop:
                targets: [ "F", "body" ]
            - rename:
                  - from: "message"
                    to: "body"
            - underRoot:
                  - kubernetes
    1. kubectl delete pod and kubectl logs pod
    Proposed Changes:

    • add zinc sink

    Which issue(s) this PR fixes:

    Fixes #

    Additional documentation:

    ZincSearch is the simplest and easiest search system to get up and running. It's an open source easy-to-use search engine to solves your observability needs.

      - name: local
          - type: file
            name: demo
              - /tmp/log/*.log
              topic: "loggie"
          type: zinc
          host: ""
          username: admin
          password: Complexpass#123
          index: "demo"
            pretty: false
    Proposed Changes:

    • 增加支持pulsar sink

    Which issue(s) this PR fixes:

    Fixes #199

    Additional documentation:


    使用sink kafka将日志数据发送至下游Pulsar。

    !!! example

      type: pulsar
      url: pulsar://localhost:6650
      topic: persistent://tenant/namespace/topic


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | url| string | 必填 | 无 | 日志发送端pulsar连接地址 |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | topic | string | 必填 | 无 | 发送日志至pulsar的topic |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | operation_timeout_seconds| time.Duration| 非必填 | 30s | Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the operation will be marked as failed |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | connectionTimeout| time.Duration| 非必填 | 5s | Timeout for the establishment of a TCP connection |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | sendTimeout| time.Duration| 非必填 | 30s | SendTimeout set the timeout for a message that is not acknowledged by the server 30s |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | sendTimeout| time.Duration| 非必填 | 无 | MaxPendingMessages specifies the max size of the queue holding the messages pending to receive an acknowledgment from the broker |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | hashingSchema| int| 非必填 | 0 |HashingScheme is used to define the partition on where to publish a particular message. 0:JavaStringHash,1:Murmur3_32Hash |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | hashingSchema| int| 非必填 | 0 |HashingScheme is used to define the partition on where to publish a particular message. 0:JavaStringHash,1:Murmur3_32Hash |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | compressionType | int| 非必填 | 0 | 0:NoCompression,1:LZ4,2:ZLIB,3:ZSTD |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | LogLevel| string | 非必填 | 0 | 日志级别: "info","debug", "error" |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | batchingMaxSize| int | 非必填 | 2048(KB) | BatchingMaxSize specifies the maximum number of bytes permitted in a batch |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | batchingMaxMessages| int | 非必填 | 1000 |BatchingMaxMessages specifies the maximum number of messages permitted in a batch |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | batchingMaxPublishDelay| time.Duration | 非必填 | 10ms | BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | useTLS| bool | 非必填 | false | 是否使用TLS认证 |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | tlsTrustCertsFilePath| string | 非必填 | 无 | the path to the trusted TLS certificate file |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | tlsAllowInsecureConnection| bool| 非必填 | false | Configure whether the Pulsar client accept untrusted TLS certificate from broker |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | certificatePath| string | 非必填 | 无 | TLS证书路径 |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | privateKeyPath| string | 非必填 | 无 | TLS privateKey路径 |


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | token | string| 非必填 | 无 | 如果使用token认证鉴权pulsar,请填写此项|


    | 字段 | 类型 | 是否必填 | 默认值 | 含义 | | ---------- | ----------- | ----------- | --------- | -------- | | tokenFilePath| string| 非必填 | 无 | auth token from a file|

    环境:kubernetes1.21 + docker logconfig apiVersion: loggie.io/v1beta1 kind: LogConfig metadata: name: nginx namespace: default spec: pipeline: interceptorsRef: nginx-interce sinkRef: nginx-sink sources: | - type: file name: mylog containerName: nginx fields: topic: "nginx-access" matchFields: labelKey: [app] paths: - stdout selector: labelSelector: app: nginx type: pod Interceptor apiVersion: loggie.io/v1beta1 kind: Interceptor metadata: name: nginx-interce spec: interceptors: | - type: normalize name: stdproc belongTo: ["mylog"] processors: - jsonDecode: target: body - drop: targets: ["stream", "time", "body"] - rename: convert: - from: "log" to: "message" sink apiVersion: loggie.io/v1beta1 kind: Sink metadata: name: nginx-sink spec: sink: | type: dev printEvents: true

    部署完成请求nginx,查看loggie pod日志为: { "fields": { "namespace": "default", "nodename": "", "podname": "nginx-6799fc88d8-td4sc", "containername": "nginx", "logconfig": "nginx", "topic": "nginx-access" }, "body": "{\"log\":\" - - [21/Mar/2022:14:47:44 +0000] \\\"GET / HTTP/1.1\\\" 200 615 \\\"-\\\" \\\"curl/7.29.0\\\" \\\"-\\\"\\n\",\"stream\":\"stdout\",\"time\":\"2022-03-21T14:47:44.246358969Z\"}" } 日志中log字段没有被替换,且stream、time字段未被删除

  • loggie无法写入kafka报错


    2022-08-08 19:52:01 ERR pkg/pipeline/pipeline.go:341 > consumer batch failed: write to kafka: kafka write errors (2048/2048)

    Ask your question here:


    Ask your question here:

    版本:loggie-v1.3.0-rc.0 采用inclusterconfig 读取配置文件


    apiVersion: loggie.io/v1beta1 kind: Interceptor metadata: name: jsondecode spec: interceptors: | - type: normalize name: json processors: - jsonDecode: ~ - drop: targets: ["body"]

    apiVersion: loggie.io/v1beta1 kind: ClusterLogConfig metadata: name: kubeevent spec: selector: type: cluster cluster: aggregator pipeline: sources: | - type: kubeEvent name: event interceptorRef: jsondecode sinkRef: k8sbgy-kube-eventer


    What version of Loggie?


    Expected Behavior

    1、正常采集日志且按照指定正则将多行合并输出到Elasticsearch索引中。 2、loggie后端无报错日志。

    Actual Behavior

    1、Elasticsearch中没有生成指定索引。 2、loggie后端日志报错: 2022-09-03 17:03:28 INF pkg/eventbus/export/logger/logger.go:141 > [metric]: {"filesource":{},"queue":{"public/catalog-channel":{"capacity":2048,"fillPercentage":0,"pipeline":"public/catalog","queueType":"channel","size":0}},"reload":{"ReloadTotal":6},"sink":{}} 2022-09-03 17:03:39 ERR pkg/pipeline/pipeline.go:353 > consumer batch failed: send events to elasticsearch: request to elasticsearch response error: {"errors":true,"items":[{"index":{"_index":"--2022-09","type":"doc","status":400,"error":{"type":"invalid_index_name_exception","reason":"Invalid index name [--2022-09], must not start with '', '-', or '+'","index":"--2022-09"}}}]} 2022-09-03 17:03:39 ERR pkg/pipeline/pipeline.go:353 > consumer batch failed: send events to elasticsearch: request to elasticsearch response error: {"errors":true,"items":[{"index":{"_index":"--2022-09","type":"doc","status":400,"error":{"type":"invalid_index_name_exception","reason":"Invalid index name [--2022-09], must not start with '', '-', or '+'","index":"--2022-09"}}}]} 2022-09-03 17:03:45 WRN pkg/interceptor/retry/interceptor.go:191 > interceptor/retry retry buffer size(2) too large 2022-09-03 17:03:45 INF pkg/interceptor/retry/interceptor.go:214 > next retry duration: 690ms 2022-09-03 17:03:45 ERR pkg/pipeline/pipeline.go:353 > consumer batch failed: send events to elasticsearch: request to elasticsearch response error: {"errors":true,"items":[{"index":{"_index":"--2022-09","type":"doc","status":400,"error":{"type":"invalid_index_name_exception","reason":"Invalid index name [--2022-09], must not start with '', '-', or '+'","index":"--2022-09"}}}]}

    Steps to Reproduce the Problem

    1、部署loggie时,增加discovery.kubernetes.typePodFields: typePodFields: logconfig: "${_k8s.logconfig}" namespace: "${_k8s.pod.namespace}" workloadkind: "${_k8s.workload.kind}" workloadname: "${_k8s.workload.name}" nodename: "${_k8s.node.name}" nodeip: "${_k8s.node.ip}" poduid: "${_k8s.pod.uid}" podname: "${_k8s.pod.name}" podip: "${_k8s.pod.ip}" containerid: "${_k8s.pod.container.id}" containername: "${_k8s.pod.container.name}" containerimage: "${_k8s.pod.container.image}" 2、创建LogConfig,source配置多行采集,sink配置动态索引名称: apiVersion: loggie.io/v1beta1 kind: LogConfig metadata: name: catalog namespace: public spec: selector: type: pod labelSelector: app: catalog release: catalog pipeline: sources: | - type: file name: logfile addonMeta: true paths: - /catalog/logs/*.log multi: active: true pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}" sink: | type: elasticsearch hosts: [""] index: "${fields.workloadname}-${fields.namespace}-${+YYYY-MM}" etype: "doc" codec: type: json beatsFormat: true

    What version of Loggie?


    Expected Behavior

    某 logconfig 匹配的 label 为 workload.user.cattle.io/workloadselector=statefulSet-game-s6-game-core-v4-cluster


    之前日志可以正常采集,但是用户有过pod 重建 pod 重建后,因为原pod 路径不存在,能看到 loggie中的异常日志


    在loggie 容器里 /var/log/pods 路径能看到新的pod,但是新的pod 日志也无法采集

    describe logconfig 发现 events 为none:


    但是 label Selector 是没有问题的,之前日志可以正常采集,也没有动过,get pod 也可以匹配到新的pod:


    Actual Behavior

    新pod 日志无法采集

    Steps to Reproduce the Problem

    interceptors: | - type: rateLimit qps: 100000 - type: transformer actions: - action: copy(body, res) - action: regex(res) pattern: '(?

    我把配置改成只保留 - action: copy(body, res)



    发现copy 后的res字段被base64编码了,请问这是一个BUG还是我配置的问题?

  • 容器内日志采集,是否有相关示例文档?




    • k8s: 1.24.3
    • containerd: 1.6.4
    • loggie版本: v1.4.0-rc.0


    apiVersion: loggie.io/v1beta1
    kind: Sink
      name: dev
      sink: |
        type: dev
        printEvents: true
          type: raw


    apiVersion: loggie.io/v1beta1
    kind: Interceptor
      name: default
      interceptors: |
        - type: rateLimit
          qps: 90000




    apiVersion: loggie.io/v1beta1
    kind: LogConfig
      name: game
        type: pod
          app: game
        sources: |
          - type: file
            name: game_dayreport
              _log_type_: dayreport
              - /app/game/logs/dayreport/dayreport_*.log     
        sinkRef: dev
        interceptorRef: default
    Proposed Changes:

    • move persistence from /source/file to a single package
    • move db config to loggie.yml
    • replace sqlite with badger

    Which issue(s) this PR fixes:

    Fixes #

    Additional documentation:

    Proposed Changes:

    • journal source (specific image with systemd needed)

    Which issue(s) this PR fixes:

    Fixes #

    Additional documentation:

