登录至聚石塔控制台(https://console.cloud.tmall.com),切换到业务域“聚石塔”,在左上角进入“组件中心”,选择启用“消息同步”。
A.开发者启动消息同步服务,订阅消息后,平台会把开发者订阅的消息写到入MQ消息队列,初次启用,需要先开通MQ服务。
B.消息队列MQ为按量付费产品,立即购买不会产生任何费用,按实际使用量进行后付费。
进入控制台“组件中心”下的“消息同步服务”,根据业务需要,选择需要订购消息的应用APPKEY。
订阅消息成功,系统将自动在消息队列MQ服务中创建系统级topic。开发者可登陆聚石塔控制台->应用列表->选择你的应用,在互联网中间件关联MQ实例。
注意:聚石塔控制台是打造以应用为中心的PAAS平台,应用作为开发者最基本的业务单元,因此需要先创建应用对云产品进行关联授权,聚石塔控制台应用介绍可详见文档。
关联成功后,可以在消息队列RocketMQ中查看当前消息队列TOPIC。
业务类型 |
业务场景 |
业务描述 |
淘宝物流 |
物流详情跟踪 |
实现跟踪物流订单流转状态,实现客户关怀及异常订单监控 |
淘宝交易评价 |
负面印象评价消息 |
智能分析订单评价信息,提取用户对订单的负面印象关键字,基于核心关键字实现对商品、物流、客服等维度的优化解决方案 |
淘宝交易 |
订单数据管理 |
订单数据实时推送到消息队列,可实现按用户分组从队列中过滤消息,灵活实现在大促场景下大客户的资源保障方案 |
淘宝商品 |
商品数据管理 |
实现商品生命周期管理,及时监控商品变更记录 |
淘宝分销 |
淘宝分销订单管理 |
实现品牌商全渠道订单统一管理,及时获取分销商店铺经营情况 |
淘宝退款 |
退款退货数据管理 |
订单退款退货管理,实现发货订单退款快速拦截以及oa协同退款财务审批 |
A.下载API SDK,登陆聚石塔控制台->“应用管理”->“应用列表”,选择开通消息同步服务的应用进入应用证书,下载最新的SDK进行开发。
B.下载MQ SDK,访问阿里云帮助中心,下载MQ对应开发语言SDK。
访问地址:点击访问。
C.订阅消息同步服务成功后,将在MQ控制台发布管理中生成一个系统级topic.用户需要在订阅管理中订阅系统级topic获取消息。
注意事项:
使用MQ服务的应用程序需要部署在ECS上,如果只是测试,可以创建一个公网测试环境下的topic,这个topic可以在外网收发消息,进行一些简单的测试。
订阅开通消息同步服务后,系统将默认为您的应用增加消息同步服务权限包,可通过api为特定的用户开通消息实时推送。
调用taobao.jushita.jms.user.add 添加MQ消息同步用户.此api调用需要传入sessionkey【即授权码】。
调用taobao.jushita.jms.user.delete 删除MQ消息同步用户,删除后用户的消息将不会推送到聚石塔的MQ中.入参必须输入user_nick来判断删除用户是否成功。
调用taobao.jushita.jms.user.get 查询某个用户是否同步消息.入参必须输入user_nick来判断该用户是否开通消息推送。
JAVA示例代码
TaobaoClient client=new DefaultTaobaoClient(url, appkey, secret); JushitaJmsUserAddRequest req=new JushitaJmsUserAddRequest(); JushitaJmsUserAddResponse response = client.execute(req , sessionKey);
接收消息的前提条件
1)应用appkey订阅消息需要推送的消息。
2)调用taobao.jushita.jms.user.add添加需要推送的用户。
3)确保应用appkey拥有同步MQ消息权限包。
Java代码:
public class ConsumerTest { public static void main(String[] args) { Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, "CID_001");// ons控制台订阅管理中获取ConsumerID properties.put(PropertyKeyConst.AccessKey, "appkey");// 应用appkey,根据用户实际参数修改 properties.put(PropertyKeyConst.SecretKey, "secretkey");// 应用密钥,根据用户实际参数修改 properties.put(PropertyKeyConst.OnsChannel, "CLOUD"); // cloud为聚石塔标识 properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING);//集群消费,默认为集群消费模式 // properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.BROADCASTING);广播消费模式 Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicTestONS", "*", new MessageListener() {// 消息队列名称,根据用户实际参数修改 public Action consume(Message message, ConsumeContext context) { String msg_body=new String(message.getBody(),Charsets.UTF-8); JSONObject messageobject=JSONObject.fromObject(msg_body); String usernick=messageobject.getString("user_nick"); System.out.println("usernick"+ usernick); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); } }
.NET代码:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.InteropServices; using ons; using std; namespace ons { // pushconsumer拉取到消息后,会主动调用该实例的consume函数 public class MyMsgListener : MessageListener { public MyMsgListener(){} ~MyMsgListener(){} public override Action consume(ref Message value){ // 形参value是返回的消息实例,可以根据业务逻辑提取message的各个字段 Console.WriteLine("\nCallback topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}",value.getTopic(),value.getTag(),value.getKey(),value.getMsgID(),value.getBody()); return ons.Action.CommitMessage; } } class onscsharp{ static void Main(string[] args){ // pushconsumer创建和工作需要的参数,必须输入 ONSFactoryProperty factoryInfo = new ONSFactoryProperty(); factoryInfo.setFactoryProperty(factoryInfo.getConsumerIdName(), "CID_xxx");// ons控制台订阅管理中获取ConsumerID factoryInfo.setFactoryProperty(factoryInfo.getPublishTopicsName(), "xxxxx");//cid_xxx,从xxx地方获取 factoryInfo.setFactoryProperty(factoryInfo.getAccessKeyName(),"xxx");// 应用appkey factoryInfo.setFactoryProperty(factoryInfo.getSecretKeyName(), "xxxx");// 应用密钥 // create consumerONS ONSFactory onsfactory = new ONSFactory(); PushConsumer pConsumer = onsfactory.getInstance().createPushConsumer(factoryInfo); // register msg listener and subscribe msg topic MessageListener msgListener = new MyMsgListener(); pConsumer.subscribe(factoryInfo.getPublishTopics(), "*", ref msgListener); // start consumer pConsumer.start(); // consumer启动后,会自动拉取消息,拉取到消息后,会自动调用mymsglistener实例的consume函数 // 确定消费完成后,调用shutdown函数,在应用退出前,必须销毁consumer对象,否则会导致内存泄露等问题 pConsumer.shutdown(); } } }