消费者与消费者组
消费者组是若干个消费者组成的集合,一个消费者组包含以下特性:
- 一个消费者组可以有一个或多个消费者实例
- 消费者组名(GroupId)通常由一个一个字符串表示,有唯一性。
- 当一个消费者组订阅了主题,那么该主题中的每个分区职能分配给否一个消费者组中的某一个消费者程序
一个分区对应一个消费者,一个消费者可以负责多个分区。消费者数量尽量不要超过话题的分区数,否则多出的消费者将处于空闲状态。

使用脚本控制消费者
1
| ./kafka-console-consumer.sh --broker-list broker_name:port --topic topic_name
|
使用消费者API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import org.apache.kafka.clients.producer.KafkaConsumer; import org.apache.kafka.clients.producer.ConsumerRecords;
import java.util.Properties;
public class KafkaConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList("test")); try { while (true) { ConsumerReccords<String, String> records = consumer.poll(100); for (ConsumerReccord<String, String> record : records) { log.debug("topic=%s, partition=%s, offset=%d, key=%s, value=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } }
|
KafkaConsumer是非多线程并发安全的:如果多个线程公用一个KafkaConsumer实例,则抛出异常错误信息。