Kafka
1.阻塞队列
2.Kafka入门
- Topic:指存放数据的空间
- Partition:指存放消息的分区
- Offset:消息存放的索引
- Leader Replica:主副本,消费者从主副本获取数据
- Follower Replica:从主副本备份数据,不负责响应。如果某个时候主副本挂了,就会从从副本中选择一个新的。
- 启动zookeeper服务
zookeeper-server-start.bat C:\kafka_2.12-2.2.0\config\zookeeper.properties
- 启动kafka
kafka-server-start.bat C:\kafka_2.12-2.2.0\config\server.properties
- 创建topic主体名称为test(此步无返回说明成功)
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
- 查看创建的topic主题
kafka-topics.bat --list --bootstrap-server localhost:9092
- 调用生产者,发送消息(broker指代服务器)
kafka-console-producer.bat --broker-list localhost:9092 --topic test
- 再启动一个cmd窗口,调用消费者,读取consumer发送的消息
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
3.Springboot整合Kafka
-
引入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
-
配置kafka
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=community-consumer-group spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=3000
-
在kafka的config里要配置consumer.properties与上面一致
# consumer group id group.id=community-consumer-group
-
编写生产者和消费者代码
@RunWith(SpringRunner.class) @SpringBootTest @ContextConfiguration(classes = CommunityApplication.class) public class KafkaTests { @Autowired private KafkaProducer kafkaProducer; @Test public void testKafka(){ kafkaProducer.sendMessage("test","你好"); kafkaProducer.sendMessage("test","在吗"); try { Thread.sleep(1000*10); } catch (InterruptedException e) { e.printStackTrace(); } } } @Component //生产者 class KafkaProducer{ @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic,String content){ kafkaTemplate.send(topic,content); } } @Component //消费者 class KafkaConsumer{ @KafkaListener(topics = ("test")) public void handleMessage(ConsumerRecord record){ System.out.println(record.value()); } }
4.发送系统通知
5.显示系统通知
Q.E.D.