生产者在业务中实时读取原始数据进行业务逻辑处理,然后调用Kafka的生产者接口将处理后的消息记录写入到Kafa集群中。

使用脚本操作生产者
生产者发布消息
1
| ./kafka-console-producer.sh --broker-list broker_name:port --topic topic_name
|
消费者查看消息
1
| ./kafka-console-consumer.sh --broker-list broker_name:port --topic topic_name
|
使用Java API操作生产者
导入依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version> </dependency>
|
使用生产者API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); }
producer.close(); } }
|
多线程调用生产者API
由于Kafka的生产者对象是线程安全的,可以由多个线程调用Kafka生产者对象。
创建生产线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerThread implements Runnable {
private KafkaProducer<String, String> producer = null; private ProducerRecord<String, String> record = null;
public ProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) { this.producer = producer; this.record = record; }
@Override public void run() { producer.send(record, (metadata, e) -> { if (null != e) { e.printStackTrace(); } if (null != metadata) { System.out.println("消息发送成功 :"+String.format("offset: %s, partition:%s, topic:%s timestamp:%s", metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp())); } }); }
}
|
多个线程通过同一个Producer对象发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| package kafka.producer;
import kafka.OrderMessage; import kafka.partition.PartitionUtil; import org.apache.kafka.clients.producer.*; import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties; import java.util.UUID; import java.util.concurrent.*;
public class ProducerDemo { static Properties properties = new Properties();
static String topic = "test";
static KafkaProducer<String, String> producer = null;
static int corePoolSize = 5;
static int maximumPoolSize = 20;
static long keepAliveTime = 60;
static TimeUnit timeUnit = TimeUnit.SECONDS;
static BlockingQueue blockingQueue = new LinkedBlockingQueue();
static ExecutorService service = null;
static { properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionUtil.class.getName()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(properties); service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, blockingQueue); }
public static void main(String args[]) throws Exception { for (int i = 0; i < 6; i++) { service.submit(createMsgTask()); } }
public static ProducerThread createMsgTask() { OrderMessage orderMessage = new OrderMessage(); orderMessage.setId(UUID.randomUUID().toString()); long timestamp = System.nanoTime(); orderMessage.setCreateTime(timestamp); orderMessage.setRemake("rem"); orderMessage.setsName("test"); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, timestamp + "", orderMessage.toString()); ProducerThread task = new ProducerThread(producer, record); return task; } }
|
序列化器
Kafka提供了一些序列化器,可以在org.apache.kafka.common.serialization
中找到。
我们也可通过实现org.apache.kafka.common.serialization.Serializer<T>
接口,并重写其中的byte[] serialize(String topic, T data)
方法实现自定义序列化器。
References