about云开发

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

查看: 225|回复: 0
打印 上一主题 下一主题

[总结型] Flink关于读取Kudu的源码实现及相关内容总结

[复制链接]

56

主题

3

听众

1

收听

超级版主

Rank: 8Rank: 8

积分
444
跳转到指定楼层
楼主
本帖最后由 林宝宝 于 2019-8-8 12:28 编辑
问题导读:

1.Flink自定义kudu的source和sink是如何实现的?
2.flinkStreamSQL对于sql扩展了什么功能?
3.Flink Kudu Connector如何使用?


Flink 自定义 Kudu的 Sink 和 source
        
参照github上bahir-flink项目写的,主要是将POJO类写入kudu,

自定义Flink的sink或source


flink-customize-master.zip (34.71 KB, 下载次数: 0)

FlinkStreamSQL的Kudu Sink

还有一种是根据fieldNames 和fieldTypes写入,详细可以看flinkStreamSQL中的kudu Sink,

基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法

将原有的异步查询修改为callBack方式

kudu-sink 与现有pom保持一致 添加-${git.branch}


flinkStreamSQL-master.zip (570.94 KB, 下载次数: 0)

Flink Kudu Connector

Flink Kudu连接器提供可以读写Kudu的源(KuduInputFormat)和接收器/输出(分别为KuduSink和KuduOutputFormat)。 要使用此连接器,可以将下面这些依赖项添加到项目中:

[XML] 纯文本查看 复制代码
<dependency>
  <groupId>org.apache.bahir</groupId>
  <artifactId>flink-connector-kudu_2.11</artifactId>
  <version>1.1-SNAPSHOT</version>
</dependency>

版本兼容性:此模块与Apache Kudu 1.7.1(最新稳定版本)兼容。 注意,流连接器并不是Flink二进制包的一部分。 你需要将它们链接到作业jar以进行集群执行。 在了解如何链接它们以进行群集执行。 安装Kudu 按照的说明进行操作。 你可以选择性地使用dockers文件夹中提供的docker镜像。

Kudu输入格式
[Java] 纯文本查看 复制代码
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(PARALLELISM);

// create a table info object
KuduTableInfo tableInfo = KuduTableInfo.Builder
        .create("books")
        .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
        .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
        .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
        .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
        .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
        .build();
    
// Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips
env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo))
        .count();
        
env.execute();

Kudu输出格式
[Java] 纯文本查看 复制代码
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(PARALLELISM);

// create a table info object
KuduTableInfo tableInfo = KuduTableInfo.Builder
        .create("books")
        .createIfNotExist(true)
        .replicas(1)
        .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
        .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
        .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
        .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
        .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
        .build();

...

env.fromCollection(books)
        .output(new KuduOutputFormat<>("172.25.0.6", tableInfo));

env.execute();

KuduSink
[Java] 纯文本查看 复制代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(PARALLELISM);

// create a table info object
KuduTableInfo tableInfo = KuduTableInfo.Builder
        .create("books")
        .createIfNotExist(true)
        .replicas(1)
        .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
        .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
        .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
        .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
        .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
        .build();

...

env.fromCollection(books)
    .addSink(new KuduSink<>("172.25.0.6", tableInfo));

env.execute();

本文来源:
               

最新经典文章,欢迎关注公众号


您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /3 下一条

|小黑屋|about云开发-学问论坛|社区 ( ) 

GMT+8, 2019-8-11 21:45 , Processed in 0。507507 second(s), 35 queries , Gzip On。

Powered by X3.2 Licensed

快速回复 返回顶部 返回列表
545彩票计划群 极速赛车是哪里的 极速赛车的龙虎怎么买 M5彩票计划群 金福彩票计划群 五福彩票计划群 山东11选5走势 极速赛车登陆 必发彩票计划群 极速赛车登陆