所有分类
  • 所有分类
  • 实时新闻

KafkaConsumer源码详解

本文首先进行 flink kafka consumer 原理分析,结合 SourceFunction 和 Kafka Client API 详解源码。1.Flink Kafka Consumer 原理本文基于flink-1.11分析 Kafka Consumer 原理。FlinkKafkaConsumer 主要是继承基类 RichParallelSourceFunction,不但可以执行run(…)方法读取数据,而且拥有状态、metric 和多并发等功能。1.1 RichParallelSourceFunction 分析RichParallelSourceFunction 与父类的继承关系,如下图所示。一方面,RichParallelSourceFunction 间接实现接口 SourceFunction,可以执行 run(…)方法读取数据;另一方面,RichParallelSourceFunction 间接实现接口 RichFunction,拥有状态、metric 和多并发等功能。因此,RichParallelSourceFunction 是有状态的和多并发的 Source 基类。ParallelSourceFunction是接口 SourceFunction 的子类。共同点是 Source 的基类,需要实现 run()读取数据。不同点是前者提供多并发的能力,后者的并发度只能为1;AbstractRichFunction是接口 RichFunction 的实现类,可以提供 open()方法获取 RuntimeContext,而RuntimeContext 拥有 metric、subtasks 信息、accumulator、state等功能;RichParallelSourceFunction继承图.jpg1.2 Flink Kafka Consumer 流程分析如下图所示,Flink Kafka Consumer 流程主要分为主线程循环获取缓存数据,发送到下游;消费线程循环消费 Kafka 数据,保存到缓存。Handover.next:Handover 类的 next 属性,即 ConsumerRecords 类型的缓存数据。Handover 的主要作用是协调主线程和消费线程,有序地消费 Kafka 和发送数据到下游算子。Flink Kafka Consumer流程图.JPG(1)主线程主线程获取缓存的 Handover.next 对象即 ConsumerRecords,发送到下游算子。首先创建KafkaFetcher,同时内部创建消费线程 KafkaConsumerThread。然后,调用 KafkaFetcher.runFetchLoop()方法,启动消费线程、循环获取缓存数据;最后,根据分区往下游发送数据。(2)消费线程消费线程 KafkaConsumerThread 主要循环消费 Kafka 数据,保存到缓存。首先,主线程启动消费线程。接着,KafkaConsumer 从 Kafka Broker 循环 poll 数据,同时保持到缓存中。2.Flink Kafka Consumer 源码详解问题1:如何使用 FlinkKafkaConsumer ?如何直接使用 KafkaClient API ?/***示例1: Flink DataStream API 使用 FlinkKafkaConsumer **/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//SimpleStringSchema为数据字段解析类env.addSource(new FlinkKafkaConsumer<>(“eventTopic”, new SimpleStringSchema(), properties)/***示例2: KafkaClient API 直接使用 KafkaConsumer **/KafkaConsumer consumer = new KafkaConsumer<>(properties);consumer.poll(Duration.ofMillis(100));问题2:FlinkKafkaConsumer 内部是如何使用 KafkaClient API ?初始化执行 env.addSource 的时候会创建 StreamSource 算子对象;StreamSource 构造函数中将 function 即 FlinkKafkaConsumer 对象传给父类 AbstractUdfStreamOperator 的 userFunction 变量;StreamExecutionEnvironment源码: publicDataStreamSource addSource(SourceFunction function, String sourceName, TypeInformation typeInfo){ //省略…// function 即 FlinkKafkaConsumer final StreamSource sourceOperator = new StreamSource<>(function);//省略…}AbstractUdfStreamOperator源码:// userFunction 即 FlinkKafkaConsumer public AbstractUdfStreamOperator(F userFunction){ this.userFunction = requireNonNull(userFunction); checkUdfCheckpointingPreconditions();} Task 启动和运行Task 实现 Java多线程接口Runnable。Task 启动后,函数调用链如下 Task.run()-> Task.doRun()->StreamTask.invoke()->StreamTask.runMailboxLoop()->MailboxProcessor.runMailboxLoop()->MailboxProcessor.runMailboxStep()->SourceStreamTask .processInput()。processInput()方法里面启动线程sourceThread.start()。上述的关键源码,如下所示。StreamTask源码如下:@Override public final void invoke() throws Exception {//省略…//调用 MailboxProcessor.runMailboxLoop() runMailboxLoop();//省略…}MailboxProcessor源码如下: public void runMailboxLoop() throws Exception {//省略…//循环执行 runMailboxStep while (runMailboxStep(localMailbox, defaultActionContext)){ }} private boolean runMailboxStep(TaskMailbox localMailbox, MailboxController defaultActionContext) throws Exception { if (processMail(localMailbox)){ //执行 mailboxDefaultAction.runDefaultAction,即执行 SourceStreamTask .processInput() mailboxDefaultAction.runDefaultAction(defaultActionContext);// lock is acquired inside default action as needed return true;} return false;}SourceStreamTask源码如下:@Override protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {//由于目前没有输入,TaskMailbox 先暂停 loop 主线程 controller.suspendDefaultAction(); sourceThread.setTaskDescription(getName()); sourceThread.start();//省略…} private class LegacySourceFunctionThread extends Thread {//省略…@Override public void run(){ try {//执行 source function 的 run()方法 mainOperator.run(lock, getStreamStatusMaintainer(), operatorChain); completionFuture.complete(null);} catch (Throwable t){ completionFuture.completeExceptionally(t);} }//省略…} 消费 KafkaFlinkKafkaConsumerBase 间接实现了 SourceFunction 接口,主要实现 run()方法。然后,在 run()方法创建了一个KafkaFetcher 对象,并主要调用 KafkaFetcher.runFetchLoop()。最终,运行消费线程KafkaConsumerThread,并 while 循环地 poll Kafka 数据。上述的关键源码,如下所示。FlinkKafkaConsumerBase源码如下:@Override public void run(SourceContext sourceContext) throws Exception {//省略…//创建 KafkaFetcher 对象 this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, watermarkStrategy,(StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKACONSUMERMETRICSGROUP), useMetrics);//省略…// kafkaFetcher 执行 runFetchLoop(),即循环消费数据 kafkaFetcher.runFetchLoop();//省略…}KafkaFetcher源码如下:@Override public void runFetchLoop() throws Exception { try {//启动消费线程 KafkaConsumerThread consumerThread.start(); while (running){ //获取协调者 Handover 的 next 缓存值 final ConsumerRecords records = handover.pollNext();//从partition 获取数据 for (KafkaTopicPartitionState partition : subscribedPartitionStates()){ List<consumerrecord> partitionRecords = records.records(partition.getKafkaPartitionHandle());//向下游发送数据 partitionConsumerRecordsHandler(partitionRecords, partition);} }} finally { consumerThread.shutdown();}KafkaConsumerThread源码如下,run()方法中创建 KafkaClient API 的 KafkaConsumer,并使用 KafkaConsumer.poll()消费数据。@Override public void run(){ //省略…//从主线程获取的 handover 赋值给本地变量… final Handover handover = this.handover;//省略… try {//创建 KafkaConsumer this.consumer = getConsumer(kafkaProperties);} catch (Throwable t){ handover.reportError(t); return;} //省略… ConsumerRecords records = null;// while 循环消费 Kafka while (running){ //省略… if (records == null){ try {// KafkaConsumer poll 数据,即使用 KafkaClient API 的 KafkaConsumer 消费数据 records = consumer.poll(pollTimeout);} catch (WakeupException we){ continue;} } try {//把 Kafka 的数据保存在 Handover 的缓存中 handover.produce(records); records = null;} //省略…} }问题3:Handover 是如何协调消费线程和主线程,使得前者可以及时消费和保存数据,而后者也可以及时获取数据?Handover 的关键方法是produce()保存缓存数据 next、pollNext()获取缓存数据 next,主要作用是在消费线程和主线程下,保证同一个缓存数据 next ,在同一时间内是不能既更新(写),也输出(读),即保证原子性操作 next。Handover源码如下:/*** consumer 线程把 Kafka 数据保存到 next **/ public void produce(final ConsumerRecords element) throws InterruptedException, WakeupException, ClosedException { checkNotNull(element); synchronized (lock){ //循环判断 next 是否为 null while (next != null &&!wakeupProducer){ // lock 会释放当前的锁,该 consumer 线程进入 waiting 状态 lock.wait();} //省略… else if (error == null){ //写 next next = element;//唤醒 lock(使得处于 waiting 状态的 main 线程能够继续执行) lock.notifyAll();} //省略…} }/*** main 线程读取 next **/ public ConsumerRecords pollNext() throws Exception { synchronized (lock){ //循环判断 next 是否为 null while (next == null && error == null){ // lock 会释放当前的锁,该 main 线程进入 waiting 状态 lock.wait();} //读取 next ConsumerRecords n = next; if (n != null){ next = null;//唤醒 lock(使得处于 waiting 状态的 consumer 线程能够继续执行) lock.notifyAll(); return n;} //省略…} }Java 多线程的等待/通知机制:Object wait()、notify/notifyAll()当线程执行 wait()方法的时候,会释放当前的锁,然后让出CPU,进入等待状态。当线程执行 notify/notifyAll()方法的时候,会唤醒一个或多个正处于等待状态的线程,然后继续往下执行,直到执行完synchronized 代码块的代码或是中途遇到 wait(),再次释放锁。</consumerrecord

原文链接:https://www.w1ym.com/69655/,转载请注明出处~~~
0

评论0

请先

站点公告

【温馨提示】 本站不建议您对本站支付任何费用或开通任何会员本站99%资源为免费资源只提供共享不提供技术支持,本站资源主要以学习开发为主,本站是为个人资源记录学习研究等情况而建立,如特殊原因下载,需在24小时删除相关资源。本站资源均来自互联网收集或网友分享,若有侵权,请联系站长删除,谢谢。立即查看
显示验证码
没有账号?注册  忘记密码?