跳转至

Kafka

Kafka 是一种高吞吐量的分布式发布订阅消息系统。

Cloud Insight 通过 JMX 来获取数据监控 Kafka 客户端生产端消息请求数,处理时间等数据来可视化性能。


性能指标

Cloud Insight 采集 Kafka 以下性能指标:

指标 单位 具体含义
kafka.broker_offset offsets broker 上当前消息的偏移量(offset)
kafka.consumer.bytes_in bytes/second consumer 字节率(bytes in rate)
kafka.consumer.delayed_requests requests 延迟的 consumer 请求数
kafka.consumer.expires_per_second evictions/second 延迟 consumer 的请求到期(expiration)速率
kafka.consumer.fetch_rate requests consumer 向 broker 发送提取请求(fetch requests)的最低速率
kafka.consumer.kafka_commits writes/second 面向 Kafka 的 offset commits 速率
kafka.consumer.max_lag offsets 最大消费滞后(consumer lag)
kafka.consumer.messages_in messages/second consumer 消息消费(consumption)的速率
kafka.consumer.zookeeper_commits writes/second 面向 ZooKeeper 的 offset commits 速率
kafka.consumer_lag offsets consumer 和 broker 之间的消息滞后(lag)
kafka.consumer_offset offsets consumer 的当前消息偏移量(current message offset)
kafka.expires_sec evictions/second 延迟生产者(delayed producer)的请求到期(request expiration)速率
kafka.follower.expires_per_second evictions/second 关注者(followers)的请求到期(request expiration)速率
kafka.log.flush_rate flushes/second 日志刷新速率
kafka.messages_in messages 传入(incoming)信息速率
kafka.net.bytes_in bytes/second 传入(incoming)字节速率
kafka.net.bytes_out bytes/second 传出(outgoing)字节速率
kafka.net.bytes_rejected bytes/second 被拒绝(rejected)的字节速率
kafka.producer.bytes_out bytes/second producer 字节输出速率
kafka.producer.delayed_requests requests 延迟的 producer 请求数
kafka.producer.expires_per_seconds evictions/second producer 请求到期率
kafka.producer.io_wait nanoseconds Producer I/O 等待时间
kafka.producer.message_rate messages/second Producer 消息速率
kafka.producer.request_latency_avg milliseconds Producer 平均请求延迟
kafka.producer.request_rate requests/second producer 每秒钟的请求数
kafka.producer.response_rate responses/second producer 每秒钟的响应数
kafka.replication.isr_expands nodes/second 副本加入 ISR 池的速率
kafka.replication.isr_shrinks nodes/second 副本离开 ISR 池的速率
kafka.replication.leader_elections events/second 领导选举(Leader election)频率
kafka.replication.unclean_leader_elections events/second Unclean 的领导选举(Leader election)频率
kafka.replication.under_replicated_partitions 未使用的分区数
kafka.request.fetch.failed requests 客户端获取请求(fetch request)失败次数
kafka.request.fetch.failed_per_second requests/second 每秒钟的客户端获取请求(fetch request)失败率
kafka.request.fetch.time.99percentile requests/second 获取请求(fetch request)时间的第 99 百分位的值
kafka.request.fetch.time.avg requests/second 获取请求(fetch request)时间的平均值
kafka.request.handler.avg.idle.pct fractions 请求处理程序线程(request handler threads)的平均空闲时间占比
kafka.request.metadata.time.99percentile milliseconds 元数据(metadata)请求时间的第 99 百分位的值
kafka.request.metadata.time.avg milliseconds 元数据(metadata)请求时间的的平均值
kafka.request.offsets.time.99percentile milliseconds offset 请求时间的第 99 百分位的值
kafka.request.offsets.time.avg milliseconds offset 请求时间的平均值
kafka.request.produce.failed requests 失败的产品请求(produce requests)数
kafka.request.produce.failed_per_second requests/second 每秒钟的产品请求(produce requests)失败率
kafka.request.produce.time.99percentile requests/second 产品请求(produce requests)时间的第 99 百分位的值
kafka.request.produce.time.avg requests/second 产品请求(produce requests)平均时间
kafka.request.update_metadata.time.99percentile milliseconds 更新元数据请求(update metadata requests)时间的第 99 百分位的值
kafka.request.update_metadata.time.avg milliseconds 更新元数据请求(update metadata requests)时间的平均值

配置 Kafka 监控

JMX

开启 JMX,只需设置环境变量 JMX_PORT=9999,修改启动脚本,添加 jmx 远程调试模式: 修改 Kafka 的 kafka-run-class.sh :

if [  $JMX_PORT ]; then
    KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT"
fi

需要修改为:

if [  $JMX_PORT ]; then
    KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
fi

然后重启 Kafka 即可。

Cloud Insight Agent 通过 JMX 获取 Kafka 中的性能指标。

由于每个实体最多可以监控 350 个性能指标,所以您需要按照下方的配置方法,修改配置文件来确定自己需要哪些指标。

有关 JMX 采集方法,请查阅 JMX 远程监控


编辑配置文件

编辑配置文件 conf.d/kafka.yaml,使 Cloud Insight Agent 可以与 Kafka 通信。

##########
# WARNING
# This sample works only for Kafka >= 0.8.2.

instances:
#  - host: localhost
#    port: 9999
#    name: jmx_instance
#    user: username
#    password: password
#    #java_bin_path: /path/to/java #Optional, should be set if the agent cannot find your java executable
#    #trust_store_path: /path/to/trustStore.jks # Optional, should be set if ssl is enabled
#    #trust_store_password: password

init_config:
  is_jmx: true

  # Metrics collected by this check. You should not have to modify this.
    conf:
      #
      # Aggregate cluster stats
      #
      - include:
          domain: 'kafka.server'
          bean: 'kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesOutPerSec'
          attribute:
            MeanRate:
              metric_type: counter
              alias: kafka.net.bytes_out
      - include:
          domain: 'kafka.server'
          bean: 'kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesInPerSec'
          attribute:
            MeanRate:
              metric_type: counter
              alias: kafka.net.bytes_in
      - include:
          domain: 'kafka.server'
          bean: 'kafka.server:type=BrokerTopicMetrics,name=AllTopicsMessagesInPerSec'
          attribute:
            MeanRate:
              metric_type: gauge
              alias: kafka.messages_in

      #
      # Request timings
      #
      - include:
          domain: 'kafka.server'
          bean: 'kafka.server:type=BrokerTopicMetrics,name=AllTopicsFailedFetchRequestsPerSec'
          attribute:
            MeanRate:
              metric_type: gauge
              alias: kafka.request.fetch.failed
      - include:
          domain: 'kafka.server'
          bean: 'kafka.server:type=BrokerTopicMetrics,name=AllTopicsFailedProduceRequestsPerSec'
          attribute:
            MeanRate:
              metric_type: gauge
              alias: kafka.request.produce.failed
      - include:
          domain: 'kafka.network'
          bean: 'kafka.network:type=RequestMetrics,name=Produce-TotalTimeMs'
          attribute:
            Mean:
              metric_type: counter
              alias: kafka.request.produce.time.avg
            99thPercentile:
              metric_type: counter
              alias: kafka.request.produce.time.99percentile
      - include:
          domain: 'kafka.network'
          bean: 'kafka.network:type=RequestMetrics,name=Fetch-TotalTimeMs'
          attribute:
            Mean:
              metric_type: counter
              alias: kafka.request.fetch.time.avg
            99thPercentile:
              metric_type: counter
              alias: kafka.request.fetch.time.99percentile
      - include:
          domain: 'kafka.network'
          bean: 'kafka.network:type=RequestMetrics,name=UpdateMetadata-TotalTimeMs'
          attribute:
            Mean:
              metric_type: counter
              alias: kafka.request.update_metadata.time.avg
            99thPercentile:
              metric_type: counter
              alias: kafka.request.update_metadata.time.99percentile
      - include:
          domain: 'kafka.network'
          bean: 'kafka.network:type=RequestMetrics,name=Metadata-TotalTimeMs'
          attribute:
            Mean:
              metric_type: counter
              alias: kafka.request.metadata.time.avg
            99thPercentile:
              metric_type: counter
              alias: kafka.request.metadata.time.99percentile
      - include:
          domain: 'kafka.network'
          bean: 'kafka.network:type=RequestMetrics,name=Offsets-TotalTimeMs'
          attribute:
            Mean:
              metric_type: counter
              alias: kafka.request.offsets.time.avg
            99thPercentile:
              metric_type: counter
              alias: kafka.request.offsets.time.99percentile

      #
      # Replication stats
      #
      - include:
          domain: 'kafka.server'
          bean: 'kafka.server:type=ReplicaManager,name=ISRShrinksPerSec'
          attribute:
            MeanRate:
              metric_type: counter
                alias: kafka.replication.isr_shrinks
      - include:
          domain: 'kafka.server'
          bean: 'kafka.server:type=ReplicaManager,name=ISRExpandsPerSec'
          attribute:
            MeanRate:
              metric_type: counter
              alias: kafka.replication.isr_expands
      - include:
          domain: 'kafka.server'
          bean: 'kafka.server:type=ControllerStats,name=LeaderElectionRateAndTimeMs'
          attribute:
            MeanRate:
              metric_type: counter
              alias: kafka.replication.leader_elections
      - include:
          domain: 'kafka.server'
          bean: 'kafka.server:type=ControllerStats,name=UncleanLeaderElectionsPerSec'
          attribute:
            MeanRate:
              metric_type: counter
              alias: kafka.replication.unclean_leader_elections

      #
      # Log flush stats
      #
      - include:
          domain: 'kafka.log'
          bean: 'kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs'
          attribute:
            MeanRate:
              metric_type: counter
              alias: kafka.log.flush_rate

编辑 Consumer 配置文件

编辑 Consumer 配置文件 conf.d/kafka_consumer.yaml

init_config:

instances:
#  - kafka_connect_str: localhost:19092
#    zk_connect_str: localhost:2181
#    zk_prefix: /0.8
#    consumer_groups:
#    my_consumer:
#      my_topic: [0, 1, 4, 12]

重启 Agent

重启 Cloud Insight Agent,使配置生效。

您也可以通过查看 Agent Info 信息,来验证配置是否成功。当出现以下信息,则代表安装成功。

Checks
======

[...]

kafka-localhost-9999
--------------------
  - instance #0 [OK]
  - Collected 8 metrics & 0 events

有关 Agent Info 信息的查看,请访问帮助中心,查看 Cloud Insight Agent 常用操作


默认标签

Cloud Insight 采集 Kafka 以下默认主机标签:

标签种类 标签含义
instance 实例名称(例如 "kafka_demo")
jmx_domain JMX域(例如 "kafka.server")
type 指标类型(ThreadPool, GlobalRequestProcessor, Servlet, Cache, JspMonitor等)
name 指标名称(例如 "BytesInPerSec")

5分钟,开启你的跨云监控之旅 (`⌄´ )


常见问题