一、下载安装Kafka0.8.2
二、vi config/server.properties
三、修改为advertised.host.name=192.168.1.76
四、rm -rf /tmp *移除临时目录下的文件
五、修改vi /etc/hosts中的127.0.0.1为192.168.1.76
六、开启zookeeper
- bin/zookeeper-server-start.sh config/zookeeper.properties
七、开启kafka
bin/kafka-server-start.sh config/server.properties
八、创建主题
bin/kafka-topics.sh --create --zookeeper 192.168.1.76:2181 --replication-factor 1 --partitions 1 --topic mytesttopic
九、开启消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.1.76:2181 --topic mytesttopic --from-beginning 回车
十、生产者代码(0.8.2.1的jar包)
- import java.util.*;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- public class SimpleProducer {
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.1.76:9092");
- properties.put("metadata.broker.list", "192.168.1.76:9092");
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("serializer.class", "kafka.serializer.StringEncoder");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("request.required.acks", "1");
- KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
- for (int iCount = 0; iCount < 100; iCount++) {
- String message = "My Test Message No " + iCount;
- ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("mytesttopic", message);
- producer.send(record);
- }
- producer.close();
- }
- }
十一、查看结果
- My Test Message No 0
- My Test Message No 1
- My Test Message No 2
- My Test Message No 3
- My Test Message No 4
- My Test Message No 5
- My Test Message No 6
- My Test Message No 7
- My Test Message No 8
- My Test Message No 9
- My Test Message No 10
- ...................
- ..................
十、消费者代码(0.8.2.1的jar包)
- import kafka.consumer.ConsumerConfig;
- import kafka.consumer.ConsumerIterator;
- import kafka.consumer.KafkaStream;
- import kafka.serializer.StringDecoder;
- import kafka.utils.VerifiableProperties;
- import java.util.*;
- public class SimpleConsumerExample {
- private static kafka.javaapi.consumer.ConsumerConnector consumer;
- public static void consume() {
- Properties props = new Properties();
- // zookeeper 配置
- props.put("zookeeper.connect", "192.168.1.76:2181");
- // group 代表一个消费组
- props.put("group.id", "jd-group");
- // zk连接超时
- props.put("zookeeper.session.timeout.ms", "4000");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
- props.put("auto.offset.reset", "smallest");
- // 序列化类
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- ConsumerConfig config = new ConsumerConfig(props);
- consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put("mytesttopic", new Integer(1));
- StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
- StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
- Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,
- keyDecoder, valueDecoder);
- KafkaStream<String, String> stream = consumerMap.get("mytesttopic").get(0);
- ConsumerIterator<String, String> it = stream.iterator();
- while (it.hasNext())
- System.out.println(it.next().message());
- }
- public static void main(String[] args) {
- consume();
- }
- }
十一、提供下C#版的代码
-
- static void Main(string[] args)
- {
- //https://github.com/Jroland/kafka-net
- //生产者
- //var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));
- //var router = new BrokerRouter(options);
- //var client = new Producer(router);
- //client.SendMessageAsync("mytesttopic", new[] { new Message("hello world") }).Wait();
- //using (client) { }
- //消费者
- var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));
- var router = new BrokerRouter(options);
- var consumer = new Consumer(new ConsumerOptions("mytesttopic", router));
- //Consume returns a blocking IEnumerable (ie: never ending stream)
- foreach (var message in consumer.Consume())
- {
- Console.WriteLine("Response: P{0},O{1} : {2}",
- message.Meta.PartitionId, message.Meta.Offset,System.Text.Encoding.ASCII.GetString(message.Value));
- }
- Console.ReadLine();
- }