Flink基础(一)

本文主要参考资料为

1.Flink官方网站

2.《Flink原理和实践》–鲁蔚征编著

前言

大数据的特点被归纳为5个V。

V content
volume 数据量大
velocity 数据产生速度快
variety 数据类型繁多
veracity 数据真实性
value 数据价值

计算机诞生之后,一般是在单台计算机上处理数据。大数据时代到来后,传统的数据处理方法逐渐无法满足大数据的处理需求。将一组计算机组织为一个集群,利用集群的力量来处理大数据逐渐成为主流。这种使用集群进行计算的方式被称为分布式计算,当前几乎所有的大数据系统都在使用集群进行分布式计算。分布式计算背后的思想很朴素,即分而治之,又称分治法,指将一个原始问题分解为多个子问题,多个子问题分别在多台计算机上求解,借助必要的数据交换和合并策略,将子结果汇总即可求出最终结果的方法。

我们简单介绍了大数据的5个V的特点,大数据的分治法,而实际面对海量数据和各不相同的业务逻辑,很难用一套技术或一套方案来解决所有大数据问题。实际上,大数据技术是一整套方案,包括存储、计算和提供在线服务等多个重要部分,而且与数据形态、业务逻辑、提供何种价值等多方面的因素有关。

大数据的5个V中提到,数据量大且产生速度快。从时间维度讲,数据源源不断产生,形成一个无界数据流(Bounded Data Stream)。如图所示,单条数据被称为事件(Event),事件按照时序排列会形成一个数据流。例如,我们每时每刻的运动数据都会累积到手机传感器上,金融交易随时随地都在发生。数据流中的某段有界数据流(Bounded Data Stream)可以组成一个数据集。我们通常所说的对某份数据进行分析,指的是对某个数据集进行分析。相对应地,数据以流(Stream)的方式持续不断地产生着的,流处理(Stream Processing)就是对数据流进行处理,批处理(Batch Processing)是指对一批数据进行处理。

有界数据与无界数据

流处理的一些基本概念

延迟和吞吐

延迟表示一个事件被系统处理的总时间,一般以毫秒为单位。吞吐表示一个系统最多能处理多少事件,一般以单位时间处理的事件数量为标准。延迟与吞吐其实并不是相互孤立的,它们相互影响。如果延迟高,那么很可能吞吐较低,系统处理不了太多数据。

窗口

在流处理场景下,数据以源源不断的流的形式存在,数据一直在产生,没有始末,因此在处理时往往需要明确一个时间窗口。常见的时间窗口有三种:滚动窗口、滑动窗口、会话窗口。

三种窗口
  • 滚动窗口(Tumbling Window)模式一般定义一个固定的窗口长度,长度是一个时间间隔,比如小时级的窗口或分钟级的窗口。窗口像车轮一样,滚动向前,任意两个窗口之间不会包含同样的数据。

  • 滑动窗口(Sliding Window)模式也设有一个固定的窗口长度。假如我们想每分钟开启一个窗口,统计10分钟内的股票价格波动,就使用滑动窗口模式。当窗口的长度大于滑动的间隔,可能会导致两个窗口之间包含同样的事件。其实,滚动窗口模式是滑动窗口模式的一个特例,滚动窗口模式中滑动的间隔正好等于窗口的大小。

  • 会话窗口(Session Window)模式的窗口长度不固定,而是通过一个间隔来确定窗口,这个间隔被称为会话间隔(Session Gap)。当两个事件之间的间隔大于会话间隔,则两个事件被划分到不同的窗口中;当事件之间的间隔小于会话间隔,则两个事件被划分到同一窗口。

    会话(Session)本身是一个用户交互概念,常常出现在互联网应用上,一般指用户在某App或某网站上短期内产生的一系列行为。比如,用户在手机淘宝上短时间大量的搜索和点击的行为,这系列行为事件组成了一个会话。接着可能因为一些其他因素,用户暂停了与App的交互,过一会用户又使用App,经过一系列搜索、点击、与客服沟通,最终下单。

时间

常见的时间语义有:

  • Event Time:事件实际发生的事件
  • Processing Time:事件被流处理引擎处理的事件

对于一个事件,自其发生起,Event Time就已经确定不会改变。因各类延迟、流处理引擎各个模块先后处理顺序等因素,不同节点、系统内不同模块、同一数据不同次处理都会产生不同的Processing Time。

Watermark

虽然使用Event Time更准确,但问题在于,因为各种不可控因素,事件上报会有延迟,那么最多要等待多长时间呢?从服务器的角度来看,在事件到达之前,我们也无法确定是否有事件发生了延迟,如何设置等待时间是一个很难的问题。数据的实时性和准确性二者不可兼得,Watermark是一种折中解决方案,它假设某个时间点上,不会有比这个时间点更晚的上报数据

状态

状态是流处理区别于批处理的特有概念。举一个温度报警的例子,当系统在监听到“高温”事件后10分钟内又监听到“冒烟”事件,系统必须及时报警。在这个场景下,流处理引擎把“高温”事件作为状态记录下来,并判断这个状态接下来十分钟内是否有“冒烟”事件。

检查点

检查点(Checkpoint)机制其实并不是一个新鲜事物,它广泛存在于各类计算任务上,主要作用是将中间数据保存下来。当计算任务出现问题,重启后可以根据Checkpoint中保存的数据重新恢复任务。在流处理中,Checkpoint主要保存状态数据。

数据一致性保障

流处理任务可能因为各种原因出现故障,比如数据量暴涨导致内存溢出、输入数据发生变化而无法解析、网络故障、集群维护等。事件进入流处理引擎,如果遇到故障并重启,该事件是否被成功处理了呢?

  • At-Most-Once:每个事件最多被处理一次,也就是说,有可能某些事件直接被丢弃,不进行任何处理。这种投递保障最不安全,因为一个流处理系统完全可以把接收到的所有事件都丢弃。
  • At-Least-Once:无论遇到何种状况,流处理引擎能够保证接收到的事件至少被处理一次,有些事件可能被处理多次。例如,我们统计文本数据流中的单词出现次数,事件被处理多次会导致统计结果并不准确。
  • Exactly-Once:无论是否有故障重启,每个事件只被处理一次。Exactly-Once意味着事件不能有任何丢失,也不能被多次处理。比起前两种保障,Exactly-Once的实现难度非常高。如遇故障重启,Exactly-Once就必须确认哪些事件已经被处理、哪些还未被处理。Flink在某些情况下能提供Exactly-Once的保障。

Flink概述

Flink是由德国3所大学发起的学术项目,后来不断发展壮大,并于2014年年末成为Apache顶级项目之一,在2015年前后逐渐成熟的Flink是一个支持在有界和无界数据流上做有状态计算的大数据处理框架。在德语中,“fink”表示快速、敏捷,以此来表征这款计算框架的特点。

Flink数据流图

Flink程序分为三大部分,第1部分读取数据源(Source),第2部分对数据做转换操作(Transformation),第3部分将转换结果输出到一个目的地(Sink)。

Flink分布式架构与核心组件

为了支持分布式执行,Flink跟其他大数据框架一样,采用了主从(Master-Worker)架构。Flink执行时主要包括如下两个组件。Master是一个Flink作业的主进程。它起到了协调管理的作用。TaskManager,又被称为Worker或Slave,是执行计算任务的进程。它拥有CPU、内存等计算资源。Flink作业需要将计算任务分发到多个TaskManager上并行执行。

考虑到数据分布在多个节点的情况,逻辑视图只是一种抽象,需要将逻辑视图转化为物理执行图,才能在分布式环境下执行。在物理视图中,数据流分布在2个分区上。空心箭头部分表示数据流分区,圆圈部分表示算子在分区上的算子子任务(Operator Subtask)。

逻辑视图
物理执行图

Flink作业提交过程

Flink任务提交流程
  1. 用户编写应用程序代码,并通过Flink客户端(Client)提交作业。程序一般为Java或Scala语言,调用Flink API,构建逻辑视图。代码和相关配置文件被编译打包,被提交到Master的Dispatcher,形成一个应用作业(Application)
  2. Dispatcher接收到这个作业,启动JobManager,这个JobManager会负责本次作业的各项协调工
  3. JobManager向ResourceManager申请本次作业所需资源
  4. 由于在第0步中TaskManager已经向ResourceManager中注册了资源,这时闲置的TaskManager会被反馈给JobManager
  5. JobManager将用户作业中的逻辑视图转化为图所示的并行化的物理执行图,将计算任务分发部署到多个TaskManager上。至此,一个Flink作业就开始执行了

Flink核心组件

在作业提交流程上,我们对涉及的各个组件进行更详细的介绍。

Client

用户一般使用Client提交作业,比如Flink主目录下bin目录中提供的命令行工具。Client会对用户提交的Flink作业进行预处理,并把作业提交到Flink集群上。Client提交作业时需要配置一些必要的参数,比如使用Standalone集群还是YARN集群等。整个作业被打成了JAR包,DataStream API被转换成了JobGraph,JobGraph是一种类似图3-2所示的逻辑视图。

Dispatcher

Dispatcher可以接收多个作业,每接收一个作业,Dispatcher都会为这个作业分配一个JobManager。Dispatcher对外提供一个表述性状态转移(RepresentationalState Transfer,REST)式的接口,以超文本传输协议(Hyper Text TransferProtocal,HTTP)来对外提供服务。

JobManager

JobManager是单个Flink作业的协调者,一个作业会有一个JobManager来负责。JobManager会将Client提交的JobGraph转化为ExecutionGraph,ExecutionGraph是类似图3-3所示的并行的物理执行图。JobManager会向ResourceManager申请必要的资源,当获取足够的资源后,JobManager将ExecutionGraph以及具体的计算任务分发部署到多个TaskManager上。同时,JobManager还负责管理多个TaskManager,包括收集作业的状态信息、生成检查点、必要时进行故障恢复等。

ResourceManager

如前文所述,Flink现在可以部署在Standalone、YARN或Kubernetes等环境上,不同环境中对计算资源的管理模式略有不同,Flink使用一个名为ResourceManager的模块来统一处理资源分配上的问题。在Flink中,计算资源的基本单位是TaskManager上的任务槽位(Task Slot,简称Slot)。ResourceManager的职责主要是从YARN等资源提供方获取计算资源,当JobManager有计算需求时,将空闲的Slot分配给JobManager。当计算任务结束时,ResourceManager还会重新收回这些Slot。

TaskManager

TaskManager是实际负责执行计算的节点。一般地,一个Flink作业是分布在多个TaskManager上执行的,单个TaskManager上提供一定量的Slot。一个TaskManager启动后,相关Slot信息会被注册到ResourceManager中。当某个Flink作业提交后,ResourceManager会将空闲的Slot提供给JobManager。JobManager获取到空闲的Slot后会将具体的计算任务部署到空闲Slot之上,任务开始在这些Slot上执行。在执行过程,由于要进行数据交换,TaskManager还要和其他TaskManager进行必要的数据通信。总之,TaskManager负责具体计算任务的执行,启动时它会将Slot资源向ResourceManager注册。

任务执行与资源划分

了解了Flink的分布式架构和核心组件,这里我们从更细粒度上来介绍从逻辑视图转化为物理执行图的过程,该过程可以分成4层:StreamGraph→JobGraph→ExecutionGraph→物理执行图。我们根据下图来大致了解这些图的功能。

Flink的四种图
  • StreamGraph:根据用户编写的代码生成的最初的图,用来表示一个Flink流处理作业的拓扑结构。在StreamGraph中,节点StreamNode就是算子。
  • JobGraph:JobGraph是被提交给JobManager的数据结构。StreamGraph经过优化后生成了JobGraph,主要的优化为,将多个符合条件的节点链接在一起作为一个JobVertex节点,这样可以减少数据交换所需要的传输开销。这个链接的过程叫算子链(Operator Chain)
  • ExecutionGraph:JobManager将JobGraph转化为ExecutionGraph。ExecutionGraph是JobGraph的并行化版本:假如某个JobVertex的并行度是2,那么它将被划分为2个ExecutionVertex,ExecutionVertex表示一个算子子任务,它监控着单个子任务的执行情况。每个ExecutionVertex会输出一个IntermediateResultPartition,这是单个子任务的输出,再经过ExecutionEdge输出到下游节点。ExecutionJobVertex是这些并行子任务的合集,它监控着整个算子的执行情况。ExecutionGraph是调度层非常核心的数据结构。
  • 物理执行图:JobManager根据ExecutionGraph对作业进行调度后,在各个TaskManager上部署具体的任务,物理执行图并不是一个具体的数据结构。

DataStream API的介绍和使用

Flink程序的骨架结构

一个Flink程序的核心业务逻辑主要包括:设置执行环境、进行Source、Transformation和Sink操作,最后要调用执行环境的execute()方法。

常见Transformation的使用用法

Flink的Transformation主要包括4种:单数据流基本转换、基于Key的分组转换、多数据流转换和数据重分布转换。

单数据流基本转换

map()

对一个DataStream中的每个元素使用用户自定义的Mapper函数进行处理,每个输入元素对应一个输出元素,最终整个数据流被转换成一个新的DataStream。我们可以重写MapFunction或RichMapFunction来自定义map(),RichMapFunction类是一种RichFunction类,它除了提供MapFunction类的基础功能,还提供了一系列其他方法,包括open()、close()、getRuntimeContext()和setRuntimeContext()等方法,重写这些方法可以创建状态数据、对数据进行广播、获取累加器和计数器等。下图展示了map()的工作原理。

map图示
filter()

filter()对每个元素进行过滤,过滤的过程使用一个Filter函数进行逻辑判断。也可以继承FilterFunction或RichFilterFunction,然后重写filter()方法,下图展示了filter()的工作原理。

filter图示
flatMap()

输入是包含圆形或正方形的列表,fatMap()过滤掉圆形,正方形列表被展平,以单个元素的形式输出。

flatMap图示

基于Key的分组转换

keyBy()

大多数情况下,我们要根据事件的某种属性或数据的某个字段进行分组,然后对一个分组内的数据进行处理。keyBy()将DataStream转换成一个KeyedStream。KeyedStream是一种特殊的DataStream,事实上,KeyedStream继承了DataStream,DataStream的各元素随机分布在各算子子任务中,KeyedStream的各元素按照Key分组,相同Key的数据会被分配到同一算子子任务中。我们需要向keyBy()传递一个参数,以告知Flink以什么作为Key进行分组。

keyBy图示

指定Key本质上是实现一个KeySelector,在Flink源码中,它定义如下:

1
2
3
4
5
6
// IN为数据流元素,KEY为所选择的Key 
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {
// 选择一个字段作为Key
KEY getKey(IN value) throws Exception;
}

我们可以重写getKey()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Word { 
public String word;
public int count;
public Word() {}
public Word(String word, int count) {
this.word = word;
this.count = count;
}
public static Word of(String word, int count) {
return new Word(word, count);
}
@Override
public String toString() {
return this.word + ": " + this.count;
}
}

DataStream<Word> wordStream = senv.fromElements(
Word.of("Hello", 1), Word.of("Flink", 1),
Word.of("Hello", 2), Word.of("Flink", 2)
);

// 使用KeySelector
DataStream<Word> keySelectorStream = wordStream.keyBy(new KeySelector<Word, String> {
@Override
public String getKey(Word in) {
return in.word;
}
}).sum("count");
Aggregation

常见的聚合(Aggregation)函数有sum()、max()、min()等,这些聚合函数统称为聚合。使用聚合函数时,我们需要一个参数来指定按照哪个字段进行聚合。跟keyBy()相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以实现一个KeySelector。

  • sum()对该字段进行加和,并将结果保存在该字段中,它无法确定其他字段的数值,或者说无法保证其他字段的计算结果。
  • max()/min()对该字段求最大值,并将结果保存在该字段中。对于其他字段,该函数并不能保证其数值的计算结果。
  • maxBy()/minBy()对该字段求最大值,maxBy()与max()的区别在于,maxBy()同时保留其他字段的数值,即maxBy()返回数据流中最大的整个元素,包括其他字段。

其实,这些聚合函数里已经使用了状态数据,比如,sum()内部记录了当前的和,max()内部记录了当前的最大值。聚合函数的计算过程其实就是不断更新状态数据的过程。由于内部使用了状态数据,而且状态数据并不会被清除,因此一定要慎重地在一个无限数据流上使用这些聚合函数。对于一个KeyedStream,一次只能使用一个聚合函数,无法链式使用多个。

reduce

reduce()的原理:reduce()在KeyedStream上生效,它接受两个输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。

reduce图示

多数据流转换

union

在DataStream上使用union()可以合并多个同类型的数据流,或者说,可以将多个DataStream合并为一个新的DataStream。数据将按照先进先出(First In First Out)的模式合并,且不去重。

union图示
connect

union()虽然可以合并多个数据流,但有一个限制:多个数据流的数据类型必须相同。connect()提供了和union()类似的功能,即连接两个数据流,它与union()的区别如下:

  • connect()只能连接两个数据流,union()可以连接多个数据流
  • connect()所连接的两个数据流的数据类型可以不一致,union()所连接的两个或多个数据流的数据类型必须一致
  • 两个DataStream经过connect()之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且两个流之间可以共享状态。如图所示,connect()经常被应用于使用一个控制流对另一个数据流进行控制的场景,控制流可以是阈值、规则、机器学习模型或其他参数
connect图示

两个DataStream经过connect()之后被转化为ConnectedStreams。对于ConnectedStreams,我们需要重写CoMapFunction或CoFlatMapFunction。这两个接口都提供了3个泛型。这3个泛型分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型。在重写函数时,对于CoMapFunction,map1()方法处理第一个流的数据,map2()方法处理第二个流的数据;对于CoFlatMapFunction,fatMap1()方法处理第一个流的数据,fatMap2()方法处理第二个流的数据。

Flink并不能保证map1()/fatMap1()和map2()/fatMap2()两个方法调用的顺序,两个方法的调用顺序依赖于两个数据流中数据流入的先后顺序,即第一个数据流有数据到达时,map1()方法或fatMap1()方法会被调用,第二个数据流有数据到达时,map2()方法或fatMap2()方法会被调用。

并行度和数据重分布

并行度

Flink使用并行度来定义某个算子被切分为多少个算子子任务,并行度可以在一个Flink作业的执行环境层面统一设置,这样将影响该作业所有算子并行度,也可以对某个算子单独设置其并行度。如果不进行任何设置,默认情况下,一个作业所有算子的并行度会依赖于这个作业的执行环境。

数据重分布

默认情况下,数据是自动分配到多个子任务上的。有的时候,我们需要手动在多个子任务上进行数据分布。例如,我们知道某个子任务上的数据过多,其他子任务上的数据较少,产生了数据倾斜,这时需要将数据均匀分布到各个子任务上,以避免部分子任务负载过重。数据倾斜问题会导致整个作业的计算时间过长或者内存不足等问题。

本节涉及的各个数据重分布算子的输入是DataStream,输出也是DataStream。

  • shuffle:shuffe()基于正态分布,将数据随机分布到下游各算子子任务上。
  • rebalance和rescale;rebalance()使用Round-Ribon方法将数据均匀分布到各子任务上。Round-Ribon是负载均衡领域经常使用的均匀分布的方法,上游的数据会轮询式地均匀分布到下游的所有的子任务上。rescale()与rebalance()很像,也是将数据均匀分布到下游各子任务上,但它的传输开销更小,因为rescale()并不是将每个数据轮询地发送给下游每个子任务,而是就近发送给下游子任务。
  • broadcast;数据被复制并广播发送给下游的所有子任务上。
  • global:global()会将所有数据发送给下游算子的第一个子任务上,使用global()时要小心,以免造成严重的性能问题。
  • partitionCustom:partitionCustom()表示我们可以在DataStream上使用partitionCustom()来自定义数据重分布逻辑。partitionCustom()它有两个参数:第一个参数是自定义的Partitioner,我们需要重写里面的partition()方法,partition()方法返回一个整数,表示该元素将被分配到下游第几个子任务。;第二个参数表示对数据流哪个字段使用partiton()方法。

数据类型和序列化

几乎所有的大数据框架都要面临分布式计算、数据传输和持久化问题。数据传输过程前后要进行数据的序列化和反序列化:序列化就是将一个内存对象转换成二进制串,形成可网络传输或者可持久化的数据流。反序列化将二进制串转换为内存对象,这样就可以直接在编程语言中读/写和操作这个对象。一种最简单的序列化方法就是将复杂数据结构转化成JavaScript对象表示法(JavaScript ObjectNotation,JSON)格式。

Flink支持的数据类型

Flink支持的数据类型

用户自定义函数

接口、Lamda表达式

对于map()、fatMap()、reduce()等函数,我们可以实现MapFunction、FlatMapFunction、ReduceFunction等接口。这些接口签名中都有泛型参数,用来定义该函数的输入或输出的数据类型。我们要继承这些类,并重写里面的自定义函数。

Rich函数类

在上面两种自定义方法的基础上,Flink还提供了RichFunction函数类。从名称上来看,这种接口在普通的接口上增加了Rich前缀,比如RichMapFunction、RichFlatMapFunction或RichReduceFunction等。比起不带Rich前缀的函数类,Rich函数类增加了如下方法。

  • open()方法:Flink在算子调用前会执行这个方法,可以用来进行一些初始化工作。
  • close()方法:Flink在算子最后一次调用结束后执行这个方法,可以用来释放一些资源。
  • getRuntimeContext()方法:获取运行时上下文。每个并行的算子子任务都有一个运行时上下文,上下文记录了这个算子执行过程中的一些信息,包括算子当前的并行度、算子子任务序号、广播数据、累加器、监控数据。最重要的是,我们可以从上下文里获取状态数据。
Flink基础(二) Melancholy
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×