首页 / 新闻

04.

22

2017

用StreamSQL实现事件驱动的实时计算

技术博客

基于流的SQL引擎:StreamSQL(基础介绍)中已介绍的,Inceptor StreamSQL是用于替代Scala和API 来简化流计算编程的类SQL声明式语言。StreamSQL的计算运行于流计算引擎Transwarp Slipstream之上,该引擎混合了事件驱动和微批处理,因此既可以支持有低延迟需求的任务也可以处理高吞吐任务,能够应对不同类型业务。事件驱动模式是Slipstream的重要特性,本文将针对此模式,介绍如何正确的利用StreamSQL设置事件驱动模式,通过Slipstream进行低延迟的计算处理。

事件驱动的流处理

事件驱动的流处理,是指StreamSQL引擎逐条读取数据源流入的每条记录,进行必要的业务逻辑加工后,再输出到StreamSQL支持的输出终端。相比于等待消息集合后再打包的微批(mini-batch)处理模式,事件驱动的流处理延迟更低,在对延迟敏感的业务场景中表现更佳。

启用事件驱动模式

为了让StreamSQL运行在事件驱动模式下,我们需要在Transwarp Manager上为StreamSQL服务配置相应的参数,设置NGMR_ENGINE_MODE参数为morphling。注意,如果让StreamSQL运行在微批模式,要把NGMR_ENGINE_MODE设置为mapred。

应用实例

用户Emily有三个任务需要以低延迟实现。为了让StreamSQL以事件驱动的模式来处理流数据,她需要在触发StreamJob前配置参数streamsql.use.eventmode=true。StreamSQL中触发StreamJob的详细步骤可以参考基于流的SQL引擎:StreamSQL(基础介绍)

1. 从流表导数据到普通表

Emily希望从Topic tps1中查询数据,并放在普通表中,在事件驱动模式下实现此业务,通过如下的语句完成:

SET streamsql.use.eventmode=true;

CREATE STREAM s1(score INT, name STRING) TBLPROPERTIES("topic"="tps1","kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092");

CREATE TABLE t1(score INT, name STRING);

INSERT INTO t1 SELECT * FROM s1;

任务运行过程中,Emily在任务管理页面上发现,事件驱动模式下提交的StreamJob是一个常驻的Active Job,如下图所示

2. Window Stream聚合后插入普通表

接着Emily要根据Topic tps1创建一个流,对它按事件时间切分窗口,再以窗口为时间区间做聚合,然后将结果插入普通表。这个业务涉及到窗口的概念,下面先来对窗口做以简单介绍。StreamSQL中流处理的窗口(STREAMWINDOW)分为下面两种。

  • 滑动窗口:滑动窗口需要由两个量来定义,窗口长度(LENGTH)和滑动间隔(SLIDE)。滑动窗口是指按照一定的SLIDE 向未来滑动的长度为 LENGTH 的窗口。例如:如果窗口长度为2s,滑动间隔为1s,那么第一个窗口为[0s,2s),第二个窗口为[1s, 3s),第三个窗口为[2s, 4s),以此类推。

  • 跳动窗口:当窗口间隔和滑动间隔相同,滑动窗口就退化为跳动窗口。换句话说,跳动窗口就是滑动窗口 LENGTH =SLIDE 的特例。例如:INTERVAL 为2s跳动窗口第一个区间为[0s, 2s),第二个区间为[2s, 4s),第三个区间为[4s, 6s),以此类推。

另外,StreamSQL中有两种切分窗口的方式:

  • 系统时间(System Time)切分:以流处理引擎处理的时间为基准切分窗口。

  • 事件时间(Event Time)切分:将数据中的某指定个字段作为时间字段切分窗口。

接下来,Emily将在事件驱动模式下,创建一个按事件时间切分的Window Stream,其中LENGTH为4s,SLIDE为2s,对它做聚合然后将结果插入普通表:

SET streamsql.use.eventmode=true;

-- 使用事件时间

SET streamsql.use.eventtime=true;  

CREATE STREAM s1(score INT, name STRING, ts STRING) TBLPROPERTIES("topic"="tps1","kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092","timefield"="ts","timeformat"="yyyy-MM-dd HH:mm:ss","use.lowlevel.consumer"="true");

-- 创建Window Stream

CREATE STREAM s1win AS SELECCT * FROM s1 STREAMWINDOW (LENGTH ‘4’ SECOND SLIDE ‘2’ SECOND);  

CREATE TABLE t1(score INT, name STRING);

INSERT INTO t1 SELECT SUM(score), name FROM s1win GROUP BY NAME;

3. 两个Window Stream关联后插入普通表

最后Emily需要在事件驱动模式下创建两个带窗口的流(LENGTH为4s,SLIDE为2s),并对它们在name字段上做关联,然后将结果写入某张普通表。实现的语句是这样的:

SET streamsql.use.eventmode=true;

-- 使用事件时间

SET streamsql.use.eventtime=true; 

CREATE STREAM s1(score INT, name STRING, ts STRING) TBLPROPERTIES("topic"="tps1","kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092","timefield"="ts","timeformat"="yyyy-MM-dd HH:mm:ss","use.lowlevel.consumer"="true");

 -- 创建Window Stream s1win

CREATE STREAM s1win AS SELECT * FROM s1 STREAMWINDOW (LENGTH ‘4’ SECOND SLIDE ‘2’ SECOND); 

 

CREATE STREAM s2(class INT, name STRING, ts STRING) TBLPROPERTIES(“topic”=”tps2”,"kafka.zookeeper"="tw-node127:2181", “kafka.broker.list”=”tw-node127:9092”,”timefield”=”ts”,”timeformat”=”yyyy-MM-dd HH:mm:ss”,"use.lowlevel.consumer"="true");

 -- 创建Window Stream s2win

CREATE STREAM s2win AS SELECCT * FROM s2 STREAMWINDOW (LENGTH ‘4’ SECOND SLIDE ‘2’ SECOND);

CREATE TABLE t1(score INT, class INT, name STRING);

INSERT INTO t1 SELECT score, class, s1win.name FROM s1win JOIN s2win ON s1win.name=s2win.name;

结语

由于Slipstream对于事件驱动模式和微批模式的混合支持,用户可以方便的实现低延时和高吞吐的实时计算。在事件驱动的模式下,数据触发的计算任务延迟可以低至5毫秒,这样就可以用来开发对延迟时间敏感度较高的应用,例如在线反欺诈应用。而在微批处理的模式下,Slipstream能够提供极高的吞吐,适合运用在某些对吞吐量要求较高的特殊行业,例如交通的视频检测。而有了StreamSQL对模式切换的语义支持,用户就可以灵活的借助Slipstream的实时计算能力应对不同业务的需求。

对此篇文章如有任何问题,欢迎以邮件形式联系我们:bigdataopenlab@transwarp.io