Kafka

1.阻塞队列

image-20220331195804680

2.Kafka入门

image-20220331201508377

  • Topic:指存放数据的空间
  • Partition:指存放消息的分区
  • Offset:消息存放的索引
  • Leader Replica:主副本,消费者从主副本获取数据
  • Follower Replica:从主副本备份数据,不负责响应。如果某个时候主副本挂了,就会从从副本中选择一个新的。
  1. 启动zookeeper服务
zookeeper-server-start.bat C:\kafka_2.12-2.2.0\config\zookeeper.properties
  1. 启动kafka
kafka-server-start.bat C:\kafka_2.12-2.2.0\config\server.properties
  1. 创建topic主体名称为test(此步无返回说明成功)
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
  1. 查看创建的topic主题
kafka-topics.bat --list --bootstrap-server localhost:9092
  1. 调用生产者,发送消息(broker指代服务器)
kafka-console-producer.bat --broker-list localhost:9092 --topic test

image-20220331220631466

  1. 再启动一个cmd窗口,调用消费者,读取consumer发送的消息
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

image-20220331220820331

3.Springboot整合Kafka

image-20220331220909944

  1. 引入依赖

    <dependency>
    			<groupId>org.springframework.kafka</groupId>
    			<artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. 配置kafka

    spring.kafka.bootstrap-servers=localhost:9092
    spring.kafka.consumer.group-id=community-consumer-group
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=3000
    
  3. 在kafka的config里要配置consumer.properties与上面一致

    # consumer group id
    group.id=community-consumer-group
    
  4. 编写生产者和消费者代码

    @RunWith(SpringRunner.class)
    @SpringBootTest
    @ContextConfiguration(classes = CommunityApplication.class)
    public class KafkaTests {
    
        @Autowired
        private KafkaProducer kafkaProducer;
    
        @Test
        public void testKafka(){
            kafkaProducer.sendMessage("test","你好");
            kafkaProducer.sendMessage("test","在吗");
    
            try {
                Thread.sleep(1000*10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    @Component
    //生产者
    class KafkaProducer{
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        public void sendMessage(String topic,String content){
            kafkaTemplate.send(topic,content);
        }
    }
    
    @Component
    //消费者
    class KafkaConsumer{
        @KafkaListener(topics = ("test"))
        public void handleMessage(ConsumerRecord record){
            System.out.println(record.value());
        }
    }
    

    4.发送系统通知

    image-20220331234635332

    5.显示系统通知

Q.E.D.


窝似嫩叠