跳转至

12 客户端都有哪些不常见但是很高级的功能?

你好,我是胡夕。今天我要和你分享的主题是:客户端都有哪些不常见但是很高级的功能。

既然是不常见,那就说明在实际场景中并没有太高的出场率,但它们依然是很高级很实用的。下面就有请今天的主角登场:Kafka拦截器。

什么是拦截器?

如果你用过Spring Interceptor或是Apache Flume,那么应该不会对拦截器这个概念感到陌生,其基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。下面这张图展示了Spring MVC拦截器的工作原理:

图片来源

拦截器1和拦截器2分别在请求发送之前、发送之后以及完成之后三个地方插入了对应的处理逻辑。而Flume中的拦截器也是同理,它们插入的逻辑可以是修改待发送的消息,也可以是创建新的消息,甚至是丢弃消息。这些功能都是以配置拦截器类的方式动态插入到应用程序中的,故可以快速地切换不同的拦截器而不影响主程序逻辑。

Kafka拦截器借鉴了这样的设计思路。你可以在消息处理的前后多个时点动态植入不同的处理逻辑,比如在消息发送前或者在消息被消费后。

作为一个非常小众的功能,Kafka拦截器自0.10.0.0版本被引入后并未得到太多的实际应用,我也从未在任何Kafka技术峰会上看到有公司分享其使用拦截器的成功案例。但即便如此,在自己的Kafka工具箱中放入这么一个有用的东西依然是值得的。今天我们就让它来发挥威力,展示一些非常酷炫的功能。

Kafka拦截器

Kafka拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka会按照添加顺序依次执行拦截器逻辑。

举个例子,假设你想在生产消息前执行两个“前置动作”:第一个是为消息增加一个头信息,封装发送该消息的时间,第二个是更新发送消息数字段,那么当你将这两个拦截器串联在一起统一指定给Producer后,Producer会按顺序执行上面的动作,然后再发送消息。

当前Kafka拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。拿上面的例子来说,假设第一个拦截器的完整类路径是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二个类是com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在Producer端指定拦截器:

Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……

现在问题来了,我们应该怎么编写AddTimeStampInterceptor和UpdateCounterInterceptor类呢?其实很简单,这两个类以及你自己编写的所有Producer端拦截器实现类都要继承org.apache.kafka.clients.producer.ProducerInterceptor接口。该接口是Kafka提供的,里面有两个核心的方法。

  1. onSend:该方法会在消息发送之前被调用。如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会。
  2. onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。还记得我在上一期中提到的发送回调通知callback吗?onAcknowledgement的调用要早于callback的调用。值得注意的是,这个方法和onSend不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。还有一点很重要,这个方法处在Producer发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的Producer TPS直线下降。

同理,指定消费者拦截器也是同样的方法,只是具体的实现类要实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口,这里面也有两个核心方法。

  1. onConsume:该方法在消息返回给Consumer程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。
  2. onCommit:Consumer在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。

一定要注意的是,指定拦截器类时要指定它们的全限定名,即full qualified name。通俗点说就是要把完整包名也加上,不要只有一个类名在那里,并且还要保证你的Producer程序能够正确加载你的拦截器类。

典型使用场景

Kafka拦截器都能用在哪些地方呢?其实,跟很多拦截器的用法相同,Kafka拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景

我以端到端系统性能检测和消息审计为例来展开介绍下。

今天Kafka默认提供的监控指标都是针对单个客户端或Broker的,你很难从具体的消息维度去追踪集群间消息的流转路径。同时,如何监控一条消息从生产到最后消费的端到端延时也是很多Kafka用户迫切需要解决的问题。

从技术上来说,我们可以在客户端程序中增加这样的统计逻辑,但是对于那些将Kafka作为企业级基础架构的公司来说,在应用代码中编写统一的监控逻辑其实是很难的,毕竟这东西非常灵活,不太可能提前确定好所有的计算逻辑。另外,将监控逻辑与主业务逻辑耦合也是软件工程中不提倡的做法。

现在,通过实现拦截器的逻辑以及可插拔的机制,我们能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够从具体的消息层面上去收集这些数据。这就是Kafka拦截器的一个非常典型的使用场景。

我们再来看看消息审计(message audit)的场景。设想你的公司把Kafka作为一个私有云消息引擎平台向全公司提供服务,这必然要涉及多租户以及消息审计的功能。

作为私有云的PaaS提供方,你肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。一个可行的做法就是你编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的Kafka服务的客户端程序必须设置该拦截器。

案例分享

下面我以一个具体的案例来说明一下拦截器的使用。在这个案例中,我们通过编写拦截器类来统计消息端到端处理的延时,非常实用,我建议你可以直接移植到你自己的生产环境中。

我曾经给一个公司做Kafka培训,在培训过程中,那个公司的人提出了一个诉求。他们的场景很简单,某个业务只有一个Producer和一个Consumer,他们想知道该业务消息从被生产出来到最后被消费的平均总时长是多少,但是目前Kafka并没有提供这种端到端的延时统计。

学习了拦截器之后,我们现在知道可以用拦截器来满足这个需求。既然是要计算总延时,那么一定要有个公共的地方来保存它,并且这个公共的地方还是要让生产者和消费者程序都能访问的。在这个例子中,我们假设数据被保存在Redis中。

Okay,这个需求显然要实现生产者拦截器,也要实现消费者拦截器。我们先来实现前者:

public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {


    private Jedis jedis; // 省略Jedis初始化


    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        jedis.incr("totalSentMessage");
        return record;
    }


    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }


    @Override
    public void close() {
    }


    @Override
    public void configure(Map<java.lang.String, ?> configs) {
    }

上面的代码比较关键的是在发送消息前更新总的已发送消息数。为了节省时间,我没有考虑发送失败的情况,因为发送失败可能导致总发送数不准确。不过好在处理思路是相同的,你可以有针对性地调整下代码逻辑。

下面是消费者端的拦截器实现,代码如下:

public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {


    private Jedis jedis; //省略Jedis初始化


    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long lantency = 0L;
        for (ConsumerRecord<String, String> record : records) {
            lantency += (System.currentTimeMillis() - record.timestamp());
        }
        jedis.incrBy("totalLatency", lantency);
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
        jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        return records;
    }


    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }


    @Override
    public void close() {
    }


    @Override
    public void configure(Map<String, ?> configs) {

在上面的消费者拦截器中,我们在真正消费一批消息前首先更新了它们的总延时,方法就是用当前的时钟时间减去封装在消息中的创建时间,然后累计得到这批消息总的端到端处理延时并更新到Redis中。之后的逻辑就很简单了,我们分别从Redis中读取更新过的总延时和总消息数,两者相除即得到端到端消息的平均处理延时。

创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的Producer和Consumer程序中,这样就能计算消息从Producer端到Consumer端平均的处理延时了。这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的SLA目标。

小结

今天我们花了一些时间讨论Kafka提供的冷门功能:拦截器。如之前所说,拦截器的出场率极低,以至于我从未看到过国内大厂实际应用Kafka拦截器的报道。但冷门不代表没用。事实上,我们可以利用拦截器满足实际的需求,比如端到端系统性能检测、消息审计等。

从这一期开始,我们将逐渐接触到更多的实际代码。看完了今天的分享,我希望你能够亲自动手编写一些代码,去实现一个拦截器,体会一下Kafka拦截器的功能。要知道,“纸上得来终觉浅,绝知此事要躬行”。也许你在敲代码的同时,就会想到一个使用拦截器的绝妙点子,让我们拭目以待吧。

开放讨论

思考这样一个问题:Producer拦截器onSend方法的签名如下:

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)

如果我实现的逻辑仅仅是return null,你觉得Kafka会丢弃该消息,还是原封不动地发送消息?请动手试验一下,看看结果是否符合你的预期。

欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。

精选留言(15)
  • 风中花 👍(90) 💬(19)

    胡老师您好! 我公司现在就我一个人懂点kafka,但是公司线下却有使用kafka,现在知道我学习这个就交给我了,我现在遇到一个线上问题:消息经常堆积起来,不能消费了,重启服务就能继续消费了。我目前得能力还搞不定,望老师能给指点一二 。谢谢。谢谢

    2019-07-04

  • 落霞与孤鹜 👍(23) 💬(3)

    这个问题和这期的内容没关系😓。 如果一个主题,由一个应用的名为A的消费组消费,然后把消费组名改为B,重新发布应用,这个时候是不是从主题的分区头开始消费?如何保证从上次A消费组的最新偏移量处开始消费?

    2019-06-30

  • 李先生 👍(13) 💬(1)

    胡哥: consumer消费:比如异步发积分,发积分的消息进入kafka,加积分服务监听kafka的topic,为了避免重复消费,加积分服务获取到消息后先写入mysql,并利用mysql的唯一索引的能力来避免重复消费,然后加积分服务异步去执行mysql中的信息去实现加积分。这种实现方案会导致消费性能低下,但是写入mysql一是避免重复消费,二是做一条成功的记录(便于后期查询)。这种如何优化呢

    2020-04-10

  • Lei@时速云 👍(8) 💬(1)

    👍 胡总出专栏了

    2019-06-29

  • 👍(7) 💬(3)

    胡老师好,请教两个小问题 1:broker通过网络拿到消息后,落盘的规则是什么?来一条消息就落盘一条?还是积攒够一定的量再落盘? 2:从追主,新的消息是主主动推给各个从的?还是从主动拉取的?如果是主动拉取,从怎么知道主有新的消息的?另外,同步的时候是只要有一条新消息就同步嘛? 如果其他同学清楚,也请帮忙解答一下,多谢。

    2019-08-14

  • 打码的土豆 👍(7) 💬(2)

    老师你好 最近在看kafka官方文档时有一处没看懂还望老师指教 All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. In effect this just means that it is transferred into the kernel's pagecache 这上面说的文件系统上的持久日志为什么会是pagecache pagecache不是内存的一部分吗

    2019-07-01

  • 周曙光爱学习 👍(6) 💬(1)

    拦截器的确是很有用,我们在grpc的拦截器中做限流处理。同理,由于下游存储TPS能力有限,也完全可以在kafka消费的拦截器中做消费限流处理,防止把存储打挂

    2020-04-09

  • IT小僧 👍(6) 💬(1)

    老师好!这个消费者段onConsume是在消费数据之前执行的方法,并不能真正统计实际处理该消费耗费时间的吧。我觉得理应放在onCommit里面。

    2020-01-13

  • 小马 👍(6) 💬(4)

    老师有两个疑问请教下: 1、时间的一致性问题。System.currentTimeMillis() - record.timestamp()发送和接收的时间实际上可能来自两台机器,有可能时间不一致,会导致统计结果偏差很大; 2、消费端代码计算时间差是在循环里面进行的,把System.currentTimeMillis()提到循环外面应该会好一点吧,毕竟这一批消息应该算是同时接收的; 3、消息总数是在生产端统计的,时延是在消费端统计的,但是如果在消息传输过程中出现部分消息丢失是不是会影响统计的准确性。

    2020-01-09

  • 姬广龙 👍(4) 💬(1)

    Interceptor处理数据是单条的吗,还是多条数据作为一个集合

    2019-11-23

  • James 👍(4) 💬(7)

    请问老师无法完成提交,是因为重新平衡,是什么原因才会导致. 刚接触不久,就要修改线上环境问题.但是一直跑了一小时就会下面问题,然后oom Group coordinator rl-hadoop5.gci.com:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery Offset commit failed on partition dispatch_bus-1 at offset 28978632: The coordinator is not aware of this member. Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

    2019-11-13

  • Roy Liang 👍(3) 💬(1)

    老师你好,onAcknowledgement不应该是消息ACK后吗?怎么是提交成功后呢?提交成功后感觉应该属于callback的

    2020-04-14

  • 杨陆伟 👍(3) 💬(1)

    System.currentTimeMillis() - record.timestamp() 是否要求生产客户端设置record的timestamp字段?还是Producer Client会自动生成?对于Kafka中的timestamp还搞不太清楚,这对监控比较关键,不知道后面有没有介绍,谢谢

    2019-07-19

  • Liam 👍(3) 💬(1)

    请问下老师。onsend或onconsumer的执行线程和发送或消费消息的线程是否是同一个线程?加入太多耗时逻辑是否会影响主逻辑?

    2019-06-29

  • Lvjsky 👍(1) 💬(1)

    老师,是否可以使用消费者过滤器,在使用Spark Streaming或Flink消费数据前,通过消费者拦截器过滤掉不需要的数据呢?这样可以减少下游消费者计算量

    2020-12-03