# 数据订阅
通过数据订阅功能,瀚云平台实时将收到的设备上传数据,通过kafka订阅的方式推送到第三方平台。可实时向应用侧推送数据、事件或者命令及响应。具有实时高效、数据不丢失、数据加密的优势。
# 数据订阅
# 如何开通数据订阅功能?
- 在左侧导航栏选择“数据订阅”,进入数据订阅列表,选择需要开通数据订阅的产品,点击“申请开通”。
2.填写如下信息后,后台人员将会在1个工作日内完成审核,审核通过后即可设置订阅的数据类型。
3.设置订阅内容,勾选您需要订阅的内容。
# 如何暂停数据订阅功能?
通过点击“暂停”,暂停数据订阅功能,暂停后可再次启动。
# 消费组管理
消费组是Kafka提供的可扩展且具有容错性的消费者机制。使用kafka提供的消费组功能,同一个消费组的多个消费者就能分布到多个物理机器上以加速消费。平台为每个产品默认创建一个对应的消费组。
平台支持多消费组。当您有多个应用程序从同一个主题消费的需求,为每个需要获取一个或多个主题全部消息的应用创建一个消费组,就可以让它们获取到主题所有的消息。当前普通账号仅支持创建最多2个消费组。
# 如何创建消费组?
1.在左侧导航栏选择“数据订阅”,然后点击“消费组列表”。
2.点击 “创建消费组”,输入所要创建的消费组名称(一般设置为应用的名称),点击确定完成创建。创建成功后,将会自动生成对应的消费组ID。
# 如何设置消费组?
若您需要使用不同消费组共同消费同一个topic,在完成对应应用的消费组创建后,您需要通过以下操作步骤为消费组授权对应的topic消费权限。
- 在左侧导航栏选择“数据订阅”,进入数据订阅列表,选择需要订阅的产品,点击“设置”(需先开通数据订阅功能权限后,才可设置,如何开通消息订阅功能请 点击文档“数据订阅”)。
2.勾选您需要授权的消费组,完成授权。
# 调用平台接口实现订阅
不使用数据订阅功能时,需要应用侧定时主动调用抽取设备数据,订阅者调用平台的接口实现订阅的过程中,将会用到鉴权参数。
点击对应产品的“鉴权参数”,即可查看对应鉴权参数。
鉴权参数说明:
- 接入(accessKey/accessSecret),主要在设备通过MQTT协议接入或者通过API创建设备时使用此鉴权参数。
- 查询(queryKey/querySecret),通过api查询该产品下的数据时,使用此鉴权。
- 命令(cmdKey/cmdSecret),通过api给该产品下的设备下发控制命令时,使用此鉴权。
- 上报(uploadKey/uploadSecret),通过api以某个设备的身份上传数据时使用此鉴权,新增和更新设备档案时使用此鉴权。
# HTTP推送
通过HTTP推送功能,瀚云平台将收到的设备数据按订阅类型转发到指定的HTTP地址,功能较单一但简单易用且高效。目前支持的订阅类型消息有:设备日志、设备数据流、设备事件、设备命令。
# 创建HTTP实例
1.在左侧导航栏选择“数据订阅”-“HTTP推送”,选择HTTP实例。
2.填写实例名称、token、推送地址等信息。填写完推送地址后,平台会向该地址发送HTTP GET请求,进行地址有效性验证,校验成功方可添加实例成功。
token:选填,如填写则作为消息推送数字签名的依据,需用户妥善保管。服务端通过token校验瀚云平台身份,保证消息不被篡改。
# 如何使用token?
如果用户新建实例时,填写了token,服务端接收到请求后,需要通过signature字段对请求进行签名校验。校验方法如下:
新建实例时填写的token+随机字符串nonce+推送消息msg拼接到一起做MD5加密,然后将此加密数据做Base64转换,最后将转换后的数据与请求中signature进行比较,相等即为校验成功。具体token加密方式参考token加密
有效性验证的请求形式示例如下:
http://url?msg=validateUrl&nonce=xxx&signature=xxx
具体校验方式参考地址有效性验证
消息加密方式:分为安全模式和明文模式,安全模式采用AES加密算法,通过产品的dataSecret加密密钥。
# 应用层消息处理:
如果用户选择安全模式,产品订阅成功后,会生成对应的dataSecret作为消息加密的AES密钥。用户可以调用瀚云提供的SDK接口(使用SDK),通过productKey得到秘钥dataSecret(dataSecret获取),再进行AES解密获得消息内容。
具体处理方式参考消息处理。
# 订阅产品
1.在左侧导航栏选择“数据订阅”-“HTTP推送”,选择HTTP订阅产品。
2.选择产品后,点击“订阅设置”或“批量订阅”,选择推送内容,选择HTTP实例(实例服务器地址需验证成功)
3.订阅成功的产品会生成DataSecret,用于安全模式中消息的解密密钥。
# 消息推送
推送消息格式如下:
{
"msg": "xxxxxxxx", # 消息内容,明文模式或安全模式解密后为JSON字符串数组,包含多条同一类型的数据
"nonce": "abcdefgh", # 随机字符串
"signature": "message signature", # 数字签名
"messageType": 1, # 订阅类型
"productKey": "asdsdgsdg" # 产品key
}
字段说明:
字段 | 类型 | 说明 |
---|---|---|
msg | string | 数据内容,包括设备日志、设备数据流、设备事件、设备命令 |
nonce | string | 用于计算签名字符的随机串 |
signature | string | 数字签名,用以校验推送客户端身份合法性,校验方法见实例验证 |
messageType | int | 订阅类型 1.设备数据流 2.设备日志 3.设备事件 4.设备命令 |
productKey | string | 产品key |
msg在明文模式下传输或安全模式解密后得到的数据会有四种数据类型,各种类型的msg示例如下:
1、设备数据流
[{
"deviceId": 188741,
"deviceKey": "a31f0dbff15846079be89ab5323c02e3",
"productId": 36295,
"productKey": "a4weklJs",
"productId": 36295,
"userId": 6,
"userKey": "zlW6xF5j",
"stream": "streamTest", //数据流名称
"time": 1646620812231, //消息到达平台的时间戳
"type": 1, //数据类型,具体编码参见 https://console.hanclouds.com/doc/api/dataTypes/DataType.html#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E5%88%97%E8%A1%A8
"data": "{\"Temperature\":19.7,\"radiation\":448.5}" //数据值,视类型不同,形式有所不同
}]
2、设备日志
[{
"deviceId": 188741,
"deviceKey": "a31f0dbff15846079be89ab5323c02e3",
"productId": 36295,
"productKey": "a4weklJs",
"productId": 36295,
"userId": 6,
"userKey": "zlW6xF5j",
"time": 1646620812231, //消息到达平台的时间戳
"event": 0, //设备日志类型(1:设备上线 -1:设备下线 0:设备在线 20:新增设备 21:删除设备 3:命令校验错误 41:数据格式不匹配 42:数据流不存在 51:事件格式不正确 52:事件数据流不存在
//61:数据流脚本执行错误 71:命令脚本执行错误 81:同步拓扑结构 82:获取云端拓扑结构 83:拓扑结构同步失败 84:代理关系校验错误 85:topic解析信息错误 86:topic类别标识错误 87:topic格式校验错误)
"detail": "设备下线" //详细信息
}]
3、设备事件
[{
"deviceId": 188741,
"deviceKey": "a31f0dbff15846079be89ab5323c02e3",
"productId": 36295,
"productKey": "a4weklJs",
"productId": 36295,
"userId": 6,
"userKey": "zlW6xF5j",
"time": 1646620812231, //消息到达平台的时间戳
"event": "dis", //事件名称
"eventType": 1, //事件类型(1: info (信息)2: Alert (告警)3: Error(故障))
"data": "{\"msg\":13}" //数据值,视类型不同,形式有所不同
}]
4、设备命令
[{
"deviceId": 188741,
"deviceKey": "a31f0dbff15846079be89ab5323c02e3",
"productId": 36295,
"productKey": "a4weklJs",
"productId": 36295,
"userId": 6,
"userKey": "zlW6xF5j",
"time": 1646620812231, //消息到达平台的时间戳
"cmdId": "64f02492dd6c4153a1d7cacd2ab1dc00", //命令Id,每个命令都有一个独一无二的命令Id
"identifier": "cmd", //命令标识符
"state":1, //命令状态,具体编码参见 https://console.hanclouds.com/doc/api/dataTypes/Command.html#%E5%91%BD%E4%BB%A4%E7%8A%B6%E6%80%81%E5%88%97%E8%A1%A8
"type":1, //数据类型,具体编码参见 https://console.hanclouds.com/doc/api/dataTypes/DataType.html#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E5%88%97%E8%A1%A8
"data":"{\"in\": 10}" //数据值,视类型不同,形式有所不同
}]
# 推送说明:
瀚云平台以HTTP POST请求向服务端推送数据,服务端接收到数据后需要返回HTTP状态码 200,否则瀚云平台会认为此次推送无效并重试,最多重试5次。且等待服务端的响应都设有时限(目前是5秒),在规定时限内没有收到响应会认为发送失败,连续失败100次(包括重试次数,计数规则是根据HTTP实例计算。)则会认为服务端地址不可用,推送服务将停止,对应实例状态显示为"验证失败"。
当实例"验证失败",用户解决连续推送失败的问题后,在HTTP实例列表中,将"验证失败"的实例点击"验证"或编辑重新输入连接正常的推送地址,恢复推送。
建议服务端接收到数据时,先做数据缓存,再做业务逻辑处理。 其中请求的Content-Type
为application/json
。重试间隔如下:
重试次数 | 间隔时间 |
---|---|
1 | 5秒 |
2 | 10秒 |
3 | 30秒 |
4 | 60秒 |
5 | 120秒 |
# JAVA代码示例
# token加密
/**
* token加密
*
* @param msg 推送消息内容
* @param token 瀚云平台填写的token
* @param nonce 平台生成的随机字符串
* @return
* @throws Exception
*/
public String signWithMD5(String msg, String token, String nonce) throws Exception {
String content = new StringBuilder(token).append(nonce).append(msg).toString();
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] digest = md5.digest(content.getBytes(StandardCharsets.UTF_8));
return new Base64().encodeAsString(digest);
}
# 消息处理
/**
* 明文模式消息处理
*
* @param pushData 平台发送的数据
* @param response 响应
* @return
* @throws Exception
*/
@PostMapping("")
public String clearMode(@RequestBody HttpPushData pushData, HttpServletResponse response) throws Exception{
String signature = pushData.getSignature();
String msg = pushData.getMsg();
if(!StringUtils.isEmpty(signature)) {
String signWithMD5 = signWithMD5(token + pushData.getNonce() + msg);
boolean equals = pushData.getSignature().equals(signWithMD5);
if (!equals) {
response.setStatus(HttpStatus.BAD_REQUEST.value());
logger.error("明文模式验证token失败...");
return "FAIL";
}
}
logger.info("明文模式验证token成功...保存msg,msg={}",msg);
return "OK";
}
/**
* 安全模式消息处理
*
* @param pushData 平台发送的数据
* @param response 响应
* @return
* @throws Exception
*/
@PostMapping("")
public String safeMode(@RequestBody HttpPushData pushData, HttpServletResponse response) throws Exception {
String signature = pushData.getSignature();
String msg = pushData.getMsg();
// token验证
if(!StringUtils.isEmpty(signature)) {
String signWithMD5 = signWithMD5(token + pushData.getNonce() + msg);
boolean equals = pushData.getSignature().equals(signWithMD5);
if (!equals) {
response.setStatus(HttpStatus.BAD_REQUEST.value());
logger.error("安全模式验证token失败...");
return "FAIL";
}
}
// 根据productKey 查询dataSecret
ProductDataSecretRequest request = new ProductDataSecretRequest();
request.setProductKey(pushData.getProductKey());
HanCloudsClient hanCloudsClient = new HanCloudsClient("https://api.hanclouds.com/api/v1");
// userKey, authKey, authSecret均为用户级的鉴权信息。
hanCloudsClient.putUserAuthParams(userKey, authKey, authSecret);
try {
StringResponse sdkResponse = hanCloudsClient.execute(request);
if (sdkResponse.isSucceed()) {
//成功后处理代码
String dataSecret = sdkResponse.getValue();
msg = decodeWithAES(dataSecret, msg);
logger.info("安全模式验证token成功...保存msg,msg={}",msg);
return "OK";
}else {
response.setStatus(HttpStatus.BAD_REQUEST.value());
logger.error("查询dataSecret失败...");
return "FAIL";
}
} catch (HanCloudsException e) {
response.setStatus(HttpStatus.BAD_REQUEST.value());
logger.error("查询dataSecret失败...");
return "FAIL";
}
}
/**
* 推送消息解密
*
* @param dataSecret 产品对应的AES秘钥
* @param content 加密消息体
* @return
* @throws Exception
*/
public String decodeWithAES(String dataSecret, String content) throws Exception {
KeyGenerator kgen = KeyGenerator.getInstance("AES");
SecureRandom secureRandom = SecureRandom.getInstance("SHA1PRNG");
secureRandom.setSeed(dataSecret.getBytes());
kgen.init(128, secureRandom);
SecretKey secretKey = kgen.generateKey();
byte[] data = (new Base64()).decode(content);
byte[] enCodeFormat = secretKey.getEncoded();
SecretKeySpec key = new SecretKeySpec(enCodeFormat, "AES");
Cipher cipher = Cipher.getInstance("AES");
cipher.init(2, key);
byte[] result = cipher.doFinal(data);
return new String(result);
}
# 地址有效性验证
String token = "控制台页面生成的token";
/**
* 地址有效性验证
*
* @param msg 验证消息
* @param nonce 随机字符串
* @param signature 数字签名
* @return
* @throws Exception
*/
@GetMapping("")
public String verifyUrl( HttpServletResponse response,
@RequestParam(value = "msg") String msg,
@RequestParam(value = "nonce") String nonce,
@RequestParam(value = "signature") String signature) throws Exception {
if(!StringUtils.isEmpty(signature)) {
String signWithMD5 = signWithMD5(token + nonce + msg);
if (!signature.equals(signWithMD5)) {
response.setStatus(HttpStatus.BAD_REQUEST.value());
return "FAIL";
}
}
return "OK";
}
/**
* token加密
*
* @param content 待加密数据
* @return
*/
public String signWithMD5(String content) {
try {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] digest = md5.digest(content.getBytes(StandardCharsets.UTF_8));
return new Base64().encodeAsString(digest);
} catch (Exception var3) {
logger.info("signWithMD5({}) failed. {}", content, var3.getMessage());
return null;
}
}