docker创建kafka,bitnami/kafka:latest,带鉴权密码验证
目录
docker创建kafka,bitnami/kafka:latest,带SASL_PLAINTEXT鉴权密码验证
1、编辑client.properties
1 2 3 4 5 |
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="cloud" \ password="cloudcpp"; |
2、编辑docker-compose.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
version: '3.8' networks: kafka-net: services: kafka: image: bitnami/kafka:latest container_name: kafka user: root networks: - kafka-net ports: - "9092:9092" volumes: - ./kafka_data:/bitnami/kafka - ./client.properties:/tmp/client.properties environment: - KAFKA_KRAFT_MODE=true - KAFKA_CFG_NODE_ID=1 - KAFKA_CFG_PROCESS_ROLES=broker,controller - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 - KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://158.178.237.1:9092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CLIENT_USERS=cloud - KAFKA_CLIENT_PASSWORDS=cloudcpp |
3、启动kafka
1 2 3 4 |
# 启动 docker-compose up -d # 删除 docker-compose down |
4、创建topic,查看topic
1 2 3 4 5 6 |
# 创建topic test docker exec -ti kafka kafka-topics.sh --bootstrap-server 158.178.237.1:9092 --command-config /tmp/client.properties --create --topic test --partitions 1 --replication-factor 1 # 创建topic docker exec -ti kafka kafka-topics.sh --bootstrap-server 158.178.237.1:9092 --command-config /tmp/client.properties --create --topic CPPLA --partitions 1 --replication-factor 1 # 查看topic docker exec -ti kafka kafka-topics.sh --list --bootstrap-server 158.178.237.1:9092 --command-config /tmp/client.properties |
5、生产者生产消息测试
1 2 3 4 5 6 7 8 9 10 11 |
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='158.178.237.1:9092', security_protocol='SASL_PLAINTEXT', sasl_mechanism='PLAIN', sasl_plain_username='cloud', sasl_plain_password='cloudcpp') for i in range(10): message = f'Hello {i} from Kafka.'.encode('utf-8') producer.send(topic='test', value=message) producer.close() |
6、消费者消费消息测试
1 2 3 4 5 6 7 8 9 10 11 12 13 |
from kafka import KafkaConsumer consumer = KafkaConsumer( 'test', bootstrap_servers='158.178.237.1:9092', security_protocol='SASL_PLAINTEXT', sasl_mechanism='PLAIN', sasl_plain_username='cloud', sasl_plain_password='cloudcpp' ) for message in consumer: print(f'Received message: {message.value.decode("utf-8")}') |