# 数据订阅文档
数据订阅服务旨在为第三方云平台提供高速、稳定、安全的数据服务。 由于推送服务不保留数据及网络延迟性等问题的存在,会造成数据的丢失,而本服务的数据将会保留一天,相比之下比APP推送服务更可靠。通过定制Kafka的认证、鉴权,数据订阅服务鉴权与HanClouds平台的鉴权机制完全一致,数据均通过加密处理,拥有足够的安全保障。数据订阅服务服务采用SASL/PLAIN认证机制,为充分保障数据的安全性,用户想要成功订阅到数据,必须通过认证、鉴权校验。
订阅步骤概要说明:
1.用户向HanClouds提交开通数据订阅的申请,由管理员进行审核,并为其开通。
2.数据订阅服务开通后,平台会自动根据用户下的产品级参数productKey进行topic的创建及授权,并将数据通过生成的dataSecret进行加密发送。
3.用户调用Kafka Client API开启SASL/PLAIN认证,并通过productKey,鉴权参数以及数据密钥订阅并消费数据。
# 快速开始
# 开通服务
如需使用数据订阅服务,可以在数据订阅界面选择所需开通该项服务的产品,并点击申请开通,在填写完成基本资料之后即可提交审核,审核通过后方可使用该服务。开通成功后可以在界面设置推送数据类型,目前支持设备日志、设备数据流、设备事件、设备命令、大数据计算结果五种数据类型的任意排列组合,设置后立即生效。并且还可以在界面暂停推送,暂停超过七天后,服务将会自动关闭,请谨慎使用。
# 编写订阅客户端
HanClouds提供了完整的客户端示例 (opens new window),只需简单修改一些配置即可使用。客户端版本分为V1和V2,V1版本的客户端只针对数据订阅服务的前身云接入服务的用户使用,数据订阅服务将不再使用该客户端进行数据的拉取。下面将介绍如何修改这些配置信息(主要针对V2客户端进行介绍):
- 将示例代码中的
PRODUCT_KEY
变量修改为需要订阅数据的productKey,数据订阅服务是使用productKey进行认证,鉴权以及创建topic; - 将
QUERY_KEY
以及QUERY_SECRET
修改为需订阅的产品下的queryKey和querySecret。queryKey以及querySecret主要是用于认证、鉴权,可在数据订阅服务界面点击鉴权信息获取; - 修改
DATA_SECRECT
为开通数据订阅服务时生成的dataSecret(数据密钥),用于数据加密,可在数据订阅服务界面获取; - 如果成功修改了上述配置,可以直接运行程序,控制台将打印出订阅的数据,当然你也可以对五种不同的数据类型做不同的处理,五种数据类型通过获取到的数据中的
dataType
字段进行区分,下文会进行更加详细的介绍,HanClouds只是提供一个可供参考的模板。
# 客户端详细说明
由于订阅服务的是通过Kafka实现的,所以订阅客户端的本质就是Kafka Client API中的Consumer API,每个数据订阅服务所对应的Topic就是productKey,但是由于数据订阅服务定制了Kafka的认证、鉴权机制,客户端需开启SASL认证,需要在客户端中需添加SaslConfigs.SASL_JAAS_CONFIG
配置选项,并将productKey作为userName,password则是对鉴权参数(queryKey,querySecret)通过HMAC-SHA1
签名算法所得。
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
PlainLoginModule.class.getName() + " required username=\"%s\" " + "password=\"%s\";",
USER_NAME,
password
));
另外因为Kafka客户端与服务端之间的通信协议是开源的,为提升用户数据的安全性,数据本身也进行了加密操作,所以需要在成功订阅消费到数据之后需要使用dataSecrect解密才能使用数据明文:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
PushServiceData accessData = JSON.parseObject(EncryptUtil.decodeWithAesCbc(DATA_SECRET, record.value()),
if (accessData != null){
Object data = PushServiceDataTypeEnum.getJsonDataByType(accessData.getDataType(), accessData.getData());
if (data != null){
System.out.printf("topic=%s, partition=%s, offset = %d, key = %s, value = %s%n",
record.topic(), record.partition(), record.offset(), record.key(), data.toString());
}
}
...
# 数据格式说明
数据订阅所支持的五种数据类型都会存在于同一个topic里面,通过dataType
进行区分,在示例工程中的PushServiceDataTypeEnum
中已经体现出了这种区别,完整的一条数据格式为:
{dataType=1,data='{"data":"test",....}'}
dataType
与数据类型的对应关系为:
设备数据流=1;
设备日志=2;
设备命令=3;
大数据计算结果=4;
设备事件=5;
每种数据的格式都会有所区别,下面将对这五种数据类型做一个简单的介绍。数据中的data字段根据type字段的值不同而表示不同的类型,映射关系如下:
type值 | 数据流类型 | data类型 | 补充说明 |
---|---|---|---|
0 | 无效的数据类型 | ||
1 | Json | String | UTF-8编码的Json串 |
2 | Int | Int | |
3 | String | String | UTF-8编码的字符串 |
4 | Double | Double | |
5 | Bin | String | 二进制进行base64后的字符串,字符编码格式为UTF-8 |
6 | Boolean | Boolean | |
7 | Array | String | UTF-8编码的Json数组 |
8 | Enum | Int | |
9 | Float | Float | |
10 | Gps | String | UTF-8编码的Json串 |
# 设备数据流数据格式
/**
* userKey
*/
private String userKey;
/**
* productKey
*/
private String productKey;
/**
* deviceKey
*/
private String deviceKey;
/**
* 设备sn号
*/
private String sn;
/**
* 时间戳
*/
private long time;
/**
* 数据流名称
*/
private String stream;
/**
* 数据类型
*/
private int type;
/**
* 数据值,视类型不同,形式有所不同
*/
private Object data;
...
# 设备日志数据格式
/**
* deviceKey
*/
private String deviceKey;
/**
* 设备sn号
*/
private String sn;
/**
* 设备日志类型,具体编码参见 https://console.hanclouds.com/doc/api/dataTypes/Event.html
*/
private int event;
/**
* productKey
*/
private String productKey;
/**
* 日志时间
*/
private long time;
/**
* userKey
*/
private String userKey;
# 设备命令数据格式
/**
* 命令下发目标设备所属的userKey
*/
private String userKey;
/**
* 命令下发目标设备所属的productKey
*/
private String productKey;
/**
* 命令下发目标的deviceKey
*/
private String deviceKey;
/**
* 设备sn号
*/
private String sn;
/**
* 命令Id,每个命令都有一个独一无二的命令Id
*/
private String cmdId;
/**
* 命令的状态,具体编码参见 https://console.hanclouds.com/doc/api/dataTypes/Command.html#%E5%91%BD%E4%BB%A4%E7%8A%B6%E6%80%81%E5%88%97%E8%A1%A8
*/
private int state;
/**
* 命令状态发生的时间
*/
private long time;
/**
* 命令到期时间
*/
private long timeDue;
/**
* 数据类型,具体编码参见 https://console.hanclouds.com/doc/api/dataTypes/DataType.html#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E5%88%97%E8%A1%A8
*/
private int type;
/**
* 下发命令或设备响应命令的内容
*/
private Object data;
# 大数据计算结果数据格式
/**
* userKey
*/
private String userKey;
/**
* productKey
*/
private String productKey;
/**
* deviceKey
*/
private String deviceKey;
/**
* 设备sn号
*/
private String sn;
/**
* 时间戳
*/
private long time;
/**
* 数据流名称
*/
private String stream;
/**
* 数据类型
*/
private int type;
/**
* 数据值
*/
private Object data;
/**
* 推送类型
*/
private int pushType;
# 设备事件类型数据格式
/**
* userKey
*/
private String userKey;
/**
* productKey
*/
private String productKey;
/**
* deviceKey
*/
private String deviceKey;
/**
* 设备sn号
*/
private String sn;
/**
* 数据值
*/
private Object data;
/**
* 时间戳
*/
private long time;
/**
* 事件类型(1: info (信息)2: Alert (告警)3: Error(故障))
*/
private Integer eventType;
/**
* 输出数据
*/
private Object output;
/**
* 命令标识符
*/
private String identifier;
# 安全保障
# 签名认证
签名采用 HMAC-SHA1
算法,将生成的签名字符串与对应的密钥作为参数进行签名,并进行Base64
编码,并将得到的signature
值与UUID、当前时间戳通过 "-" 连接以得到最终的加密字符串。说明如下:
● 签名字符串:productKey-queryKey-nonce-timestamp(nonce为生成的UUID,timestamp为当前时间戳)
● 密钥:querySecret
代码示例:
public static String encryptPassword(String productKey, String queryKey, String querySecret) {
final String bar = "\u002d";
final String nonce = UUID.randomUUID().toString().replaceAll(bar, "");
long timestamp = System.currentTimeMillis();
String content = String.format("%s%s%s%s%s%s%s", productKey, bar, queryKey, bar, nonce, bar, timestamp);
String tempSignature = signWithHmacsha1(querySecret, content);
return String.format("%s%s%s%s%s", tempSignature, bar, nonce, bar, timestamp);
}
private static String signWithHmacsha1(String secret, String content) {
try {
byte[] keyBytes = secret.getBytes("utf-8");
SecretKey secretKey = new SecretKeySpec(keyBytes, "HmacSHA1");
Mac mac = Mac.getInstance("HmacSHA1");
mac.init(secretKey);
byte[] rawHmac = mac.doFinal(content.getBytes("utf-8"));
return (new BASE64Encoder()).encode(rawHmac);
} catch (Exception var) {
//异常处理
}
return null;
}
# 数据解密
这里的解密是指对获取到的数据进行解密以获取数据明文,平台中的数据都是通过AES/CBC/PKCS5Padding
算法加密,说明如下:
●加密秘钥:dataSecret
●密文:contentString
代码示例:
public static String decodeWithAesCbc(String dataSecret,String contentString) {
if (secret.length() < AES_KEY_SIZE) {
return null;
}
if (secret.length() > AES_KEY_SIZE) {
secret = secret.substring(0, AES_KEY_SIZE);
}
try {
byte[] content = Base64.decodeBase64(contentString);
SecretKeySpec key = new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8.name()), AES);
Cipher cipher = Cipher.getInstance(AES_CBC);
byte[] ivBytes = new byte[16];
System.arraycopy(content, 0, ivBytes, 0, 16);
IvParameterSpec iv = new IvParameterSpec(ivBytes);
cipher.init(Cipher.DECRYPT_MODE, key, iv);
byte[] encBytes = new byte[content.length - 16];
System.arraycopy(content, 16, encBytes, 0, encBytes.length);
byte[] result = cipher.doFinal(encBytes);
return new String(result);
} catch (Exception e) {
//异常处理
}
return null;
}