博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka使用实例
阅读量:4653 次
发布时间:2019-06-09

本文共 6028 字,大约阅读时间需要 20 分钟。

 

定义一个procucer

package cn.vko.common.kafka;import java.util.Properties;import org.apache.commons.lang3.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import cn.vko.common.utils.mybatis.GenCreateInterceptor;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class VkoProducer {//    private Logger log = LoggerFactory.getLogger(VkoProducer.class);    private String metadataBrokerList;    private Producer
producer; public VkoProducer(String metadataBrokerList) { super(); if(StringUtils.isEmpty(metadataBrokerList)){ String message = "metadataBrokerList 不可以为空";// log.error(message); throw new RuntimeException(message); } this.metadataBrokerList = metadataBrokerList; // 设置配置属性 Properties props = new Properties(); props.put("metadata.broker.list", metadataBrokerList); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); //props.put("producer.type", "async"); props.put("queue.buffering.max.ms", "5000"); props.put("queue.buffering.max.messages", "30000"); props.put("queue.enqueue.timeout.ms", "-1"); props.put("batch.num.messages", "1"); // 可选配置,如果不配置,则使用默认的partitioner //props.put("partitioner.class", "cn.vko.kafka.PartitionerDemo"); // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失 // 值为0,1,-1,可以参考 // http://kafka.apache.org/08/configuration.html ProducerConfig config = new ProducerConfig(props); producer = new Producer
(config); } /** * 单条插入队列 * @param msg * @param topic 主题 * @return */ public String send(String topic, String msg) {// Long start = System.currentTimeMillis(); KeyedMessage
data = new KeyedMessage
(topic, msg); producer.send(data);// log.info("发送消息耗时:{}",System.currentTimeMillis()- start); return "ok"; }}

 

定义一个receiver

 

package cn.vko.common.kafka;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import cn.vko.common.utils.mybatis.GenCreateInterceptor;import cn.vko.component.pageframework.util.StringUtil;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;public class Receiver {    private Logger log = LoggerFactory.getLogger(Receiver.class);    private String zookeeperConnect;    private String groupId;    private String topic;    private VkoConsumer vkoConsumer;        /**     * 创建收件人     * @param zookeeperConnect zk集群地址,逗号分隔     * @param groupId 组id     * @param topic 主题     * @param vkoConsumer 处理器     */    public Receiver(String zookeeperConnect, String groupId, String topic,VkoConsumer vkoConsumer) {        super();        if(StringUtil.isEmpty(zookeeperConnect)){            String message = "zookeeperConnect 不可以为空";            log.error(message);            throw new RuntimeException(message);        }        if(StringUtil.isEmpty(groupId)){            String message = "groupId 不可以为空";            log.error(message);            throw new RuntimeException(message);        }        if(StringUtil.isEmpty(topic)){            String message = "topic 不可以为空";            log.error(message);            throw new RuntimeException(message);        }        if(vkoConsumer == null){            String message = "vkoConsumer 不可以为空";            log.error(message);            throw new RuntimeException(message);        }        this.zookeeperConnect = zookeeperConnect;        this.groupId = groupId;        this.topic = topic;        this.vkoConsumer = vkoConsumer;        log.info("kafka vkoConsumer 创建完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect);        receive();    }    private void receive(){        Properties props = new Properties();        props.put("zookeeper.connect", zookeeperConnect);        props.put("group.id", groupId);        props.put("zookeeper.session.timeout.ms", "14000");        props.put("zookeeper.sync.time.ms", "200");        props.put("auto.commit.interval.ms", "1000");        ConsumerConfig conf = new ConsumerConfig(props);        ConsumerConnector cc = Consumer.createJavaConsumerConnector(conf);         Map
topicCountMap = new HashMap
(); // 目前每个topic都是2个分区 topicCountMap.put(topic,2); Map
>> consumerMap = cc.createMessageStreams(topicCountMap); List
> streams = consumerMap.get(topic); for (final KafkaStream
stream : streams) { new Thread(){ public void run(){ ConsumerIterator
it = stream.iterator(); while(it.hasNext()){ String msg = new String(it.next().message()); try{ vkoConsumer.dealMsg(msg); }catch(Exception e){ log.error("kafka vkoConsumer topic:{} 收到消息:{} 消费异常 xxxxxxxxxxxxxxxxxx", topic, msg,e); } log.info("kafka vkoConsumer topic:{} 收到消息:{}", topic, msg); } } }.start(); log.info("kafka vkoConsumer 启动完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect); } log.info("kafka vkoConsumer 准备接收消息:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect); }}

 

package cn.vko.common.kafka;public interface VkoConsumer {    public void dealMsg(String strings);}

 

在需要consumer的程序中定义一个消费实现类,并注入到receiver中,这样spring容器启动时会自动创建一个receiver,进行对特定的topic消费

 

转载于:https://www.cnblogs.com/wzb0228/p/10974871.html

你可能感兴趣的文章
雇佣K个工人的最小费用 Minimum Cost to Hire K Workers
查看>>
mysql优化方法
查看>>
[转]【HttpServlet】HttpServletResponse接口 案例:完成文件下载
查看>>
Eclipse配置默认的编码集为utf-8
查看>>
初学Python
查看>>
rman 脚本备份全过程
查看>>
Python小技巧
查看>>
fragment Activity之间传值的方法 之------------接口回调
查看>>
OSS研究
查看>>
Leetcode 116 Populating Next Right Pointers in Each Node
查看>>
Angular 1.63 双向数据绑定 通过 $http 发送数据
查看>>
php以及前端的一些小小的技术要点
查看>>
【精解】EOS标准货币体系与源码实现分析
查看>>
AFore.NET 翻译
查看>>
[大牛翻译系列]Hadoop(8)MapReduce 性能调优:性能测量(Measuring)
查看>>
SQLYog快捷键大全
查看>>
ASP.NET ACCESS 分页
查看>>
HashMap
查看>>
Android广播机制概述
查看>>
我是怎么让全国最大的儿童失踪预警平台流量掉底的
查看>>