Flink基础(二)

本文主要参考《Flink原理和实践》–鲁蔚征编著

状态和检查点

有状态的计算是流处理框架要实现的重要功能,因为复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。

状态的获取和更新流程

两种基本类型的状态

Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。Managed State是由Flink管理的,Flink负责存储、恢复和优化;Raw State是由开发者管理的,需要自己进行序列化与反序列化。

对Managed State细分有:Keyed State和Operator State。

Keyed State是KeyedStream上的状态。假如输入流按照ID为Key进行了keyBy()分组,形成一个KeyedSteam,那么数据流中所有ID为1的数据共享一个状态,可以访问和更新这个状态。

Operator State可以用在所有算子上,每个算子子任务共享一个状态,流入这个算子子任务的所有数据都可以访问和更新这个状态。

无论是Keyed State还是Operator State,Flink的状态都是基于本地的,即每个算子子任务维护着自身的状态,不能访问其他算子子任务的状态。

横向扩展问题

状态的横向扩展问题主要是指修改Flink应用的并行度,每个算子的并行子任务数发生了变化,应用需要关停或启动一些算子子任务,某份在原来某个算子子任务上的状态数据需要平滑迁移到新的算子子任务上。Flink上的Checkpoint可以辅助迁移状态数据。算子的本地状态将数据生成快照(Snapshaot),保存到分布式存储系统(如HDFS)上。横向扩展后,算子子任务数变化,子任务重启,相应的状态从分布式存储系统上重建(Restore)。

Flink算子扩容时的状态迁移过程

Checkpoint机制的原理及配置方法

Flink的状态是基于本地的,而Flink又是一个部署在多节点的分布式系统,分布式系统经常出现被kill、节点宕机或网络中断等问题,那么本地状态在遇到故障如何保证不丢失呢?Flink定期保存状态数据到存储空间上,故障发生后从之前的备份中恢复,这个过程被称为Checkpoint机制。

Flink分布式快照流程

Checkpoint机制大致流程

  1. 暂停处理新流入数据,将数据缓存起来
  2. 将算子子任务的本地状态数据复制到一个远程的持久化存储空间
  3. 继续处理新流入的数据,包括刚才缓存的数据

Flink在Chandy-Lamprot算法基础上实现了一种分布式快照算法。Flink的检查点协调器(Checkpoint Coordinator)触发一次Checkpoint,则Checkpint Barrier被插入到数据流中。每个Checkpint Barrier都有一个ID,表示该段数据属于哪次Checkpoint。如图所示,当ID为n的Checkpint Barrier到达每个算子后,表示要对n-1和n之间的状态更新做Snapshot。

数据流与Checkpoint Barrier

Flink的检查点协调器触发一次Checkpoint时,这个请求会发送到Source的各个子任务。Source算子各子任务接收到这个Checkpoint请求后,会将自身状态写入State Backend,生成一次Snapshot,并向下游广播Checkpoint Barrier。

Source算子将自身状态写入状态后端,向下游广播Checkpoint Barrier

Source算子做完Snapshot后,还会给Checkpoint发送确认(Acknowlegement,ACK),ACK包括刚才备份到State Backend的状态的句柄(或者叫指向状态的指针),告知自己已做完了相应工作。至此,Source完成了一次Checkpoint。

对于下游算子来说,可能有多个与之相连的上游输入,我们将算子之间的连线称为通道。Source要将一个ID为n的Checkpoint Barrier向所有下游算子广播,这也意味着下游算子的多个输入通道里都会收到ID为n的Checkpoint Barrier;而且不同输入通道里Checkpoint Barrier的流入速度不同,ID为n的Checkpoint Barrier到达的时间不同。Checkpoint Barrier传播的过程需要进行对齐(Barrier Alignment)。

对齐的步骤如下:

  1. 算子子任务在某个输入通道中收到第一个ID为n的CheckpointBarrier,但是其他输入通道中ID为n的Checkpoint Barrier还未到达,该算子子任务开始准备进行对齐。
  2. 算子子任务将第一个输入通道的数据缓存下来,同时继续处理其他输入通道的数据,这个过程被称为对齐。
  3. 第二个输入通道中ID为n的Checkpoint Barrier抵达该算子子任务,所有通道中ID为n的Checkpoint Barrier都到达该算子子任务;该算子子任务执行Snapshot,将状态写入State Backend;然后将ID为n的Checkpoint Barrier向下游所有输出通道广播。
  4. 对于这个算子子任务,Snapshot执行结束,继续处理各个通道中新流入的数据,包括刚才缓存的数据。
Barrier在算子间传播过程

进行对齐,主要是保证一个Flink作业中所有算子的状态是一致的。也就是说,保证一个Flink作业前后所有算子写入State Backend的状态都基于同样的数据。

数据流图中每个算子子任务都要完成一遍上述的对齐、Snapshot、确认的工作,当最后所有Sink算子确认完成Snapshot之后,说明ID为n的Checkpoint执行结束。Checkpoint Coordinator向State Backend写入一些本次Checkpoint的元数据。

State Backend

在Flink的分布式快照机制中,State Backend起到了持久化存储数据的重要功能。Flink将State Backend抽象成了一种插件,并提供了三种State Backend,每种State Backend对数据的保存和恢复方式略有不同。

MemoryStateBackend

MemoryStateBackend主要基于内存,它将数据存储在Java的堆区。当进行分布式快照时,所有算子子任务将自己内存上的状态同步到JobManager的堆上。因此,一个作业的所有状态数据量要小于JobManager的内存大小。这种方式显然不能存储过大的状态数据,否则将抛出OutOfMemoryError异常。这种方式只适合调试或者实验,不建议在生产环境中使用。

FsStateBackend

这种方式下,数据持久化到文件系统上,文件系统包括本地磁盘、HDFS以及包括AWS、阿里云等在内的云存储服务。使用时,我们要提供文件系统的地址,尤其要写明前缀,比如:file://、hdfs://或s3://。

Flink的本地状态仍然在TaskManager的内存堆区上,直到执行Snapshot时,状态数据会写到所配置的文件系统上。因此,这种方式能够享受本地内存的快速读/写访问,也能保证大容量状态作业的故障恢复能力。

RocksDBStateBackend

这种方式下,本地状态存储在本地的RocksDB上。RocksDB是一种嵌入式Key-Value数据库,数据实际保存在本地磁盘上。比起FsStateBackend的本地状态存储在内存中,RocksDB利用了磁盘空间,所以可存储的本地状态数据量更大。然而,每次从RocksDB中读/写数据都需要进行序列化和反序列化,因此读/写本地状态的成本更高。执行Snapshot时,Flink将存储于本地RocksDB的状态同步到远程的存储上,因此使用这种State Backend时,也要配置分布式存储空间的地址。

重启策略

一般情况下,一个作业遇到一些异常情况会导致执行异常,潜在的异常情况包括:硬件故障、部署环境抖动、流量激增、输入数据异常等。如果一个作业发生了重启,并且触发故障的原因没有根除,那么重启之后仍然会出现故障。因此,在解决根本问题之前,一个作业很可能无限次地故障重启,陷入死循环。

为了避免重启死循环,Flink提供了如下3种重启策略。

  • 固定延迟(Fixed Delay)策略:作业每次失败后,按照设定的时间间隔进行重启尝试,重启次数不会超过某个设定值。
  • 失败率(Failure Rate)策略:计算一个时间段内作业失败的次数,如果失败次数小于设定值,继续重启,否则不重启。
  • 不重启(No Restart)策略:不对作业进行重启。

重启策略的前提是作业设置了Checkpoint,如果作业未设置Checkpoint,则会使用No Restart的策略。重启策略可以在conf/fink-conf.yaml中设置,所有使用这个配置文件执行的作业都将采用这样的重启策略;也可以在单个作业的代码中配置重启策略。

Savepoint机制的原理及使用方法

目前,Checkpoint机制和Savepoint机制在代码层面使用的分布式快照逻辑基本相同,生成的数据也近乎一样,那它们到底有哪些功能性的区别呢?Checkpoint机制的目的是为了故障重启,使得作业中的状态数据与故障重启之前的保持一致,是一种应对意外情况的有力保障。Savepoint机制的目的是手动备份数据,以便进行调试、迁移、迭代等,是一种协助开发者的支持功能。

Flink的Checkpoint机制设计初衷为:

  1. Checkpoint过程是轻量级的,尽量不影响正常数据处理;
  2. 故障恢复越快越好。开发者需要进行的操作并不多,少量的操作包括:设置多大的间隔来定期进行Checkpoint,使用何种State Backend。

相比而下,Savepoint机制主要考虑的是:

  1. 刻意备份;
  2. 支持修改状态数据或业务逻辑。

Savepoint相关操作是有计划的、人为的。开发者要手动触发、管理和删除Savepoint。比如,将当前状态保存下来之后,我们可以更新并行度、修改业务逻辑代码,甚至在某份代码基础上生成一个对照组来验证一些实验猜想。可见,Savepoint机制的数据备份和恢复都需要更高的时间和人力成本,Savepoint机制数据也必须有一定的可移植性,能够适应数据或逻辑上的改动。具体而言,Savepoint机制的潜在应用场景如下:

  • 我们可以给同一份作业设置不同的并行度,来找到最佳的并行度设置,每次可以从Savepoint中加载原来的状态数据。
  • 我们想测试一个新功能或修复一个已知的bug,并用新的程序逻辑处理原来的数据。
  • 进行一些A/B测试,使用相同的数据源测试程序的不同版本。
  • 因为状态可以被持久化存储到分布式文件系统上,我们甚至可以将同样一个应用程序从一个集群迁移到另一个集群,只需保证不同的集群都可以访问该文件系统。

可见,Checkpoint机制和Savepoint机制是Flink提供的两个相似的功能,它们满足了不同的需求,以确保一致性、容错性,满足了作业升级、bug修复、迁移、A/B测试等不同场景。

读/写Savepoint中的数据

Flink提供了一个名为State Processor API的功能,允许开发者读/写Savepoint中的数据。它主要基于DataSet API,将Savepoint数据从远程存储空间读到内存中,再对Savepoint数据进行处理,然后保存到远程存储空间上。有了State ProcessorAPI,开发者在状态的修改和更新上有更大的自由度。

Flink提供的另外一个读取状态的API为Queryable State。使用Queryable State可以查询状态中的数据,其原理与State Processor API有相同之处。但是,两者侧重点各有不同,Queryable State重在查询状态,主要针对正在执行的线上服务;State Processor API可以修改状态,主要针对写入Savepoint中的数据。

Flink连接器

在实际生产环境中,数据可能存放在不同的系统中,比如文件系统、数据库或消息队列。一个完整的Flink作业包括Source和Sink两大模块,Source和Sink肩负着Flink与外部系统进行数据交互的重要功能,它们又被称为外部连接器(Connector)。

Flink端到端的Exactly-Once保障

故障恢复与一致性保障

某条数据投递到某个流处理系统后,该系统对这条数据只处理一次,并提供Exactly-Once保障是一种理想的情况。如果系统不出任何故障,那堪称完美。然而在现实世界中,系统经常受到各类意外因素的影响而发生故障,比如流量激增、网络抖动、云服务资源分配出现问题等。如果发生了故障,Flink重启作业,读取Checkpoint中的数据,恢复状态,并重新执行计算。

Checkpoint和故障恢复过程可以保证内部状态的一致性,但有数据重发的问题,如图7-1所示。假设系统最近一次Checkpoint时间戳是3,系统在时间戳10处发生故障,在Checkpoint之后和故障之前的3到10期间,系统已经处理了一些数据(图7-1所示的时间戳为5和8的数据)。在实际场景中,我们无法预知故障发生的时间,只能在故障发生后,收到报警信息,并知道最近一次的Checkpoint时间戳是3。作业重启后,我们可以从最近一次的Checkpoint数据中恢复状态,整个作业的状态被初始化到时间戳3处。为了保证一致性,时间戳3以后的数据需要重新处理一遍。Flink的Checkpoint过程保证了一个作业内部的数据一致性,主要是因为Flink对如下两类数据做了备份。(1)作业中每个算子的状态;(2)输入数据的偏移量Offset。

故障发生之前,可能一部分数据已经被一些算子处理了,甚至可能已经被发送到外部系统了,重启后,这些数据又被重新发送一次。一条数据不是只被处理一次,而是有可能被处理了多次(即At-Least-Once)。从结果的准确性角度来说,我们期望一条数据只影响一次最终结果。如果一个系统能保证一条数据只影响一次最终结果,我们称这个系统提供端到端的Exactly-Once保障。

端到端的Exactly-Once问题是分布式系统领域最具挑战性的问题之一,很多系统都在试图攻克这个问题。在这个问题上,Flink内部状态的一致性主要依赖Checkpoint机制,外部交互的一致性主要依赖Source和Sink提供的功能。Source需要支持重发功能,Sink需要采用一定的数据写入技术,比如幂等写或事务写。对于Source重发功能,只要我们记录了输入的偏移量Offset,作业重启后数据发送方根据该Offset重新开始发送数据即可。Kafka的Producer除了发送数据,还能将数据持久化写到日志文件中。如果下游作业重启,Kafka Producer根据下游作业提供的Offset,从持久化的日志文件中定位到数据,可以重新开始向下游作业发送数据。Source的重发功能会导致数据被处理多次,为了保证只对下游作业产生一次影响,还需要依赖Sink的幂等写或事务写。

幂等写

幂等写(Idempotent Write)是指,任意多次向一个系统写入数据,只对目标系统产生一次结果影响。例如,重复向一个HashMap里插入同一个(Key,Value)二元组,第一次插入时该HashMap发生变化,后续的插入操作不会改变HashMap的结果,这就是一个幂等写操作。重复地对一个整数执行加法操作就不是幂等写,因为多次操作后,该整数会“变大”。

像Cassandra、HBase和Redis这样的Key-Value数据库一般用来作为Sink,用以实现端到端的Exactly-Once保障。需要注意的是,并不是一个Key-Value数据库就完全支持幂等写。幂等写对(Key,Value)数据本身有要求,那就是(Key,Value)必须是可确定性(Deterministic)计算的。假如我们设计的Key是name+curTimestamp,每次执行数据重发时,系统当前时间会发生变化,生成的Key都不相同,就会产生多次结果,整个操作不是幂等的。如果我们把Key改为name+eventTimestamp,由于Event Time的确定性,即使有数据重发,一条数据生成的Key也是可以确定的。因此,为了追求端到端的Exactly-Once保障,我们设计业务逻辑时要尽量使用确定性的计算逻辑和数据模型。

事务写

事务(Transaction)是数据库系统所要解决的核心问题。Flink借鉴了数据库中的事务处理技术,同时结合自身的Checkpoint机制来保证Sink只对外部输出产生一次影响。简单概括,Flink的事务写(Transaction Write)是指,Flink先将待输出的数据保存下来,暂时不向外部系统提交;等到Checkpoint结束,Flink上、下游所有算子的数据都一致时,将之前保存的数据全部提交到外部系统。换句话说,只有经过Checkpoint确认的数据才向外部系统写入。

在事务写的具体实现上,Flink目前提供了两种方式:预写日志(Write-Ahead-Log,WAL)和两阶段提交(Two-Phase-Commit,2PC)。这两种方式也是很多数据库和分布式系统实现事务时经常采用的方式,Flink根据自身的条件对这两种方式做了适应性调整。这两种方式的主要区别在于:Write-Ahead-Log方式使用Operator State缓存待输出的数据;如果外部系统自身支持事务,比如Kafka,就可以使用Two-Phase-Commit方式,待输出数据被缓存在外部系统。

事务写能提供端到端的Exactly-Once保障,它的代价也是非常明显的,即牺牲延迟。输出数据不再实时写入外部系统,而是分批次地提交。目前来说,没有完美的故障恢复和Exactly-Once保障机制,对于开发者来说,需要权衡不同需求。

自定义Source和Sink

Flink1.11之前的Source

这里将先重点介绍老的Source接口,因为老的Source接口更易于理解和实现,之后会简单介绍新的Source接口的原理。Flink 1.11之前的Source接口已经存在较长时间,如果用户想自己定义一个Source,需要实现一个名为SourceFunction的接口。

Source的公开接口:SourceFunction的接口和RichSourceFunction的Rich函数类。自定义Source时必须实现两个方法。

实现SourceFunction接口

run()方法在Source启动后开始执行,一般都会在方法中使用循环,在循环内不断向下游发送数据,发送数据时使用SourceContext.collect()方法。cancel()方法停止向下游继续发送数据。由于run()方法内一般会使用循环,可以使用一个boolean类型的标志位来标记Source是否在执行。当停止Source时,也要修改这个标志位。自定义Source,从0开始计数,将数字发送到下游。

RichSourceFunction提供了RuntimeContext,以及增加了open()方法用来初始化资源,close()方法用来关闭资源。RuntimeContext指运行时上下文,包括并行度、监控项MetricGroup等。比如,我们可以使用getRuntimeContext().getIndexOfThisSubtask()获取当前子任务是多个并行子任务中的哪一个。

可恢复的Source

假如遇到故障,整个作业重启,Source每次从0开始,没有记录遇到故障前的任何信息,所以它不是一个可恢复的Source。我们讨论过,Source需要支持数据重发才能支持端到端的Exactly-Once保障。如果想支持数据重发,需要满足如下两点:

  • Flink开启Checkpoint机制,Source将数据Offset定期写到Checkpoint中。作业重启后,Flink Source从最近一次的Checkpoint中恢复Offset数据。
  • Flink所连接的上游系统支持从某个Offset开始重发数据。如果上游是Kafka,它是支持Offset重发的。如果上游是一个文件系统,读取文件时可以直接跳到Offset所在的位置,从该位置重新读取数据。

继承并实现了CheckpointedFunction,可以使用Operator State。整个作业第一次执行时,Flink会调用initializeState()方法,offset被设置为0,之后每隔一定时间触发一次Checkpoint,触发Checkpoint时会调用snapshotState()方法来更新状态到State Backend。如果遇到故障,重启后会从offsetState状态中恢复上次保存的Offset。

在run()方法中,我们增加了一个同步锁ctx.getCheckpointLock(),是为了当触发这次Checkpoint时,不向下游发送数据。或者说,等本次Checkpoint触发结束,snapshotState()方法执行完,再继续向下游发送数据。如果没有这个步骤,有可能会导致run()方法中Offset和snapshotState()方法中Checkpoint的Offset不一致。

并行版本

上面提到的Source都是并行度为1的版本,或者说启动后只有一个子任务在执行。如果需要在多个子任务上并行执行的Source,可以实现ParallelSourceFunction和RichParallelSourceFunction两个类。

Flink1.11之后的Source

Flink在1.11之后提出了一个新的Source接口,主要目的是统一流处理和批处理两大计算模式,提供更大规模并行处理的能力。

新的Source接口提出了3个重要组件。

  • 分片(Split):Split是将数据源切分后的一小部分。如果数据源是文件系统上的一个文件夹,Split可以是文件夹里的某个文件;如果数据源是一个Kafka数据流,Split可以是一个Kafka Partition。因为对数据源做了切分,Source就可以启动多个实例并行地读取。
  • 读取器(SourceReader):SourceReader负责Split的读取和处理,SourceReader运行在TaskManager上,可以分布式地并行运行。比如,某个SourceReader可以读取文件夹里的单个文件,多个SourceReader实例共同完成读取整个文件夹的任务。
  • 分片枚举器(SplitEnumerator):SplitEnumerator负责发现和分配Split。SplitEnumerator运行在JobManager上,它会读取数据源的元数据并构建Split,然后按照负载均衡策略将多个Split分配给多个SourceReader。

其中,Master进程中的JobManager运行着SplitEnumerator,各个TaskManager中运行着SourceReader,SourceReader每次向SplitEnumerator请求Split,Split Enumerator会分配Split给各个SourceReader。

新Source接口中的3个重要组件

自定义Sink

对于Sink,Flink提供的API为SinkFunction接口和RichSinkFunction函数类。使用时需要实现下面的虚方法。每条数据到达后都会调用invoke()方法发送到外部系统。我们讨论过,如果想提供端到端的Exactly-Once保障,需要使用幂等写和事务写两种方式。

写入外部系统一般是采用更新插入(Upsert)的方式,即将原有数据删除,将新数据插入,或者说将原有数据覆盖。一些Key-Value数据库经常被用来实现幂等写,幂等写也是一种实现成本相对比较低的方式。另外一种提供端到端Exactly-Once保障的方式是事务写,并且有两种具体的实现方式:Write-Ahead-Log和Two-Phase-Commit。两者非常相似,下面分别介绍两种方式的原理,并重点介绍Two-Phase-Commit的具体实现。

Write-Ahead-Log协议的原理

Write-Ahead-Log是一种广泛应用在数据库和分布式系统中的保证事务一致性的协议。Write-Ahead-Log的核心思想是,在数据写入下游系统之前,先把数据以日志(Log)的形式缓存下来,等收到明确的确认提交信息后,再将Log中的数据提交到下游系统。由于数据都写到了Log里,即使出现故障恢复,也可以根据Log中的数据决定是否需要恢复、如何进行恢复。

Two-Phase-Commit协议的原理和实现

Two-Phase-Commit是另一种广泛应用在数据库和分布式系统中的事务协议。与刚刚介绍的Write-Ahead-Log相比,Flink中的Two-Phase-Commit协议不将数据缓存在Operator State,而是将数据直接写入外部系统,比如支持事务的Kafka。因为Kafka提供了事务机制,开发者可以通过“预提交-提交”的两阶段提交方式将数据写入Kafka。

Flink中常用的Connector

Flink支持了绝大多数的常见大数据系统,从系统的类型上,包括了消息队列、数据库、文件系统等;从具体的技术上,包括了Kafka、Elasticsearch、HBase、Cassandra、JDBC、Kinesis、Redis等。各个大数据系统使用起来略有不同,接下来将重点介绍一下Flink内置I/O接口和Flink Kafka Connector,这两类Connector被广泛应用在很多业务场景中,具有很强的代表性。

Flink中常用的Connector

内置I/O接口

之所以给这类Connector起名为内置I/O接口,是因为这些接口直接集成在了Flink的核心代码中,无论在任何环境中,我们都可以调用这些接口进行数据输入/输出操作。与内置I/O接口相对应的是fink-connector子项目以及Apache Bahir项目中的Connector,fink-connector虽然是Flink开源项目的一个子项目,但是并没有直接集成到二进制包中。因此,使用Flink的内置I/O接口,一般不需要额外添加依赖,使用其他Connector需要添加相应的依赖。

Flink的内置I/O接口如下:

  • 基于Socket的Source和Sink
  • 基于内存集合的Source
  • 输出到标准输出的Sink
  • 基于文件系统的Source和Sink

像Socket、内存集合和打印这3类接口非常适合调试。此外,文件系统被广泛用于大数据的持久化,是大数据架构中经常涉及的一种组件。

Kafka是一个消息队列,它可以在Flink的上游向Flink发送数据,也可以在Flink的下游接收Flink的输出。Kafka是一个很多公司都采用的消息队列,因此非常具有代表性。具体接入方式这里不做介绍。

Table API & SQL的介绍和使用

为了方便开发和迭代,Flink基于DataStream/DataSet API提供了一个更高层的关系型数据库式的API——Table API & SQL。Table API & SQL有以下优点:

  • 结合了流处理和批处理两种场景,提供统一的对外接口
  • Table API & SQL均以关系型数据库中的表为基础模型,Table API和SQL两者结合非常紧密。Table API & SQL与其他平台使用习惯相似,例如Hive SQL、Spark DataFrame& SQL、Python pandas等,数据科学家可以快速将其他平台的使用方法迁移到Flink平台上
  • 比起DataStream/DataSet API,Table API & SQL的开发成本较低,可以广泛应用于数据探索、业务报表、商业智能等各类场景,适合企业大规模推广

Table API & SQL综述

在具体执行层面,Flink使用一个名为执行计划器(Planner)的组件将Table API或SQL语句中的关系型查询转换为可执行的Flink作业,并对作业进行优化。

Table API & SQL程序的骨架结构

目前的Table API & SQL要与DataStream/DataSet API相结合来使用,主要需要以下步骤:

  1. 创建执行环境(ExecutionEnvironment)和表环境(TableEnvironment)。
  2. 获取表。
  3. 使用Table API或SQL在表上做查询等操作。
  4. 将结果输出到外部系统。
  5. 调用execute(),执行作业。
创建TableEnvironment

使用Table API & SQL之前,要确定使用何种编程语言(Java/Scala)、进行批处理还是流处理以及使用哪种Planner。

获取表

在传统的关系型数据库中,表一般由开发者定义,在后续对外部系统提供服务的过程中,表是常驻数据库的,开发者不断在表上进行增、删、查、改操作。在数据分析领域,表的概念被拓展,表不仅包括了关系型数据库中传统意义上的表,也包括了文件、消息队列等。Flink是一个计算引擎,它不提供数据存储的功能,但是可以通过Connector连接不同的外部系统。为了基于外部数据进行Table API & SQL计算,Flink使用表表示广义上的表。它包括物理上确实存在的表,也包括基于物理表经过一些计算而生成的虚拟表,虚拟表又被称为视图(View)。

可见,如果想在Flink中使用表来查询数据,最重要的一步是将数据(数据库、文件或消息队列)读取并转化成一个表。我们可以在Flink作业运行时注册一个新的表,也可以获取已创建好的常驻集群的表。在每个Flink作业启动后临时创建的表是临时表(Temporary Table),随着Flink作业的结束,该表也被销毁,它只能在一个Flink Session中使用。

但是在更多的情况下,我们想跟传统的数据库一样提前创建好表,这些表后续可以为整个集群上的所有用户和所有作业提供服务,这种表被称为常驻表(Permanent Table)。常驻表可以在多个Flink Session中使用。

为了管理多个常驻表,Flink使用Catalog来维护多个常驻表的名字、类型(文件、消息队列或数据库)、数据存储位置等元数据(Metadata)信息。一个Flink作业可以连接某个Catalog,这样就可以直接读取其中的数据,生成表。有了Catalog功能,数据管理团队对数据源更了解,他们可以提前在Catalog中创建常驻表,注册好该表的Schema、注明该表使用何种底层技术、写明数据存储位置等;数据分析团队可以完全不用关心这些元数据信息,无须了解该表到底是存储在Kafka还是HDFS中,直接在该表上进行查询。

在表上执行语句
Table API

基于表,我们可以调用Table API来查询其中的数据。Table API和编程语言结合更紧密,我们可以在Table类上使用链式调用,调用Table类中的各种方法,执行各类关系型操作。

SQL

我们也可以直接对表执行SQL语句。SQL标准中定义了一系列语法和关键字,FlinkSQL用户可以基于SQL标准来编写SQL语句。与Table API中函数调用的方式不同,SQL语句是纯文本形式的。Flink SQL基于Apache Calcite(以下简称Calcite),Calcite提供了SQL解析器,并且Calcite支持SQL标准,因此Flink SQL也支持SQL标准。

将表结果输出

我们可以将查询结果通过TableSink输出到外部系统。TableSink和前文提到的DataStream API中的Sink很像,它是一个数据输出的统一接口,可以将数据以CSV、Parquet、Avro等格式序列化,并将数据发送到关系型数据库、Key-Value数据库、消息队列或文件系统上。TableSink与Catalog、Schema等概念紧密相关。

执行作业

Table API或者SQL调用经过Planner最终转化为一个JobGraph,Planner在中间起到一个转换和优化的作用。Table API和SQL首先转换为一个未经过优化的逻辑执行计划(LogicalPlan),其中Flink SQL使用Calcite进行SQL解析。之后优化器(Optimizer)会对Logical Plan进行优化,得到物理执行计划(Physical Plan)。Physical Plan转换为Flink的Transformation,然后转换为JobGraph,JobGraph可以提交到Flink集群上。

Table API & SQL从调用到执行的大致流程
获取表的具体方式

在Flink 1.11中,Table API & SQL与外部系统交互主要有如下两种方式:

  • 在程序中使用代码编程配置。
  • 使用声明式的语言,如SQL的数据库定义语言(Data Definition Language,DDL)或YAML文件。

无论哪种方式,都需要配置外部系统的必要参数、序列化方式和Schema。

具体细节不再做介绍,需要使用时补充于本小节。

Flink部署和配置

Flink的集群部署模式

Standalone集群

一个Standalone集群包括至少一个Master进程和至少一个TaskManager进程,每个进程作为一个单独的Java JVM进程。其中,Master节点上运行Dispatcher、ResourceManager和JobManager,Worker节点将运行TaskManager。

Hadoop YARN集群

在YARN上使用Flink有3种模式:Per-Job模式、Session模式和Application模式。Per-Job模式指每次向YARN提交一个作业,YARN为这个作业单独分配资源,基于这些资源启动一个Flink集群,该作业运行结束后,相应的资源会被释放。Session模式在YARN上启动一个长期运行的Flink集群,用户可以向这个集群提交多个作业。

Kubernetes集群

Kubernetes(简称K8s)是一个开源的Container编排平台。近年来,Container以及Kubernetes大行其道,获得了业界的广泛关注,很多信息系统正在逐渐将业务迁移到Kubernetes上。

在Flink 1.10之前,Flink的Kubernetes部署需要用户对Kubernetes各组件和工具有一定的了解,而Kubernetes涉及的组件和概念较多,学习成本较高。和YARN一样,Flink Kubernetes部署方式支持Per-Job和Session两种模式。为了进一步减小Kubernetes部署的难度,Flink 1.10提出了原生Kubernetes部署,同时也保留了之前的模式。新的Kubernetes部署非常简单,将会成为未来的趋势,因此本小节只介绍这种原生Kubernetes部署方式。

命令行工具

在生产环境中,Flink使用命令行工具(Command Line Interface)来管理作业的执行。命令行工具本质上是一个可执行脚本,名为fink,放置在Flink的主目录下的bin文件夹中。我们在之前也曾多次介绍过,它的功能主要包括:提交、取消作业,罗列当前正在执行和排队的作业、获取某个作业的信息,设置Savepoint等。

命令行工具完成以上功能的前提是,我们已经启动了一个Flink集群,命令行工具能够直接连接到这个集群上。默认情况下,命令行工具会从conf/fink-conf.yaml里读取配置信息。

提交作业

我们要提供一个打包好的用户作业JAR包。打包需要使用Maven,在自己的Java工程目录下执行mvn package,在target文件夹下找到相应的JAR包。

1
./bin/flink run [OPTIONS] <xxx.jar> [ARGUMENTS] 

任何一个Java程序都需要一个主类和main方法作为入口,启动WorldCount程序时,我们并没有提及主类,因为程序在pom.xml文件中设置了主类。确切地说,经过Maven打包生成的JAR包有文件META-INF/MANIFEST.MF,该文件里定义了主类。如果我们想明确使用自己所需要的主类,可以使用-c<classname>--class<classname>来指定程序的主类。在一个包含众多main()方法的JAR包里,必须指定一个主类,否则会报错。

我们也可以在命令行中用-p选项设置这个作业的并行度。

管理作业

罗列当前作业的命令如下:

1
./bin/flink list 

触发一个作业执行Savepoint的命令如下,这行命令会通知作业ID为jobId的作业执行Savepoint,可以在后面添加路径,Savepoint会写入对应目录,该路径必须是Flink Master可访问到的目录,例如一个HDFS路径。

1
./bin/flink savepoint <jobId> [savepointDirectory] 

关停一个Flink作业的命令如下:

1
./bin/flink cancel <jobID>  

关停一个带Savepoint的作业的命令如下:

1
./bin/flink stop <jobID> 

从一个Savepoint恢复一个作业的命令如下:

1
./bin/flink run -s <savepointPath> [OPTIONS] <xxx.jar>

与Hadoop集成

需要时仔细了解。

双眼是雾 Flink基础(一)
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×