# 数据订阅

通过数据订阅功能,瀚云平台实时将收到的设备上传数据,通过kafka订阅的方式推送到第三方平台。可实时向应用侧推送数据、事件或者命令及响应。具有实时高效、数据不丢失、数据加密的优势。

# 数据订阅

# 如何开通数据订阅功能?

  1. 在左侧导航栏选择“数据订阅”,进入数据订阅列表,选择需要开通数据订阅的产品,点击“申请开通”。

img

2.填写如下信息后,后台人员将会在1个工作日内完成审核,审核通过后即可设置订阅的数据类型。

img

3.设置订阅内容,勾选您需要订阅的内容。

img

# 如何暂停数据订阅功能?

通过点击“暂停”,暂停数据订阅功能,暂停后可再次启动。

img

# 消费组管理

消费组是Kafka提供的可扩展且具有容错性的消费者机制。使用kafka提供的消费组功能,同一个消费组的多个消费者就能分布到多个物理机器上以加速消费。平台为每个产品默认创建一个对应的消费组。

平台支持多消费组。当您有多个应用程序从同一个主题消费的需求,为每个需要获取一个或多个主题全部消息的应用创建一个消费组,就可以让它们获取到主题所有的消息。当前普通账号仅支持创建最多2个消费组。

# 如何创建消费组?

1.在左侧导航栏选择“数据订阅”,然后点击“消费组列表”。

img

2.点击 “创建消费组”,输入所要创建的消费组名称(一般设置为应用的名称),点击确定完成创建。创建成功后,将会自动生成对应的消费组ID。

img

# 如何设置消费组?

若您需要使用不同消费组共同消费同一个topic,在完成对应应用的消费组创建后,您需要通过以下操作步骤为消费组授权对应的topic消费权限。

  1. 在左侧导航栏选择“数据订阅”,进入数据订阅列表,选择需要订阅的产品,点击“设置”(需先开通数据订阅功能权限后,才可设置,如何开通消息订阅功能请 点击文档“数据订阅”)。

img

2.勾选您需要授权的消费组,完成授权。

img

# 调用平台接口实现订阅

不使用数据订阅功能时,需要应用侧定时主动调用抽取设备数据,订阅者调用平台的接口实现订阅的过程中,将会用到鉴权参数。

点击对应产品的“鉴权参数”,即可查看对应鉴权参数。

img

img

鉴权参数说明:

  1. 接入(accessKey/accessSecret),主要在设备通过MQTT协议接入或者通过API创建设备时使用此鉴权参数。
  2. 查询(queryKey/querySecret),通过api查询该产品下的数据时,使用此鉴权。
  3. 命令(cmdKey/cmdSecret),通过api给该产品下的设备下发控制命令时,使用此鉴权。
  4. 上报(uploadKey/uploadSecret),通过api以某个设备的身份上传数据时使用此鉴权,新增和更新设备档案时使用此鉴权。

# HTTP推送

通过HTTP推送功能,瀚云平台将收到的设备数据按订阅类型转发到指定的HTTP地址,功能较单一但简单易用且高效。目前支持的订阅类型消息有:设备日志、设备数据流、设备事件、设备命令。

# 创建HTTP实例

1.在左侧导航栏选择“数据订阅”-“HTTP推送”,选择HTTP实例。

img

2.填写实例名称、token、推送地址等信息。填写完推送地址后,平台会向该地址发送HTTP GET请求,进行地址有效性验证,校验成功方可添加实例成功。

img

token:选填,如填写则作为消息推送数字签名的依据,需用户妥善保管。服务端通过token校验瀚云平台身份,保证消息不被篡改。

# 如何使用token?

如果用户新建实例时,填写了token,服务端接收到请求后,需要通过signature字段对请求进行签名校验。校验方法如下:

新建实例时填写的token+随机字符串nonce+推送消息msg拼接到一起做MD5加密,然后将此加密数据做Base64转换,最后将转换后的数据与请求中signature进行比较,相等即为校验成功。具体token加密方式参考token加密

有效性验证的请求形式示例如下:

http://url?msg=validateUrl&nonce=xxx&signature=xxx

具体校验方式参考地址有效性验证

消息加密方式:分为安全模式和明文模式,安全模式采用AES加密算法,通过产品的dataSecret加密密钥。

# 应用层消息处理:

如果用户选择安全模式,产品订阅成功后,会生成对应的dataSecret作为消息加密的AES密钥。用户可以调用瀚云提供的SDK接口(使用SDK),通过productKey得到秘钥dataSecret(dataSecret获取),再进行AES解密获得消息内容。

具体处理方式参考消息处理

# 订阅产品

1.在左侧导航栏选择“数据订阅”-“HTTP推送”,选择HTTP订阅产品。

img

2.选择产品后,点击“订阅设置”或“批量订阅”,选择推送内容,选择HTTP实例(实例服务器地址需验证成功)

img

3.订阅成功的产品会生成DataSecret,用于安全模式中消息的解密密钥。

# 消息推送

推送消息格式如下:

{
      "msg":  "xxxxxxxx", # 消息内容,明文模式或安全模式解密后为JSON字符串数组,包含多条同一类型的数据
      "nonce": "abcdefgh", # 随机字符串
      "signature": "message signature", # 数字签名
      "messageType":  1,  # 订阅类型
      "productKey":  "asdsdgsdg"  # 产品key
}

字段说明:

字段 类型 说明
msg string 数据内容,包括设备日志、设备数据流、设备事件、设备命令
nonce string 用于计算签名字符的随机串
signature string 数字签名,用以校验推送客户端身份合法性,校验方法见实例验证
messageType int 订阅类型 1.设备数据流 2.设备日志 3.设备事件 4.设备命令
productKey string 产品key

msg在明文模式下传输或安全模式解密后得到的数据会有四种数据类型,各种类型的msg示例如下:

1、设备数据流

[{
    "deviceId": 188741,
    "deviceKey": "a31f0dbff15846079be89ab5323c02e3",
    "productId": 36295,
    "productKey": "a4weklJs",
    "productId": 36295,
    "userId": 6,
    "userKey": "zlW6xF5j",
    "stream": "streamTest",  //数据流名称
    "time": 1646620812231,  //消息到达平台的时间戳
    "type": 1,   //数据类型,具体编码参见 https://console.hanclouds.com/doc/api/dataTypes/DataType.html#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E5%88%97%E8%A1%A8
    "data": "{\"Temperature\":19.7,\"radiation\":448.5}"   //数据值,视类型不同,形式有所不同
}]

2、设备日志

[{
    "deviceId": 188741,
    "deviceKey": "a31f0dbff15846079be89ab5323c02e3",
    "productId": 36295,
    "productKey": "a4weklJs",
    "productId": 36295,
    "userId": 6,
    "userKey": "zlW6xF5j",
    "time": 1646620812231,   //消息到达平台的时间戳
    "event": 0,    //设备日志类型(1:设备上线 -1:设备下线 0:设备在线 20:新增设备 21:删除设备 3:命令校验错误 41:数据格式不匹配 42:数据流不存在 51:事件格式不正确 52:事件数据流不存在
    //61:数据流脚本执行错误 71:命令脚本执行错误 81:同步拓扑结构 82:获取云端拓扑结构 83:拓扑结构同步失败 84:代理关系校验错误 85:topic解析信息错误 86:topic类别标识错误 87:topic格式校验错误)
    "detail": "设备下线"  //详细信息
}]

3、设备事件

[{
    "deviceId": 188741,
    "deviceKey": "a31f0dbff15846079be89ab5323c02e3",
    "productId": 36295,
    "productKey": "a4weklJs",
    "productId": 36295,
    "userId": 6,
    "userKey": "zlW6xF5j",
    "time": 1646620812231,  //消息到达平台的时间戳
    "event": "dis",   //事件名称
    "eventType": 1,   //事件类型(1: info (信息)2: Alert (告警)3: Error(故障))
    "data": "{\"msg\":13}"   //数据值,视类型不同,形式有所不同
}]

4、设备命令

[{
    "deviceId": 188741,
    "deviceKey": "a31f0dbff15846079be89ab5323c02e3",
    "productId": 36295,
    "productKey": "a4weklJs",
    "productId": 36295,
    "userId": 6,
    "userKey": "zlW6xF5j",
    "time": 1646620812231,  //消息到达平台的时间戳
    "cmdId": "64f02492dd6c4153a1d7cacd2ab1dc00",  //命令Id,每个命令都有一个独一无二的命令Id
    "identifier": "cmd",  //命令标识符
    "state":1,  //命令状态,具体编码参见 https://console.hanclouds.com/doc/api/dataTypes/Command.html#%E5%91%BD%E4%BB%A4%E7%8A%B6%E6%80%81%E5%88%97%E8%A1%A8
    "type":1,  //数据类型,具体编码参见 https://console.hanclouds.com/doc/api/dataTypes/DataType.html#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E5%88%97%E8%A1%A8
    "data":"{\"in\": 10}"  //数据值,视类型不同,形式有所不同
}]

# 推送说明:

瀚云平台以HTTP POST请求向服务端推送数据,服务端接收到数据后需要返回HTTP状态码 200,否则瀚云平台会认为此次推送无效并重试,最多重试5次。且等待服务端的响应都设有时限(目前是5秒),在规定时限内没有收到响应会认为发送失败,连续失败100次(包括重试次数,计数规则是根据HTTP实例计算。)则会认为服务端地址不可用,推送服务将停止,对应实例状态显示为"验证失败"。

当实例"验证失败",用户解决连续推送失败的问题后,在HTTP实例列表中,将"验证失败"的实例点击"验证"或编辑重新输入连接正常的推送地址,恢复推送。

建议服务端接收到数据时,先做数据缓存,再做业务逻辑处理。 其中请求的Content-Typeapplication/json。重试间隔如下:

重试次数 间隔时间
1 5秒
2 10秒
3 30秒
4 60秒
5 120秒

# JAVA代码示例

# token加密

/**
 * token加密
 *
 * @param msg   推送消息内容
 * @param token 瀚云平台填写的token
 * @param nonce 平台生成的随机字符串
 * @return
 * @throws Exception
 */
public String signWithMD5(String msg, String token, String nonce) throws Exception {
    String content = new StringBuilder(token).append(nonce).append(msg).toString();
    MessageDigest md5 = MessageDigest.getInstance("MD5");
    byte[] digest = md5.digest(content.getBytes(StandardCharsets.UTF_8));
    return new Base64().encodeAsString(digest);
}

# 消息处理

/**
 * 明文模式消息处理
 *
 * @param pushData 平台发送的数据
 * @param response  响应
 * @return
 * @throws Exception
 */
@PostMapping("")
public String clearMode(@RequestBody HttpPushData pushData, HttpServletResponse response) throws Exception{
    String signature = pushData.getSignature();
    String msg = pushData.getMsg();
    if(!StringUtils.isEmpty(signature)) {
        String signWithMD5 = signWithMD5(token + pushData.getNonce() + msg);
        boolean equals = pushData.getSignature().equals(signWithMD5);
        if (!equals) {
            response.setStatus(HttpStatus.BAD_REQUEST.value());
            logger.error("明文模式验证token失败...");
            return "FAIL";
        }
    }
    logger.info("明文模式验证token成功...保存msg,msg={}",msg);
    return "OK";
}


/**
 * 安全模式消息处理
 *
 * @param pushData 平台发送的数据
 * @param response  响应
 * @return
 * @throws Exception
 */
@PostMapping("")
public String safeMode(@RequestBody HttpPushData pushData, HttpServletResponse response) throws Exception {
    String signature = pushData.getSignature();
    String msg = pushData.getMsg();
    // token验证
    if(!StringUtils.isEmpty(signature)) {
        String signWithMD5 = signWithMD5(token + pushData.getNonce() + msg);
        boolean equals = pushData.getSignature().equals(signWithMD5);
        if (!equals) {
            response.setStatus(HttpStatus.BAD_REQUEST.value());
            logger.error("安全模式验证token失败...");
            return "FAIL";
        }
    }
    // 根据productKey 查询dataSecret
    ProductDataSecretRequest request = new ProductDataSecretRequest();
    request.setProductKey(pushData.getProductKey());
    HanCloudsClient hanCloudsClient = new HanCloudsClient("https://api.hanclouds.com/api/v1");
    // userKey, authKey, authSecret均为用户级的鉴权信息。
    hanCloudsClient.putUserAuthParams(userKey, authKey, authSecret);
    try {
        StringResponse sdkResponse =  hanCloudsClient.execute(request);
        if (sdkResponse.isSucceed()) {
            //成功后处理代码
            String dataSecret = sdkResponse.getValue();
            msg = decodeWithAES(dataSecret, msg);
            logger.info("安全模式验证token成功...保存msg,msg={}",msg);
            return "OK";
        }else {
            response.setStatus(HttpStatus.BAD_REQUEST.value());
            logger.error("查询dataSecret失败...");
            return "FAIL";
        }
    } catch (HanCloudsException e) {
        response.setStatus(HttpStatus.BAD_REQUEST.value());
        logger.error("查询dataSecret失败...");
        return "FAIL";
    }
}

/**
 * 推送消息解密
 *
 * @param dataSecret 产品对应的AES秘钥
 * @param content    加密消息体
 * @return
 * @throws Exception
 */
public String decodeWithAES(String dataSecret, String content) throws Exception {
    KeyGenerator kgen = KeyGenerator.getInstance("AES");
    SecureRandom secureRandom = SecureRandom.getInstance("SHA1PRNG");
    secureRandom.setSeed(dataSecret.getBytes());
    kgen.init(128, secureRandom);
    SecretKey secretKey = kgen.generateKey();
    byte[] data = (new Base64()).decode(content);
    byte[] enCodeFormat = secretKey.getEncoded();
    SecretKeySpec key = new SecretKeySpec(enCodeFormat, "AES");
    Cipher cipher = Cipher.getInstance("AES");
    cipher.init(2, key);
    byte[] result = cipher.doFinal(data);
    return new String(result);
}

# 地址有效性验证

String token = "控制台页面生成的token";
/**
 * 地址有效性验证
 *
 * @param msg   验证消息
 * @param nonce 随机字符串
 * @param signature 数字签名
 * @return
 * @throws Exception
 */
@GetMapping("")
public String verifyUrl( HttpServletResponse response,
                           @RequestParam(value = "msg") String msg,
                           @RequestParam(value = "nonce") String nonce,
                           @RequestParam(value = "signature") String signature) throws Exception {
    if(!StringUtils.isEmpty(signature)) {
        String signWithMD5 = signWithMD5(token + nonce + msg);
        if (!signature.equals(signWithMD5)) {
            response.setStatus(HttpStatus.BAD_REQUEST.value());
            return "FAIL";
        } 
    }
    return "OK";
}

/**
 * token加密
 *
 * @param content   待加密数据
 * @return
 */
public String signWithMD5(String content) {
    try {
        MessageDigest md5 = MessageDigest.getInstance("MD5");
        byte[] digest = md5.digest(content.getBytes(StandardCharsets.UTF_8));
        return new Base64().encodeAsString(digest);
    } catch (Exception var3) {
        logger.info("signWithMD5({}) failed. {}", content, var3.getMessage());
        return null;
    }
}