由于开发过程中使用到了kafka,又不想自己部署kafka,索性采用k8s 部署kafka集群,以求做到随时插拔。
创建命名空间
apiVersion: v1
kind: Namespace
metadata:
name: "kafka"
labels:
name: "kafka"
sudo kubectl apply -f namespace.yaml
sudo kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
sudo kubectl get pods -n kafka
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka
spec:
kafka:
version: 3.2.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: nodeport
tls: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.2"
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
sudo kubectl create -f kafka.yaml
sudo kubectl get pods -n kafka
sudo kubectl get services -n kafka
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: movies
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
from confluent_kafka.admin import AdminClient, NewTopic
class Topic:
def __init__(self) -> None:
self.conf = {'bootstrap.servers': f'192.168.214.133:32546'}
self.admin = AdminClient(self.conf)
def topicExist(self,topicname):
topic_metadata = self.admin.list_topics()
if topic_metadata.topics.get(topicname) is None:
self.creataTopic(topicname)
def creataTopic(self,topic):
new_topics = [NewTopic(topic,num_partitions=3, replication_factor=1)]
fs = self.admin.create_topics(new_topics)
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
return True
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
return False
Topic().creataTopic('ellis1')
sudo kubectl get kafkatopic -n kafka
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack