Kafka 消息序列化和反序列化(上)

作者 : 开心源码 本文共4430个字,预计阅读时间需要12分钟 发布时间: 2022-05-12 共198人阅读

Kafka Producer在发送消息时必需配置的参数为:bootstrap.servers、key.serializer、value.serializer。序列化操作是在阻拦器(Interceptor)执行之后并且在分配分区(partitions)之前执行的。

首先我们通过一段示例代码来看下普通情况下Kafka Producer如何编写:

public class ProducerJavaDemo {    public static final String brokerList = "192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092";    public static final String topic = "hidden-topic";    public static void main(String[] args) {        Properties properties = new Properties();        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        properties.put("client.id", "hidden-producer-client-id-1");        properties.put("bootstrap.servers", brokerList);        Producer<String,String> producer = new KafkaProducer<String,String>(properties);        while (true) {            String message = "kafka_message-" + new Date().getTime() + "-edited by hidden.zhu";            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);            try {                Future<RecordMetadata> future =  producer.send(producerRecord, new Callback() {                    public void onCompletion(RecordMetadata metadata, Exception exception) {                        System.out.print(metadata.offset()+"    ");                        System.out.print(metadata.topic()+"    ");                        System.out.println(metadata.partition());                    }                });            } catch (Exception e) {                e.printStackTrace();            }            try {                TimeUnit.MILLISECONDS.sleep(10);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }}

这里采用的用户端不是0.8.x.x时代的Scala版本,而是Java编写的新Kafka Producer, 相应的Maven依赖如下:

<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>1.0.0</version></dependency>

上面的程序中使用的是Kafka用户端自带的org.apache.kafka.common.serialization.StringSerializer,除了用于String类型的序列化器之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口,此接口有三种方法:

public void configure(Map<String, ?> configs, boolean isKey):用来配置当前类。
public byte[] serialize(String topic, T data):用来执行序列化。
public void close():用来关闭当前序列化器。一般情况下这个方法都是个空方法,假如实现了此方法,必需确保此方法的幂等性,由于这个方法很可能会被KafkaProducer调用屡次。
下面我们来看看Kafka中org.apache.kafka.common.serialization.StringSerializer的具体实现,源码如下:

public class StringSerializer implements Serializer<String> {    private String encoding = "UTF8";    @Override    public void configure(Map<String, ?> configs, boolean isKey) {        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";        Object encodingValue = configs.get(propertyName);        if (encodingValue == null)            encodingValue = configs.get("serializer.encoding");        if (encodingValue != null && encodingValue instanceof String)            encoding = (String) encodingValue;    }    @Override    public byte[] serialize(String topic, String data) {        try {            if (data == null)                return null;            else                return data.getBytes(encoding);        } catch (UnsupportedEncodingException e) {            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);        }    }    @Override    public void close() {        // nothing to do    }}

首先看下StringSerializer中的configure(Map)

public class Company {    private String name;    private String address;    //省略Getter, Setter, Constructor & toString方法}

接下去我们来实现Company类型的Serializer,即下面代码示例中的DemoSerializer。

package com.hidden.client;public class DemoSerializer implements Serializer<Company> {    public void configure(Map<String, ?> configs, boolean isKey) {}    public byte[] serialize(String topic, Company data) {        if (data == null) {            return null;        }        byte[] name, address;        try {            if (data.getName() != null) {                name = data.getName().getBytes("UTF-8");            } else {                name = new byte[0];            }            if (data.getAddress() != null) {                address = data.getAddress().getBytes("UTF-8");            } else {                address = new byte[0];            }            ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);            buffer.putInt(name.length);            buffer.put(name);            buffer.putInt(address.length);            buffer.put(address);            return buffer.array();        } catch (UnsupportedEncodingException e) {            e.printStackTrace();        }        return new byte[0];    }    public void close() {}}

使用时只要要在Kafka Producer的config中修改value.serializer属性就可,示例如下:

properties.put("value.serializer", "com.hidden.client.DemoSerializer");//记得也要将相应的String类型改为Company类型,如://Producer<String,Company> producer = new KafkaProducer<String,Company>(properties);//Company company = new Company();//company.setName("hidden.cooperation-" + new Date().getTime());//company.setAddress("Shanghai, China");//ProducerRecord<String, Company> producerRecord = new ProducerRecord<String, Company>(topic,company);1234567

本文的重点是你有没有收获与成长,其他的都不重要,希望读者们能谨记这一点。同时我经过多年的收藏目前也算收集到了一套完整的学习资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、Jvm性能调优、Spring,MyBatis,Nginx源码分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多个知识点高级进阶干货,希望对想成为架构师的朋友有肯定的参考和帮助

喜欢这篇文章的朋友可以点个喜欢,也可以关注一下我的个人专题:Java成长之路

需要更详细架构师技能思维导图和以下资料的可以加一下技术交流分享群:“708 701 457”免费获取




说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » Kafka 消息序列化和反序列化(上)

发表回复