一、抽象消费类封装:
package cn.shaker.media.basic.rocketMq.consumer;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.PostConstruct;
import java.util.Map;
/**
* @author :yepk
* @version :1.0
* @apiNote :mq消费者
* @date :2019-03-18-17:08
*/
@Slf4j
public abstract class BaseRocketMQConsumer {
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.consumeThreadMin}")
private int consumeThreadMin;
@Value("${rocketmq.consumer.consumeThreadMax}")
private int consumeThreadMax;
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
private int consumeMessageBatchMaxSize;
/**
* topic单独提出来解析
**/
@Value("${rocketmq.consumer.topics}")
private String topics;
@PostConstruct
public void init() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
try {
Map<String, String> map = getTopic();
// map.put(RocketMqConst.SHAKER_MQ_ADD_COMMENT, RocketMqConst.TAG_SHAKER_ADD_COMMENT);
// map.put(RocketMqConst.SHAKER_MQ_COMMENT_REPLAY, RocketMqConst.TAG_SHAKER_COMMENT_REPLAY);
consumer.setSubscription(map);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeOrderlyContext) -> {
try {
for (MessageExt messageExt : list) {
String tag = messageExt.getTags();
String topic = messageExt.getTopic();
log.info(String.format("mq收到消息,来自标签%s", tag));
System.out.println(tag);
System.out.println(topic);
System.out.println("消费成功");
consume(tag, topic, messageExt);
}
} catch (Exception e) {
e.printStackTrace();
//稍后再试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
consumer.start();
System.out.println("消费者启动完成");
} catch (Exception e) {
e.printStackTrace();
}
}
private Map<String, String> getTopic() {
Map<String, String> map = Maps.newHashMapWithExpectedSize(7);
String[] topicTagsArr = topics.split(";");
for (String topic : topicTagsArr) {
String[] topicTags = topic.split("~");
map.put(topicTags[0], topicTags[1]);
}
return map;
}
protected abstract void consume(String tag, String topic, MessageExt messageExt);
}
二、实际消费者示例:
package cn.shaker.media.comment.center.rocketmq.consumer;
import cn.media.constant.RocketMqConst;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author :yepk
* @version :1.0
* @apiNote :评论
* @date :2019-03-13-12:28
*/
@Component
@Slf4j
public class AddCommentConsumer extends BaseRocketMQConsumer {
@Override
protected void consume(String tag, String topic, MessageExt messageExt) {
log.info(String.format("mq收到消息,来自标签%s", tag));
System.out.println(tag);
System.out.println(topic);
String messageBody = new String(messageExt.getBody());
if (RocketMqConst.SHAKER_MQ_ADD_COMMENT.equals(topic)) {
//新增评论
this.addComment(messageBody);
} else if (RocketMqConst.SHAKER_MQ_COMMENT_REPLAY.equals(topic)) {
//回复
this.addCommentReplay(messageBody);
}
}
}
三、生产者类封装:
package cn.shaker.media.basic.rocketMq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* @author :yepk
* @version :1.0
* @apiNote :mq发送配置
* @date :2019-03-11-13:43
*/
@Slf4j
@Configuration
public class RocketMQProducer {
/**
* 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
*/
@Value("${rocketmq.producer.groupName}")
private String groupName;
@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;
/**
* 消息最大大小,默认4M
*/
@Value("${rocketmq.producer.maxMessageSize}")
private Integer maxMessageSize;
/**
* 消息发送超时时间,默认3秒
*/
@Value("${rocketmq.producer.sendMsgTimeout}")
private Integer sendMsgTimeout;
/**
* 消息发送失败重试次数,默认2次
*/
@Value("${rocketmq.producer.retryTimesWhenSendFailed}")
private Integer retryTimesWhenSendFailed;
private DefaultMQProducer producer;
@PostConstruct
public void init() {
producer = new DefaultMQProducer(this.groupName);
this.producer.setNamesrvAddr(this.namesrvAddr);
//如果需要同一个jvm中不同的this.producer往不同的mq集群发送消息,需要设置不同的instanceName
// this.producer.setInstanceName(instanceName);
if (this.maxMessageSize != null) {
this.producer.setMaxMessageSize(this.maxMessageSize);
}
if (this.sendMsgTimeout != null) {
this.producer.setSendMsgTimeout(this.sendMsgTimeout);
}
//如果发送消息失败,设置重试次数,默认为2次
if (this.retryTimesWhenSendFailed != null) {
this.producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
}
try {
this.producer.start();
log.info(String.format("this.producer is start ! groupName:[%s],namesrvAddr:[%s]"
, this.groupName, this.namesrvAddr));
} catch (MQClientException e) {
log.error(String.format("this.producer is error {} %s %s"
, e.getMessage(), e));
}
System.out.println("生产者启动完成");
}
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
this.producer.send(msg, sendCallback);
}
}
四 实际生产者示例:
package cn.shaker.media.comment.center.service.impl;
import cn.media.constant.MessageConst;
import cn.media.constant.RocketMqConst;
import cn.media.constant.SystemConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.Reference;
import org.apache.dubbo.config.annotation.Service;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author :yepk
* @version :1.0
* @apiNote :评论相关
* @date :2019-03-11-14:03
*/
@Slf4j
@Service(version = SystemConstant.DUBBO_SERVICE_VERSION,timeout = SystemConstant.DUBBO_TIMEOUT)
public class AppCommentServiceImpl implements AppCommentService {
@Resource
private RocketMQProducer rocketMQProducer;
@Override
public ShakerResponse addComment(ShakerResponse res, AppComment comment) {
log.info("开始发送消息:");
appCommentMapper.insert(comment);
Message sendMsg = new Message(RocketMqConst.SHAKER_MQ_ADD_COMMENT, RocketMqConst.TAG_SHAKER_ADD_COMMENT, JSON.toJSONBytes(comment));
try {
sendMessage(sendMsg);
} catch (RemotingException | MQClientException | InterruptedException e) {
e.printStackTrace();
}
res.putResult("domain", comment);
return res;
}
/**
* @param message 消息体
* @apiNote mq发送消息
* @author yepk
* @date 2019/3/12 8:58
*/
private void sendMessage(Message message) throws RemotingException, MQClientException, InterruptedException {
//默认3秒超时
rocketMQProducer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送完成");
// System.out.println(JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
throwable.printStackTrace();
}
});
}
}
五 启动类配置
package cn.media.comment.center;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.apache.dubbo.config.spring.context.annotation.EnableDubboConfig;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableDubbo
@EnableDubboConfig
@MapperScan(basePackages = {"cn.shaker.media.basic.dao.mapper.*"})
@SpringBootApplication(scanBasePackages = {"cn.shaker.*"})
public class CommentCenterApplication {
public static void main(String[] args) {
SpringApplication.run(CommentCenterApplication.class, args);
}
}
六配置文件
#########################################
################rocketMq Config Setting ############
########################################
###producer
#该应用是否启用生产者
rocketmq.producer.isOnOff=on
#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
rocketmq.producer.groupName=${dubbo.application.name}
#mq的nameserver地址
rocketmq.producer.namesrvAddr=118.190.209.86:9876
#消息最大长度 默认 (4M)
rocketmq.producer.maxMessageSize=4194304
#发送消息超时时间,默认3000
rocketmq.producer.sendMsgTimeout=3000
#发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2
###consumer
##该应用是否启用消费者
rocketmq.consumer.isOnOff=on
rocketmq.consumer.groupName=${dubbo.application.name}
#mq的nameserver地址
rocketmq.consumer.namesrvAddr=118.190.209.86:9876
#该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
## 评论 回复 topic
rocketmq.consumer.topics=RED_PACKET_ACTIVITY_TOPIC~RED_PACKET_ACTIVITY_GAINER_TAG;SHAKER_MQ_ADD_COMMENT~TAG_SHAKER_ADD_COMMENT;SHAKER_MQ_COMMENT_REPLAY~TAG_SHAKER_COMMENT_REPLAY;BROWSE_TOPIC~TAG_BROWSE;VIDEO_TOPIC~TAG_VIDEO;LIKE_TOPIC~TAG_LIKE;LOG_TOPIC~TAG_LOG;AOP_TOPIC~TAG_AOP
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64
#设置一次消费消息的条数,默认为1条
rocketmq.consumer.consumeMessageBatchMaxSize=100