联 系 我 们
售前咨询
售后咨询
微信关注:星环科技服务号
更多联系方式 >

技术博客

首页>博客资讯>2013 Hadoop 中国技术峰会 (China Hadoop Summit 2013) 现场视频>

2013 Hadoop 中国技术峰会 (China Hadoop Summit 2013) 现场视频

发布时间 2013-11-21

基于流的SQL引擎:StreamSQL(基础介绍)中已介绍的,Inceptor StreamSQL是用于替代Scala和API 来简化流计算编程的类SQL声明式语言。StreamSQL的计算运行于流计算引擎Transwarp Slipstream之上,该引擎混合了事件驱动和微批处理,因此既可以支持有低延迟需求的任务也可以处理高吞吐任务,能够应对不同类型业务。事件驱动模式是Slipstream的重要特性,本文将针对此模式,介绍如何正确的利用StreamSQL设置事件驱动模式,通过Slipstream进行低延迟的计算处理。

事件驱动的流处理

事件驱动的流处理,是指StreamSQL引擎逐条读取数据源流入的每条记录,进行必要的业务逻辑加工后,再输出到StreamSQL支持的输出终端。相比于等待消息集合后再打包的微批(mini-batch)处理模式,事件驱动的流处理延迟更低,在对延迟敏感的业务场景中表现更佳。

启用事件驱动模式

为了让StreamSQL运行在事件驱动模式下,我们需要在Transwarp Manager上为StreamSQL服务配置相应的参数,设置NGMR_ENGINE_MODE参数为morphling。注意,如果让StreamSQL运行在微批模式,要把NGMR_ENGINE_MODE设置为mapred。

 

应用实例

用户Emily有三个任务需要以低延迟实现。为了让StreamSQL以事件驱动的模式来处理流数据,她需要在触发StreamJob前配置参数streamsql.use.eventmode=true。StreamSQL中触发StreamJob的详细步骤可以参考基于流的SQL引擎:StreamSQL(基础介绍)

1. 从流表导数据到普通表

Emily希望从Topic tps1中查询数据,并放在普通表中,在事件驱动模式下实现此业务,通过如下的语句完成:

SET streamsql.use.eventmode=true;

CREATE STREAM s1(score INT, name STRING) TBLPROPERTIES("topic"="tps1","kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092");

CREATE TABLE t1(score INT, name STRING);

INSERT INTO t1 SELECT * FROM s1;

任务运行过程中,Emily在任务管理页面上发现,事件驱动模式下提交的StreamJob是一个常驻的Active Job

 

2. Window Stream聚合后插入普通表

接着Emily要根据Topic tps1创建一个流,对它按事件时间切分窗口,再以窗口为时间区间做聚合,然后将结果插入普通表。这个业务涉及到窗口的概念,下面先来对窗口做以简单介绍。StreamSQL中流处理的窗口(STREAMWINDOW)分为下面两种。

滑动窗口:滑动窗口需要由两个量来定义,窗口长度(LENGTH)和滑动间隔(SLIDE)。滑动窗口是指按照一定的SLIDE 向未来滑动的长度为 LENGTH 的窗口。例如:如果窗口长度为2s,滑动间隔为1s,那么第一个窗口为[0s,2s),第二个窗口为[1s, 3s),第三个窗口为[2s, 4s),以此类推。

跳动窗口:当窗口间隔和滑动间隔相同,滑动窗口就退化为跳动窗口。换句话说,跳动窗口就是滑动窗口 LENGTH =SLIDE 的特例。例如:INTERVAL 为2s跳动窗口第一个区间为[0s, 2s),第二个区间为[2s, 4s),第三个区间为[4s, 6s),以此类推。

另外,StreamSQL中有两种切分窗口的方式:

系统时间(System Time)切分:以流处理引擎处理的时间为基准切分窗口。

事件时间(Event Time)切分:将数据中的某指定个字段作为时间字段切分窗口。

接下来,Emily将在事件驱动模式下,创建一个按事件时间切分的Window Stream,其中LENGTH为4s,SLIDE为2s,对它做聚合然后将结果插入普通表:

SET streamsql.use.eventmode=true;

-- 使用事件时间

SET streamsql.use.eventtime=true;

CREATE STREAM s1(score INT, name STRING, ts STRING) TBLPROPERTIES("topic"="tps1","kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092","timefield"="ts","timeformat"="yyyy-MM-dd HH:mm:ss","use.lowlevel.consumer"="true");

-- 创建Window Stream

CREATE STREAM s1win AS SELECCT * FROM s1 STREAMWINDOW (LENGTH ‘4’ SECOND SLIDE ‘2’ SECOND);

CREATE TABLE t1(score INT, name STRING);

INSERT INTO t1 SELECT SUM(score), name FROM s1win GROUP BY NAME;

3. 两个Window Stream关联后插入普通表

后Emily需要在事件驱动模式下创建两个带窗口的流(LENGTH为4s,SLIDE为2s),并对它们在name字段上做关联,然后将结果写入某张普通表。实现的语句是这样的:

SET streamsql.use.eventmode=true;

-- 使用事件时间

SET streamsql.use.eventtime=true;

CREATE STREAM s1(score INT, name STRING, ts STRING) TBLPROPERTIES("topic"="tps1","kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092","timefield"="ts","timeformat"="yyyy-MM-dd HH:mm:ss","use.lowlevel.consumer"="true");

-- 创建Window Stream s1win

CREATE STREAM s1win AS SELECT * FROM s1 STREAMWINDOW (LENGTH ‘4’ SECOND SLIDE ‘2’ SECOND);

 

CREATE STREAM s2(class INT, name STRING, ts STRING) TBLPROPERTIES(“topic”=”tps2”,"kafka.zookeeper"="tw-node127:2181", “kafka.broker.list”=”tw-node127:9092”,”timefield”=”ts”,”timeformat”=”yyyy-MM-dd HH:mm:ss”,"use.lowlevel.consumer"="true");

-- 创建Window Stream s2win

CREATE STREAM s2win AS SELECCT * FROM s2 STREAMWINDOW (LENGTH ‘4’ SECOND SLIDE ‘2’ SECOND);

CREATE TABLE t1(score INT, class INT, name STRING);

INSERT INTO t1 SELECT score, class, s1win.name FROM s1win JOIN s2win ON s1win.name=s2win.name;

 

结语

由于Slipstream对于事件驱动模式和微批模式的混合支持,用户可以方便的实现低延时和高吞吐的实时计算。在事件驱动的模式下,数据触发的计算任务延迟可以低至5毫秒,这样就可以用来开发对延迟时间敏感度较高的应用,例如在线反欺诈应用。而在微批处理的模式下,Slipstream能够提供很高的吞吐,适合运用在某些对吞吐量要求较高的特殊行业,例如交通的视频检测。而有了StreamSQL对模式切换的语义支持,用户就可以灵活的借助Slipstream的实时计算能力应对不同业务的需求。

关键词:
Hadoop,中国技术峰会,SQL

热门产品

  • TDC星环数据云平台(TDC),基于云原生技术融合数据 PaaS、分析PaaS、应用 PaaS,实现数据端到端全生命周期管理。

  • TDS数据开发 | 数据治理 | 共享交换 支撑企业级数据治理和数据资产平台建设

  • SophonSophon-星环智能分析工具,分布式计算、多模态处理、图形化建模、隐私密保护、云边化一体。

  • KunDB星环分布式交易型数据库 SQL兼容、强一致、高性能、高可用

  • ArgoDBTranswarp ArgoDB 是星环科技自主研发的分布式分析型闪存数据库,可以替代Hadoop+MPP混合架构。支持标准SQL语法,提供多模分析、实时数据处理、存算解耦、混合负载、数据联邦、异构服务器混合部署等领先技术能力。