文档中心 > 聚石塔

消息同步服务组件使用

更新时间:2024/07/31 访问次数:66527

一、开通消息同步服务


登录至聚石塔控制台https://console.cloud.tmall.com),切换到业务域“聚石塔”,在左上角进入“组件中心”,选择启用“消息同步”。


1. 开通MQ服务


A.开发者启动消息同步服务,订阅消息后,平台会把开发者订阅的消息写到入MQ消息队列,初次启用,需要先开通MQ服务。


B.消息队列MQ为按量付费产品,立即购买不会产生任何费用,按实际使用量进行后付费。


2. 订购消息类型


进入控制台“组件中心”下的“消息同步服务”,根据业务需要,选择需要订购消息的应用APPKEY。

 

订阅消息成功,系统将自动在消息队列MQ服务中创建系统级topic。开发者可登陆聚石塔控制台->应用列表->选择你的应用,在互联网中间件关联MQ实例。

注意:聚石塔控制台是打造以应用为中心的PAAS平台,应用作为开发者最基本的业务单元,因此需要先创建应用对云产品进行关联授权,聚石塔控制台应用介绍可详见文档

 

关联成功后,可以在消息队列RocketMQ中查看当前消息队列TOPIC。

 

二、消息同步服务类型及场景


 业务类型

 业务场景

 业务描述

淘宝物流

物流详情跟踪

实现跟踪物流订单流转状态,实现客户关怀及异常订单监控

淘宝交易评价

负面印象评价消息

智能分析订单评价信息,提取用户对订单的负面印象关键字,基于核心关键字实现对商品、物流、客服等维度的优化解决方案

淘宝交易

订单数据管理

订单数据实时推送到消息队列,可实现按用户分组从队列中过滤消息,灵活实现在大促场景下大客户的资源保障方案

淘宝商品

商品数据管理

实现商品生命周期管理,及时监控商品变更记录

淘宝分销

淘宝分销订单管理

实现品牌商全渠道订单统一管理,及时获取分销商店铺经营情况

淘宝退款

退款退货数据管理

订单退款退货管理,实现发货订单退款快速拦截以及oa协同退款财务审批


三、消息同步服务使用指南

1. 环境准备


A.下载API SDK,登陆聚石塔控制台->“应用管理”->“应用列表”,选择开通消息同步服务的应用进入应用证书,下载最新的SDK进行开发。


B.下载MQ SDK,访问阿里云帮助中心,下载MQ对应开发语言SDK。

访问地址:点击访问

C.订阅消息同步服务成功后,将在MQ控制台发布管理中生成一个系统级topic.用户需要在订阅管理中订阅系统级topic获取消息。


注意事项:

使用MQ服务的应用程序需要部署在ECS上,如果只是测试,可以创建一个公网测试环境下的topic,这个topic可以在外网收发消息,进行一些简单的测试。

2. 开启用户消息推送


订阅开通消息同步服务后,系统将默认为您的应用增加消息同步服务权限包,可通过api为特定的用户开通消息实时推送。

1)添加消息同步用户

调用taobao.jushita.jms.user.add 添加MQ消息同步用户.此api调用需要传入sessionkey【即授权码】。

2)删除消息同步用户

调用taobao.jushita.jms.user.delete 删除MQ消息同步用户,删除后用户的消息将不会推送到聚石塔的MQ中.入参必须输入user_nick来判断删除用户是否成功。

3)查询消息同步用户

调用taobao.jushita.jms.user.get 查询某个用户是否同步消息.入参必须输入user_nick来判断该用户是否开通消息推送。

 

JAVA示例代码

TaobaoClient client=new DefaultTaobaoClient(url, appkey, secret);
JushitaJmsUserAddRequest req=new JushitaJmsUserAddRequest();
JushitaJmsUserAddResponse response = client.execute(req , sessionKey);


3. 接收消息

接收消息的前提条件

1)应用appkey订阅消息需要推送的消息。

2)调用taobao.jushita.jms.user.add添加需要推送的用户。

3)确保应用appkey拥有同步MQ消息权限包。

集群方式消费消息

Java代码:


public class ConsumerTest {  
  public static void main(String[] args) { 
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.ConsumerId, "CID_001");// ons控制台订阅管理中获取ConsumerID
    properties.put(PropertyKeyConst.AccessKey, "appkey");// 应用appkey,根据用户实际参数修改
    properties.put(PropertyKeyConst.SecretKey, "secretkey");// 应用密钥,根据用户实际参数修改
    properties.put(PropertyKeyConst.OnsChannel,  "CLOUD"); // cloud为聚石塔标识
    properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING);//集群消费,默认为集群消费模式
   // properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.BROADCASTING);广播消费模式
    Consumer consumer = ONSFactory.createConsumer(properties); 
    consumer.subscribe("TopicTestONS", "*", new MessageListener() {// 消息队列名称,根据用户实际参数修改
       public Action consume(Message message, ConsumeContext context) {
           String msg_body=new String(message.getBody(),Charsets.UTF-8);
           JSONObject messageobject=JSONObject.fromObject(msg_body);
           String usernick=messageobject.getString("user_nick");
           System.out.println("usernick"+ usernick);
          return Action.CommitMessage;
       }
    });
    consumer.start();
    System.out.println("Consumer Started");
  }
}

 

.NET代码:


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.InteropServices;
using ons;
using std;
namespace ons {
  // pushconsumer拉取到消息后,会主动调用该实例的consume函数
  public class MyMsgListener : MessageListener {
     public MyMsgListener(){}
     ~MyMsgListener(){}
     public override Action consume(ref Message value){
       // 形参value是返回的消息实例,可以根据业务逻辑提取message的各个字段
       Console.WriteLine("\nCallback topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}",value.getTopic(),value.getTag(),value.getKey(),value.getMsgID(),value.getBody());
       return ons.Action.CommitMessage;
     }
  }
 class onscsharp{
   static void Main(string[] args){
     // pushconsumer创建和工作需要的参数,必须输入
     ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
       factoryInfo.setFactoryProperty(factoryInfo.getConsumerIdName(), "CID_xxx");// ons控制台订阅管理中获取ConsumerID
       factoryInfo.setFactoryProperty(factoryInfo.getPublishTopicsName(), "xxxxx");//cid_xxx,从xxx地方获取
       factoryInfo.setFactoryProperty(factoryInfo.getAccessKeyName(),"xxx");// 应用appkey
       factoryInfo.setFactoryProperty(factoryInfo.getSecretKeyName(), "xxxx");// 应用密钥
       // create consumerONS
    ONSFactory onsfactory = new ONSFactory();
    PushConsumer pConsumer = onsfactory.getInstance().createPushConsumer(factoryInfo);
    // register msg listener and subscribe msg topic
    MessageListener msgListener = new MyMsgListener();
    pConsumer.subscribe(factoryInfo.getPublishTopics(), "*", ref msgListener);
    // start consumer
    pConsumer.start();
    // consumer启动后,会自动拉取消息,拉取到消息后,会自动调用mymsglistener实例的consume函数
    // 确定消费完成后,调用shutdown函数,在应用退出前,必须销毁consumer对象,否则会导致内存泄露等问题
    pConsumer.shutdown();
   }
 }
}


 

FAQ

关于此文档暂时还没有FAQ
返回
顶部