消息服务是开放平台为提高应用API调用效率而推出的一种主动推送服务( From淘宝 ),推送内容包括(淘宝交易、商品、退款等信息),基于该推送服务,应用获取淘宝数据不需再不停轮询API,仅需在接收到淘宝推送的消息时调用API获取即可,大大提高API调用效率和降低API使用费用。同时还提供消息回流服务( To淘宝 ),应用可将信息回流到淘宝,做商品数源服务等。
From淘宝:即淘宝向外推送淘宝(包括天猫)的交易、商品、退款等官方消息。
To淘宝:即向淘宝回流消息。
那么如何使用消息服务呢? 请看以下是消息服务 From淘宝 和 To淘宝 两种方式的详细使用说明。
消息服务就像一个mq中间件, 对内承接业务消息, 为接入的appkey划分队列/分组, 把消息存储下来。
提供给ISV两种渠道去消费归属自己的mq队列中的消息:
1)tmcClient长连接(websocket)监听消息(被推送消息);
2)api轮询消息。
多种消费方式协作消费同一个队列, 可按需扩容机器数。
1.旧控制台功能不再维护更新, 存在一些功能缺陷;
2.新控制台拥有旧功能, 且持续更新新功能;
控制台操作和 消息API操作 效果相同。
需注册开发者, 新建应用。
进入 应用管理控制台。
请注意确保 拥有权限包: 消息服务, 其他特定业务权限包;
没有权限包会导致相关消息无法可见, 无法被订阅。
进入新消息控制台。
1)消息接入说明:不同订阅方式有不同的稳定性保障和费用支出。
① MQ订阅/ONS订阅
产品介绍:点击查看;
产品使用:点击查看;
仅适用于商品正向逆向交易等官方消息, 以及部分支持mq订阅的其他消息;
需要购买聚石塔mq中间件实例, 消息存储时间由mq运维控制台自行控制;
需编码接入消费云中间件 mq。
② TMC订阅
适用于基本所有类型的消息;
消息存储时间仅最多24小时;
需要调用api消费消息(低性能) 或 编码tmcClient长连接消费消息(高性能)。
2)消息类型说明
① 广播消息: 消费者下单等操作产生的广播消息, 需要卖家用户授权appkey 且 appkey添加卖家用户到分组;
② 点对点消息: appkey跟内部应用沟通好发送场景后, 内部应用自行向appkey发送消息;
③ 回流消息: 将信息回流到淘宝,做商品数源服务等。
不同消息类型有不同的管理方式。
消息属于用户(卖家), 需把用户添加到目标分组, 淘宝内部将消息广播给所有拥有用户授权的appkey;
① appkey引导用户(卖家)授权
授权工具 或 服务市场订购流程;
② 管理分组/创建分组 为用户开通消息 并 添加用户到分组。
进入新消息控制台。
把用户(卖家)添加到分组中, 推荐使用默认分组 default。
(可等价于api操作 taobao.tmc.user.permit 和 taobao.tmc.group.add[仅非dafault需要])
③ 校验分组 用户 授权和开通是否正常
进入新消息控制台。
(可等价于api查询 taobao.tmc.user.get 获取用户已开通消息)
④ 删除用户 不再接收用户的广播消息。
(可等价于api操作 taobao.tmc.group.delete[仅非dafault需要]和 taobao.tmc.user.cancel)
消息属于appkey, 跟用户无关, 淘宝内部将定向发消息给appkey;
一般发往 appkey的 default 默认分组。
下载SDK,SDK包含常用的结构对象,简化开发成本。
目前支持JAVA语言,其它语言建议采用推模式接收消息。
相较于推模式,拉模式能有效避免流量高峰期消息消费不均、网卡堆积及重发率高的问题,对提升应用服务质量有积极作用。
JAVA示例代码
package ali; import com.alibaba.fastjson.JSON; import com.taobao.api.internal.tmc.Message; import com.taobao.api.internal.tmc.MessageHandler; import com.taobao.api.internal.tmc.MessageStatus; import com.taobao.api.internal.tmc.TmcClient; /** * 简易tmcClient案例 * 第一次启动 若连接失败会 自动退出程序, 可避免连接未建立, 不正常而不感知 * 第一次正常连接后, 后面断开连接 会自动重连 */ public class IsvTmcClientSimple { public static void main(String[] args) throws Exception { TmcClient client = new TmcClientBuilder() .setAppKey(Env.appkey) .setAppSecret(Env.secret) .setGroupName("default") // 关于default参考消息分组说明 .setPullMode(true) // 拉模式 - true,推模式 - false .build(); client.setQueueSize(1000); //被推送的缓冲队列 不要设置太大 避免堆积过久 导致未即时处理 导致雪崩重发 client.setThreadCount(Math.max(1, Runtime.getRuntime().availableProcessors() - 1)); //处理消息线程数 client.setMessageHandler(new MessageHandler() { public void onMessage(Message message, MessageStatus status) { // 注意处理时间耗时, 收到消息后若阻塞1分钟仍未处理完毕(当前onMessage函数未执行完毕), 将导致确认超时, 会导致重发消息, 请根据处理tps设置分组流控! // 注意日志级别 主动打些日志 避免收到消息而不能断定是否收到了消息 System.out.println("receiveMsg " + message.getTopic() + " " + message.getContent() ); try { // todo 业务处理 // xxxxxx } catch (Exception e) { // todo 自行记录异常信息 !!! 重要 避免排查为什么消息不确认 大部分场景是isv代码逻辑抛出异常了 System.out.println(e.getMessage()); e.printStackTrace(); // todo 若希望这条消息等会再重发一次过来 则标记消息消费失败 约6分钟后重新加入消费队列待消费 // status.fail(e.getMessage()); // 标记消息消费失败 // 重试注意:不是所有的异常都需要系统重试。 // 对于字段不全、主键冲突问题,导致写DB异常,不可重试,否则消息会一直重发 // 对于,由于网络问题,权限问题导致的失败,可重试。 // 重试时间 6分钟不等,不要滥用,否则会引起雪崩 } } }); //连接失败时会抛出异常, 请不要捕获, 避免未连接而不知晓, 第一次启动成功后, 后续断开会自动重连 client.connect("ws://mc.api.taobao.com"); // todo 请关注当前client需要消费消息的环境 各个环境消息隔离 但分组配置公用 // , "ws://premc.api.taobao.com/" // 预发环境 测试环境 // , "ws://mc.api.taobao.com/" // 生产环境 线上环境 System.out.println("tmc-client信息 client:" + JSON.toJSONString(client)); System.out.println("api调用地址 getApiUrl: " + client.getApiUrl()); System.out.println("tmc-client 在线状态 isOnline: " + client.isOnline()); //连接正常时 注意tmcClient实例 不能被销毁 (对象回收机制) if(client.isOnline()) { Thread.sleep(Long.MAX_VALUE); } } }
JAVA示例代码
package ali; import com.alibaba.fastjson.JSON; import com.taobao.api.internal.tmc.Message; import com.taobao.api.internal.tmc.MessageHandler; import com.taobao.api.internal.tmc.MessageStatus; import com.taobao.api.internal.tmc.TmcClient; /** * 简易tmcClient案例 * 第一次启动 若连接失败会 自动退出程序, 可避免连接未建立, 不正常而不感知 * 第一次正常连接后, 后面断开连接 会自动重连 */ public class IsvTmcClientSimple { public static void main(String[] args) throws Exception { TmcClient client = new TmcClient(Env.appkey, Env.secret, "default"); // 关于default参考消息分组说明 client.setQueueSize(1000); //被推送的缓冲队列 不要设置太大 避免堆积过久 导致未即时处理 导致雪崩重发 client.setThreadCount(Math.max(1, Runtime.getRuntime().availableProcessors() - 1)); //处理消息线程数 client.setMessageHandler(new MessageHandler() { public void onMessage(Message message, MessageStatus status) { // 注意处理时间耗时, 收到消息后若阻塞1分钟仍未处理完毕(当前onMessage函数未执行完毕), 将导致确认超时, 会导致重发消息, 请根据处理tps设置分组流控! // 注意日志级别 主动打些日志 避免收到消息而不能断定是否收到了消息 System.out.println("receiveMsg " + message.getTopic() + " " + message.getContent() ); try { // todo 业务处理 // xxxxxx } catch (Exception e) { // todo 自行记录异常信息 !!! 重要 避免排查为什么消息不确认 大部分场景是isv代码逻辑抛出异常了 System.out.println(e.getMessage()); e.printStackTrace(); // todo 若希望这条消息等会再重发一次过来 则标记消息消费失败 约6分钟后重新加入消费队列待消费 // status.fail(e.getMessage()); // 标记消息消费失败 // 重试注意:不是所有的异常都需要系统重试。 // 对于字段不全、主键冲突问题,导致写DB异常,不可重试,否则消息会一直重发 // 对于,由于网络问题,权限问题导致的失败,可重试。 // 重试时间 6分钟不等,不要滥用,否则会引起雪崩 } } }); //连接失败时会抛出异常, 请不要捕获, 避免未连接而不知晓, 第一次启动成功后, 后续断开会自动重连 client.connect("ws://mc.api.taobao.com"); // todo 请关注当前client需要消费消息的环境 各个环境消息隔离 但分组配置公用 // , "ws://premc.api.taobao.com/" // 预发环境 测试环境 // , "ws://mc.api.taobao.com/" // 生产环境 线上环境 System.out.println("tmc-client信息 client:" + JSON.toJSONString(client)); System.out.println("api调用地址 getApiUrl: " + client.getApiUrl()); System.out.println("tmc-client 在线状态 isOnline: " + client.isOnline()); //连接正常时 注意tmcClient实例 不能被销毁 (对象回收机制) if(client.isOnline()) { Thread.sleep(Long.MAX_VALUE); } } }
注意保障TmcClient单例,且实例不要被销毁!
不限编程语言
API调用流量有限
提供API接收消息的目的是那种对多线程和长连接处理不方便的语言使用的,比如PHP、Python,这些语言官方暂时没有提供SDK,可以通过下面两个API配合使用也能达到接收和确认消息的目的;
推荐尽量用SDK方式,如果必须使用API,建议调用taobao.tmc.messages.consume接口时尽量不要并发或并发量不要太大,可根据返回的消息数量酌情增减拉取消息的频次, 如拉取消息量为空时, 等待1秒以上再轮询;
API使用存在实时性不是很高的情况,如果实时性要求高建议还是用SDK。
① 首先消费消息:API名称:taobao.tmc.messages.consume 消息消费后,指针自动后移,下次调用自动获取到未消费过的消息,但是消费确认后的消息无法再次获取。
② 然后确认消息:API名称:taobao.tmc.messages.confirm 获取消息后,如果不确认,消息服务会选择时机(约6分钟)重发,重发次数由消息服务控制。
JAVA示例代码
TaobaoClient client = new DefaultTaobaoClient("http://gw.api.taobao.com/router/rest", "app_key", "app_secret", "json"); do { long quantity = 100L; TmcMessagesConsumeResponse rsp = null; do { TmcMessagesConsumeRequest req = new TmcMessagesConsumeRequest(); req.setQuantity(quantity); req.setGroupName("default"); rsp = client.execute(req); if (rsp.isSuccess() && rsp.getMessages() != null) { for (TmcMessage msg : rsp.getMessages()) { // handle message System.out.println(msg.getContent()); System.out.println(msg.getTopic()); // confirm message TmcMessagesConfirmRequest cReq = new TmcMessagesConfirmRequest(); cReq.setGroupName("default"); cReq.setsMessageIds(String.valueOf(msg.getId())); TmcMessagesConfirmResponse cRsp = client.execute(cReq); System.out.println(cRsp.getBody()); } } System.out.println(rsp.getBody()); } while (rsp != null && rsp.isSuccess() && rsp.getMessages() != null && rsp.getMessages().size() == quantity); Thread.sleep(1000L); } while (true);
进入新消息控制台
1)监控
如图可 查看 消息产生 消息消费 消息确认 连接数等情况。
2)告警
配置如下。
注意核对告警接收人 接收规则
查看报警记录 。注意:存在发送报警限流情况(如,每10分钟仅收到一条报警邮件)。
参考rocketMq使用 最佳实践。
MQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:
① 同一类tmcClient appkey group,通过增加或减少 tmcClient 实例数量来调整并行度。可以通过加机器,或者在已有机器启动多个进程的方式。
② 提高或降低单个 tmcClient 的消费并行线程,通过修改参数 实现。
TmcClient client = new TmcClient(appKey, appSecret, group); client.setQueueSize(1000); //任务最大堆积量 默认 2000 client.setThreadCount(16); //处理线程数 默认 Runtime.getRuntime().availableProcessors() * 10
发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:
public void onMessage(Message message, MessageStatus status) { if( 开关 降级 & ! message.getUserId().equal(vip)) return; }
根据tmcClient的确认qps能力来设置分组流控, 避免拉取过多消息下来而又无法按时确认, 导致 大量消息反复发送, 严重影响性能和正常业务。
进入新消息控制台
查看监控。
按实际单机tmcClient确认qps 手动控制 修改单个tmcClient流控 实时(10s)生效 。
消息总量qps: 消息产生量: 消息源产生的消息都存储在 消息服务的 mq队列中;
推送量qps: 消息服务 把mq队列中的 消息 推送给 tmcClient;
确认量qps: tmcClient收到消息后, 业务处理完成, 主动上报确认;
正常情况下, 确认量 <= 推送量 <= 消息总量 且相互相差不大 意味着即时消费且确认;
当有堆积时:
推送量qps
= tmcClient机器数
* 单机流控值(低于32时,有毛刺,但均值正常,第一秒为32,后续31秒为0)
* 24(倍率)
异常情况
① 消息总量 > 推送量, 会有堆积, 受限于推送量不足, 可按公式, 增加机器数, 调整流控值;
② 推送量 > 确认量, 会有重发, 受限于确认量不足, 可按公式, 降低推送量(降低机器数,调整流控值), 或可提升tmcClient确认能力;
③ 推送量 > 消息量, 说明有堆积, 在额外把堆积中的消息推送到tmcClient, 若确认量跟不上, 按②处理。
常见方法
① 调整单机流控为极小值 1;
② 观察单机确认量qps;
③ 逐步扩大单机流控值 为 32,64,128..., 每批次10分钟, 观察单机确认量qps, 若确认量qps<推送量qps,则 回调单机流控值为上次的值;
④ 当前单机流控值则为单机确认qps极限, 不可再调大, 可调优后重复步骤③;
⑤ 若当前确认qps量小于消息产生量qps, 则按需增加tmcClient机器数, 或等待慢慢消费削峰填谷。
消息丢失概率极低, 请先自查。
进入新消息控制台。
确保订阅修改时间, 授权过期时间, 和预期产生消息的场景时间匹配(时序), (点对点消息需指定业务场景产生消息)
(可等价于api操作 taobao.tmc.user.get)
进入新消息控制台
① 轨迹1 消息产生记录
② 轨迹2 消息消费记录
消费记录中 每一条均代表推送给了目标isvClient一次该消息
其中可看到
收到该消息的 时间 202x-xx-xx xx:xx:xx.xxx
收到该消息的 tmcClient机器 ip地址 xx.xx.xx.xx
"ip":"30.7.202.155:42.120.74.122:11.8.4.163"消息确认链路 三个ip 分别对应
tmcClient的内网ip、tmcClient的公网ip、tmc服务器ip。 注意: 旧sdk中间ip为 null。
消息发送链路关系为
消息产生->(消息记录)->tmc服务器ip->(发送记录)->tmcClient的公网ip->tmcClient的内网ip ->确认消息->(确认记录)
ip可能并非tmcClient的最终ip, 但一定是 tmcClient -> tmcServer ip链路上的ip, 可考虑使用 traceroute 指令跟踪ip链路。
③ 轨迹3 消息确认记录
代表 tmcClient已经收到了消息且按时确认了消息, 链路完成
若以上排查不出结果可以提交问题到支持中心附上:AppKey、(用户nick)、消息产生来源业务场景、消息状态、消息大概时间、订单的tid、商品的num_iid。
1)appkey一直不消费, 堆积过多(万级)或过久(半小时以上)会, 消费流控值预期会*30%;
2)appkey消费了不确认, 每间隔约6分钟重发一次;
3)appkey消费了不确认, 确认比过低(20%), 会自适应限制消费速率, 长期如此, 会酌情加白禁止消费消息;
4)消息最多保留8至24小时(不同集群有差异), 不论发没发, 有没有重发, 紧急情况下会进一步减少保留时间, 请即时消费并正常确认消息;
5)订阅关系失效后, 短时间(5分钟左右)可能会查出来缓存的关系;
关系失效场景: 取消订阅, 授权过期, 删除用户;
失效影响: 不再产生新的消息, 但已产生的待消费堆积消息依然继续推送;
6)授权过期问题有效期问题;
7)交易类taobao_trade_xxx消息体里面的状态字段 status 问题;
交易类消息体里面的状态是指:支付状态;
支付状态 只有有限的付款相关状态枚举值 [ 等待付款(未付款), 等待卖家发货(已付款), 交易成功(确认收货), 交易关闭(退款完成) ], 没有已发货的状态;
具体请以api核查订单详情, 物流详情为准;
8)分组问题
消息分组是用户消息隔离的一种方式,组内用户的消息只会发送到相同组名的连接上。同一个组支持多个连接,同一组的消息,随机发送到组内的某一个连接。如果要用户的类型对消息区别对待,比如优先保证付费用户,然后再保证免费用户,就可以通过消息分组来接收不同用户的消息。每个应用最多创建50个分组,每个分组用户数不限。
分组路由链路
9)多连接接收消息
多连接收消息是指同一分组内ISV服务器与TOP的消息服务器建立多个连接来协作消费消息。多链接是对同一个分组而言,消息在下发时随机选择从分组内的多个连接中选择一个连接下发消息。一般一台机器部署一个tmcClient, 可调整处理消息的线程数, 多部署几台机器进行协作消费负载;
10)确认消息无效, 确认后还反复重发消息: 常见于http api确认时, 确认消息outgoingId long型精度丢失, 导致无法正常确认;
11)错误码 isp.system-error: unknown errors,isv.tmc-switch-off: appkey,the app do not enable messaging-channel feature;
应用未订阅(开通)消息服务,就使用TmcClient来接收消息, 请于消息控制台订阅任意消息。
需要isv先跟内部对接的二方沟通,让对接的二方在开放平台内部去申请创建 isv -> taobao 流向的消息topic
使用前 请先咨询内部对应的小二同学。