博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hello, Kafka World
阅读量:6839 次
发布时间:2019-06-26

本文共 3928 字,大约阅读时间需要 13 分钟。

Producer.java

package com.favccxx.isoft.favkafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * Hello Producer *  * @author favccxx * */public class Producers {    private static Logger logger = LoggerFactory.getLogger(Producers.class);    public static void main(String[] args) {        try {            // Get the Producer            Properties props = new Properties();            props.put("bootstrap.servers", "10.0.10.5:9092");            props.put("acks", "all");            props.put("retries", 0);            props.put("batch.size", 16384);            props.put("linger.ms", 1);            props.put("buffer.memory", 33554432);            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");            Producer
 producer = new KafkaProducer
(props);            for (int i = 0; i < 100; i++)                producer.send(new ProducerRecord
("fav-topic", "boy-" + i, "gril-" + i));            producer.close();        } catch (Exception e) {            e.printStackTrace();        }    }}

输出内容

[TRACE~16:30][main] Sending record ProducerRecord(topic=fav-topic, partition=null, key=boy-1, value=gril-1 with callback null to topic fav-topic partition 0  [org.apache.kafka.clients.producer.KafkaProducer][2016-03-31]

[TRACE~16:30][main] Sending record ProducerRecord(topic=fav-topic, partition=null, key=boy-2, value=gril-2 with callback null to topic fav-topic partition 0  [org.apache.kafka.clients.producer.KafkaProducer][2016-03-31]

[TRACE~16:30][main] Sending record ProducerRecord(topic=fav-topic, partition=null, key=boy-3, value=gril-3 with callback null to topic fav-topic partition 0  [org.apache.kafka.clients.producer.KafkaProducer][2016-03-31]

...

Consumer.java

package com.favccxx.isoft.favkafka;import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Consumer {    static Logger logger = LoggerFactory.getLogger(Consumer.class);    private static KafkaConsumer
 consumer;    public static void main(String[] args) {        Properties props = new Properties();        props.put("bootstrap.servers", "10.0.10.5:9092");        props.put("group.id", "test");        props.put("enable.auto.commit", "true");        props.put("auto.commit.interval.ms", "1000");        props.put("session.timeout.ms", "30000");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        consumer = new KafkaConsumer
(props);        consumer.subscribe(Arrays.asList("fav-topic"));         while (true) {             ConsumerRecords
 records = consumer.poll(100);             for (ConsumerRecord
 record : records)                 logger.debug("@@@@@@@@@@@@@@@@@@" + record.key() + ":" + record.value());         }    }}

输出内容

[DEBUG~16:30][main] @@@@@@@@@@@@@@@@@@boy-0:gril-0  [com.favccxx.isoft.favkafka.Consumer][2016-03-31]

[DEBUG~16:30][main] @@@@@@@@@@@@@@@@@@boy-1:gril-1  [com.favccxx.isoft.favkafka.Consumer][2016-03-31]

[DEBUG~16:30][main] @@@@@@@@@@@@@@@@@@boy-2:gril-2  [com.favccxx.isoft.favkafka.Consumer][2016-03-31]

...

备注:如果在练习过程中出现java.net.ConnectException: Connection timed out: no further information这样的信息,一个是服务器地址不正确,另外可能就是启动Kafka服务器时没有指定host.nameadvertised.host.name,具体操作在上篇文章中有详细描述。

转载地址:http://mvwul.baihongyu.com/

你可能感兴趣的文章
Cocos2d-xna : 横版战略游戏开发实验4 Layer构建丰富的交互
查看>>
给孩子增加学习生物的兴趣,买了个显微镜
查看>>
代码风格 2012/10/12
查看>>
Source Code Pro - 来自 Adobe的最佳编程字体!
查看>>
Uva 11300 Spreading the Wealth
查看>>
深度拷贝
查看>>
远程桌面时自动输入“c“的解决方法
查看>>
谨慎的沉默就是精明的回避
查看>>
音频采样位数问题
查看>>
Response.Clear() Response.ClearContent()和Response.ClearHeaders()之间的区别
查看>>
数字签名、数字证书、对称加密算法、非对称加密算法、单向加密(散列算法)...
查看>>
linux zip
查看>>
一个简单的统计图像主颜色的算法(C#源代码)
查看>>
《你不可不知的50个地球知识》之未来进化
查看>>
动态代理的工作原理图
查看>>
【计算几何】点在多边形内部
查看>>
iOS利用代理实现界面跳转
查看>>
(二十一)状态模式详解(DOTA版)
查看>>
MySQL累积求和
查看>>
第五周 Word注释与交叉引用
查看>>