kafka相关使用

1、安装

  1. 下载
    地址:http://kafka.apache.org/downloads
    注意要下载二进制版本 Binary downloads
  2. 解压
    我这里安装在/usr/local/kafka下面
  3. 修改配置文件(也可以默认运行,6. Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181)
    1>进入config目录找到文件server.properties并打开
    找到并编辑log.dirs=log日志位置
    找到并编辑zookeeper.connect=localhost:2181 设置zookeeperip端口号
    2>进入zookeeper.properties并打开编辑
    修改clientPort=2188 默认2181
  4. 运行zookeeper组件
        ./zookeeper-server-start.sh  -daemon ../config/zookeeper.properties
  1. 运行kafka组件
        ./kafka-server-start.sh -daemon ../config/server.properties

2、使用

1.创建主题
        ./kafka-topics.sh --create --zookeeper localhost:2188 --replication-factor 1 --partitions 1 --topic test
2.查看主题列表
        ./kafka-topics.sh --list --zookeeper localhost:2188
3.控制台测试发送接收
    # 生产:
        ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
    # 消费:
        ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

3、自动创建主题配置

1.打开service.properties
        num.partitions=3
        auto.create.topics.enable=true
        default.replication.factor=3

4.kafka-web-console

  1. 下载地址:https://github.com/claudemamo/kafka-web-console/releases
    目前最新是2.0.0版本
  2. 编译安装包
        kafka web console是用scala 开发的,所有编译打包需要安装scala的构建工具sbt
        curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
        sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/
        //安装sbt
        sudo yum install sbt
        wget https://codeload.github.com/claudemamo/kafka-web-console/tar.gz/v2.0.0
        tar -zvxf 
        编辑文件vim build.sbt 
        增加mysql配置:
            libraryDependencies ++= Seq(
            jdbc,
            cache,
            "org.squeryl" % "squeryl_2.10" % "0.9.5-6",
            "com.twitter" % "util-zk_2.10" % "6.11.0",
            "com.twitter" % "finagle-core_2.10" % "6.15.0",
            "org.quartz-scheduler" % "quartz" % "2.2.1",
            "mysql" % "mysql-connector-java" % "5.1.9",      #增加mysql配置
            "org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
            exclude("javax.jms", "jms")
            exclude("com.sun.jdmk", "jmxtools")
            exclude("com.sun.jmx", "jmxri")
            )
        进入当前kafka目录:cd  /usr/local/soft/kafka-web-console-master
    
        配置mysql的jdbc驱动
        vim application.conf
        增加代码如下:
        .......
        db.default.driver=com.mysql.jdbc.Driver
        db.default.url="jdbc:mysql://xxx:3306/xiangcheng?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false"
        db.default.user=root
        db.default.password=xxx
        .......
3 、编译
    sbt package 
4、运行
        第一次启动用这个命令: ./kafka-web-console  -DapplyEvolutions.default=true 
        第二次以后启动用这个命令:./kafka-web-console  -Dhttp.port=9001
        nohup ./kafka-web-console  -DapplyEvolutions.default=true -Dhttp.port=9093  >../logs/application.log 2>&1 &
        sbt run
5、浏览器访问
访问地址: http://ip:9001/

5.使用上的问题

代码没问题,但是每次运行就会抛一个time out 异常,总是连接失败。
这里需要注意的是,因为是远程连接服务器,所以要看服务器的防火墙是否针对端口9092(默认端口)打开的,刚开始弄了很长时间,我一直没弄好的原因是因为中午我重启了服务器,导致防火墙又打开了。
如果防火墙是正常的,就需要改变Kafka的配置:在/config/service.properties中,添加上一句host.name=192.168.0.11
重启好使了
# kafka   mq   集群  

评论

企鹅群:39438021

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×