摘要
为了高效清洗具有时序性、周期性等特点的工业数据,首先利用分布式组件设计了一套流式清洗系统,系统以Mosquitto作为采集数据的汇集中心,以Flume为连接组件,以Kafka为缓冲组件,对接数据清洗组件,使系统具有高吞吐、大缓冲等优势。然后基于速度约束模型,设计了一种周期性数据清洗算法,综合工业数据的时序性、周期性、物理意义等特性,在原有速度约束算法基础上增加周期性检测和数据切片机制,以解决速度约束算法处理周期性数据的失真问题,提高可用度。最后文中以盾构掘进数据集为样本,验证了系统和算法的有效性,以及改进算法的适用性。
“工业4.0”背景下,生产制造过程、工程装备状态监测等环节产生了海量数
目前,常见的工业数据清洗模式为离线清洗。离线清洗系统对实时数据先存储、后清洗,如Kumar等
流式数据清洗系统通常集成了高精度、低偏差的清洗算法。工业时序数据的清洗算法通常以处理缺失值、错列、异常值等为目
本文为了高效清洗具有时序性、周期性等特点的工业数据,设计了一套面向周期性工业时序数据的流式清洗系统和清洗算法,使系统在处理周期性突变数据时,也具有良好的数据修复效果。首先,利用Flume、Kafka等组件,构建一套流式数据处理系统,克服传统离线清洗系统先存储、后清洗的实时性差问题;其次,在构建的流式清洗系统基础上,使用基于速度约束的清洗方
为满足工业数据先清洗、再存储的实时处理需求,本文使用主流的大数据框架,搭建了一套用于工业时序数据清洗、存储的流式处理系统。整套系统包括数据汇集组件,连接组件,缓存组件,清洗组件,存储组件,系统框架如

图1 清洗系统组件和架构
Fig.1 Component and architecture in the cleaning system
数据汇集组件选用Mosquitto MQTT Broker,其位于系统下层,使用MQTT协议实现底层采集装置和数据清洗系统的通信。采集的数据须包含时间戳以及至少一列的有效观测数据。Mosquitto汇集并封装采集的数据,随后数据被发往数据连接组件进行后续流转。
数据连接组件选用Flume框架,其在清洗系统中的位置和业务逻辑如
数据缓存组件由Kafka集群组成,用于待清洗数据的缓存和系统解耦,适配高速产生的数据和清洗组件较慢的处理速率。
数据清洗组件是数据清洗系统的核心部分。清洗组件从Kafka持续获取指定Topic下的数据,完成基于速度约束的数据清洗工作。消费者每隔一定间隔从Kafka消费数据,依据数据格式进行预处理,把原始数据封装成可清洗数据集;随后经过处理算法完成异常值检测和异常数据修复工作。
数据存储组件由HDFS、Hive、HBase构成。清洗组件依据Flume连接阶段数据标签的差异,调用不同API写入存储组件。清洗系统修复所有不满足约束的异常数据,并存储修复结果。但被清洗组件检测为异常的数据以及被连接组件标记为Error的数据,会被冗余存储,方便应用于故障检测等工业应用;存储组件同时存储修复前异常值和修复后结果,为数据清洗后的数据分析工作提供保障。
清洗系统由上述组件构成,作为清洗算法的载体,能够实现海量数据的流式处理和清洗任务,相比传统清洗系统具有低耦合、可扩展、易维护、大缓冲等特点。
首先给出速度约束的概念:一组时序数据中,若两个时序点数值的变化率限制在一定范围内,则称两点满足速度约束。

图2 时间序列和速度约束示
Fig.2 Time series and speed constraints
考察
(1) |
式中:xn,xm为两个点的数值;tn,tm为数据点对应的时间戳;smin, smax分别为两点速度约束的下限和上限。
现有的速度约束清洗方
根据速度约束条件,xk应当在xi的约束s所限制的范围内,
再考查
将xk本身和xk前时刻点约束范围端点以及后时刻点xk+1,xk+2,…,xk+m的回溯结果成候选结果集,记作Xk,则
为了满足最小修复距离原则,位于候选集Xk中间的数值通常具有较小的修复距离。把候选结果集Xk的中位数记作,就作为数据点的修复结果,
通常情况下,该方法能完成大多数清洗任

图3 盾构机样本数据和清洗结果
Fig.3 Sample data of a tunnel boring machine and the cleaning results
而速度约束方法用于此类设备的数据清洗时,不能获得良好的清洗结果。速度约束方法会把每个工作周期的数据按照连续数据清洗,导致两个周期间、周期末尾的数据点发生较大失真,见
为提高速度约束算法的适用性,对周期性数据清洗前,应先识别并划分数据的周期,限制窗口范围。以油缸行程数据为例,应仅对盾构推进时的数据进行清洗,油缸回退时的数据不应当参与速度约束求解。
算法1描述了本清洗系统中周期性数据速度约束清洗算法(Periodic Data Cleaning Based on Speed Method)的流程。数据清洗算法主要面向时序数据中的单点异常值,异常值的识别依据
算法模型以Kafka Consumer身份接入系统并作为清洗组件。清洗组件先依据Kafka配置连接到Kafka集群,再通过Kafka主题订阅待清洗数据。清洗组件每隔一段间隔调用数据拉取函数从集群中获取一批数据,记作DatasRecords。
随后,清洗组件调用预处理函数对每条数据Record做格式转换等预处理,预处理方式按照不同数据格式定制,预处理后的数据被暂存到RangofResult队列中等待清洗;当RangofResult中的数据达到一定窗口时进行数据清洗工作。数据清洗工作通过调用清洗周期时间序列函数完成,清洗后的结果被记录在CleanedDatas集合中。
算法1中的清洗周期时间序列函数会先用调用判断函数对队列RangOfResult的数据逐条判断,判断数据是否属于同一周期。周期判断的方法应依据现场数据进行特性化设计。本例中,有效工作周期和非工作周期数据间有明显的突变,对
数据被划分为周期片段后,调用算法2清洗周期片段函数清洗每一个切片数据。最后,清洗组件将算法2的返回结果写入存储系统。
算法1 数据清洗组件
Algorithm1 DataCleaningModule
输入 待处理的周期性时间序列
输出 清洗后的时间序列
主函数 数据清洗{
数据清洗消费者 = 新的Kafka Consumer(Kafka配置);
数据清洗消费者.连接和订阅(待清洗数据主题);
循环{
DatasRecords = 数据清洗消费者.从Kafka拉取数据(一段间隔);
对于DatasRecords中的每一个Record{
如果Record不为空{
Result = 预处理(Record);
RangofResult.添加(Result);}}
如果RangofResult.大小> 窗口大小{
CleanedDatas = 清洗周期时间序列(RangofResult);
清空RangofResult;}
写入数据存储组件(CleanedDatas );}}
函数 清洗周期时间序列(RangofResult){
对于RangofResult中的每一个Record{
归属的周期 = 判断(Record );
OneSplitPeriod.添加(归属的周期, Record );}
如果(OneSplitPeriod 不为空){
清洗后的序列片段 = 清洗周期片段 (OneSplitPeriod);
返回 清洗后的序列片段;}}
单个周期片段数据的速度约束清洗流程如算法2所
算法2 清洗周期片段
Algorithm2 CleanAFragmentofTimeSeries
输入 一个时间序列片段;
输出 清洗后的时间序列片段;
清洗后的时间序列片段 = 新建 一个列表;
对于 一个时间序列片段 中的每一个 xk{
候选集 = 新建 一个列表;
;
;
m个后续点 = 获取xk后的m个数据点;
对于 m个后续点 中的每一个 {
;
.添加(); |
;
.添加();} |
.add(); .add(); .add();
xk的清洗结果 = 判断包含关系(候选集的中位数, , );
清洗后的时间序列片段.添加(xk的清洗结果);}
返回 清洗后的时间序列片段。
为了测试周期性数据清洗算法用于周期时序数据的有效性,试验选取了来源于上海申通盾构集团的中铁CREC929-932系列盾构机掘进数据集进行模拟实验。原始样本集包含油缸行程、油缸推力等200维数据,选取周期性明显的A组油缸行程数据为清洗对象,再从此列数据中选取2 500个时序点组成原始序列,如

图4 油缸行程数据和异常值点示例
Fig.4 Cylinder stroke data and outliers
所有实验程序用Java实现,其中数据生成程序会模拟工业现场底层采集单元的数据上传。系统中各组件和算法程序运行在Intel(R) Xeon(R) CPU E5-2603 v4 @ 1.70GHz的CPU和8GB内存的PC上,操作系统为CentOS Linux Release 7.8。5台上述规格的PC组成实验集群,编号为PC1~PC5,集群配置如
试验集群组件 | 组件部署节点 |
---|---|
Hadoop-2.7.2 Namenode | PC1 |
Hadoop-2.7.2 Datanode | PC1、PC2、PC3、PC4、PC5 |
Mosquitto-1.4.14 MQTT | PC2 |
Flume-1.7.0 | PC3 |
Kafka-0.11 | PC2、PC3、PC4 |
Zookeeper-3.4.10 | PC1、PC2、PC3、PC4、PC5 |
数据清洗组件 | PC5 |
实验选用移动平均法SMA、加权平均法WMA、速度约束
本实验中,因盾构推进中途油缸不可回缩,油缸行程的速度约束值下限选取为0。速度约束的上限依据施工组织方案中的施工计划设定,油缸推进速度最大为80mm·mi
实验使用异常点替换原始序列中的部分样本点,来模拟工业现场采集到异常数据的场景,验证系统的可用性和算法的有效性。异常点的位置会对数据的清洗效果产生影响,清洗系统面向周期性数据,系统会检测并划分一组数据的不同周期,本实验中异常数据只被添加在油缸每个工作周期的推进行程中。
为体现算法在不同异常值分布下的泛化性,实验使用均匀分布和偏态分布模拟工程装备运转时出现的异常值。均匀分布用于模拟装备在正常工作阶段时普遍出现的异常值。异常数据的位置服从均匀分布,样本集中每个数据点等可能地出现异常;手动添加的均匀分布异常点与原本正常点数值上存在20%~30%的偏差。偏态分布用于模拟装备启动阶段的高概率异常。周期运转的大型装备,在工作循环的启动阶段,由于环境复杂,交叉作业多,易发生信号干扰,初始阶段异常值的出现概率高,正常工作阶段异常概率相对较低。在该场景下,实验使用Zipf分布模拟这种偏态分布;手动添加的偏态分布异常点与原本正常点数值上存在20%~30%的偏差。
首先直观地对比在两种分布下使用不同方法的清洗效果,分别在原始序列中添加1%的服从均匀分布的异常点和5%的服从偏态分布的异常点,图中圆点为手动添加的异常点,如
清洗系统运用不同算法清洗序列的结果如

图5 系统应用不同清洗算法的直观清洗结果(含服从均匀分布的1%异常值)
Fig.5 Cleaning results of different algorithms applied to the system (Including 1 % outliers obeying uniform distribution)

图6 系统应用不同清洗算法的直观清洗结果(含服从偏态分布的5%异常值)
Fig.6 Cleaning results of different algorithms applied to the system (Including 5 % outliers obeying skewed distribution)
为了测试周期性数据清洗算法在不同异常数据占比时的清洗效果,对比各算法在异常点比例为1%、2%、5%…25%的8组样本条件下的异常数据修复距离和,修复结果偏差和,清洗准确度3种指标。
异常数据修复距离是指修复前数据与修复后数据的距离;修复距离越小,则系统越能反应数据原有的变化趋势,修复距离和是指所有点的修复距离加和,如

图7 修复距离和、结果偏差和示
Fig.7 Repair distance and result distance
清洗系统使用不同算法的修复距离和如

图8 清洗系统应用不同算法的修复质量指标对比
Fig.8 Comparison of repair quality indicators for different algorithms used in the cleaning system
所有被修复的数据点和真实数据点的偏差值如
不同算法导致数据变化的比例如
综上,周期性数据清洗算法在用于周期性的工业数据修复时,且具有良好的泛化性,能用于不同类型的异常值分布。周期性数据清洗算法比平滑方法和普通速度约束方法有更小的修复距离,且修复结果能更好的反应数据变化趋势。通过周期识别与切分,提高了周期性清洗算法的准确度,对原本正确的数据影响较小。机器学习方法在异常值较少时清洗效果好,随着异常值比例增多修复效果波动大偏差增加,且受异常数据分布影响,而周期性清洗算法在不同数据分布下均有良好的清洗效果。
本文搭建了一套面向工业时序数据的流式清洗系统,系统以Mosquitto作为底层采集单元的汇集中心,通过Flume连接Mosquitto和Kafka实现整套系统的数据流转,将清洗后的数据进行存储或进分析。该系统相比传统的离线时序数据清洗系统,具有高吞吐,缓冲,解耦等优势。
本文还设计了一种基于速度约束的周期性数据清洗方法,在速度约束算法基础上将周期性数据进行周期识别和数据分片,解决速度约束算法用于周期突变数据的清洗失效问题,更好地完成具有周期性的工业数据清洗任务。最后通过实验验证了本系统在处理周期性工业数据时的可用性,验证了周期性清洗方法比原有速度约束、平滑算法有更小的修复距离和结果偏差,验证了周期性数据清洗算法对原始数据的影响比例更小,修复结果能更好的反应真实数据和现有数据的变化趋势;在不同异常数据分布和占比条件下,周期性数据清洗方法仍具有较好的泛化性。
作者贡献声明
王 耀:进行系统构建和算法设计,论文撰写;
赵 炯:拟定研究方向,项目规划和指导,论文修改和定稿;
周奇才:指导系统构建,指导论文修改;
熊肖磊:帮助数据分析;
陈传林、张恒:提供样本数据,分析实验场景,协助数据分析。
参考文献
王建民. 工业大数据技术综述[J]. 大数据, 2017, 3(6): 3. [百度学术]
WANG Jianmin. Survey on industrial big data[J]. Big Data, 2017, 3(6):3. [百度学术]
丁小欧, 王宏志, 于晟健.工业时序大数据质量管理[J]. 大数据, 2019, 5(6): 1. [百度学术]
DING Xiaoou, WANG hongzhi, YU Shengjian. Data quality management of industrialtemporal big data[J]. Big Data, 2019, 5(6): 1. [百度学术]
金晓航, 王宇, ZHANG Bin. 工业大数据驱动的故障预测与健康管理[J]. 计算机集成制造系统, 2022, 28(5): 1314. [百度学术]
JIN Xiaohang, WANG Yu , ZHANG Bin. Industrial big data-driven fault prognostics and health management[J]. Computer Integrated Manufacturing Systems, 2022, 28(5): 1314. [百度学术]
GIEREJ S. Big data in the industry — overview of selected issues[J]. Management Systems in Production Engineering, 2017, 25(4): 251. [百度学术]
CHU Xu, IHAB F I, KRISHNAN S, et al. Data cleaning: overview and emerging challenges[C]//Proceedings of the 2016 ACM SIGMOD. San Francisco California USA: International Conference on Management of Data, 2016: 2201-2206. [百度学术]
IHAB F I. Effective Data cleaning with continuous evaluation[J]. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, 2016, 39(2): 38. [百度学术]
DING Xiaoou, WANG Hongzhi, SU Jiaxuan, et al. Cleanits: a data cleaning system for industrial time series[J]. Proceedings of the VLDB Endowment, 2019, 12(12): 1786. [百度学术]
郑树泉, 覃海焕, 王倩. 工业大数据技术与架构[J]. 大数据, 2017, 3(4):14. [百度学术]
ZHENG Shuquan, TAN Haihuan, WANG Qin.Industrial big data technologies and architecture[J].Big Data, 2017, 3(4): 14. [百度学术]
SONG Shaoxu, ZHANG Aoqian, WANG Jianmin, et al. SCREEN: stream data cleaning under speed constraints[C]//Proceedings of the 2015 ACM SIGMOD. Melbourne Victoria [S.l.]: International Conference on Management of Data, 2015: 827-841. [百度学术]
BHATTACHARJEE A K, PARTHA C, SHAW M P, et al. ETL-based cleaning on database[J]. International Journal of Computer Applications, 2014, 105(8): 34. [百度学术]
陈志云,肖楚乔.基于Storm的工业流水线实时分析系统设计与实现[J]. 计算机应用与软件, 2017, 34(11): 48. [百度学术]
CHEN Zhiyun, XIAO Chuqiao. Design of industrial assembly line real-time analysis system based on storm and its implementation [J]. Computer Applications and Software, 2017, 34(11): 48. [百度学术]
GENG Daoqu , ZHANG Chengyun , XIA Chengjing, et al. Big data-based improved data acquisition and storage system for designing industrial data platform[J]. IEEE Access, 2019, 7: 44574. [百度学术]
张奥千. 时间序列数据清洗方法研究[D].北京:清华大学, 2018. [百度学术]
ZHANG Aoqian, Research on time series data cleaning[D]. Beijing:Tsinghua University, 2018. [百度学术]
丁小欧. 时态数据清洗关键技术研究[D]. 哈尔滨:哈尔滨工业大学, 2021. [百度学术]
DING Xiaoou. Research on key technologies of temporal data cleaning[D]. Harbin:Harbin Institute of Technology, 2021. [百度学术]
WANG Xi, WANG Chen. Time series data cleaning: a survey[J]. IEEE Access, 2020(8): 1866. [百度学术]
陈翅刚. 制造物联网海量RFID感知数据智能清洗处理技术研究[D]. 广州:广东工业大学, 2014. [百度学术]
CHEN Chigang. Research on clean technology of RFID sensing data of manufacturing in IoT[D]. Guangzhou:Guangdong University of Technology, 2014 [百度学术]
JEFFERY S R, ALONSO G, FRANKLIN M J, et al. A pipelined framework for online cleaning of sensor data streams[C]//IEEE 22nd International Conference on Data Engineering (ICDE'06). Atlanta Georgia USA:IEEE, 2006: 140-140. [百度学术]
SONG Shaoxu, GAO Fei, ZHANG Aoqian, et al. Stream data cleaning under speed and acceleration constraints[J]. ACM Transactions on Database Systems (TODS), 2021, 46(3): 1. [百度学术]