背景SQL,Structured Query Language:结构化查询语言,作为一个通用、流行的查询语言,不仅仅是在传统的数据库,在大数据领域也变得越来越流行,hive、spark、kafka、flink等大数据组件都支持sql的查询,使用sql可以让一些不懂这些组件原理的人,轻松的来操作,大大的降低了使用的门槛,今天我们先来简单的讲讲在flink的流处理中如何使用sql.

实例讲解构造StreamTableEnvironment对象在flink的流处理中,要使用sql,需要首先构造一个StreamTableEnvironment对象,方法比较简单。

sql中用到的catalog、table、function等都需要注册到StreamTableEnvironment才能使用。

代码语言:javascript复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

注册table接下来要将相应的表的信息注册到StreamTableEnvironment对象中,有以下几种方式可以选择.

以下的代码是基于flink 1.10.0版本进行讲解的,各个版本略有不同。

使用Tuple代码语言:javascript复制

//使用flink的二元组,这个时候需要自定义字段名称

Tuple2 tuple2 = Tuple2.of("jack", 10);

//构造一个Tuple的DataStream

DataStream> tupleStream = env.fromElements(tuple2);

// 注册到StreamTableEnvironment,并且指定对应的字段名

tableEnv.createTemporaryView("usersTuple", tupleStream, "name,age");

//执行一个sql 查询. 然后返回一个table对象

Table table = tableEnv.sqlQuery("select name,age from usersTuple");

// 将table对象转成flink的DataStream,以便后续操作,我们这里将其输出

tableEnv.toAppendStream(table, Row.class).print();

具体的详尽的内容请参考代码中的注释.

使用Rowflink中提供的元组Tuple是有限制的,最多到Tuple25,所以如果我们有更多的字段,可以选择使用flink中的Row对象.

代码语言:javascript复制

//使用Row

Row row = new Row(2);

row.setField(0, "zhangsan");

row.setField(1, 20);

DataStream rowDataStream = env.fromElements(row);

tableEnv.createTemporaryView("usersRow", rowDataStream, "name,age");

Table tableRow = tableEnv.sqlQuery("select name,age from usersRow");

tableEnv.toAppendStream(tableRow, Row.class).print();

使用java的Pojo类首先定一个pojo类

代码语言:javascript复制

public static class User{

private String name;

private int age;

public String getName(){

return name;

}

public void setName(String name){

this.name = name;

}

public int getAge(){

return age;

}

public void setAge(int age){

this.age = age;

}

}

定义这个pojo类是要符合flink的序列化规则,是有一定要求的,具体的可以参考【1】:

该类是public类型并且没有非静态内部类该类拥有公有的无参构造器类(以及所有超类)中的所有非静态、非 transient 字段都是公有的(非 final 的);或者遵循 Java bean 规则,字段是private的,但是具有public类型的 getter 和 setter 方法代码语言:javascript复制

User user = new User();

user.setName("Tom");

user.setAge(20);

DataStream userDataStream = env.fromElements(user);

tableEnv.createTemporaryView("usersPojo", userDataStream);

Table tablePojo = tableEnv.sqlQuery("select name,age from usersPojo");

tableEnv.toAppendStream(tablePojo, Row.class).print();

如果使用的是java pojo类型的DataStream,就不用声明字段名称了,flink会自动解析pojo类中的字段名称和类型来作为table的字段和类型。

使用外部存储代码语言:javascript复制

//连接外部系统,比如文件,kafka等

Schema schema = new Schema()

.field("name", DataTypes.STRING())

.field("age", DataTypes.INT());

tableEnv.connect(new FileSystem().path("...."))

.withFormat(new Csv())

.withSchema(schema)

.createTemporaryTable("usersFile");

Table tableFile = tableEnv.sqlQuery("select name,age from usersFile");

tableEnv.toAppendStream(tableFile, Row.class).print();

使用外部存储的时候需要指定以下对象:

tableEnv.connect(ConnectorDescriptor ...) 指定连接符,目前flink支持Elasticsearch、hbase、kafka、filesystem这几类withFormat(FormatDescriptor format) 这个就是指定我们从上述数据源读取的数据的格式,比如json、csv、parquet等等.withSchema(Schema schema) 给我们的table定义一个schema,也就是字段的名称和类型,用于sql查询.createTemporaryTable("usersFile") 给表起一个名字,并且注册到StreamTableEnvironment中其实还有一些其他的注册方法,但是已经标记为过期了,我们这里就不讲解了。

参考资料:

[1].https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html

完整代码请参考

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/sql/SqlFirst.java