(3)Flink on k8s checkpoint 状态写入pv
1.简介
大家都知道,Flink 是一个有状态的分布式流式计算引擎,flink 中的每个function或者是operator都可以是有状态的,为了使得状态可以容错,flink引入了checkpoint机制。checkpoint使得flink能够恢复作业的状态和位置,从而为作业提供与无故障执行相同的语义。
CheckPoint的触发和状态数据管理主要由JobManager负责,JobManager需要将checkpoint生成的状态信息文件保存在外部Storage中,在基于On Yarn的部署模式下,这个Storage通常是HDFS,在On Kubernetes的模式下,这个Storage通常是PV,这样可以使Flink On K8s减少对其他第三方组件的依赖,比如Hadoopp的依赖。
2.Checkpoint 程序开发
为了演示在K8S 上运行flink checkpoint程序,新开发了一个checkpoint演示程序,这个程序实现的是WordCount单词统计功能。
在这个示例中,上游的Source使用Kafka,由Kafka的生产者程序往对应的topic发送字符流信息,在实际生成环境中,Flink的Source通常也是Kafka。
Flink Checkpoint程序读取Kafka Topic的字符流,然后进行分词和单词统计,并将统计结果Sink输出到MySQL数据库中。
为了确保即使程序终止重启后,可以在原有的单词统计结果上累计单词的统计结果,例如在Flink程序重启前,我们统计Hello的单词出现了5次,在重启程序继续消费Kafka的字符流数据时,当首次遇到包含Hello单词的字符流时,此时Hello的统计结果是6,而不是1。为此,我们在程序里开启checkpoint功能,这样JobManager就能定期周期性的触发checkpoint操作,将当前的统计结果,也就是checkpoint状态信息保存起来。
1、flink 主程序
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;
import java.util.Arrays;
import java.util.Properties;
public class StreamWordCountWithCP {
private static Logger logger = Logger.getLogger(StreamWordCountWithCP.class);
public static void main(String[] args) throws Exception {
logger.info("******************* StreamWordCountWithCP job start *******************");
// kafka server
String kafkaServer = "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092";
// kafka topic
String kafkaTopic = "flink_test";
// mysql数据库ip
String dbHost = "172.16.252.113";
// mysql数据库端口
String dbPort = "3306";
// 数据库名称
String dbName = "flink_test";
// 结果表
String table = "wc";
// checkpoint文件保存路径
String checkpointPath = "file:///Users/willwang/flink-on-k8s-demo/checkpoint";
// checkpoint保存时间间隔,默认10s
long interval = 10000;
// 并行度
int parallelism = 1;
// 从程序传参中获取参数
if (args != null && args.length == 9) {
kafkaServer = args[0];
kafkaTopic = args[1];
dbHost = args[2];
dbPort = args[3];
dbName = args[4];
table = args[5];
checkpointPath = args[6];
interval = Long.parseLong(args[7]);
parallelism = Integer.parseInt(args[8]);
logger.info("******************* kafkaServer=" + args[0] + ", " +
"kafkaTopic=" + args[1] + ", " +
"dbHost=" + args[2] + ", " +
"dbPort=" + args[3] + ", " +
"dbName=" + args[4] + ", " +
"table=" + args[5] + ", " +
"checkpointPath=" + args[6] + ", " +
"interval=" + args[7] + ", " +
"parallelism=" + args[8]);
}
// 0. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 1. 配置checkpoint
env.enableCheckpointing(interval);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.enableUnalignedCheckpoints();
checkpointConfig.setCheckpointStorage(checkpointPath);
// 2. 配置Kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaServer);
properties.setProperty("group.id", "wc-consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
// 3. 设置kafka source
DataStreamSource stream = env.addSource(new FlinkKafkaConsumer(
kafkaTopic,
new SimpleStringSchema(),
properties
));
// 4. 转换数据格式
SingleOutputStreamOperator wordAndOne = stream
.flatMap((String line, Collector words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 5. 分组
KeyedStream wordAndOneKS = wordAndOne
.keyBy(t -> t.f0);
// 6. 求和
SingleOutputStreamOperator result = wordAndOneKS
.sum(1);
// 7. 设置自定义sink,结果输出到MySQL
result.addSink(new MySQLSink(dbHost, dbPort, dbName, table));
env.execute("StreamWordCountWithCP");
}
}
2、sink mysql工具类
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.log4j.Logger;
import java.sql.*;
public class MySQLSink extends RichSinkFunction {
private static Logger logger = Logger.getLogger(MySQLSink.class);
private Connection conn;
private PreparedStatement insertPS;
private PreparedStatement updatePS;
private String dbHost;
private String dbPort;
private String dbName;
private String table;
public MySQLSink(String dbHost, String dbPort, String dbName, String table) {
this.dbHost = dbHost;
this.dbPort = dbPort;
this.dbName = dbName;
this.table = table;
}
@Override
public void open(Configuration parameters) throws Exception {
String url = "jdbc:mysql://"+dbHost+":"+dbPort+"/" + dbName + "?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC";
//建立mysql连接
conn = DriverManager.getConnection(url, "root", "111...aaa");
String sql = "INSERT INTO "+table+" (`word`, `cnt`) VALUES(?,?);";
insertPS = conn.prepareStatement(sql);
String sql2 = "UPDATE "+table+" set cnt = ? where word = ?;";
updatePS = conn.prepareStatement(sql2);
System.out.println("自定义sink,open数据库链接 =====");
logger.info("*** 自定义sink,open数据库链接 =====");
}
@Override
public void close() throws Exception {
if(insertPS != null){
insertPS.close();
}
if(updatePS != null){
updatePS.close();
}
if(conn != null){
conn.close();
}
System.out.println("自定义sink,close数据库连接 =====");
logger.info("*** 自定义sink,close数据库连接 =====");
}
@Override
public void invoke(Tuple2 value, Context context) throws Exception {
String sql = "select count(*) from "+table+" where word = '" + value.f0 + "'";
System.out.println(sql);
logger.info("*** " + sql);
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
long cnt = 0;
while (resultSet.next()) {
cnt = resultSet.getLong(1);
}
resultSet.close();
statement.close();
if (cnt > 0) {
System.out.println("update value=" + value);
updatePS.setLong(1,value.f1);
updatePS.setString(2,value.f0);
updatePS.executeUpdate();
} else {
System.out.println("insert value=" + value);
insertPS.setString(1,value.f0);
insertPS.setLong(2,value.f1);
insertPS.executeUpdate();
}
}
}
3.K8S 测试配置
1、创建checkpoint pvc 存储checkpoint的数据
[root@k8s-demo001 ~]# cat flink-checkpoint-application-pvc.yaml
# Flink checkpoint 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: flink-checkpoint-application-pvc # checkpoint pvc名称
namespace: flink # 指定归属的名命空间
spec:
storageClassName: nfs-storage #sc名称,更改为实际的sc名称
accessModes:
- ReadWriteMany #采用ReadWriteMany的访问模式
resources:
requests:
storage: 1Gi #存储容量,根据实际需要更改
2、通过application 模式进行测试
[root@k8s-demo001 ~]# cat application-deployment-checkpoint.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: flink
name: application-deployment-checkpoint # flink 集群名称
spec:
image: flink:1.13.6 # flink基础镜像
flinkVersion: v1_13 # flink版本,选择1.13
imagePullPolicy: IfNotPresent # 镜像拉取策略,本地没有则从仓库拉取
ingress: # ingress配置,用于访问flink web页面
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.checkpoints.dir: file:///opt/flink/checkpoints
serviceAccount: flink
jobManager:
resource:
memory: "1024m"
cpu: 1
taskManager:
resource:
memory: "1024m"
cpu: 1
podTemplate:
spec:
containers:
- name: flink-main-container
env:
- name: TZ
value: Asia/Shanghai
volumeMounts:
- name: flink-jar # 挂载nfs上的jar
mountPath: /opt/flink/jar
- name: flink-checkpoints # 挂载checkpoint pvc
mountPath: /opt/flink/checkpoints
volumes:
- name: flink-jar
persistentVolumeClaim:
claimName: flink-jar-pvc
- name: flink-checkpoints
persistentVolumeClaim:
claimName: flink-checkpoint-application-pvc
job:
jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar # 使用pv方式挂载jar包
entryClass: org.fblinux.StreamWordCountWithCP
args: # 传递到作业main方法的参数
- "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092"
- "flink_test"
- "172.16.252.113"
- "3306"
- "flink_test"
- "wc"
- "file:///opt/flink/checkpoints"
- "10000"
- "1"
parallelism: 1
upgradeMode: stateless
3、访问flink ui:验证checkpoint 也可以落地到实际的路径
http://flink.k8s.io/flink/application-deployment-checkpoint/#/job/453b1bd5f90756e7afe929c3e71f747b/checkpoints

4、通过往Kafka 发送数据,flink 程序也可以写入到mysql

对于application 作业,要是终止和重启这个作业,那么整个application的集群也会一起关闭掉,jobmanager和taskmanager也会一并释放掉,checkpoint数据也会丢失,因为新的作业checkpoint路径是会变化的,这个时候需要查询pv获取最后一次的checkpoint文件路径。

1、终止作业
kubectl delete -f application-deployment-checkpoint.yaml
2、编写重启作业的yaml
前面的内容和第一次提交checkpoint是一样的,只是在第一次启动需要指定initialSavepointPath,只有这点不一样。
[root@k8s-demo001 ~]# cat application-deployment-checkpoint.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: flink
name: application-deployment-checkpoint # flink 集群名称
spec:
image: flink:1.13.6 # flink基础镜像
flinkVersion: v1_13 # flink版本,选择1.13
imagePullPolicy: IfNotPresent # 镜像拉取策略,本地没有则从仓库拉取
ingress: # ingress配置,用于访问flink web页面
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.checkpoints.dir: file:///opt/flink/checkpoints
serviceAccount: flink
jobManager:
resource:
memory: "1024m"
cpu: 1
taskManager:
resource:
memory: "1024m"
cpu: 1
podTemplate:
spec:
hostAliases:
- ip: "172.16.252.129"
hostnames:
- "kafka-01"
- ip: "172.16.252.130"
hostnames:
- "kafka-02"
- ip: "172.16.252.131"
hostnames:
- "kafka-03"
containers:
- name: flink-main-container
env:
- name: TZ
value: Asia/Shanghai
volumeMounts:
- name: flink-jar # 挂载nfs上的jar
mountPath: /opt/flink/jar
- name: flink-checkpoints # 挂载checkpoint pvc
mountPath: /opt/flink/checkpoints
volumes:
- name: flink-jar
persistentVolumeClaim:
claimName: flink-jar-pvc
- name: flink-checkpoints
persistentVolumeClaim:
claimName: flink-checkpoint-application-pvc
job:
jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar # 使用pv方式挂载jar包
entryClass: org.fblinux.StreamWordCountWithCP
args: # 传递到作业main方法的参数
- "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092"
- "flink_test"
- "172.16.252.113"
- "3306"
- "flink_test"
- "wc"
- "file:///opt/flink/checkpoints"
- "10000"
- "1"
initialSavepointPath: /opt/flink/checkpoints/4a81f02e391f129fa647121e4014226f/chk-70/ # checkpoint文件绝对路径
parallelism: 1
upgradeMode: stateless
重新提交,在通过Kafka写入数据,观察mysql表就可以看到是从checkpoint恢复的数据