登录至聚石塔首页(http://cloud.tmall.com),选择“产品与服务”下的“消息队列MQ”,点击马上使用申请免费试用。
审核通过后将通过手机短信、邮件、旺旺提醒等方式通知用户完成开通订购ONS服务。
发布管理:可以创建一个TOPIC,即消息队列名称.TOPIC名称只能包含字母,数字,短横线(-)和下划线(_),名称长度限制在3-64个字节之间,一旦创建后不能修改TOPIC名称。
订阅管理:可以订阅已创建的TOPIC消息,即订阅者从哪个TOPIC消费消息,订阅者名称以大写的CID开头,只能包含字母,数字,短横线(-)和下划线(_),长度限制在3-64字节之间, 留空不填的话系统将自动生成一个ID。
支持三个维度的消息查询,Topic,Key,Message ID并且提供每一条消息的trace功能,方便大家对消息消费的过程追踪。
消息消费:可以查询消费者某个topic消息消费的速度。
消息生产:可以查询生产者某个topic消息生产的速度。
publicclass ProducerTest { publicstaticvoid main(String[] args) { Properties properties = new Properties(); properties.put(PropertyKeyConst.ProducerId, "填写pid"); properties.put(PropertyKeyConst.AccessKey, "填写appkey"); properties.put(PropertyKeyConst.SecretKey, "填写secret"); properties.put(PropertyKeyConst.OnsChannel, ONSChannel.CLOUD); Producer producer = ONSFactory.createProducer(properties); // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可 producer.start(); Message msg = new Message( "TopicTestONS", // Message Topic "TagA", // Message Tag "Hello ONS".getBytes()); // Message Body // 发送消息,只要不抛异常就是成功 SendResult sendResult = producer.send(msg); System.out.println(sendResult); producer.shutdown();// 在应用退出前,销毁Producer对象 } }
publicclass ConsumerTest { publicstaticvoid main(String[] args) { Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, "您的cid"); properties.put(PropertyKeyConst.AccessKey, "您的appkey"); properties.put(PropertyKeyConst.SecretKey, "您的secret"); properties.put(PropertyKeyConst.OnsChannel, ONSChannel.CLOUD); properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("您的队表名", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); } }
public class ConsumerTest { public static void main(String[] args) { Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, "您的cid"); properties.put(PropertyKeyConst.AccessKey, "您的appkey"); properties.put(PropertyKeyConst.SecretKey, "您的secret"); properties.put(PropertyKeyConst.OnsChannel, ONSChannel.CLOUD); properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.BROADCASTING); properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.Broadcasting); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicTestONS", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); } }
因为各种各样的原因,比如因为网络的原因,或者业务处理失败,MQ会重复投递消息,应用需要保持业务处理相同消息的幂等性. 即收到相同的消息,业务处理的结果仍然相同。
保存3天,超过3天未消费的消息会被删除掉。
暂时只支持JAVA语言,c++\.net\php将来会支持。
集群消费方式下的策略:
/**消息监听器,Consumer注册消息监听器来订阅消息*/ publicinterface MessageListener { /**消费消息接口,由应用来实现 * @return消费结果,如果应用抛出异常或者返回Null等价于返回Action.ReconsumeLater*/ public Action consume(final Message message, final ConsumeContext context); }
消费业务逻辑代码如果返回Action.ReconsumerLater,或者NULL,或者抛出异常,消息都会走重试流程,至多重试16次,如果重试16次后,仍然失败,则消息丢弃,
每次重试的间隔时间如下,最后第16次间隔时间为2小时:
第几次重试 | 每次重试间隔时间 | 第几次重试 | 每次重试间隔时间 | 第几次重试 | 每次重试间隔时间 |
1 | 10秒 | 2 | 30秒 | 3 | 1分钟 |
4 | 2分钟 | 5 | 3分钟 | 6 | 4分钟 |
7 | 5分钟 | 8 | 6分钟 | 9 | 7分钟 |
10 | 8分钟 | 11 | 9分钟 | 12 | 10分钟 |
13 | 20分钟 | 14 | 30分钟 | 15 | 1小时 |
在使用MQ的过程中,为了大家更加准确的知道遇到的错误,我们提供如下的错误编码供大家查询:
ONS错误编码 | 错误描述 |
ERROR_CODE:1 | 客户端地址在黑名单中拒绝访问,请联系MQ技术支持 |
ERROR_CODE:2 | 请求码非法,请联系MQ技术支持 |
ERROR_CODE:3 | 客户端ID(Consumer_ID或Producer_ID)不正确,请检查值是否与申请一致 |
ERROR_CODE:4 | 客户端ID(Consumer_ID或Producer_ID)和阿里云账号不匹配,请检查值是否与申请一致 |
ERROR_CODE:5 | 签名校验失败,请求非法,请联系MQ技术支持 |
ERROR_CODE:6 | 请求码非法,请联系MQ技术支持 |
ERROR_CODE:7 | 资源所有者不存在,请检查topic或者客户端ID(Consumer_ID或Producer_ID)是否与申请一致 |
ERROR_CODE:8 | 没权限,请到控制台申请订阅或发送消息 |
ERROR_CODE:9 | 受限操作,请联系MQ技术支持 |