(6)Flink SQL on k8s 实现
1.简介
我们在实际使用flink的过程中,不仅使用java开发flink作业,同时也会直接使用flink SQL,通过编写SQL的方式,来实现flink作业。本文就基于Flink 官方提供的 Kubernetes Operator,来实现flink sql在k8s上的运行。
2.程序功能示意图
开发一个通用的flink sql运行程序,通过向程序的启动类传参的方式,将sql脚本的路径传给程序,然后程序就会读取sql脚本,解析出里面的sql语句,并把他们转换为flink 任务提交
sql脚本的逻辑主要是从Kafka读取数据,并经过flink处理,将数据写入到mysql

在我们这个案例中,我们只需要开发好SqlRunner,具体的逻辑都在sql脚本里面定义,这样只需要编写不同的脚本就可以,让程序变得非常通用
3.Flink SQL程序开发
主程序:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
public class SqlRunner {
private static final Logger LOG = LoggerFactory.getLogger(SqlRunner.class);
private static final String STATEMENT_DELIMITER = ";"; // a statement should end with `;`
private static final String LINE_DELIMITER = "n";
private static final String COMMENT_PATTERN = "(--.*)|(((\/\*)+?[\w\W]+?(\*\/)+))";
public static void main(String[] args) throws Exception {
String scriptFilePath = "simple.sql";
if (args.length == 1) {
scriptFilePath = args[0];
}
String script = FileUtils.readFileUtf8(new File(scriptFilePath));
List statements = parseStatements(script);
//建立Stream环境,设置并行度为1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//建立Table环境
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
TableResult tableResult = null;
for (String statement : statements) {
LOG.info("Executing:n{}", statement);
System.out.println("Executing: " + statement);
tableResult = tableEnvironment.executeSql(statement);
}
// ** executeSql是一个异步的接口,在idea里面跑的话,直接就结束了,需要手动拿到那个executeSql的返回的TableResult
tableResult.getJobClient().get().getJobExecutionResult().get();
}
public static List parseStatements(String script) {
String formatted = formatSqlFile(script).replaceAll(COMMENT_PATTERN, "");
List statements = new ArrayList();
StringBuilder current = null;
boolean statementSet = false;
for (String line : formatted.split("n")) {
String trimmed = line.trim();
if (trimmed == null || trimmed.length() < 1) {
continue;
}
if (current == null) {
current = new StringBuilder();
}
if (trimmed.startsWith("EXECUTE STATEMENT SET")) {
statementSet = true;
}
current.append(trimmed);
current.append("n");
if (trimmed.endsWith(STATEMENT_DELIMITER)) {
if (!statementSet || trimmed.equals("END;")) {
// SQL语句不能以分号结尾
statements.add(current.toString().replace(";", ""));
current = null;
statementSet = false;
}
}
}
return statements;
}
public static String formatSqlFile(String content) {
String trimmed = content.trim();
StringBuilder formatted = new StringBuilder();
formatted.append(trimmed);
if (!trimmed.endsWith(STATEMENT_DELIMITER)) {
formatted.append(STATEMENT_DELIMITER);
}
formatted.append(LINE_DELIMITER);
return formatted.toString();
}
}
sql 脚本文件:放在程序的根目录下
-- source到kafka
CREATE TABLE user_log
(
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = '172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092',
'properties.group.id' = 'testGroup1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
-- sink到mysql
CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT,
primary key (dt) not enforced
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.252.113:3306/flink_test?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC',
'table-name' = 'pvuv_sink',
'username' = 'root',
'password' = '111...aaa'
);
-- 打印到屏幕
-- CREATE TABLE pvuv_sink
-- (
-- dt VARCHAR,
-- pv BIGINT,
-- uv BIGINT
-- ) WITH (
-- 'connector' = 'print'
-- );
-- 把统计结果数据保存到MySQL
INSERT INTO pvuv_sink
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');
mysql 建表语句:
CREATE TABLE pvuv_sink (
dt VARCHAR(30),
pv BIGINT,
uv BIGINT,
primary key (dt)
)
本地运行,往Kafka 发送消息,准备的数据是json格式的,要和Kafka定义表的数据格式一致
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2023-03-26 01:00:00"}
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2023-03-26 02:00:00"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2023-03-26 03:36:00"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2023-03-26 04:36:00"}
测试通过后,打包把相关的依赖范围改为provided,减少jar包体积。
org.apache.flink
flink-connector-kafka_${scala.binary.version}
${flink.version}
provided
org.apache.flink
flink-connector-jdbc_${scala.binary.version}
${flink.version}
provided
mysql
mysql-connector-java
5.1.49
provided
org.apache.flink
flink-connector-files
${flink.version}
provided
org.apache.flink
flink-table-api-java
${flink.version}
provided
org.apache.flink
flink-table-api-java-bridge_${scala.binary.version}
${flink.version}
provided
org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}
provided
org.apache.flink
flink-csv
${flink.version}
provided
org.apache.flink
flink-json
${flink.version}
provided
上传jar包和simple.sql到flink-jar-pvc对应的pv目录下
4.Flink SQL镜像构建
因为JobManager启动的时候就要去连接kafka和mysql,而flink自有lib目录下没有kafka和mysql的jar包,所以需要新构建flink-sql的镜像,将kafka和mysql的jar包放到/opt/flint/lib目录下
mkdir flink-sql cd flink-sql/ wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.13.6/flink-connector-jdbc_2.12-1.13.6.jar wget https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar
编写Dockerfile
[root@k8s-demo001 flink-sql]# cat Dockerfile FROM flink:1.13.6 WORKDIR /opt/flink COPY flink-connector-jdbc_2.12-1.13.6.jar /opt/flink/lib/flink-connector-jdbc_2.12-1.13.6.jar COPY flink-sql-connector-kafka_2.12-1.13.6.jar /opt/flink/lib/flink-sql-connector-kafka_2.12-1.13.6.jar COPY mysql-connector-java-5.1.49.jar /opt/flink/lib/mysql-connector-java-5.1.49.jar RUN chown -R flink:flink /opt/flink/lib ENTRYPOINT ["/docker-entrypoint.sh"] EXPOSE 6123 8081 CMD ["help"]
构建镜像,并上传到镜像仓库
docker build -f Dockerfile -t flink-sql:1.13.6 . docker tag flink-sql:1.13.6 172.16.252.110:8011/flinkonk8s/flink-sql:1.13.6 docker push 172.16.252.110:8011/flinkonk8s/flink-sql:1.13.6
5.提交flink-sql作业
1、编写flink-sql作业yaml并提交
[root@k8s-demo001 ~]# cat sql-application.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: flink
name: sql-application
spec:
image: 172.16.252.110:8011/flinkonk8s/flink-sql:1.13.6 # 使用flink-sql的镜像
flinkVersion: v1_13
imagePullPolicy: IfNotPresent # 镜像拉取策略,本地没有则从仓库拉取
ingress: # ingress配置,用于访问flink web页面
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
classloader.resolve-order: parent-first # 默认是child-first,必须改为parent-first,先加载flink自带的Jar,要不然容易出现各种类型不匹配问题
serviceAccount: flink
jobManager:
resource:
memory: "1024m"
cpu: 1
taskManager:
resource:
memory: "1024m"
cpu: 1
podTemplate:
spec:
hostAliases:
- ip: "172.16.252.129"
hostnames:
- "Kafka-01"
- ip: "172.16.252.130"
hostnames:
- "Kafka-02"
- ip: "172.16.252.131"
hostnames:
- "Kafka-03"
containers:
- name: flink-main-container
env:
- name: TZ
value: Asia/Shanghai
volumeMounts:
- name: flink-jar # 挂载nfs上的jar
mountPath: /opt/flink/jar
- name: flink-log
mountPath: /opt/flink/log
volumes:
- name: flink-jar
persistentVolumeClaim:
claimName: flink-jar-pvc
- name: flink-log
persistentVolumeClaim:
claimName: flink-log-pvc
job:
jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar # 使用pv方式挂载jar包
entryClass: org.fblinux.SqlRunner
args: # 传递到作业main方法的参数
- "/opt/flink/jar/simple.sql"
parallelism: 1
upgradeMode: stateless
[root@k8s-demo001 ~]# kubectl apply -f sql-application.yaml
验证pod创建情况

通过Kafka,发送测试数据
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2023-03-26 11:00:00"}
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2023-03-26 12:00:00"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2023-03-26 13:36:00"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2023-03-26 14:36:00"}