博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
创建Kafka0.8.2生产者与消费者
阅读量:6237 次
发布时间:2019-06-22

本文共 4393 字,大约阅读时间需要 14 分钟。

一、下载安装Kafka0.8.2

二、vi config/server.properties

三、修改为advertised.host.name=192.168.1.76

四、rm -rf  /tmp *移除临时目录下的文件

五、修改vi /etc/hosts中的127.0.0.1为192.168.1.76

六、开启zookeeper 

[html]   
 
  1. bin/zookeeper-server-start.sh config/zookeeper.properties  

七、开启kafka

bin/kafka-server-start.sh config/server.properties

八、创建主题

bin/kafka-topics.sh --create --zookeeper 192.168.1.76:2181 --replication-factor 1 --partitions 1 --topic mytesttopic

九、开启消费者

bin/kafka-console-consumer.sh --zookeeper 192.168.1.76:2181 --topic mytesttopic --from-beginning 回车

十、生产者代码(0.8.2.1的jar包)

[java]   
 
  1. import java.util.*;  
  2.   
  3. import org.apache.kafka.clients.producer.KafkaProducer;  
  4. import org.apache.kafka.clients.producer.ProducerRecord;  
  5.   
  6. public class SimpleProducer {  
  7.     public static void main(String[] args) {  
  8.         Properties properties = new Properties();  
  9.         properties.put("bootstrap.servers", "192.168.1.76:9092");  
  10.         properties.put("metadata.broker.list", "192.168.1.76:9092");  
  11.         properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  12.         properties.put("serializer.class", "kafka.serializer.StringEncoder");  
  13.         properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  14.         properties.put("request.required.acks", "1");  
  15.   
  16.         KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);  
  17.         for (int iCount = 0; iCount < 100; iCount++) {  
  18.             String message = "My Test Message No " + iCount;  
  19.             ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("mytesttopic", message);  
  20.             producer.send(record);  
  21.         }  
  22.         producer.close();  
  23.     }  
  24. }  

十一、查看结果

 

 

[html]   
 
  1. My Test Message No 0  
  2. My Test Message No 1  
  3. My Test Message No 2  
  4. My Test Message No 3  
  5. My Test Message No 4  
  6. My Test Message No 5  
  7. My Test Message No 6  
  8. My Test Message No 7  
  9. My Test Message No 8  
  10. My Test Message No 9  
  11. My Test Message No 10  
[html]   
 
  1. ...................  
[html]   
 
  1. ..................  

十、消费者代码(0.8.2.1的jar包)

 

[java]   
 
  1. import kafka.consumer.ConsumerConfig;  
  2. import kafka.consumer.ConsumerIterator;  
  3. import kafka.consumer.KafkaStream;  
  4. import kafka.serializer.StringDecoder;  
  5. import kafka.utils.VerifiableProperties;  
  6. import java.util.*;  
  7. public class SimpleConsumerExample {  
  8.   
  9.     private static kafka.javaapi.consumer.ConsumerConnector consumer;  
  10.   
  11.     public static void consume() {  
  12.   
  13.         Properties props = new Properties();  
  14.         // zookeeper 配置  
  15.         props.put("zookeeper.connect", "192.168.1.76:2181");  
  16.   
  17.         // group 代表一个消费组  
  18.         props.put("group.id", "jd-group");  
  19.   
  20.         // zk连接超时  
  21.         props.put("zookeeper.session.timeout.ms", "4000");  
  22.         props.put("zookeeper.sync.time.ms", "200");  
  23.         props.put("auto.commit.interval.ms", "1000");  
  24.         props.put("auto.offset.reset", "smallest");  
  25.         // 序列化类  
  26.         props.put("serializer.class", "kafka.serializer.StringEncoder");  
  27.   
  28.         ConsumerConfig config = new ConsumerConfig(props);  
  29.   
  30.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);  
  31.   
  32.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  33.         topicCountMap.put("mytesttopic", new Integer(1));  
  34.   
  35.         StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());  
  36.         StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());  
  37.   
  38.         Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,  
  39.                 keyDecoder, valueDecoder);  
  40.         KafkaStream<String, String> stream = consumerMap.get("mytesttopic").get(0);  
  41.         ConsumerIterator<String, String> it = stream.iterator();  
  42.         while (it.hasNext())  
  43.             System.out.println(it.next().message());  
  44.     }  
  45.   
  46.     public static void main(String[] args) {  
  47.         consume();  
  48.     }  
  49. }  

十一、提供下C#版的代码

 

[csharp]   
 
    1. static void Main(string[] args)  
    2. {  
    3.     //https://github.com/Jroland/kafka-net  
    4.     //生产者  
    5.     //var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));  
    6.     //var router = new BrokerRouter(options);  
    7.     //var client = new Producer(router);  
    8.   
    9.     //client.SendMessageAsync("mytesttopic", new[] { new Message("hello world") }).Wait();  
    10.   
    11.     //using (client) { }  
    12.   
    13.     //消费者  
    14.     var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));  
    15.     var router = new BrokerRouter(options);  
    16.     var consumer = new Consumer(new ConsumerOptions("mytesttopic", router));  
    17.   
    18.     //Consume returns a blocking IEnumerable (ie: never ending stream)  
    19.     foreach (var message in consumer.Consume())  
    20.     {  
    21.         Console.WriteLine("Response: P{0},O{1} : {2}",  
    22.             message.Meta.PartitionId, message.Meta.Offset,System.Text.Encoding.ASCII.GetString(message.Value));  
    23.     }  
    24.     Console.ReadLine();  
    25. }  

转载于:https://www.cnblogs.com/heidsoft/p/7697979.html

你可能感兴趣的文章
Maven仓库的布局
查看>>
PHP定界符<<<的使用方法
查看>>
LeetCode--176--第二高的薪水
查看>>
如何解决开机出现Missing operating system的故障
查看>>
【C#学习笔记】函数重载
查看>>
解除映射错误
查看>>
TOJ 假题之 Cow Brainiacs
查看>>
命令模式(Command Pattern)
查看>>
升级到Ubuntu 11.10遇到的问题
查看>>
二十年后的回眸(8)——晋级的炒更之旅
查看>>
Oracle dataGuard专题:利用冷备创建standby
查看>>
运维工程师的职责和前景
查看>>
小议安全测试【测试帮日记公开课】
查看>>
Red Hat Enterprise Linux 8 Beta 抢先体验
查看>>
objectC 数据类型转换
查看>>
阿里退市,投资人实在应该鼓掌欢迎
查看>>
zabbix企业应用之从数据库提取centos 6.2系统在线天数
查看>>
大学生抄袭,病根在哪?
查看>>
3.VMware vsphere 5.0新体验-安装VMware Center
查看>>
Windows 7的预备知识系列之二:认识Windows 7中的窗口
查看>>