Kafka Producer 阻拦器

作者 : 开心源码 本文共3373个字,预计阅读时间需要9分钟 发布时间: 2022-05-12 共193人阅读

Kafka中的阻拦器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Producer端的阻拦器和Kafka Consumer端的阻拦器。本篇主要讲述的是Kafka Producer端的阻拦器,它主要用来对消息进行阻拦或者者修改,也可以用于Producer的Callback回调之前进行相应的预解决。

使用Kafka Producer端的阻拦器非常简单,主要是实现ProducerInterceptor接口,此接口包含4个方法:

    1. ProducerRecord<K, V> onSend(ProducerRecord<K, V> record):Producer在将消息序列化和分配分区之前会调用阻拦器的这个方法来对消息进行相应的操作。一般来说最好不要修改消息ProducerRecord的topic、key以及partition等信息,假如要修改,也需确保对其有精确的判断,否则会与料想的效果出现偏差。比方修改key不仅会影响分区的计算,同样也会影响Broker端日志压缩(Log Compaction)的功能。
    1. void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被应答(Acknowledgement)之前或者者消息发送失败时调用,优先于客户设定的Callback之前执行。这个方法运行在Producer的IO线程中,所以这个方法里实现的代码逻辑越简单越好,否则会影响消息的发送速率。
    1. void close():关闭当前的阻拦器,此方法主要用于执行少量资源的清除工作。
    1. configure(Map<String, ?> configs):用来初始化此类的方法,这个是ProducerInterceptor接口的父接口Configurable中的方法。

一般情况下只要要关注并实现onSend或者者onAcknowledgement方法就可。下面我们来举个案例,通过onSend方法来过滤消息体为空的消息以及通过onAcknowledgement方法来计算发送消息的成功率。

public class ProducerInterceptorDemo implements ProducerInterceptor<String,String> {    private volatile long sendSuccess = 0;    private volatile long sendFailure = 0;    @Override    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {        if(record.value().length()<=0)            return null;        return record;    }    @Override    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {        if (exception == null) {            sendSuccess++;        } else {            sendFailure ++;        }    }    @Override    public void close() {        double successRatio = (double)sendSuccess / (sendFailure + sendSuccess);        System.out.println("[INFO] 发送成功率="+String.format("%f", successRatio * 100)+"%");    }    @Override    public void configure(Map<String, ?> configs) {}}

自己设置的ProducerInterceptorDemo类实现之后即可以在Kafka Producer的主程序中指定,示例代码如下:

public class ProducerMain {    public static final String brokerList = "localhost:9092";    public static final String topic = "hidden-topic";    public static void main(String[] args) throws ExecutionException, InterruptedException {        Properties properties = new Properties();        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        properties.put("bootstrap.servers", brokerList);        properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");        Producer<String, String> producer = new KafkaProducer<String, String>(properties);        for(int i=0;i<100;i++) {            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, "msg-" + i);            producer.send(producerRecord).get();        }        producer.close();    }}

Kafka Producer不仅可以指定一个阻拦器,还可以指定多个阻拦器以形成阻拦链,这个阻拦链会按照其中的阻拦器的加入顺序逐个执行。比方上面的程序多增加一个阻拦器,示例如下:

properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");1

这样Kafka Producer会先执行阻拦器ProducerInterceptorDemo,之后再执行ProducerInterceptorDemoPlus。

有关interceptor.classes参数,在kafka 1.0.0版本中的定义如下:

NAMEDESCRIPTIONTYPEDEFAULTVALID VALUESIMPORTANCE
interceptor.calsssesA list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors.listnulllow

本文的重点是你有没有收获与成长,其他的都不重要,希望读者们能谨记这一点。同时我经过多年的收藏目前也算收集到了一套完整的学习资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、Jvm性能调优、Spring,MyBatis,Nginx源码分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多个知识点高级进阶干货,希望对想成为架构师的朋友有肯定的参考和帮助

喜欢这篇文章的朋友可以点个喜欢,也可以关注一下我的个人专题:Java成长之路

需要更详细思维导图和以下资料的可以加一下技术交流分享群:“708 701 457”免费获取




说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » Kafka Producer 阻拦器

发表回复