# 数据订阅文档

数据订阅服务旨在为第三方云平台提供高速、稳定、安全的数据服务。 由于推送服务不保留数据及网络延迟性等问题的存在,会造成数据的丢失,而本服务的数据将会保留一天,相比之下比APP推送服务更可靠。通过定制Kafka的认证、鉴权,数据订阅服务鉴权与HanClouds平台的鉴权机制完全一致,数据均通过加密处理,拥有足够的安全保障。数据订阅服务服务采用SASL/PLAIN认证机制,为充分保障数据的安全性,用户想要成功订阅到数据,必须通过认证、鉴权校验。

data-subscription-flow

订阅步骤概要说明:

1.用户向HanClouds提交开通数据订阅的申请,由管理员进行审核,并为其开通。

2.数据订阅服务开通后,平台会自动根据用户下的产品级参数productKey进行topic的创建及授权,并将数据通过生成的dataSecret进行加密发送。

3.用户调用Kafka Client API开启SASL/PLAIN认证,并通过productKey,鉴权参数以及数据密钥订阅并消费数据。

# 快速开始

# 开通服务

如需使用数据订阅服务,可以在数据订阅界面选择所需开通该项服务的产品,并点击申请开通,在填写完成基本资料之后即可提交审核,审核通过后方可使用该服务。开通成功后可以在界面设置推送数据类型,目前支持设备日志、设备数据流、设备事件、设备命令、大数据计算结果五种数据类型的任意排列组合,设置后立即生效。并且还可以在界面暂停推送,暂停超过七天后,服务将会自动关闭,请谨慎使用。

# 编写订阅客户端

HanClouds提供了完整的客户端示例 (opens new window),只需简单修改一些配置即可使用。客户端版本分为V1和V2,V1版本的客户端只针对数据订阅服务的前身云接入服务的用户使用,数据订阅服务将不再使用该客户端进行数据的拉取。下面将介绍如何修改这些配置信息(主要针对V2客户端进行介绍):

  1. 将示例代码中的PRODUCT_KEY变量修改为需要订阅数据的productKey,数据订阅服务是使用productKey进行认证,鉴权以及创建topic;
  2. QUERY_KEY以及QUERY_SECRET修改为需订阅的产品下的queryKey和querySecret。queryKey以及querySecret主要是用于认证、鉴权,可在数据订阅服务界面点击鉴权信息获取;
  3. 修改DATA_SECRECT为开通数据订阅服务时生成的dataSecret(数据密钥),用于数据加密,可在数据订阅服务界面获取;
  4. 如果成功修改了上述配置,可以直接运行程序,控制台将打印出订阅的数据,当然你也可以对五种不同的数据类型做不同的处理,五种数据类型通过获取到的数据中的dataType字段进行区分,下文会进行更加详细的介绍,HanClouds只是提供一个可供参考的模板。

# 客户端详细说明

由于订阅服务的是通过Kafka实现的,所以订阅客户端的本质就是Kafka Client API中的Consumer API,每个数据订阅服务所对应的Topic就是productKey,但是由于数据订阅服务定制了Kafka的认证、鉴权机制,客户端需开启SASL认证,需要在客户端中需添加SaslConfigs.SASL_JAAS_CONFIG配置选项,并将productKey作为userName,password则是对鉴权参数(queryKey,querySecret)通过HMAC-SHA1签名算法所得。

        props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                PlainLoginModule.class.getName() + " required username=\"%s\" " + "password=\"%s\";",
                USER_NAME,
                password
        ));

另外因为Kafka客户端与服务端之间的通信协议是开源的,为提升用户数据的安全性,数据本身也进行了加密操作,所以需要在成功订阅消费到数据之后需要使用dataSecrect解密才能使用数据明文:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  for (ConsumerRecord<String, String> record : records) {
    PushServiceData accessData =     JSON.parseObject(EncryptUtil.decodeWithAesCbc(DATA_SECRET, record.value()),   
    if (accessData != null){
      Object data = PushServiceDataTypeEnum.getJsonDataByType(accessData.getDataType(),   accessData.getData());
    if (data != null){
      System.out.printf("topic=%s, partition=%s, offset = %d, key = %s, value = %s%n",
      record.topic(), record.partition(),     record.offset(), record.key(), data.toString());
      }
}
    ...

# 数据格式说明

数据订阅所支持的五种数据类型都会存在于同一个topic里面,通过dataType进行区分,在示例工程中的PushServiceDataTypeEnum中已经体现出了这种区别,完整的一条数据格式为:

{dataType=1,data='{"data":"test",....}'}

dataType与数据类型的对应关系为:

设备数据流=1;

设备日志=2;

设备命令=3;

大数据计算结果=4;

设备事件=5;

每种数据的格式都会有所区别,下面将对这五种数据类型做一个简单的介绍。数据中的data字段根据type字段的值不同而表示不同的类型,映射关系如下:

type值 数据流类型 data类型 补充说明
0 无效的数据类型
1 Json String UTF-8编码的Json串
2 Int Int
3 String String UTF-8编码的字符串
4 Double Double
5 Bin String 二进制进行base64后的字符串,字符编码格式为UTF-8
6 Boolean Boolean
7 Array String UTF-8编码的Json数组
8 Enum Int
9 Float Float
10 Gps String UTF-8编码的Json串

# 设备数据流数据格式

    /**
     * userKey
     */
    private String userKey;
    /**
     * productKey
     */
    private String productKey;
    /**
     * deviceKey
     */
    private String deviceKey;
    /**
     * 设备sn号
     */
    private String sn;
    /**
     * 时间戳
     */
    private long time;
    /**
     * 数据流名称
     */
    private String stream;
    /**
     * 数据类型
     */
    private int type;
    /**
     * 数据值,视类型不同,形式有所不同
     */
    private Object data;
    
    ...

# 设备日志数据格式

   /**
     * deviceKey
     */
    private String deviceKey;
    /**
     * 设备sn号
     */
    private String sn;    
    /**
     * 设备日志类型,具体编码参见 https://console.hanclouds.com/doc/api/dataTypes/Event.html
     */
    private int event;
    /**
     * productKey
     */
    private String productKey;
    /**
     * 日志时间
     */
    private long time;
    /**
     * userKey
     */
    private String userKey;

# 设备命令数据格式

   /**
     * 命令下发目标设备所属的userKey
     */
    private String userKey;
    /**
     * 命令下发目标设备所属的productKey
     */
    private String productKey;
    /**
     * 命令下发目标的deviceKey
     */
    private String deviceKey;
    /**
     * 设备sn号
     */
    private String sn;  
    /**
     * 命令Id,每个命令都有一个独一无二的命令Id
     */
    private String cmdId;
    /**
     * 命令的状态,具体编码参见 https://console.hanclouds.com/doc/api/dataTypes/Command.html#%E5%91%BD%E4%BB%A4%E7%8A%B6%E6%80%81%E5%88%97%E8%A1%A8
     */
    private int state;
    /**
     * 命令状态发生的时间
     */
    private long time;
    /**
     * 命令到期时间
     */
    private long timeDue;

    /**
     * 数据类型,具体编码参见 https://console.hanclouds.com/doc/api/dataTypes/DataType.html#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E5%88%97%E8%A1%A8
     */
    private int type;
    /**
     * 下发命令或设备响应命令的内容
     */
    private Object data;

# 大数据计算结果数据格式

   /**
     * userKey
     */
    private String userKey;
    /**
     * productKey
     */
    private String productKey;
    /**
     * deviceKey
     */
    private String deviceKey;
    /**
     * 设备sn号
     */
    private String sn;  
    /**
     * 时间戳
     */
    private long time;
    /**
     * 数据流名称
     */
    private String stream;
    /**
     * 数据类型
     */
    private int type;
    /**
     * 数据值
     */
    private Object data;
    /**
     * 推送类型
     */
    private int pushType;

# 设备事件类型数据格式

    /**
     * userKey
     */
    private String userKey;

    /**
     * productKey
     */
    private String productKey;

    /**
     * deviceKey
     */
    private String deviceKey;
    
    /**
     * 设备sn号
     */
    private String sn;  

    /**
     * 数据值
     */
    private Object data;
    /**
     * 时间戳
     */
    private long time;

    /**
     * 事件类型(1: info (信息)2: Alert (告警)3: Error(故障))
     */
    private Integer eventType;

    /**
     * 输出数据
     */
    private Object output;
    /**
     * 命令标识符
     */
    private String identifier;

# 安全保障

# 签名认证

签名采用 HMAC-SHA1 算法,将生成的签名字符串与对应的密钥作为参数进行签名,并进行Base64编码,并将得到的signature值与UUID、当前时间戳通过 "-" 连接以得到最终的加密字符串。说明如下:

● 签名字符串:productKey-queryKey-nonce-timestamp(nonce为生成的UUID,timestamp为当前时间戳)

● 密钥:querySecret

代码示例:

   public static String encryptPassword(String productKey, String queryKey, String querySecret) {
        final String bar = "\u002d";
        final String nonce = UUID.randomUUID().toString().replaceAll(bar, "");
        long timestamp = System.currentTimeMillis();
        String content = String.format("%s%s%s%s%s%s%s", productKey, bar, queryKey, bar, nonce, bar, timestamp);
        String tempSignature = signWithHmacsha1(querySecret, content);
        return String.format("%s%s%s%s%s", tempSignature, bar, nonce, bar, timestamp);
    }
    private static String signWithHmacsha1(String secret, String content) {
        try {
            byte[] keyBytes = secret.getBytes("utf-8");
            SecretKey secretKey = new SecretKeySpec(keyBytes, "HmacSHA1");
            Mac mac = Mac.getInstance("HmacSHA1");
            mac.init(secretKey);
            byte[] rawHmac = mac.doFinal(content.getBytes("utf-8"));
            return (new BASE64Encoder()).encode(rawHmac);
        } catch (Exception var) {
           //异常处理
        }
        return null;
    }

# 数据解密

这里的解密是指对获取到的数据进行解密以获取数据明文,平台中的数据都是通过AES/CBC/PKCS5Padding算法加密,说明如下:

●加密秘钥:dataSecret

●密文:contentString

代码示例:

  public static String decodeWithAesCbc(String dataSecret,String contentString) {
        if (secret.length() < AES_KEY_SIZE) {
            return null;
        }
        if (secret.length() > AES_KEY_SIZE) {
            secret = secret.substring(0, AES_KEY_SIZE);
        }
        try {
            byte[] content = Base64.decodeBase64(contentString);
            SecretKeySpec key = new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8.name()), AES);
            Cipher cipher = Cipher.getInstance(AES_CBC);
            byte[] ivBytes = new byte[16];
            System.arraycopy(content, 0, ivBytes, 0, 16);
            IvParameterSpec iv = new IvParameterSpec(ivBytes);
            cipher.init(Cipher.DECRYPT_MODE, key, iv);
            byte[] encBytes = new byte[content.length - 16];
            System.arraycopy(content, 16, encBytes, 0, encBytes.length);
            byte[] result = cipher.doFinal(encBytes);
            return new String(result);
        } catch (Exception e) {
          //异常处理

        }
        return null;
    }