博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka java api
阅读量:1825 次
发布时间:2019-04-25

本文共 4895 字,大约阅读时间需要 16 分钟。

1、消费者

package com.asiainfo.group.kafka.consumer;import java.io.FileReader;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.clients.consumer.OffsetCommitCallback;import org.apache.kafka.common.TopicPartition;public class ConsumerDemo {	private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/consumer/";	private static KafkaConsumer
consumer; static{ try { Properties p = new Properties(); p.load(new FileReader(PATH+"consumer.properties")); consumer = new KafkaConsumer
(p); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args){ new Thread(new Runnable() { @Override public void run() { List
topics = new ArrayList
(); topics.add("okk"); consumer.subscribe(topics); try { while(true){ ConsumerRecords
records = consumer.poll(3000); System.err.println("收到了"+records.count()+"条消息"); for (ConsumerRecord
record : records) { System.err.println("topic:"+record.topic()); System.err.println("partition:"+record.partition()); System.err.println("offset:"+record.offset()); System.err.println("key:"+record.key()); System.err.println("value:"+record.value()); } //同步提交:会一直尝试直至提交成功,会一直阻塞 //consumer.commitSync(); //异步提交:不会重试,原因是因为重试过程中可能有更大的偏移量提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map
arg0, Exception arg1) { System.err.println("异步提交偏移量成功!"); } }); //同步提交结合异步提交 //如果一切正常,就用异步提交,即使此次提交不成功,下次提交总会成功的 //如果关闭消费者,就没有下一次了,则用同步提交,一直尝试到提交成功为止,类似下面的代码 /*try{ consumer.commitSync(); } finally{ consumer.close(); }*/ } } catch (Exception e) { e.printStackTrace(); } finally{ consumer.close(); } } }).start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } //wakeup方法是唯一能够打断consumer.poll的方法,并使其抛出异常跳出while(true),然后进入finally关闭consumer consumer.wakeup(); }}

 

2、生产者

package com.asiainfo.group.kafka.producer;import java.io.FileReader;import java.util.Properties;import java.util.concurrent.ExecutionException;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class ProductDemo {	private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/producer/";	private static KafkaProducer
producer; static{ try { Properties p = new Properties(); p.load(new FileReader(PATH+"producer.properties")); producer = new KafkaProducer
(p); } catch (Exception e) { e.printStackTrace(); } } /** * 发送并忘记(不关心是否到达) * @throws ExecutionException * @throws InterruptedException */ public void sendAndForget() throws Exception{ for (int i = 0; i < 10; i++) { ProducerRecord
record = new ProducerRecord<>("okk", "key"+i, "sendAndForget"+i); producer.send(record); } producer.close(); } public void syncSend() throws Exception{ long start = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { ProducerRecord
record = new ProducerRecord<>("test1", "key"+i, "syncSend"+i); RecordMetadata recordMetadata = producer.send(record).get(); System.err.println("同步发送成功!"); } System.err.println("同步发送耗时:"+(System.currentTimeMillis()-start)); producer.close(); } public void asyncSend(){ long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { ProducerRecord
record = new ProducerRecord<>("test1", "key"+i, "asyncSend"+i); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata arg0, Exception arg1) { System.err.println("异步发送成功!"); } }); } System.err.println("异步发送耗时:"+(System.currentTimeMillis()-start)); producer.close(); } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ new ProductDemo().sendAndForget(); //new ProductDemo().syncSend(); //new ProductDemo().asyncSend(); }}

3、生产者配置文件

bootstrap.servers=192.168.0.108:9092key.serializer=org.apache.kafka.common.serialization.StringSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializer

4、消费者配置文件

bootstrap.servers=192.168.0.108:9092key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializergroup.id=groupByJava1enable.auto.commit=false

 

转载地址:http://rjpkf.baihongyu.com/

你可能感兴趣的文章
vue的模板的父子组件使用,并且实现数据传递
查看>>
vue父访问子组件中的数据方法,使用$refs
查看>>
vue中子组件访问父组件中的数据方法,使用$parent或者$root
查看>>
vue中的插槽使用slot
查看>>
前端模块化工具webpack
查看>>
安装vue脚手架不成功的解决方法
查看>>
使用visual studio 2019 开发vue项目
查看>>
ES6的箭头函数使用
查看>>
android studio 新版本打开就版本代码出现的问题解决办法
查看>>
android alertdialog的自定义背景设置方法
查看>>
简单的Toast类
查看>>
android studio 新建文件自动增加文件注释说明
查看>>
让activity充满整个屏幕
查看>>
alertdialog点击确定对话框不消失的方法
查看>>
android模拟器使用虚拟串口实现和PC通讯
查看>>
Visual studio 2010 主题设置
查看>>
C# WPF DataGrid 根据某列的值设置行的背景色
查看>>
水晶报表(Crystal Report)通过Visual Studio 2010的WPF实现
查看>>
C# 实现多语言界面
查看>>
限制用户输入 只能是数字,小数,负数。
查看>>