本文共 3928 字,大约阅读时间需要 13 分钟。
Producer.java
package com.favccxx.isoft.favkafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * Hello Producer * * @author favccxx * */public class Producers { private static Logger logger = LoggerFactory.getLogger(Producers.class); public static void main(String[] args) { try { // Get the Producer Properties props = new Properties(); props.put("bootstrap.servers", "10.0.10.5:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer (props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord ("fav-topic", "boy-" + i, "gril-" + i)); producer.close(); } catch (Exception e) { e.printStackTrace(); } }}
输出内容
[TRACE~16:30][main] Sending record ProducerRecord(topic=fav-topic, partition=null, key=boy-1, value=gril-1 with callback null to topic fav-topic partition 0 [org.apache.kafka.clients.producer.KafkaProducer][2016-03-31] [TRACE~16:30][main] Sending record ProducerRecord(topic=fav-topic, partition=null, key=boy-2, value=gril-2 with callback null to topic fav-topic partition 0 [org.apache.kafka.clients.producer.KafkaProducer][2016-03-31] [TRACE~16:30][main] Sending record ProducerRecord(topic=fav-topic, partition=null, key=boy-3, value=gril-3 with callback null to topic fav-topic partition 0 [org.apache.kafka.clients.producer.KafkaProducer][2016-03-31] ... |
Consumer.java
package com.favccxx.isoft.favkafka;import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Consumer { static Logger logger = LoggerFactory.getLogger(Consumer.class); private static KafkaConsumerconsumer; public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "10.0.10.5:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer (props); consumer.subscribe(Arrays.asList("fav-topic")); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) logger.debug("@@@@@@@@@@@@@@@@@@" + record.key() + ":" + record.value()); } }}
输出内容
[DEBUG~16:30][main] @@@@@@@@@@@@@@@@@@boy-0:gril-0 [com.favccxx.isoft.favkafka.Consumer][2016-03-31] [DEBUG~16:30][main] @@@@@@@@@@@@@@@@@@boy-1:gril-1 [com.favccxx.isoft.favkafka.Consumer][2016-03-31] [DEBUG~16:30][main] @@@@@@@@@@@@@@@@@@boy-2:gril-2 [com.favccxx.isoft.favkafka.Consumer][2016-03-31] ... |
备注:如果在练习过程中出现java.net.ConnectException: Connection timed out: no further information这样的信息,一个是服务器地址不正确,另外可能就是启动Kafka服务器时没有指定host.name和advertised.host.name,具体操作在上篇文章中有详细描述。
转载地址:http://mvwul.baihongyu.com/