问答1 问答5 问答50 问答500 问答1000
网友互助专业问答平台

kafka consumer重新连接后如何获取当前最新数据

提问网友 发布时间:2022-04-22 06:42
声明:本网页内容为用户发布,旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。
E-MAIL:1656858193@qq.com
1个回答
热心网友 回答时间:2022-04-14 06:57
不过要注意一些注意事项,对于多个partition和多个consumer
1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5. High-level接口中获取不到数据的时候是会block的
简单版,
简单的坑,如果测试流程是,先proce一些数据,然后再用consumer读的话,记得加上第一句设置
因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前proce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset

Properties props = new Properties();
props.put("auto.offset.reset", "smallest"); //必须要加,如果要读旧数据
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "pv");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

ConsumerConfig conf = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
String topic = "page_visits";
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

KafkaStream<byte[], byte[]> stream = streams.get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()){
System.out.println("message: " + new String(it.next().message()));
}

if (consumer != null) consumer.shutdown(); //其实执行不到,因为上面的hasNext会block

在用high-level的consumer时,两个给力的工具,
1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
可以看到当前group offset的状况,比如这里看pv的状况,3个partition
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
关键就是offset,logSize和Lag
这里以前读完了,所以offset=logSize,并且Lag=0
2. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits
3个参数,
[earliest | latest],表示将offset置到哪里
consumer.properties ,这里是配置文件的路径
topic,topic名,这里是page_visits
我们对上面的pv group执行完这个操作后,再去check group offset状况,结果如下,
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
可以看到offset已经被清0,Lag=logSize

底下给出原文中多线程consumer的完整代码

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;

public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector,注意下面对conf的配置
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}

public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
}

public void run(int a_numThreads) { // 创建并发的consumers
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建Streams
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 每个线程对应于一个KafkaStream

// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);

// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread
threadNumber++;
}
}

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);
}

public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);

ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);

try {
Thread.sleep(10000);
} catch (InterruptedException ie) {

}
example.shutdown();
}
}

SimpleConsumer
另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口
参考,https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
什么时候用这个接口?
Read a message multiple times
Consume only a subset of the partitions in a topic in a process
Manage transactions to make sure a message is processed once and only once

当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦
所以不是一定要用,最好别用
You must keep track of the offsets in your application to know where you left off consuming.
You must figure out which Broker is the lead Broker for a topic and partition
You must handle Broker leader changes
使用SimpleConsumer的步骤:
Find an active Broker and find out which Broker is the leader for your topic and partition
Determine who the replica Brokers are for your topic and partition
Build the request defining what data you are interested in
Fetch the data
Identify and recover from leader changes
首先,你必须知道读哪个topic的哪个partition
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
再者,自己去写request并fetch数据
最终,还要注意需要识别和处理broker leader的改变

本文如未解决您的问题请添加抖音号:51dongshi(抖音搜索懂视),直接咨询即可。

相关推荐
  • Kafka0.8.1常用操作

    Kafka0.8.1常用操作

    Kafka0.8.1常用操作:Kafka的版本间差异较大,下面是0.8.1的操作方法 首先cd到kafaka的bin目录下,操作kafka的工具都在这里呢。如果发现找不到,或者名字不对,说明kafka版本不对。 以topic是test为例子 –zookeeper 后的ip改成你们自己的 创建topic ./kafka-to
    查看详情
便秘的穴位在哪儿,我要刮痧 手机上什么软件可以打开pP丅功画图片? 印光大师十念法(怎样念佛不伤气 便秘质地不干刮痧什么穴位 念佛方法:念佛怎么念,念多少,十念记数法和十念法有 怎样用刮痧疗法治疗便秘? 十字念佛法是先想数字还是先念佛 经常便秘,去按摩,刮痧可以排毒吗? 多个消费者要消费kafka相同数据怎么办 vivo手机自动保存pp的图片怎么办 十念记数念佛法和十念法门的区别 kafka 的实现依赖了哪些东西 顽固性便秘刮痧刮哪些穴位 pp助手里面的图片怎么在手机自带图库看到 几种最有效的念佛方法 问便秘的人 刮痧怎么刮 在什么部位 怎么换手机pp资料背景图片 kafka 生产者速度快于消费者那部分多的数据怎么处理 用十念法念佛,呼吸怎么调 如何判断kafka消费者是否关闭是否关闭 如何念佛 手机pp助手为什么将导入的图片导出时总是提示存储... 刮痧对治疗便秘有用吗 有哪些念佛方法? 按揉哪个穴位治便秘呢 我用印光大师的记数十念法 念佛 记不住数怎么办 什么是手机PP 刮痧治疗便秘什么手法 怎样利用十念计数念佛法摄心念佛 iPhone5用PP助手给手机导入不进去照片 刮痧穴位,及其所在位置.以及治疗什么?越详细越好 广慧老和尚念佛十念法法门论义疏? 怎样利用pp恃将手机文件图片快速传到电脑桌面 有谁知道刮痧? pp助手里怎么删除手机里的照片 常智居士晨朝十念佛法门不会可以入定吗? 老婆身上的痧怎么刮?刮什么位置? 印光法师三三四念佛如何换气,请念佛师兄指教 刮痧的有哪些好处 不同部位好处不同 乳胶床垫有味道怎么办
Top