[Kafka] 設定Listener / Advertised Listener:讓Client遠端連線到AWS EC2上的Kafka
最近side project需要使用AWS Lambda 當作producer,寫資料到EC2上的Kafka broker,這看似簡單的事情在一連串通靈的過程後才發現是有一些眉角的。
先說結論
如果只是要趕快讓你的遠端client連上AWS EC2 docker的kafka broker,只要在docker-compose加上這段environment:
kafka:
...(略)
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://你的EC2 IP或External Hostname:9092
並且確定有把docker port, EC2 security group開放9092 port,一切搞定。
原理
Google過以後才發現,這個問題在Stack overflow也是超多人問,多到Confluent自己開了一篇文章在說明其中的原理:
自己大概花了一些時間搞懂來龍去脈,所以在這篇整理一下筆記,從不同的遠端Client上,用producer / consumer傳送/讀取message到AWS EC2上的kafka broker的原理,和需要做的設定。
Note
1.使用Confluent cloud不用設定listener,這邊只整理一般使用docker 運作kafka的情況
2.docker hub寫的KAFKA_CFG.*開頭的環境變數需要加在extraEnvVars內才會生效(ex: extraEnvVars: {KAFKA_CFG_EXTRA_VAR: …})。
一般要改環境變數時,在environment中加入KAFKA_.*開頭的變數即可
see: https://github.com/bitnami/charts/issues/3958
實作上需要設定…
需要在Kafka broke 設定監聽0.0.0.0(監聽機器上所有interface)的Listener,
以及配對的Advertised Listener。
下面從Client連線到Kafka時,如何尋找Leader partition所在的broker開始說明流程。
先備知識: Leader partition / Follower partition
當Client連線到kafak時,實際上是透過Leader partition讀/寫資料。
在kafka中,當replication factor >1時,每個partition會有多個複製品(replica,備援用),這些partition replica分成Leader / Follower partition兩種角色。
其中Leader partition只會有一個,他負責處理所有對這個partition的讀取(Consumer)/寫入(Producer)。而Leader partition可能會在Broker Cluster中的任何一個Broker上面。
除了Leader partition以外的replica都是Follower partition,他們都不負責服務Consumer,他們只會從Leader讀取Message。
當Client端連到kafka時,首先要取得的,就是Leader partition所在的broker的連線資訊。
詳細機制跟一些Citation可參考Stack overflow這篇: https://stackoverflow.com/questions/60835817/what-is-a-partition-leader-in-apache-kafka
遠端Client連到Kafka的程序
以kafkaJS實作的Producer為例,步驟如下:
1.Initial connection: Client Request (Connect by Host IP)
Client問Broker Cluster(18.134.151.199:9092)可以連到Leader partition的Broker的連線方式
2.Initial connection: Server Response(Response: meta data)
Broker回應meta data,裡面包含了目標Broker的連線方式(18.134.151.198:9093)
3.Send message:
Client會從Broker給的連線方式,再連到目標Broker實際讀寫資料
這邊要注意的是,我們在Client端設定的Kafka host,只是告訴Client哪裡可以拿到meta data的位址。這讓Client可以透過步驟1~2 (Initial connection),去Broker Cluster取得meta data, 裡面包含了實際要讀寫的目標broker連線位址。
而這個目標Broker的連線位址,要在Kafka Broker(server端)上透過advertised.listeners 或者KAFKA_ADVERTISED_LISTENERS設定。 如果沒設定的話,預設是回應broker所在的host name。
所以如果要讓Client正確produce / consumer遠端broker資料,需要在Kafka Broker config設定好連線方式,不然就會看到明明Client端可以成功connect到Broker Cluster,卻又出現連線到localhost:9092失敗 (因為沒設定目標broker連線方式, 使用預設回應localhost:9092)這種奇怪的事情。
Listeners, Advertised Listeners
- Listeners
Kafka 綁定的HostName / IP:port,如果完全不設定Listener的話,Kafka Default會有一個綁定0.0.0.0的Listener (監聽機器上所有interface)
- Advertised Listeners
是客戶端連接的方式,也是HostName / IP: port
一個綁定某Interface的Listener,可以設定他對應的Advertised Listener,當有Client連到這個Listener綁定的HostName / Ip:port時,回應的end point(meta data 內),就是他對應的Advertised Listener的值
設定Listener以及他對應的Advertised Listener
直接以一個例子做說明:
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://localhost:9092,EXTERNAL://ec2–18–133–151–199.ap-southeast-1.compute.amazonaws.com:9093
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
這邊設定了兩個Listener,名字分別叫做INTERNAL以及EXTERNAL,他們都使用PLAINTEXT(表示沒有加密)做連線的protocol,並監聽所有的interface(0.0.0.0)。
設定不同的Listener
INTERNAL 這個Listener因為有設定同樣名稱的ADVERTISED_LISTENER, 所以當Client對broker的9092 port做initial connection時,broker會回應我們設定的localhost:9092。
而當Client對9093 port做initial connection時,則會回應EXTERNAL ADVERTISED_LISTENER的url: ec2–18–133–151–199.ap-southeast-1.compute.amazonaws.com:9093
最後我們在KAFKA_INTER_BROKER_LISTENER_NAME指定使用INTERNAL作為broker之間溝通的Listener
docker環境變數說明
KAFKA_LISTENERS
- 格式: {LISTENER_NAME}://{hostname}:{port}
- 定義: 設定Listener name, ip/hostname, 以及port。
- 其中Listener name是為了指定Client端連到kafka server端時,使用的加密protocol。可以直接用Kafka 支援的protocal,也可以給一個自訂的名稱,然後在KAFKA_LISTENER_SECURITY_PROTOCOL_MAP指定這個名稱所使用的protocol
KAFKA_ADVERTISED_LISTENERS
- 格式(同Listener): {LISTENER_NAME}://{hostname}:{port}
- 代表當Client端連到某個Listener時,在initial connection會回應給Client的位址,以LISTENER_NAME當作配對的依據
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
- 把Listener name 對應到kafka 支援的四種加密protocol: PLAINTEXT(沒加密, 僅建議測試用), SSL, SASL_PLAINTEXT, SASL_SSL
KAFKA_INTER_BROKER_LISTENER_NAME
- 指定要使用哪一個Listner來讓Broker彼此溝通
其他眉角
- 若未設定任何Advertised Listener,Listener預設回應的meta data endpoint會是機器的Hos tName。
- 若只設定Advertised Listener,而未設定任何Listeners,該Advertised Listener將對應到kafka的預設Listener (0.0.0.0)。
- Advertised Listener不可設定為0.0.0.0。
在AWS EC2上設定Advertised Listeners
需注意下面的原理以及限制:
- 如上所述,如果完全不設定Advertised Listener,則在Initial connection時,Listener預設會回給Client 這台(AWS EC2)的host name。
- 由於EC2的hostname是只有AWS內網可以resolved的internal hostname,會造成Client在Initial connection後,無法透過拿到的hostname 連到目標kafka broker
docker-compose範例:在AWS EC2上設定Advertised Listeners: (單一Listener)
version: "2"
services:
zookeeper:
image: zookeeper:3.4
mem_limit: 104857600
container_name: zookeeper
restart: unless-stopped
ports:
# exposing for debug reason
- 2181:2181
volumes:
- "./zookeeper/data:/data"
- "./zookeeper/datalog:/datalog"
- "./zookeeper/zoo.cfg:/conf/zoo.cfg"
kafka:
image: 'bitnami/kafka:2.1.1'
hostname: localhost
container_name: kafka
ports:
- '9092:9092'
# expose the external port for external clients
- '9093:9093'
environment:
# Kafka defaults to the following jvm memory parameters which mean that kafka will allocate 1GB at startup and use a maximum of 1GB of memory
- KAFKA_HEAP_OPTS=-Xmx512m -Xms256m
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
# set advertised listeners of default listener to the hostname which can be resolved both internally and externally
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://EC2的external hostname或IP:9092
volumes:
- KAFKA_VOLUMES:/bitnami/kafka
depends_on:
- zookeeper
volumes:
KAFKA_VOLUMES:
- 如果不需區分外網/內網Listener時:
- 不用額外設定Listener, kafka 會有一個Default的Listener
- 只需要新增defult Listener的 Advertised Listener,把值設定為EC2的對外host name。這樣就可以在Client端也被resolve
- 內部外部都會使用這個 Advertised Listener來連線
docker-compose範例: 在AWS EC2 上設定Advertised Listeners (外網/內網Listener)
version: "2"
services:
zookeeper:
image: zookeeper:3.4
mem_limit: 104857600
container_name: zookeeper
restart: unless-stopped
ports:
# exposing for debug reason
- 2181:2181
volumes:
- "./zookeeper/data:/data"
- "./zookeeper/datalog:/datalog"
- "./zookeeper/zoo.cfg:/conf/zoo.cfg"
kafka:
image: 'bitnami/kafka:2.1.1'
hostname: localhost
container_name: kafka
ports:
- '9092:9092'
# expose the external port for external clients
- '9093:9093'
environment:
# Kafka defaults to the following jvm memory parameters which mean that kafka will allocate 1GB at startup and use a maximum of 1GB of memory
- KAFKA_HEAP_OPTS=-Xmx512m -Xms256m
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
# set listeners and advertised listeners for external (9093) and internal(9092)
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://localhost:9092,EXTERNAL://EC2的external hostname或IP:9093
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
volumes:
- KAFKA_VOLUMES:/bitnami/kafka
depends_on:
- zookeeper
volumes:
KAFKA_VOLUMES:
這邊要注意的是,只要有設定了KAFKA_LISTENERS,除了要設定對應的KAFKA_ADVERTISED_LISTENERS以外,
還必須要設定KAFKA_INTER_BROKER_LISTENER_NAME,來告訴kafka broker之間的通訊要用哪一個listener。否則會出現下面的錯誤:
ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are INTERNAL,EXTERNAL
即便你只設定了一個Listener也一樣 。
使用kafka console Producer / Consumer 來對9092, 9093 port做測試
Consumer
docker exec -it kafka容器名稱 /opt/bitnami/kafka/bin/kafka-console-consumer.sh -bootstrap-server kafka_broker_ip:9092 --from-beginning -topic TEST_TOPIC
Producer: port 9093 (可正常連線)
docker exec -it kafka容器名稱 /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list kafka_broker_ip:9093 -topic TEST_TOPIC
Producer: 9092 (會連不到)
docker exec -it kafka容器名稱 /opt/bitnami/kafka/bin/kafka-console-producer.sh - broker-list kafka broker ip:9092 - topic TEST_TOPIC
...ERROR Error when sending message to topic TEST_TOPIC with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.