We are using the fleet-of-m3-coordinators-and-m3-aggregators topology to aggregate metrics before sending it to downstream grafana remote storage
https://m3db.io/docs/how_to/any_remote_storage/#fleet-of-m3-coordinators-and-m3-aggregators
Apps ---> otel collector Prometheus Receiver -> M3 Coordinator --> M3 Aggregators --> M3 Coordinator (Aggregated metrics )-> Prometheus Remote Write to Grafana
We are observing a sudden surge in M3 Aggregation Errors of the type "too far in past" after around ~30 hours of traffic. (1.5 full day of prod traffic). I am not sure if its just caused by load since 1) the cluster looks very stable for the first full day of prod traffic 2) the increase in these errors are not gradual. It really is a sudden increase and that causes the M3 Coordinators to finally OOM and crash.
The surge in the above errors are accompanied by an increase in the ingest latency metric
Topology Details:
25 Otel Collector Pods -> 25 M3 Coordinator Nodes -> 24 M3 Agg Nodes - Shards 512 - RF 2
M3 Agg End to End details dashboard snapshot.pdf
M3 Coordinator Dashboard snapshot.pdf
I suspect something might be wrong with my configs
M3 Agg Log example:
{"level":"error","ts":1672883758.452722,"msg":"could not process message","error":"datapoint for aggregation too far in past: off_by=28m19.550704118s, timestamp=2023-01-05T01:22:23Z, past_limit=2023-01-05T01:50: 43Z, timestamp_unix_nanos=1672881743902000000, past_limit_unix_nanos=1672883443452704118","errorCauses":[{"error":"datapoint for aggregation too far in past: off_by=28m19.550704118s, timestamp=2023-01-05T01:22: 23Z, past_limit=2023-01-05T01:50:43Z, timestamp_unix_nanos=1672881743902000000, past_limit_unix_nanos=1672883443452704118"}],"shard":854,"proto":"type:TIMED_METRIC_WITH_METADATAS timed_metric_with_metadatas: <metric:<type:GAUGE id:\"u'\\n\\000\\010\\000__name__\\032\\000varz_disconnect_end_bucket\\n\\000__rollup__\\004\\000true\\007\\000appname\\n\\000uber- voice\\013\\000client_type\\007\\000dialpad\\007\\000country\\002\\000mx\\010\\000instance\\014\\0000.0.0.0:8080\\003\\000job\\r\\000log-processor\\002\\000le\\005\\0000. 025\\006\\000module\\006\\000answer\\007\\000version\\n\\0002212-02-49\" time_nanos:1672881743902000000 > metadatas:<metadatas:<cutover_nanos:1672771058627771735 metadata:<pipelines:<aggregation_id:<id:128 > storage_policies:<resolution:<window_size:15000000000 precision:1000000000 > retention:<period:7200000000000 > > pipeline:<> > > > > > "}
Configs:
M3 Coordinator
listenAddress: 0.0.0.0:7201
logging:
level: info
metrics:
scope:
prefix: "m3coordinator"
prometheus:
handlerPath: /metrics
listenAddress: 0.0.0.0:3030
sanitization: prometheus
samplingRate: 1.0
backend: prom-remote
prometheusRemoteBackend:
endpoints:
# There must be one endpoint for unaggregated metric (retention=0, resolution=0) so that m3 would be throw
# an error.
# We have a mapping rule to drop unaggregated metric (see downsample-> mappingRules below). So putting endpoint
# here would NOT direct unaggregated metric.
- name: unaggregated
address: http://nginx-reverse-proxy.monitoring.svc.cluster.local:9092/
# Use the following endpoint for directing pre-aggregated metric to a self-host prometheus instance for testing.
# address: http://prometheus-m3-pre-agg.m3aggregation.svc.cluster.local:9090/api/v1/write
- name: nginx-sidecar
address: http://nginx-reverse-proxy.monitoring.svc.cluster.local:9092/
# Use the following endpoint for directing aggregated metric to a self-host prometheus instance for testing.
# address: http://prometheus-agg.m3aggregation.svc.cluster.local:9090/api/v1/write
storagePolicy:
retention: 2h
resolution: 15s
downsample:
all: false
# all: false means not all the metric is going through downsampling, only those passed the filter
connectTimeout: 15s # Default is 5s, increase to 15s
maxIdleConns: 500 # Default is 100, increase to 500
clusterManagement:
etcd:
env: m3aggregation
zone: embedded
service: m3db
cacheDir: /var/lib/m3kv
etcdClusters:
- zone: embedded
endpoints:
- etcd-0.etcd:2379
- etcd-1.etcd:2379
- etcd-2.etcd:2379
downsample:
rules:
# mapping rule to drop unaggregate metrics.
mappingRules:
- name: "Drop unaggregate metric"
filter: "__name__:*"
drop: True
rollupRules:
# Exclude instance label for non VARZ metrics.
# eg. web client metric includes a device id in its 'instance' label.
# eg. K8s metrics includes a pod id in its 'instance' label.
- name: "Exclude instance for _count"
filter: "__name__:[!v]??[!z]*_count instance:*"
transforms:
- rollup:
metricName: "{{ .MetricName }}"
excludeBy: ["instance"]
aggregations: ["Sum"]
storagePolicies:
- resolution: 15s
retention: 2h
- name: "Exclude instance for _sum"
filter: "__name__:[!v]??[!z]*_sum instance:*"
transforms:
- rollup:
metricName: "{{ .MetricName }}"
excludeBy: ["instance"]
aggregations: ["Sum"]
storagePolicies:
- resolution: 15s
retention: 2h
- name: "Exclude instance for _bucket"
filter: "__name__:[!v]??[!z]*_bucket instance:*"
transforms:
- rollup:
metricName: "{{ .MetricName }}"
excludeBy: ["instance"]
aggregations: ["Sum"]
storagePolicies:
- resolution: 15s
retention: 2h
- name: "Exclude instance for _total"
filter: "__name__:[!v]??[!z]*_total instance:*"
transforms:
- rollup:
metricName: "{{ .MetricName }}"
excludeBy: ["instance"]
aggregations: ["Sum"]
storagePolicies:
- resolution: 15s
retention: 2h
# todo: figure out how to filter out varz metrics
# We still need this for self monitoring gauge metrics.
- name: "Exclude instance for gauge"
filter: "__name__:!*{_count,_sum,_bucket,_total} instance:*"
transforms:
- rollup:
metricName: "{{ .MetricName }}"
excludeBy: ["instance"]
aggregations: ["Last"]
storagePolicies:
- resolution: 15s
retention: 2h
# VARZ's target id is log_processor_instance.
# Here we use another set of roll up rule to exclude two labels.
- name: "Exclude log_processor_instance for _count"
filter: "__name__:varz_*_count log_processor_instance:*"
transforms:
- rollup:
metricName: "{{ .MetricName }}"
excludeBy: ["log_processor_instance"]
aggregations: ["Sum"]
storagePolicies:
- resolution: 15s
retention: 2h
- name: "Exclude log_processor_instance for _sum"
filter: "__name__:varz_*_sum log_processor_instance:*"
transforms:
- rollup:
metricName: "{{ .MetricName }}"
excludeBy: ["log_processor_instance"]
aggregations: ["Sum"]
storagePolicies:
- resolution: 15s
retention: 2h
- name: "Exclude log_processor_instance for _bucket"
filter: "__name__:varz_*_bucket log_processor_instance:*"
transforms:
- rollup:
metricName: "{{ .MetricName }}"
excludeBy: ["log_processor_instance"]
aggregations: ["Sum"]
storagePolicies:
- resolution: 15s
retention: 2h
- name: "Exclude log_processor_instance for _total"
filter: "__name__:varz_*_total log_processor_instance:*"
transforms:
- rollup:
metricName: "{{ .MetricName }}"
excludeBy: ["log_processor_instance"]
aggregations: ["Sum"]
storagePolicies:
- resolution: 15s
retention: 2h
# todo: figure out how to apply this for varz metrics
# - name: "Exclude instance & log_processor_instance for gauge"
# filter: "__name__:!*{_count,_sum,_bucket,_total} instance:* log_processor_instance:*"
# transforms:
# - rollup:
# metricName: "{{ .MetricName }}"
# excludeBy: ["instance", "log_processor_instance"]
# aggregations: ["Last"]
# storagePolicies:
# - resolution: 15s
# retention: 2h
matcher:
requireNamespaceWatchOnInit: false
remoteAggregator:
client:
type: m3msg
m3msg:
producer:
writer:
topicName: aggregator_ingest
topicServiceOverride:
zone: embedded
environment: m3aggregation
placement:
isStaged: true
placementServiceOverride:
namespaces:
placement: /placement
connection:
numConnections: 64
messagePool:
size: 16384
watermark:
low: 0.7
high: 1.0
# This is for configuring the ingestion server that will receive metrics from the m3aggregators on port 7507
ingest:
ingester:
workerPoolSize: 10000
opPool:
size: 10000
retry:
maxRetries: 1
jitter: true
logSampleRate: 0.01
m3msg:
server:
listenAddress: "0.0.0.0:7507"
retry:
maxBackoff: 10s
jitter: true
M3 Agg
logging:
level: info
metrics:
scope:
prefix: m3aggregator
prometheus:
onError: none
handlerPath: /metrics
listenAddress: 0.0.0.0:6002
timerType: histogram
sanitization: prometheus
samplingRate: 1.0
extended: none
http:
listenAddress: 0.0.0.0:6001
readTimeout: 60s
writeTimeout: 60s
m3msg:
server:
listenAddress: 0.0.0.0:6000
retry:
maxBackoff: 30s
consumer:
messagePool:
size: 16384
kvClient:
etcd:
env: m3aggregation
zone: embedded
service: m3aggregator
cacheDir: /var/lib/m3kv
etcdClusters:
- zone: embedded
endpoints:
- etcd-0.etcd:2379
- etcd-1.etcd:2379
- etcd-2.etcd:2379
runtimeOptions:
kvConfig:
environment: m3aggregation
zone: embedded
writeValuesPerMetricLimitPerSecondKey: write-values-per-metric-limit-per-second
writeValuesPerMetricLimitPerSecond: 0
writeNewMetricLimitClusterPerSecondKey: write-new-metric-limit-cluster-per-second
writeNewMetricLimitClusterPerSecond: 0
writeNewMetricNoLimitWarmupDuration: 0
aggregator:
hostID:
resolver: environment
envVarName: M3AGGREGATOR_HOST_ID
instanceID:
type: host_id
verboseErrors: true
metricPrefix: ""
counterPrefix: ""
timerPrefix: ""
gaugePrefix: ""
aggregationTypes:
counterTransformFnType: empty
timerTransformFnType: suffix
gaugeTransformFnType: empty
aggregationTypesPool:
size: 1024
quantilesPool:
buckets:
- count: 256
capacity: 4
- count: 128
capacity: 8
stream:
eps: 0.001
capacity: 32
streamPool:
size: 4096
samplePool:
size: 4096
floatsPool:
buckets:
- count: 4096
capacity: 16
- count: 2048
capacity: 32
- count: 1024
capacity: 64
client:
type: m3msg
m3msg:
producer:
writer:
topicName: aggregator_ingest
topicServiceOverride:
zone: embedded
environment: m3aggregation
placement:
isStaged: true
placementServiceOverride:
namespaces:
placement: /placement
messagePool:
size: 16384
watermark:
low: 0.7
high: 1
messageRetry:
initialBackoff: 5s # Chronosphere setting.
maxBackoff: 5s # Chronosphere setting.
placementManager:
kvConfig:
namespace: /placement
environment: m3aggregation
zone: embedded
placementWatcher:
key: m3aggregator
initWatchTimeout: 10s
hashType: murmur32
bufferDurationBeforeShardCutover: 10m
bufferDurationAfterShardCutoff: 10m
bufferDurationForFutureTimedMetric: 10m # Allow test to write into future.
bufferDurationForPastTimedMetric: 5m # Don't wait too long for timed metrics to flush.
resignTimeout: 1m
flushTimesManager:
kvConfig:
environment: m3aggregation
zone: embedded
flushTimesKeyFmt: shardset/%d/flush
flushTimesPersistRetrier:
initialBackoff: 100ms
backoffFactor: 2.0
maxBackoff: 30s
maxRetries: 0
electionManager:
election:
leaderTimeout: 10s
resignTimeout: 10s
ttlSeconds: 10
serviceID:
name: m3aggregator
environment: m3aggregation
zone: embedded
electionKeyFmt: shardset/%d/lock
campaignRetrier:
initialBackoff: 100ms
backoffFactor: 2.0
maxBackoff: 2s
forever: true
jitter: true
changeRetrier:
initialBackoff: 100ms
backoffFactor: 2.0
maxBackoff: 5s
forever: true
jitter: true
resignRetrier:
initialBackoff: 100ms
backoffFactor: 2.0
maxBackoff: 5s
forever: true
jitter: true
campaignStateCheckInterval: 1s
shardCutoffCheckOffset: 30s
flushManager:
checkEvery: 1s
jitterEnabled: true
maxJitters:
- flushInterval: 5s
maxJitterPercent: 1.0
- flushInterval: 10s
maxJitterPercent: 0.5
- flushInterval: 1m
maxJitterPercent: 0.5
- flushInterval: 10m
maxJitterPercent: 0.5
- flushInterval: 1h
maxJitterPercent: 0.25
numWorkersPerCPU: 0.5
flushTimesPersistEvery: 10s
maxBufferSize: 5m
forcedFlushWindowSize: 10s
flush:
handlers:
- dynamicBackend:
name: m3msg
hashType: murmur32
producer:
buffer:
maxBufferSize: 1000000000 # max buffer before m3msg start dropping data.
writer:
topicName: aggregated_metrics
topicServiceOverride:
zone: embedded
environment: m3aggregation
messagePool:
size: 16384
watermark:
low: 0.7
high: 1
passthrough:
enabled: true
forwarding:
maxSingleDelay: 30s # Chronosphere setting.
maxConstDelay: 5m # Need to add some buffer window, since timed metrics by default are delayed by 1min.
entryTTL: 11h
entryCheckInterval: 5m
maxTimerBatchSizePerWrite: 140
defaultStoragePolicies: []
maxNumCachedSourceSets: 2
discardNaNAggregatedValues: true
entryPool:
size: 4096
counterElemPool:
size: 4096
timerElemPool:
size: 4096
gaugeElemPool:
size: 0
Performance issues
If the issue is performance related, please provide the following information along with a description of the issue that you're experiencing:
- What service is experiencing the performance issue? (M3Coordinator, M3DB, M3Aggregator, etc)
- Approximately how many datapoints per second is the service handling?
- What is the approximate series cardinality that the series is handling in a given time window? I.E How many unique time series are being measured?
- What is the hardware configuration (number CPU cores, amount of RAM, disk size and types, etc) that the service is running on? Is the service the only process running on the host or is it colocated with other software?
- What is the configuration of the service? Please include any YAML files, as well as namespace / placement configuration (with any sensitive information anonymized if necessary).
- How are you using the service? For example, are you performing read/writes to the service via Prometheus, or are you using a custom script?
In addition to the above information, CPU and heap profiles are always greatly appreciated.
CPU / Heap Profiles
CPU and heap profiles are critical to helping us debug performance issues. All our services run with the net/http/pprof server enabled by default.
Instructions for obtaining CPU / heap profiles for various services are below, please attach these profiles to the issue whenever possible.
M3Coordinator
CPU
curl <HOST_NAME>:<PORT(default 7201)>/debug/pprof/profile?seconds=5 > m3coord_cpu.out
Heap
curl <HOST_NAME>:<PORT(default 7201)>/debug/pprof/heap > m3coord_heap.out
M3DB
CPU
curl <HOST_NAME>:<PORT(default 9004)>/debug/pprof/profile?seconds=5 > m3db_cpu.out
Heap
curl <HOST_NAME>:<PORT(default 9004)>/debug/pprof/heap -> m3db_heap.out
M3DB Grafana Dashboard Screenshots
If the service experiencing performance issues is M3DB and you're monitoring it using Prometheus, any screenshots you could provide using this dashboard would be helpful.