本篇博主带来的是Kafka的Producer API操作。
1. 消息发送流程
。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
2. 无回调参数的API
- 1. 导入依赖
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version></version> </dependency> </dependencies>
- 2. 编写代码
- 3. 完整代码
package com.buwenbuhuo.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@author 卜温不火
* @create 2020-05-06 20:21
* com.buwenbuhuo.kafka.producer - the name of the target package where the new class or interface will be created.
* kafka0506 - the name of the current project.
public class CustomProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop002:9092");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重试次数 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("second", i + "", "my name is bu wen bu huo -" + i)); } producer.close(); }
- 4. 群起zookeeper和kafka
[bigdata@hadoop002 zookeeper-3.4.10]$ bin/start-allzk.sh
[bigdata@hadoop002 kafka]$ bin/start-kafkaall.sh
- 1
- 2
- 5. 启动一个consumer控制台
[bigdata@hadoop002 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop002:9092 --topic first
- 1
- 6. 跑代码进行测试
3. 带回调函数的API
- 1. 代码
package com.buwenbuhuo.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
@author 卜温不火
* @create 2020-05-06 20:21
* com.buwenbuhuo.kafka.producer - the name of the target package where the new class or interface will be created.
* kafka0506 - the name of the current project.
public class CustomProducer { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop002:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); //1.创建1个生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<>(props); //2.调用send方法 for (int i = 0; i < 1000; i++) { producer.send(new ProducerRecord<String, String>("second", i + "", "message-" + i), (metadata,exception)->{ if(exception==null){ System.out.println("success"+metadata.topic()+"-"+metadata.partition()+"-"+metadata.offset()); }else{ exception.printStackTrace(); } }); } //3.关闭生产者 producer.close(); }
- 2. 结果
- 3. 大致流程图
4. 同步发送API / 只是比异步多了一个.get()
- 1. 代码:
package com.buwenbuhuo.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@author 卜温不火
* @create 2020-05-06 20:21
* com.buwenbuhuo.kafka.producer - the name of the target package where the new class or interface will be created.
* kafka0506 - the name of the current project.
public class CustomProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop002:9092");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重试次数 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("second", i + "", "my name is bu wen bu huo -" + i)).get(); } producer.close(); }
- 2. 结果
