1、安装
- 下载
地址:http://kafka.apache.org/downloads
注意要下载二进制版本 Binary downloads
- 解压
我这里安装在/usr/local/kafka下面
- 修改配置文件(也可以默认运行,6. Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181)
1>进入config目录找到文件server.properties并打开
找到并编辑log.dirs=log日志位置
找到并编辑zookeeper.connect=localhost:2181 设置zookeeperip端口号
2>进入zookeeper.properties并打开编辑
修改clientPort=2188 默认2181
- 运行zookeeper组件
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
- 运行kafka组件
./kafka-server-start.sh -daemon ../config/server.properties
2、使用
1.创建主题
./kafka-topics.sh --create --zookeeper localhost:2188 --replication-factor 1 --partitions 1 --topic test
2.查看主题列表
./kafka-topics.sh --list --zookeeper localhost:2188
3.控制台测试发送接收
# 生产:
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 消费:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
3、自动创建主题配置
1.打开service.properties
num.partitions=3
auto.create.topics.enable=true
default.replication.factor=3
4.kafka-web-console
- 下载地址:https://github.com/claudemamo/kafka-web-console/releases
目前最新是2.0.0版本
- 编译安装包
kafka web console是用scala 开发的,所有编译打包需要安装scala的构建工具sbt
curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/
//安装sbt
sudo yum install sbt
wget https://codeload.github.com/claudemamo/kafka-web-console/tar.gz/v2.0.0
tar -zvxf
编辑文件vim build.sbt
增加mysql配置:
libraryDependencies ++= Seq(
jdbc,
cache,
"org.squeryl" % "squeryl_2.10" % "0.9.5-6",
"com.twitter" % "util-zk_2.10" % "6.11.0",
"com.twitter" % "finagle-core_2.10" % "6.15.0",
"org.quartz-scheduler" % "quartz" % "2.2.1",
"mysql" % "mysql-connector-java" % "5.1.9", #增加mysql配置
"org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
)
进入当前kafka目录:cd /usr/local/soft/kafka-web-console-master
配置mysql的jdbc驱动
vim application.conf
增加代码如下:
.......
db.default.driver=com.mysql.jdbc.Driver
db.default.url="jdbc:mysql://xxx:3306/xiangcheng?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false"
db.default.user=root
db.default.password=xxx
.......
3 、编译
sbt package
4、运行
第一次启动用这个命令: ./kafka-web-console -DapplyEvolutions.default=true
第二次以后启动用这个命令:./kafka-web-console -Dhttp.port=9001
nohup ./kafka-web-console -DapplyEvolutions.default=true -Dhttp.port=9093 >../logs/application.log 2>&1 &
sbt run
5、浏览器访问
访问地址: http://ip:9001/
5.使用上的问题
代码没问题,但是每次运行就会抛一个time out 异常,总是连接失败。
这里需要注意的是,因为是远程连接服务器,所以要看服务器的防火墙是否针对端口9092(默认端口)打开的,刚开始弄了很长时间,我一直没弄好的原因是因为中午我重启了服务器,导致防火墙又打开了。
如果防火墙是正常的,就需要改变Kafka的配置:在/config/service.properties中,添加上一句host.name=192.168.0.11
重启好使了