[Kafka] 設定Listener / Advertised Listener:讓Client遠端連線到AWS EC2上的Kafka

Cancerpio
17 min readMar 26, 2023

--

最近side project需要使用AWS Lambda 當作producer,寫資料到EC2上的Kafka broker,這看似簡單的事情在一連串通靈的過程後才發現是有一些眉角的。

先說結論

如果只是要趕快讓你的遠端client連上AWS EC2 docker的kafka broker,只要在docker-compose加上這段environment:

kafka:
...(略)
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://你的EC2 IP或External Hostname:9092

並且確定有把docker port, EC2 security group開放9092 port,一切搞定。

原理

Google過以後才發現,這個問題在Stack overflow也是超多人問,多到Confluent自己開了一篇文章在說明其中的原理:

自己大概花了一些時間搞懂來龍去脈,所以在這篇整理一下筆記,從不同的遠端Client上,用producer / consumer傳送/讀取message到AWS EC2上的kafka broker的原理,和需要做的設定。

Note

1.使用Confluent cloud不用設定listener,這邊只整理一般使用docker 運作kafka的情況

2.docker hub寫的KAFKA_CFG.*開頭的環境變數需要加在extraEnvVars內才會生效(ex: extraEnvVars: {KAFKA_CFG_EXTRA_VAR: …})。

一般要改環境變數時,在environment中加入KAFKA_.*開頭的變數即可

see: https://github.com/bitnami/charts/issues/3958

實作上需要設定…

需要在Kafka broke 設定監聽0.0.0.0(監聽機器上所有interface)的Listener,

以及配對的Advertised Listener。

下面從Client連線到Kafka時,如何尋找Leader partition所在的broker開始說明流程。

先備知識: Leader partition / Follower partition

當Client連線到kafak時,實際上是透過Leader partition讀/寫資料。

在kafka中,當replication factor >1時,每個partition會有多個複製品(replica,備援用),這些partition replica分成Leader / Follower partition兩種角色。

其中Leader partition只會有一個,他負責處理所有對這個partition的讀取(Consumer)/寫入(Producer)。而Leader partition可能會在Broker Cluster中的任何一個Broker上面。

除了Leader partition以外的replica都是Follower partition,他們都不負責服務Consumer,他們只會從Leader讀取Message。

當Client端連到kafka時,首先要取得的,就是Leader partition所在的broker的連線資訊。

詳細機制跟一些Citation可參考Stack overflow這篇: https://stackoverflow.com/questions/60835817/what-is-a-partition-leader-in-apache-kafka

遠端Client連到Kafka的程序

以kafkaJS實作的Producer為例,步驟如下:

Client端先透過Initial connection跟broker拿到真正的連線方式,再透過這個拿到的url去對kafka send message

1.Initial connection: Client Request (Connect by Host IP)

Client問Broker Cluster(18.134.151.199:9092)可以連到Leader partition的Broker的連線方式

2.Initial connection: Server Response(Response: meta data)

Broker回應meta data,裡面包含了目標Broker的連線方式(18.134.151.198:9093)

3.Send message:

Client會從Broker給的連線方式,再連到目標Broker實際讀寫資料

這邊要注意的是,我們在Client端設定的Kafka host,只是告訴Client哪裡可以拿到meta data的位址。這讓Client可以透過步驟1~2 (Initial connection),去Broker Cluster取得meta data, 裡面包含了實際要讀寫的目標broker連線位址。

而這個目標Broker的連線位址,要在Kafka Broker(server端)上透過advertised.listeners 或者KAFKA_ADVERTISED_LISTENERS設定。 如果沒設定的話,預設是回應broker所在的host name。

所以如果要讓Client正確produce / consumer遠端broker資料,需要在Kafka Broker config設定好連線方式,不然就會看到明明Client端可以成功connect到Broker Cluster,卻又出現連線到localhost:9092失敗 (因為沒設定目標broker連線方式, 使用預設回應localhost:9092)這種奇怪的事情。

Listeners, Advertised Listeners

  • Listeners

Kafka 綁定的HostName / IP:port,如果完全不設定Listener的話,Kafka Default會有一個綁定0.0.0.0的Listener (監聽機器上所有interface)

  • Advertised Listeners

是客戶端連接的方式,也是HostName / IP: port

一個綁定某Interface的Listener,可以設定他對應的Advertised Listener,當有Client連到這個Listener綁定的HostName / Ip:port時,回應的end point(meta data 內),就是他對應的Advertised Listener的值

設定Listener以及他對應的Advertised Listener

直接以一個例子做說明:

- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://localhost:9092,EXTERNAL://ec2–18–133–151–199.ap-southeast-1.compute.amazonaws.com:9093
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL

這邊設定了兩個Listener,名字分別叫做INTERNAL以及EXTERNAL,他們都使用PLAINTEXT(表示沒有加密)做連線的protocol,並監聽所有的interface(0.0.0.0)。

設定不同的Listener

INTERNAL 這個Listener因為有設定同樣名稱的ADVERTISED_LISTENER, 所以當Client對broker的9092 port做initial connection時,broker會回應我們設定的localhost:9092

而當Client對9093 port做initial connection時,則會回應EXTERNAL ADVERTISED_LISTENER的url: ec2–18–133–151–199.ap-southeast-1.compute.amazonaws.com:9093

最後我們在KAFKA_INTER_BROKER_LISTENER_NAME指定使用INTERNAL作為broker之間溝通的Listener

docker環境變數說明

KAFKA_LISTENERS

  • 格式: {LISTENER_NAME}://{hostname}:{port}
  • 定義: 設定Listener name, ip/hostname, 以及port。
  • 其中Listener name是為了指定Client端連到kafka server端時,使用的加密protocol。可以直接用Kafka 支援的protocal,也可以給一個自訂的名稱,然後在KAFKA_LISTENER_SECURITY_PROTOCOL_MAP指定這個名稱所使用的protocol

KAFKA_ADVERTISED_LISTENERS

  • 格式(同Listener): {LISTENER_NAME}://{hostname}:{port}
  • 代表當Client端連到某個Listener時,在initial connection會回應給Client的位址,以LISTENER_NAME當作配對的依據

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP

  • 把Listener name 對應到kafka 支援的四種加密protocol: PLAINTEXT(沒加密, 僅建議測試用), SSL, SASL_PLAINTEXT, SASL_SSL

KAFKA_INTER_BROKER_LISTENER_NAME

  • 指定要使用哪一個Listner來讓Broker彼此溝通

其他眉角

  • 若未設定任何Advertised Listener,Listener預設回應的meta data endpoint會是機器的Hos tName。
  • 若只設定Advertised Listener,而未設定任何Listeners,該Advertised Listener將對應到kafka的預設Listener (0.0.0.0)。
  • Advertised Listener不可設定為0.0.0.0。

在AWS EC2上設定Advertised Listeners

需注意下面的原理以及限制:

  • 如上所述,如果完全不設定Advertised Listener,則在Initial connection時,Listener預設會回給Client 這台(AWS EC2)的host name。
  • 由於EC2的hostname是只有AWS內網可以resolved的internal hostname,會造成Client在Initial connection後,無法透過拿到的hostname 連到目標kafka broker

docker-compose範例:在AWS EC2上設定Advertised Listeners: (單一Listener)

version: "2"
services:
zookeeper:
image: zookeeper:3.4
mem_limit: 104857600
container_name: zookeeper
restart: unless-stopped
ports:
# exposing for debug reason
- 2181:2181
volumes:
- "./zookeeper/data:/data"
- "./zookeeper/datalog:/datalog"
- "./zookeeper/zoo.cfg:/conf/zoo.cfg"
kafka:
image: 'bitnami/kafka:2.1.1'
hostname: localhost
container_name: kafka
ports:
- '9092:9092'
# expose the external port for external clients
- '9093:9093'
environment:
# Kafka defaults to the following jvm memory parameters which mean that kafka will allocate 1GB at startup and use a maximum of 1GB of memory
- KAFKA_HEAP_OPTS=-Xmx512m -Xms256m
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
# set advertised listeners of default listener to the hostname which can be resolved both internally and externally
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://EC2的external hostname或IP:9092
volumes:
- KAFKA_VOLUMES:/bitnami/kafka
depends_on:
- zookeeper
volumes:
KAFKA_VOLUMES:
  • 如果不需區分外網/內網Listener時:
  • 不用額外設定Listener, kafka 會有一個Default的Listener
  • 只需要新增defult Listener的 Advertised Listener,把值設定為EC2的對外host name。這樣就可以在Client端也被resolve
  • 內部外部都會使用這個 Advertised Listener來連線

docker-compose範例: 在AWS EC2 上設定Advertised Listeners (外網/內網Listener)

version: "2"
services:
zookeeper:
image: zookeeper:3.4
mem_limit: 104857600
container_name: zookeeper
restart: unless-stopped
ports:
# exposing for debug reason
- 2181:2181
volumes:
- "./zookeeper/data:/data"
- "./zookeeper/datalog:/datalog"
- "./zookeeper/zoo.cfg:/conf/zoo.cfg"
kafka:
image: 'bitnami/kafka:2.1.1'
hostname: localhost
container_name: kafka
ports:
- '9092:9092'
# expose the external port for external clients
- '9093:9093'
environment:
# Kafka defaults to the following jvm memory parameters which mean that kafka will allocate 1GB at startup and use a maximum of 1GB of memory
- KAFKA_HEAP_OPTS=-Xmx512m -Xms256m
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
# set listeners and advertised listeners for external (9093) and internal(9092)
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://localhost:9092,EXTERNAL://EC2的external hostname或IP:9093
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
volumes:
- KAFKA_VOLUMES:/bitnami/kafka
depends_on:
- zookeeper
volumes:
KAFKA_VOLUMES:

這邊要注意的是,只要有設定了KAFKA_LISTENERS,除了要設定對應的KAFKA_ADVERTISED_LISTENERS以外,

還必須要設定KAFKA_INTER_BROKER_LISTENER_NAME,來告訴kafka broker之間的通訊要用哪一個listener。否則會出現下面的錯誤:

ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are INTERNAL,EXTERNAL

即便你只設定了一個Listener也一樣 。

使用kafka console Producer / Consumer 來對9092, 9093 port做測試

Consumer

docker exec -it kafka容器名稱 /opt/bitnami/kafka/bin/kafka-console-consumer.sh -bootstrap-server kafka_broker_ip:9092 --from-beginning -topic TEST_TOPIC

Producer: port 9093 (可正常連線)

docker exec -it kafka容器名稱 /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list kafka_broker_ip:9093 -topic TEST_TOPIC

Producer: 9092 (會連不到)

docker exec -it kafka容器名稱 /opt/bitnami/kafka/bin/kafka-console-producer.sh - broker-list kafka broker ip:9092 - topic TEST_TOPIC

...ERROR Error when sending message to topic TEST_TOPIC with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

--

--