1. Überblick Die Verzögerung der Kafka-Konsumentengruppe ist ein wichtiger Leistungsindikator für jedes Kafka-basierte ereignisgesteuerte System. In diesem Tutorial werden wir eine Analyseanwendung zur Überwachung des Kafka-Consumer-Lags erstellen.

2. Consumer Lag Consumer Lag ist einfach das Delta zwischen dem letzten Commit-Offset des Consumers und dem End-Offset des Producers im Log.

Mit anderen Worten, der Consumer Lag misst die Verzögerung zwischen der Produktion und dem Konsum von Nachrichten in einem Producer-Consumer-System. In diesem Abschnitt wird erläutert, wie die Offset-Werte bestimmt werden können. 2.1. Kafka AdminClient Um die Offset-Werte einer Consumer-Gruppe zu untersuchen, benötigen wir den administrativen Kafka-Client. Schreiben wir also eine Methode in die LagAnalyzerService-Klasse, um eine Instanz der AdminClient-Klasse zu erstellen: private AdminClient getAdminClient(String bootstrapServerConfig) { Properties config = new Properties(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig); return AdminClient.create(config); } Beachten Sie die Verwendung der @Value-Anmerkung, um die Bootstrap-Serverliste aus der Property-Datei abzurufen.

Auf dieselbe Weise werden wir diese Annotation verwenden, um andere Werte wie groupId und topicName zu erhalten. 2.2. Verbrauchergruppen-Offset Zunächst können wir die Methode listConsumerGroupOffsets() der AdminClient-Klasse verwenden, um die Offset-Informationen einer bestimmten Verbrauchergruppen-ID abzurufen. Da wir uns hauptsächlich auf die Offset-Werte konzentrieren, können wir die Methode partitionsToOffsetAndMetadata() aufrufen, um eine Karte von TopicPartition vs.

Überwachen des Consumer Lag in Apache Kafka

OffsetAndMetadata zu erhalten. OffsetAndMetadata Werte zu erhalten: private Map getConsumerGrpOffsets(String groupId) throws ExecutionException, InterruptedException { ListConsumerGroupOffsetsResult info = adminClient.listConsumerGroupOffsets(groupId); Map topicPartitionOffsetAndMetadataMap = info.partitionsToOffsetAndMetadata().get(); Map groupOffset = new HashMap<>(); for (Map.Entry entry : topicPartitionOffsetAndMetadataMap.entrySet()) { TopicPartition key = entry.getKey(); OffsetAndMetadata metadata = entry.getValue(); groupOffset.putIfAbsent(new TopicPartition(key.topic(), key.partition()), metadata.offset()); } return groupOffset; } Schließlich können wir die Iteration über die topicPartitionOffsetAndMetadataMap feststellen, um unsere abgerufenen Ergebnisse auf die Offset-Werte für jedes Thema und jede Partition zu beschränken.

2.3. Producer Offset Das Einzige, was für die Suche nach der Verbrauchergruppenverzögerung übrig bleibt, ist eine Möglichkeit, die Endoffsetwerte zu erhalten. Hierfür können wir die endOffsets()-Methode der KafkaConsumer-Klasse verwenden. Beginnen wir mit der Erstellung einer Instanz der KafkaConsumer-Klasse in der LagAnalyzerService-Klasse: private KafkaConsumer getKafkaConsumer(String bootstrapServerConfig) { Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return new KafkaConsumer<>(properties); } Als Nächstes aggregieren wir alle relevanten TopicPartition-Werte aus den ConsumerGrpOffsets, für die wir die Verzögerung berechnen müssen, so dass wir sie als Argument für die endOffsets()-Methode bereitstellen: private Map getProducerOffsets(Map consumerGrpOffset) { List topicPartitions = new LinkedList<>(); for (Map.Entry entry : consumerGrpOffset.entrySet()) { TopicPartition key = entry.getKey(); topicPartitions.add(new TopicPartition(key.topic(), key.partition())); } return kafkaConsumer.endOffsets(topicPartitions); } Zum Schluss schreiben wir eine Methode, die die Offsets des Konsumenten und die EndOffsets des Produzenten verwendet, um den Lag für jede TopicPartition zu generieren: private Map computeLags( Map consumerGrpOffsets, Map producerOffsets) { Map lags = new HashMap<>(); for (Map.Entry entry : consumerGrpOffsets.entrySet()) { Long producerOffset = producerOffsets.get(entry.getKey()); Long consumerOffset = consumerGrpOffsets.get(entry.getKey()); long lag = Math.abs(producerOffset - consumerOffset); lags.putIfAbsent(entry.getKey(), lag); } return lags; } 3.

Lag-Analyzer Nun wollen wir die Lag-Analyse orchestrieren, indem wir die Methode analyzeLag() in der Klasse LagAnalyzerService schreiben: public void analyzeLag(String groupId) throws ExecutionException, InterruptedException { Map consumerGrpOffsets = getConsumerGrpOffsets(groupId); Map producerOffsets = getProducerOffsets(consumerGrpOffsets); Map lags = computeLags(consumerGrpOffsets, producerOffsets); for (Map.Entry lagEntry : lags.entrySet()) { String topic = lagEntry.getKey().topic(); int partition = lagEntry.getKey().partition(); Long lag = lagEntry.getValue(); System.out.printf(Time=%s | Lag for topic = %s, partition = %s is %d/MonitoringUtil.time(), topic, partition, lag); } } Für die Überwachung der Verzögerungsmetrik benötigen wir jedoch einen Fast-Echtzeitwert der Verzögerung, damit wir administrative Maßnahmen zur Wiederherstellung der Systemleistung ergreifen können.

Eine einfache Möglichkeit, dies zu erreichen, ist die Abfrage des Verzögerungswerts in einem regelmäßigen Zeitintervall. Erstellen wir also einen LiveLagAnalyzerService-Dienst, der die Methode analyzeLag() des LagAnalyzerService aufruft: @Scheduled(fixedDelay = 5000L) public void liveLagAnalysis() throws ExecutionException, InterruptedException { lagAnalyzerService.analyzeLag(groupId); } Für unseren Zweck haben wir die Abfragefrequenz mit der @Scheduled-Annotation auf 5 Sekunden festgelegt.

Für die Überwachung in Echtzeit müssten wir dies jedoch wahrscheinlich über JMX zugänglich machen. 4. Simulation In diesem Abschnitt werden wir Kafka-Producer und -Consumer für ein lokales Kafka-Setup simulieren, damit wir LagAnalyzer in Aktion sehen können, ohne von einem externen Kafka-Producer und -Consumer abhängig zu sein. 4.1. Simulationsmodus Da der Simulationsmodus nur zu Demonstrationszwecken benötigt wird, sollten wir einen Mechanismus haben, um ihn auszuschalten, wenn wir die LagAnalyzer-Anwendung in einem realen Szenario ausführen wollen.

Wir können dies als konfigurierbare Eigenschaft in der Ressourcendatei application.properties festlegen: monitor.producer.simulate=true monitor.consumer.simulate=true Wir werden diese Eigenschaften in den Kafka-Producer und -Consumer einfügen und ihr Verhalten steuern. Zusätzlich definieren wir producer startTime, endTime und eine Hilfsmethode time(), um die aktuelle Zeit während der Überwachung zu erhalten: public static final Date startTime = new Date(); public static final Date endTime = new Date(startTime.getTime() + 30 * 1000); public static String time() { DateTimeFormatter dtf = DateTimeFormatter.ofPattern(yyyy//MM//dd HH:mm:ss); LocalDateTime now = LocalDateTime.now(); String date = dtf.format(now); return date; } 4.2.

Producer-Consumer-Konfigurationen Wir müssen einige Kernkonfigurationswerte für die Instanziierung unserer Kafka-Consumer- und Producer-Simulatoren definieren. Zunächst definieren wir die Konfiguration für den Consumer-Simulator in der Klasse KafkaConsumerConfig: public ConsumerFactory consumerFactory(String groupId) { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); if (enabled) { props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); } else { props.put(ConsumerConfig.GROUP_ID_CONFIG, simulateGroupId); } props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); if (enabled) { factory.setConsumerFactory(consumerFactory(groupId)); } else { factory.setConsumerFactory(consumerFactory(simulateGroupId)); } return factory; } Als nächstes können wir die Konfiguration für den Producer-Simulator in der Klasse KafkaProducerConfig definieren: @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } Außerdem verwenden wir die @KafkaListener-Annotation, um den Ziel-Listener zu spezifizieren, der natürlich nur aktiviert ist, wenn monitor.consumer.simulate auf true gesetzt ist: @KafkaListener( topics =containerFactory = kafkaListenerContainerFactory, autoStartup = ) public void listen(String message) throws InterruptedException { Thread.sleep(10L); } Wir haben also eine Schlafzeit von 10 Millisekunden hinzugefügt, um eine künstliche Verzögerung der Verbraucher zu erzeugen.

Schließlich schreiben wir eine sendMessage()-Methode, um den Produzenten zu simulieren: @Scheduled(fixedDelay = 1L, initialDelay = 5L) public void sendMessage() throws ExecutionException, InterruptedException { if (enabled) { if (endTime.after(new Date())) { String message = msg- + time(); SendResult result = kafkaTemplate.send(topicName, message).get(); } } } Wir können feststellen, dass der Producer Nachrichten mit einer Rate von 1 Nachricht//ms erzeugt.

Außerdem Überwachen des Consumer Lag in Apache Kafka er nach der endTime von 30 Sekunden nach der startTime der Simulation auf, Überwachen des Consumer Lag in Apache Kafka zu produzieren. 4.3. Live-Überwachung Führen wir nun die Hauptmethode in unserer LagAnalyzerApplication aus: public static void main(String[] args) { SpringApplication.run(LagAnalyzerApplication.class, args); while (true) ; } Wir werden die aktuelle Verzögerung auf jeder Partition des Themas nach jeweils 30 Sekunden sehen: Time=2021//06//06 11:07:24 | Lag für topic = baeldungTopic, partition = 0 ist 93 Time=2021//06//06 11:07:29 | Lag für topic = baeldungTopic, partition = 0 ist 290 Time=2021//06//06 11:07:34 | Lag für topic = baeldungTopic, partition = 0 ist 776 Time=2021//06//06 11:07:39 | Lag für topic = baeldungTopic, partition = 0 is 1159 Time=2021//06//06 11:07:44 | Lag für topic = baeldungTopic, partition = 0 is 1559 Time=2021//06//06 11:07:49 | Lag für topic = baeldungTopic, partition = 0 is 2015 Time=2021//06//06 11:07:54 | Lag für topic = baeldungTopic, partition = 0 is 1231 Time=2021//06//06 11:07:59 | Lag für topic = baeldungTopic, partition = 0 is 731 Time=2021//06//06 11:08:04 | Lag für topic = baeldungTopic, partition = 0 is 231 Time=2021//06//06 11:08:09 | Lag for topic = baeldungTopic, partition = 0 is 0 Die Rate, mit der der Produzent Nachrichten produziert, ist also 1 Nachricht//ms, was höher ist als die Rate, mit der der Konsument die Nachricht konsumiert.

In den ersten 30 Sekunden baut sich also eine Verzögerung auf, danach stellt der Produzent die Produktion ein, so dass die Verzögerung allmählich auf 0,5 sinkt. Schlussfolgerung In diesem Tutorial haben wir ein Verständnis dafür entwickelt, wie man die Verzögerung der Konsumenten in einem Kafka-Thema ermittelt.

Außerdem haben wir dieses Wissen genutzt, um eine LagAnalyzer-Anwendung in Spring zu erstellen, die den Lag fast in Echtzeit anzeigt. Wie immer ist der komplette Quellcode des Tutorials auf GitHub verfügbar.


8 16 21