绍圣–kafka之生产者(五)

作者 : 开心源码 本文共2495个字,预计阅读时间需要7分钟 发布时间: 2022-05-13 共270人阅读

在看很多讲kafka的文章里面都会说:kafka只保证单个partition的有序性,那么kafka是怎样保证有序的喃?

使用RecordAccumulator的mutePartition和unmutePartition方法来配合实现有序性

//记录tp能否还有未完成的RecordBatch,保证一个tp的顺序性,当一个tp对应的RecordBatch要开始发送时,就将此tp加入到muted中,tp对应的RecordBatch发送完成后,删除muted中的tp

private final Set muted;

public void mutePartition(TopicPartition tp) { muted.add(tp); }

public void unmutePartition(TopicPartition tp) { muted.remove(tp); }

RecordAccumulator.ready方法中进行判断(伪代码)

public ReadyCheckResult ready(Cluster cluster, long nowMs) {

if (!readyNodes.contains(leader) && !muted.contains(part)) {}

}

if (!readyNodes.contains(leader) && !muted.contains(part)),假如muted中包含了这个tp,那么即便这个tp对应的leader存在,RecordBatch可以发送也不会去发送它,由于它上一个RecordBatch还没有解决完成。

RecordAccumulator.drain方法中进行判断(伪代码)

public Map> drain(Cluster cluster, Set nodes, int maxSize, long now) {

if (!muted.contains(tp)){}

}

if (!muted.contains(tp))在对RecordAccumulator中的记录进行重新组装的时候,仍旧会判断对应的tp能否在muted中。在muted中的仍旧不会选择出来发送。

在Sender中的变量:guaranteeMessageOrder:能否保持单个partition的有序性

在KafkaProducer的构造中

this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), this.metrics, new SystemTime(), clientId, this.requestTimeoutMs);

public Sender(KafkaClient client, Metadata metadata, RecordAccumulator accumulator, boolean guaranteeMessageOrder, int maxRequestSize, short acks, int retries, Metrics metrics, Time time, String clientId, int requestTimeout) { this.client = client; this.accumulator = accumulator; this.metadata = metadata; this.guaranteeMessageOrder = guaranteeMessageOrder; this.maxRequestSize = maxRequestSize; this.running = true; this.acks = acks; this.retries = retries; this.time = time; this.clientId = clientId; this.sensors = new SenderMetrics(metrics); this.requestTimeout = requestTimeout; }

guaranteeMessageOrder=config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1

我们可以在使用的时候设置max.in.flight.requests.per.connection来设置guaranteeMessageOrder的值。

mutePartition和unmutePartition方法都是在Sender中进行调用

mutePartition在Sender.run中调用

if (guaranteeMessageOrder) {

// 记录将要发送的topicPartition到mute中

for (List batchList : batches.values()) {

for (RecordBatch batch : batchList)

this.accumulator.mutePartition(batch.topicPartition);

}

}

发送的时候,把将要提交的RecordBatch的tp加到muted中。下次再需要发送tp里的RecordBatch的时候,假如muted里面包含了此tp,就不会选择出来发送。

在解决服务端响应的时候,清理muted中的tp

if (guaranteeMessageOrder) this.accumulator.unmutePartition(batch.topicPartition);

总结:要保证单partition的有序性,需要配置max.in.flight.requests.per.connection=1。

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 绍圣–kafka之生产者(五)

发表回复