@ -0,0 +1,54 @@ |
|||
2022规划:物联网感知平台(物联网数据接入中台服务) |
|||
|
|||
1. 数据分析工具 |
|||
|
|||
基于notebook+python实现在线数据分析功能,提供hive/iceberg数据源。实现行业服务科常用分析方法,提供可视化界面,实现分析算法和可视化组件的动态组合。可以自定义分析流程、制定分析任务。分析结果报表文件生成和导出下载。 |
|||
|
|||
2. 原型定义扩展 |
|||
|
|||
原型组合、单位可选、公式可选。 |
|||
|
|||
增加监测原型灵活性,支持公式选择、单位选择(之前2.0的遗留功能)。 |
|||
|
|||
3. 动态数据接入和边缘网关能力 |
|||
|
|||
加强平台动态数据处理能力,主要考虑边缘计算+数据湖/OSS存储方案。 |
|||
|
|||
扩展边缘网关振动采集、DAC采集能力,实现动态数据在边缘节点的计算和存储。可实现边缘独立工作和云边协同处理能力,数据最终可汇报到平台进行存储分析。(可扩展云厂商存储能力) |
|||
|
|||
4. 存储 |
|||
|
|||
应用数据湖技术。ES存储能力协同HDFS文档型存储,提供hive/iceberg抽象层定义,存储海量异构数据。存储介质上考虑自建机房SSD热数据存储+通用机械硬盘阵列温数据备份,补充购买使用云厂商OSS服务存储冷数据,实现数据的容灾以及不同使用场景的存储需求。 |
|||
|
|||
5. ETL |
|||
|
|||
构建通用的Flink+Python 批流一体处理框架,除现有通用数据处理流程,可以给各个智慧应用提供自定义的数据处理能力,包括实时的数据处理、预告警、反向控制,以及历史数据的批处理分析、机器学习和AI训练能力。 |
|||
|
|||
6. 超融合,租户资源隔离 |
|||
|
|||
超融合是将服务器硬件资源打散融合,按需分配。实现一套简单的IaaS服务,部署我们的PaaS和SaaS平台,实现对用户资源的隔离、限制。 |
|||
|
|||
7. 继续提高平台稳定性、健壮性 |
|||
1. DAC故障跟踪解决,提示数据接入的稳定性 |
|||
2. 限流算法在数据接入、接口请求方面的应用 |
|||
3. 支持埋点跟踪数据日志 |
|||
4. 研发运维能力:服务进程状态/性能跟踪 |
|||
|
|||
8. 视频接入优化和性能提升 |
|||
|
|||
语言技术栈统一,支持ffmepg通用数据流格式推流解析。支持分布式负载均衡部署。 |
|||
|
|||
9. 3D、BIM展示应用和GIS展示 |
|||
|
|||
持续研究以上内容在动效、性能、交互能力上的提升 |
|||
|
|||
10. 大屏展示组件化,低代码开发 |
|||
|
|||
研究低代码实现大屏的可能性,实现自定义大屏模板、组件拖拽、主题定义、数据绑定组态功能。 |
|||
|
|||
11. 其他: |
|||
|
|||
1. 工作流引擎持续定制化 |
|||
2. 协议、计算脚本化扩展能力:扩展支持python/JavaScript/Lua等通用脚本语言与Scala的互调,实现更多可自定义的处理能力。 |
|||
3. 拥抱云原生,全面容器化,使用k8s/m-k8s全套部署方案,加强k8s监控,扩展弹性伸缩能力。 |
|||
4. 提供混合云服务,提供多场景的应用部署能力。 |
@ -0,0 +1,292 @@ |
|||
## 部署启动 |
|||
|
|||
### EDGE |
|||
|
|||
**设备型号**:ok-3399C |
|||
|
|||
**系统**:ubuntu-18.02 |
|||
|
|||
**默认用户**:forlinx / forlinx |
|||
|
|||
**网络**: 通过netplan (apply)设置网络地址 |
|||
|
|||
**基础服务:** |
|||
|
|||
+ influxdb |
|||
|
|||
数据库。安装方法参见https://portal.influxdata.com/downloads/ |
|||
|
|||
启动数据库: influxd http://localip:8086/ (设置用户密码 admin/admin123) |
|||
|
|||
获取全局Token (后续edge配置使用) |
|||
|
|||
**启动EDGE** |
|||
|
|||
`edge.conf` |
|||
|
|||
```json |
|||
{ |
|||
"msg.mqtt.center": "10.8.30.236:1883", -- 服务端MQTT服务地址 |
|||
"serial_no": "001", -- 测试设备序列号 |
|||
"influx.token": "rBqy73hzOc1Fk5xxofGjqy5bKSmHBVLQouRBkt8eaXUmhum9c4m5nEMWVkG83ihR8CQjWbzTaLvUMoFp0xegYw==", -- influ操作token |
|||
"db.type":"file", |
|||
"db.dir":"../../resources/test", |
|||
"log.file":true, |
|||
"log.file.loc":"runtime/logs/log" |
|||
} |
|||
``` |
|||
|
|||
```shell |
|||
# 启动主程序 |
|||
chmod +x ./edge |
|||
./edge |
|||
``` |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
### SERVER |
|||
|
|||
**基础服务** |
|||
|
|||
+ Emqx |
|||
|
|||
启动MQTT代理服务, emqx start |
|||
|
|||
+ Prometheus |
|||
|
|||
配置抓取设备指标 |
|||
|
|||
```yaml |
|||
scrape_configs: |
|||
- job_name: "edge-server" |
|||
static_configs: |
|||
- targets: ["localhost:19202"] |
|||
# 调试使用(抓取内网设备上的监控指标) |
|||
- job_name: "dac" |
|||
static_configs: |
|||
- targets: ["10.8.30.244:19201"] |
|||
``` |
|||
|
|||
默认UI地址: http://localhost:9090/ |
|||
|
|||
+ Grafana |
|||
|
|||
配合Prometheus显示EDGE状态和性能指标。 |
|||
|
|||
+ 其他 |
|||
|
|||
+ 连接测试Iota数据库 `postgres://postgres:postgres@10.8.30.156:5432/iota20211206?sslmode=disable` |
|||
+ 部署以太网站 http://10.8.30.38/ |
|||
+ Postman调试工具 |
|||
|
|||
|
|||
|
|||
**启动SERVER** |
|||
|
|||
配置`server.conf` |
|||
|
|||
```json |
|||
{ |
|||
"msg.mqtt.center": "10.8.30.236:1883", -- MQTT Broker地址 |
|||
"web.url":":8088", -- WEB接口地址 |
|||
"db.type": "postgres", |
|||
"db.conn": "postgres://postgres:postgres@10.8.30.156:5432/iota20211206?sslmode=disable", -- 以太数据库地址 |
|||
"log.file":true, |
|||
"log.file.loc":"runtime/logs/log" |
|||
} |
|||
``` |
|||
|
|||
启动Server. |
|||
|
|||
|
|||
|
|||
## 功能演示 |
|||
|
|||
|
|||
|
|||
### 平台新增边缘网关 |
|||
|
|||
目前已经实现CRUD API |
|||
|
|||
**新增设备:** |
|||
|
|||
URL:Post http://localhost:8088/edges |
|||
|
|||
BODY: |
|||
|
|||
```json |
|||
{"serial_no":"002","name":"DEMO-2","hardware":{"name":"FS-EDGE-01"},"software":{"ver":"0.2.1"}} |
|||
``` |
|||
|
|||
RET: 200 |
|||
|
|||
> 平台serial_no设置必须和设备端SerialNo匹配,才能进行设备控制 |
|||
|
|||
|
|||
|
|||
**查询当前所有设备**: |
|||
|
|||
URL: GET localhost:8088/edges |
|||
|
|||
RET: |
|||
|
|||
```json |
|||
{"001":{"serial_no":"001","name":"DEMO-WW","hardware":{"name":"FS-EDGE-01"},"software":{"ver":"0.2.1"},"set_ver":"1","config_ver":"9"},"002":{"serial_no":"002","name":"DEMO-2","properties":{"hb":"true"},"hardware":{"name":"FS-EDGE-01"},"software":{"ver":"0.2.1"},"set_ver":"0","config_ver":"0"}} |
|||
``` |
|||
|
|||
|
|||
|
|||
其他: **修改PUT** 和 **删除 DELETE** |
|||
|
|||
|
|||
|
|||
### 网关在线状态和性能在线统计 |
|||
|
|||
通过网关心跳数据上报,Prometheus抓取,可通过Grafana查看: |
|||
|
|||
![image-20220121162513190](imgs/EDGE-V0.1调试手册/image-20220121162513190.png) |
|||
|
|||
其中心跳数据格式如下: |
|||
|
|||
```json |
|||
{ |
|||
"time": 1642734937400741643, -- 当前数据的设备时间(用于校时) |
|||
"ver": { |
|||
"pv": "v0.0.1" -- 当前配置版本(包括设备配置和采集配置) |
|||
}, |
|||
"machine": { |
|||
"mt": 3845, -- 总内存 |
|||
"mf": 2616, -- 空闲内存 |
|||
"mp": 10.074738688877986, -- 内存使用比 |
|||
"dt": 12031, -- 总磁盘 |
|||
"df": 7320, -- 剩余磁盘空间 |
|||
"dp": 36, -- 磁盘使用率 |
|||
"u": 7547, -- 系统启动时长 |
|||
"pform": "ubuntu", -- 系统信息 |
|||
"pver": "18.04", -- 系统版本 |
|||
"load1": 0.09, -- 1分钟内平均负载 |
|||
"load5": 0.02, -- 5分钟内平均负载 |
|||
"load15": 0.01 -- 15分钟内平均负载 |
|||
} |
|||
} |
|||
``` |
|||
|
|||
|
|||
|
|||
### 绑定结构物到网关 |
|||
|
|||
在以太(测试环境)建立结构物,我们这里模拟的一个振弦采集的场景,如下 |
|||
|
|||
![image-20220121135940527](imgs/EDGE-V0.1调试手册/image-20220121135940527.png) |
|||
|
|||
下发该结构物到边缘网关 |
|||
|
|||
URL:Post http://llocalhost:8088/edge/002/things |
|||
|
|||
BODY: |
|||
|
|||
```json |
|||
["f73d1b17-f2d5-46dd-9dd1-ebbb66b11854"] |
|||
``` |
|||
|
|||
RET: 200 |
|||
|
|||
> 获取指定网关绑定的结构物 GET http://llocalhost:8088/edge/002/things |
|||
|
|||
|
|||
|
|||
下发后,边缘网关自动更新配置(如果未在线,会在下次上下后更新配置),并重启 |
|||
|
|||
![image-20220121152314499](imgs/EDGE-V0.1调试手册/image-20220121152314499.png) |
|||
|
|||
|
|||
|
|||
模拟DTU设备上线到边缘网关, |
|||
|
|||
<img src="imgs/EDGE-V0.1调试手册/image-20220121152705457.png" width=600 align=left/> |
|||
|
|||
|
|||
|
|||
随后边缘网关按照配置的采集规则进行采集,目前可以通过边缘端InfluxDB的Web UI查看数据: |
|||
|
|||
![image-20220121163903101](imgs/EDGE-V0.1调试手册/image-20220121163903101.png) |
|||
|
|||
采集的数据会通过MQTT消息发送到服务端,见下节(采集数据实时预览)。 |
|||
|
|||
同事,在平台更改采集配置(部署)后,通过 POST http://localhost:8088/edge/002/sync 可以触发网关进行配置同步。 |
|||
|
|||
|
|||
|
|||
### 采集数据实时预览 |
|||
|
|||
DAC采集的数据会实时推送到服务器MQTT上,服务端进行**入库**操作,并支持WebSocket像前端接口**推送**。 |
|||
|
|||
ws地址:ws://localhost:8088/edge/ws/{device} |
|||
|
|||
实时数据预览界面:http://localhost:8088/edge/rt/{device} |
|||
|
|||
![image-20220121162951692](imgs/EDGE-V0.1调试手册/image-20220121162951692.png) |
|||
|
|||
|
|||
|
|||
### 绑定包含振动设备的结构物 |
|||
|
|||
新建包含振动设备的结构物,测试如下: |
|||
|
|||
![image-20220121163144291](imgs/EDGE-V0.1调试手册/image-20220121163144291.png) |
|||
|
|||
同上,执行结构物绑定网关操作。 |
|||
|
|||
|
|||
|
|||
模拟振动设备连接到网关,通过日志可以看到网关开始采集振动传感器: |
|||
|
|||
![image-20220121164158554](imgs/EDGE-V0.1调试手册/image-20220121164158554.png) |
|||
|
|||
振动数据存储在本地,通过数据库的定时聚集功能(CQ),生成分钟级聚集数据。查看实时数据如下: |
|||
|
|||
![image-20220121164306992](imgs/EDGE-V0.1调试手册/image-20220121164306992.png) |
|||
|
|||
|
|||
|
|||
### 动态数据实时预览 |
|||
|
|||
振动的实时数据**默认不会**直接推送到平台。 |
|||
|
|||
前端打开振动设备实时数据界面,将发布WS订阅,此时会通知设备开始上报数据(类似视频推流服务的实现),之后类似普通数据的处理方式。 |
|||
|
|||
实时数据刷新界面如下: |
|||
|
|||
![image-20220121164715214](imgs/EDGE-V0.1调试手册/image-20220121164715214.png) |
|||
|
|||
WS订阅退出后,会通知设备关闭实时推流(节约流量、性能和服务端存储)。 |
|||
|
|||
后面会实现云端保存最近一段播放历史、设备上的历史数据回放功能。 |
|||
|
|||
|
|||
|
|||
### 作单机振动采集软件使用 |
|||
|
|||
包含振动采集的配置、采集、计算、存储、转发功能。可以替换某些场景下本地工控机上的DAAS软件。 |
|||
|
|||
> 注:云端工作模式,访问设备上的Vib界面,可以查看配置,但是不能进行修改。 |
|||
|
|||
|
|||
|
|||
振动设备配置:http://10.8.30.244:8828/vib |
|||
|
|||
![image-20220121165041737](imgs/EDGE-V0.1调试手册/image-20220121165041737.png) |
|||
|
|||
振动通道配置: |
|||
|
|||
![image-20220121165146403](imgs/EDGE-V0.1调试手册/image-20220121165146403.png) |
|||
|
|||
IP设置: |
|||
|
|||
![image-20220121165230596](imgs/EDGE-V0.1调试手册/image-20220121165230596.png) |
|||
|
|||
网关侧实时数据预览: |
|||
|
|||
![image-20220121165302506](imgs/EDGE-V0.1调试手册/image-20220121165302506.png) |
@ -0,0 +1 @@ |
|||
1. 历史数据查询 |
@ -0,0 +1,286 @@ |
|||
## 部署启动 |
|||
|
|||
### EDGE |
|||
|
|||
**设备型号**:ok-3399C |
|||
|
|||
**系统**:ubuntu-18.02 |
|||
|
|||
**默认用户**:forlinx / forlinx |
|||
|
|||
**网络**: 通过netplan (apply)设置网络地址 |
|||
|
|||
**安装程序:** |
|||
|
|||
```sh |
|||
#通过串口线连接Console口,或者设置好网络后通过IP地址,远程SSH到板子上 |
|||
# 安装目前只支持在线模式,设备必须接入因特网 |
|||
# 1. 安装docker |
|||
$ sudo apt-get update |
|||
$ sudo apt-get upgrade |
|||
$ curl -fsSL test.docker.com -o get-docker.sh && sh get-docker.sh |
|||
$ sudo usermod -aG docker $USER |
|||
$ sudo apt install gnupg2 pass |
|||
|
|||
# 2. 安装程序 |
|||
# 复制disk包到网关上 |
|||
$ chmox +x docker-compose |
|||
$ docker-compose up -d |
|||
``` |
|||
|
|||
|
|||
|
|||
安装完成之后,在浏览器中访问 http://ip:8828 ,进入如下界面,表示设备初始化成功 |
|||
|
|||
|
|||
|
|||
![image-20220322090946149](imgs/EDGE-V0.2调试手册/image-20220322090946149.png) |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
### SERVER |
|||
|
|||
**基础服务** |
|||
|
|||
+ Emqx |
|||
|
|||
启动MQTT代理服务, emqx start |
|||
|
|||
+ Prometheus |
|||
|
|||
配置抓取设备指标 |
|||
|
|||
```yaml |
|||
scrape_configs: |
|||
- job_name: "edge-server" |
|||
static_configs: |
|||
- targets: ["localhost:19202"] |
|||
# 调试使用(抓取内网设备上的监控指标) |
|||
- job_name: "dac" |
|||
static_configs: |
|||
- targets: ["10.8.30.244:19201"] |
|||
``` |
|||
|
|||
默认UI地址: http://localhost:9090/ |
|||
|
|||
+ Grafana |
|||
|
|||
配合Prometheus显示EDGE状态和性能指标。 |
|||
|
|||
+ 其他 |
|||
|
|||
+ 连接测试Iota数据库 `postgres://postgres:postgres@10.8.30.156:5432/iota20211206?sslmode=disable` |
|||
+ 部署以太网站 http://10.8.30.38/ |
|||
+ Postman调试工具 |
|||
|
|||
|
|||
|
|||
**启动SERVER** |
|||
|
|||
配置`server.conf` |
|||
|
|||
```json |
|||
{ |
|||
"msg.mqtt.center": "10.8.30.236:1883", -- MQTT Broker地址 |
|||
"web.url":":8088", -- WEB接口地址 |
|||
"db.type": "postgres", |
|||
"db.conn": "postgres://postgres:postgres@10.8.30.156:5432/iota20211206?sslmode=disable", -- 以太数据库地址 |
|||
"log.file":true, |
|||
"log.file.loc":"runtime/logs/log" |
|||
} |
|||
``` |
|||
|
|||
启动Server. |
|||
|
|||
|
|||
|
|||
## 功能演示 |
|||
|
|||
|
|||
|
|||
### 平台新增边缘网关 |
|||
|
|||
目前已经实现CRUD API |
|||
|
|||
**新增设备:** |
|||
|
|||
URL:Post http://localhost:8088/edges |
|||
|
|||
BODY: |
|||
|
|||
```json |
|||
{"serial_no":"002","name":"DEMO-2","hardware":{"name":"FS-EDGE-01"},"software":{"ver":"0.2.1"}} |
|||
``` |
|||
|
|||
RET: 200 |
|||
|
|||
> 平台serial_no设置必须和设备端SerialNo匹配,才能进行设备控制 |
|||
|
|||
|
|||
|
|||
**查询当前所有设备**: |
|||
|
|||
URL: GET localhost:8088/edges |
|||
|
|||
RET: |
|||
|
|||
```json |
|||
{"001":{"serial_no":"001","name":"DEMO-WW","hardware":{"name":"FS-EDGE-01"},"software":{"ver":"0.2.1"},"set_ver":"1","config_ver":"9"},"002":{"serial_no":"002","name":"DEMO-2","properties":{"hb":"true"},"hardware":{"name":"FS-EDGE-01"},"software":{"ver":"0.2.1"},"set_ver":"0","config_ver":"0"}} |
|||
``` |
|||
|
|||
|
|||
|
|||
其他: **修改PUT** 和 **删除 DELETE** |
|||
|
|||
|
|||
|
|||
### 网关在线状态和性能在线统计 |
|||
|
|||
通过网关心跳数据上报,Prometheus抓取,可通过Grafana查看: |
|||
|
|||
![image-20220121162513190](imgs/EDGE-V0.1调试手册/image-20220121162513190.png) |
|||
|
|||
其中心跳数据格式如下: |
|||
|
|||
```json |
|||
{ |
|||
"time": 1642734937400741643, -- 当前数据的设备时间(用于校时) |
|||
"ver": { |
|||
"pv": "v0.0.1" -- 当前配置版本(包括设备配置和采集配置) |
|||
}, |
|||
"machine": { |
|||
"mt": 3845, -- 总内存 |
|||
"mf": 2616, -- 空闲内存 |
|||
"mp": 10.074738688877986, -- 内存使用比 |
|||
"dt": 12031, -- 总磁盘 |
|||
"df": 7320, -- 剩余磁盘空间 |
|||
"dp": 36, -- 磁盘使用率 |
|||
"u": 7547, -- 系统启动时长 |
|||
"pform": "ubuntu", -- 系统信息 |
|||
"pver": "18.04", -- 系统版本 |
|||
"load1": 0.09, -- 1分钟内平均负载 |
|||
"load5": 0.02, -- 5分钟内平均负载 |
|||
"load15": 0.01 -- 15分钟内平均负载 |
|||
} |
|||
} |
|||
``` |
|||
|
|||
|
|||
|
|||
### 绑定结构物到网关 |
|||
|
|||
在以太(测试环境)建立结构物,我们这里模拟的一个振弦采集的场景,如下 |
|||
|
|||
![image-20220121135940527](imgs/EDGE-V0.1调试手册/image-20220121135940527.png) |
|||
|
|||
下发该结构物到边缘网关 |
|||
|
|||
URL:Post http://llocalhost:8088/edge/002/things |
|||
|
|||
BODY: |
|||
|
|||
```json |
|||
["f73d1b17-f2d5-46dd-9dd1-ebbb66b11854"] |
|||
``` |
|||
|
|||
RET: 200 |
|||
|
|||
> 获取指定网关绑定的结构物 GET http://llocalhost:8088/edge/002/things |
|||
|
|||
|
|||
|
|||
下发后,边缘网关自动更新配置(如果未在线,会在下次上下后更新配置),并重启 |
|||
|
|||
![image-20220121152314499](imgs/EDGE-V0.1调试手册/image-20220121152314499.png) |
|||
|
|||
|
|||
|
|||
模拟DTU设备上线到边缘网关, |
|||
|
|||
<img src="imgs/EDGE-V0.1调试手册/image-20220121152705457.png" width=600 align=left/> |
|||
|
|||
|
|||
|
|||
随后边缘网关按照配置的采集规则进行采集,目前可以通过边缘端InfluxDB的Web UI查看数据: |
|||
|
|||
![image-20220121163903101](imgs/EDGE-V0.1调试手册/image-20220121163903101.png) |
|||
|
|||
采集的数据会通过MQTT消息发送到服务端,见下节(采集数据实时预览)。 |
|||
|
|||
同事,在平台更改采集配置(部署)后,通过 POST http://localhost:8088/edge/002/sync 可以触发网关进行配置同步。 |
|||
|
|||
|
|||
|
|||
### 采集数据实时预览 |
|||
|
|||
DAC采集的数据会实时推送到服务器MQTT上,服务端进行**入库**操作,并支持WebSocket像前端接口**推送**。 |
|||
|
|||
ws地址:ws://localhost:8088/edge/ws/{device} |
|||
|
|||
实时数据预览界面:http://localhost:8088/edge/rt/{device} |
|||
|
|||
![image-20220121162951692](imgs/EDGE-V0.1调试手册/image-20220121162951692.png) |
|||
|
|||
|
|||
|
|||
### 绑定包含振动设备的结构物 |
|||
|
|||
新建包含振动设备的结构物,测试如下: |
|||
|
|||
![image-20220121163144291](imgs/EDGE-V0.1调试手册/image-20220121163144291.png) |
|||
|
|||
同上,执行结构物绑定网关操作。 |
|||
|
|||
|
|||
|
|||
模拟振动设备连接到网关,通过日志可以看到网关开始采集振动传感器: |
|||
|
|||
![image-20220121164158554](imgs/EDGE-V0.1调试手册/image-20220121164158554.png) |
|||
|
|||
振动数据存储在本地,通过数据库的定时聚集功能(CQ),生成分钟级聚集数据。查看实时数据如下: |
|||
|
|||
![image-20220121164306992](imgs/EDGE-V0.1调试手册/image-20220121164306992.png) |
|||
|
|||
|
|||
|
|||
### 动态数据实时预览 |
|||
|
|||
振动的实时数据**默认不会**直接推送到平台。 |
|||
|
|||
前端打开振动设备实时数据界面,将发布WS订阅,此时会通知设备开始上报数据(类似视频推流服务的实现),之后类似普通数据的处理方式。 |
|||
|
|||
实时数据刷新界面如下: |
|||
|
|||
![image-20220121164715214](imgs/EDGE-V0.1调试手册/image-20220121164715214.png) |
|||
|
|||
WS订阅退出后,会通知设备关闭实时推流(节约流量、性能和服务端存储)。 |
|||
|
|||
后面会实现云端保存最近一段播放历史、设备上的历史数据回放功能。 |
|||
|
|||
|
|||
|
|||
### 作单机振动采集软件使用 |
|||
|
|||
包含振动采集的配置、采集、计算、存储、转发功能。可以替换某些场景下本地工控机上的DAAS软件。 |
|||
|
|||
> 注:云端工作模式,访问设备上的Vib界面,可以查看配置,但是不能进行修改。 |
|||
|
|||
|
|||
|
|||
振动设备配置:http://10.8.30.244:8828/vib |
|||
|
|||
![image-20220121165041737](imgs/EDGE-V0.1调试手册/image-20220121165041737.png) |
|||
|
|||
振动通道配置: |
|||
|
|||
![image-20220121165146403](imgs/EDGE-V0.1调试手册/image-20220121165146403.png) |
|||
|
|||
IP设置: |
|||
|
|||
![image-20220121165230596](imgs/EDGE-V0.1调试手册/image-20220121165230596.png) |
|||
|
|||
网关侧实时数据预览: |
|||
|
|||
![image-20220121165302506](imgs/EDGE-V0.1调试手册/image-20220121165302506.png) |
@ -0,0 +1,69 @@ |
|||
找一根USB转接线连接 板子的Console口,如下: |
|||
|
|||
|
|||
|
|||
![image-20220407085859032](imgs/EDGE-环境准备/image-20220407085859032.png) |
|||
|
|||
|
|||
|
|||
电脑会自动安装驱动,等待自动安装完成,在设备管理界面中,可查看具体的串口号: |
|||
|
|||
![image-20220407090121447](imgs/EDGE-环境准备/image-20220407090121447.png) |
|||
|
|||
|
|||
|
|||
通过putty或xshell等远程工具可以进行SSH远程连接: |
|||
|
|||
![image-20220407090243473](imgs/EDGE-环境准备/image-20220407090243473.png) |
|||
|
|||
|
|||
|
|||
![image-20220407090353559](imgs/EDGE-环境准备/image-20220407090353559.png) |
|||
|
|||
> 默认用户名密码均是forlinx, 可以通过 `sudo su` 命令进入超管账户,密码也是`forlinx` |
|||
|
|||
|
|||
|
|||
进行网络配置: |
|||
|
|||
找一根网线,将板子连接到工作路由上, |
|||
|
|||
```sh |
|||
root@forlinx:/etc/netplan# cd /etc/netplan/ |
|||
root@forlinx:/etc/netplan# ls |
|||
50-cloud-init.yaml |
|||
root@forlinx:/etc/netplan# vi 50-cloud-init.yaml |
|||
network: |
|||
ethernets: |
|||
eth0: |
|||
dhcp4: no |
|||
addresses: [10.8.30.244/24] |
|||
gateway4: 10.8.30.1 |
|||
nameservers: |
|||
addresses: [114.114.114.114] |
|||
search: [localdomain] |
|||
version: 2 |
|||
~ |
|||
root@forlinx:/etc/netplan# netplan apply |
|||
root@forlinx:/etc/netplan# ip a |
|||
``` |
|||
|
|||
![image-20220407090848867](imgs/EDGE-环境准备/image-20220407090848867.png) |
|||
|
|||
这里我的配置是: |
|||
|
|||
```yaml |
|||
network: |
|||
ethernets: |
|||
eth0: |
|||
dhcp4: no |
|||
addresses: [10.8.30.244/24] #网络地址和掩码 |
|||
gateway4: 10.8.30.1 # 网关地址 |
|||
nameservers: |
|||
addresses: [114.114.114.114] # DNS |
|||
search: [localdomain] |
|||
version: 2 |
|||
|
|||
``` |
|||
|
|||
网络配置完成后,即可执行后续命令,具体参照 《EDGE-V-N调试手册.pdf》 |
@ -0,0 +1,505 @@ |
|||
## UCloud云主机 |
|||
|
|||
https://console.ucloud.cn/ |
|||
|
|||
账户密码 FS12345678 |
|||
|
|||
|
|||
|
|||
## 环境准备 |
|||
|
|||
**Postgres** |
|||
|
|||
```sh |
|||
apt update |
|||
apt install postgresql postgresql-contrib |
|||
|
|||
su postgres |
|||
> psql |
|||
> # alter user postgres with password 'ROOT'; |
|||
|
|||
vi /etc/postgresql/9.5/main/pg_hba.conf |
|||
# host all all 10.60.178.0/24 md5 |
|||
service postgresql restart |
|||
|
|||
createdb iOTA_console |
|||
psql -d iOTA_console < dump.sql |
|||
``` |
|||
|
|||
|
|||
|
|||
**Docker** |
|||
|
|||
```sh |
|||
curl -sSL https://get.daocloud.io/docker | sh |
|||
``` |
|||
|
|||
|
|||
|
|||
**Redis** |
|||
|
|||
因为redis默认端口暴露在外网环境不安全,启动ubuntu防火墙 |
|||
|
|||
```sh |
|||
ufw enable |
|||
|
|||
ufw status |
|||
|
|||
# 默认允许外部访问本机 |
|||
ufw default allow |
|||
|
|||
# 禁止6379端口外部访问 |
|||
ufw deny 6379 |
|||
|
|||
# 其他一些 |
|||
# 允许来自10.0.1.0/10访问本机10.8.30.117的7277端口 |
|||
ufw allow proto tcp from 10.0.1.0/10 to 10.8.30.117 7277 |
|||
|
|||
Status: active |
|||
|
|||
To Action From |
|||
-- ------ ---- |
|||
6379 DENY Anywhere |
|||
6379 (v6) DENY Anywhere (v6) |
|||
``` |
|||
|
|||
开放了防火墙,外网还是无法访问开放的端口。进入ucloud控制台, |
|||
|
|||
基础网络UNet > 外网防火墙 > 创建防火墙 (自定义规则) |
|||
|
|||
开放所有tcp端口,只禁用redis-6379 |
|||
|
|||
![image-20211122152046659](imgs/UCloud-DAC上云测试/image-20211122152046659.png) |
|||
|
|||
云主机UHost > 关联资源操作 > 更改外网防火墙 |
|||
|
|||
![image-20211122152136855](imgs/UCloud-DAC上云测试/image-20211122152136855.png) |
|||
|
|||
|
|||
|
|||
安装redis |
|||
|
|||
```sh |
|||
apt update |
|||
apt install redis-server |
|||
``` |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
## 引流测试 |
|||
|
|||
机房搬迁,准备在云上运行单实例dac进行数据采集。 |
|||
|
|||
准备工作:进行线上引流测试。不影响商用dac的采集,准备如下: |
|||
|
|||
1. proxy上被动连接转发到UCloud。 |
|||
1. 流单向复制。设备 -> proxy -> DAC通路, 开路:DAC->proxy-|->设备。 |
|||
2. 主动连接 |
|||
1. mqtt、http主动连接第三方服务器的, |
|||
2. mqtt 的clientid添加后缀 |
|||
3. 截断driver的写入 |
|||
|
|||
关键代码 |
|||
|
|||
```go |
|||
// io.copy无法多次执行 |
|||
|
|||
|
|||
// 如果配置了OutTarget,则进行本地复制到同时向外复制流 |
|||
func Pipeout(conn1, conn2 net.Conn, port string, wg *sync.WaitGroup, reg []byte) { |
|||
if OutTarget != "" { |
|||
tt := fmt.Sprintf("%s:%s", OutTarget, port) |
|||
tw := NewTeeWriter(tt, reg) |
|||
tw.Start() |
|||
if _, err := io.Copy(tw, io.TeeReader(conn2 /*read*/, conn1 /*write*/)); err != nil { |
|||
log.Error("pipeout error: %v", err) |
|||
} |
|||
tw.Close() |
|||
} else { |
|||
io.Copy(conn1, conn2) |
|||
} |
|||
conn1.Close() |
|||
log.Info("[tcp] close the connect at local:%s and remote:%s", conn1.LocalAddr().String(), conn1.RemoteAddr().String()) |
|||
wg.Done() |
|||
} |
|||
|
|||
// 引流写入器 |
|||
type TeeWriter struct { |
|||
target string // 转发目标地址 |
|||
conn net.Conn // 转发连接 |
|||
isConnect bool // 是否连接 |
|||
exitCh chan interface{} // 退出 |
|||
registry []byte |
|||
} |
|||
|
|||
func NewTeeWriter(target string, reg []byte) *TeeWriter { |
|||
return &TeeWriter{ |
|||
target: target, |
|||
exitCh: make(chan interface{}), |
|||
registry: reg, |
|||
} |
|||
} |
|||
|
|||
func (w *TeeWriter) Start() error { |
|||
go w.keep_connect() |
|||
return nil |
|||
} |
|||
|
|||
func (w *TeeWriter) Close() error { |
|||
close(w.exitCh) |
|||
return nil |
|||
} |
|||
|
|||
func (w *TeeWriter) Write(p []byte) (n int, err error) { |
|||
defer func() { |
|||
if err := recover(); err != nil { |
|||
log.Error("teewrite failed %s", w.target) |
|||
} |
|||
}() |
|||
if w.isConnect { |
|||
go w.conn.Write(p) |
|||
} |
|||
// 此方法永远不报错 |
|||
return len(p), nil |
|||
} |
|||
|
|||
func (w *TeeWriter) keep_connect() { |
|||
defer func() { |
|||
if err := recover(); err != nil { |
|||
log.Error("teewrite keep connect error: %v", err) |
|||
} |
|||
}() |
|||
for { |
|||
if cont := func() bool { |
|||
var err error |
|||
w.conn, err = net.Dial("tcp", w.target) |
|||
if err != nil { |
|||
select { |
|||
case <-time.After(time.Second): |
|||
return true |
|||
case <-w.exitCh: |
|||
return false |
|||
} |
|||
} |
|||
w.isConnect = true |
|||
defer func() { |
|||
w.isConnect = false |
|||
}() |
|||
defer w.conn.Close() |
|||
|
|||
if w.registry != nil { |
|||
_, err := w.conn.Write(w.registry) |
|||
if err != nil { |
|||
return true |
|||
} |
|||
} |
|||
|
|||
if err := w.conn.(*net.TCPConn).SetKeepAlive(true); err != nil { |
|||
return true |
|||
} |
|||
if err := w.conn.(*net.TCPConn).SetKeepAlivePeriod(30 * time.Second); err != nil { |
|||
return true |
|||
} |
|||
|
|||
connLostCh := make(chan interface{}) |
|||
defer close(connLostCh) |
|||
|
|||
// 检查远端bconn连接 |
|||
go func() { |
|||
defer func() { |
|||
log.Info("bconn check exit") |
|||
recover() // write to closed channel |
|||
}() |
|||
one := make([]byte, 1) |
|||
for { |
|||
if _, err := w.conn.Read(one); err != nil { |
|||
log.Info("bconn disconnected") |
|||
connLostCh <- err |
|||
return |
|||
} |
|||
time.Sleep(time.Second) |
|||
} |
|||
}() |
|||
|
|||
select { |
|||
case <-connLostCh: |
|||
time.Sleep(10 * time.Second) |
|||
return true |
|||
case <-w.exitCh: |
|||
return false |
|||
} |
|||
}(); !cont { |
|||
break |
|||
} else { |
|||
time.Sleep(time.Second) |
|||
} |
|||
} |
|||
} |
|||
``` |
|||
|
|||
|
|||
|
|||
引流测试未执行。。。 |
|||
|
|||
|
|||
|
|||
## DAC线上测试 |
|||
|
|||
配置如下 |
|||
|
|||
```json |
|||
|
|||
``` |
|||
|
|||
需要配置 `url.maps.json` |
|||
|
|||
```json |
|||
"47.106.112.113:1883" |
|||
"47.104.249.223:1883" |
|||
"mqtt.starwsn.com:1883" |
|||
"test.tdzntech.com:1883" |
|||
"mqtt.tdzntech.com:1883" |
|||
|
|||
"s1.cn.mqtt.theiota.cn:8883" |
|||
"mqtt.datahub.anxinyun.cn:1883" |
|||
|
|||
"218.3.126.49:3883" |
|||
"221.230.55.28:1883" |
|||
|
|||
"anxin-m1:1883" |
|||
"10.8.25.201:8883" |
|||
"10.8.25.231:1883" |
|||
"iota-m1:1883" |
|||
``` |
|||
|
|||
|
|||
|
|||
以下数据无法获取: |
|||
|
|||
1. gnss数据 |
|||
|
|||
http.get error: Get "http://10.8.25.254:7005/gnss/6542/data?startTime=1575443410000&endTime=1637628026000": dial tcp 10.8.25.254:7005: i/o timeout |
|||
|
|||
2. 时 |
|||
|
|||
|
|||
|
|||
## DAC内存问题排查 |
|||
|
|||
> 文档整理不够清晰,可以参考 https://www.cnblogs.com/gao88/p/9849819.html |
|||
> |
|||
> pprof的使用: |
|||
> |
|||
> https://segmentfault.com/a/1190000020964967 |
|||
> |
|||
> https://cizixs.com/2017/09/11/profiling-golang-program/ |
|||
|
|||
查看进程内存消耗: |
|||
|
|||
```sh |
|||
top -c |
|||
# shift+M |
|||
top - 09:26:25 up 1308 days, 15:32, 2 users, load average: 3.14, 3.70, 4.37 |
|||
Tasks: 582 total, 1 running, 581 sleeping, 0 stopped, 0 zombie |
|||
%Cpu(s): 5.7 us, 1.5 sy, 0.0 ni, 92.1 id, 0.0 wa, 0.0 hi, 0.8 si, 0.0 st |
|||
KiB Mem : 41147560 total, 319216 free, 34545608 used, 6282736 buff/cache |
|||
KiB Swap: 0 total, 0 free, 0 used. 9398588 avail Mem |
|||
|
|||
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND |
|||
18884 root 20 0 11.238g 0.010t 11720 S 48.8 26.7 39:52.43 ./dac |
|||
``` |
|||
|
|||
发现dac内存咱用超10G |
|||
|
|||
|
|||
|
|||
查看所在容器: |
|||
|
|||
```sh |
|||
root@iota-n3:/home/iota/etwatcher# systemd-cgls | grep 18884 |
|||
│ │ ├─32574 grep --color=auto 18884 |
|||
│ │ └─18884 ./dac |
|||
``` |
|||
|
|||
|
|||
|
|||
```sh |
|||
for i in $(docker container ls --format "{{.ID}}"); do docker inspect -f '{{.State.Pid}} {{.Name}}' $i; done | grep 18884 |
|||
``` |
|||
|
|||
定位到 dac-2 |
|||
|
|||
|
|||
|
|||
> 查看指定容器的pid可以使用“ |
|||
> |
|||
> docker top container_id |
|||
> |
|||
> 获取所有容器的PID |
|||
> |
|||
> ```sh |
|||
> for l in `docker ps -q`;do docker top $l|awk -v dn="$l" 'NR>1 {print dn " PID is " $2}';done |
|||
> ``` |
|||
> |
|||
> 通过docker inspect方式 |
|||
> |
|||
> ```sh |
|||
> docker inspect --format "{{.State.Pid}}" container_id/name |
|||
> ``` |
|||
|
|||
查看dac-2容器信息 |
|||
|
|||
```sh |
|||
root@iota-n3:~# docker ps | grep dac-2 |
|||
05b04c4667bc repository.anxinyun.cn/iota/dac "./dac" 2 hours ago Up 2 hours k8s_iota-dac_iota-dac-2_iota_d9879026-465b-11ec-ad00-c81f66cfe365_1 |
|||
be5682a82cda theiota.store/iota/filebeat "filebeat -e" 4 hours ago Up 4 hours k8s_iota-filebeat_iota-dac-2_iota_d9879026-465b-11ec-ad00-c81f66cfe365_0 |
|||
f23499bc5c22 gcr.io/google_containers/pause-amd64:3.0 "/pause" 4 hours ago Up 4 hours k8s_POD_iota-dac-2_iota_d9879026-465b-11ec-ad00-c81f66cfe365_0 |
|||
c5bcbf648268 repository.anxinyun.cn/iota/dac "./dac" 6 days ago Up 6 days k8s_iota-dac_iota-dac-2_iota_2364cf27-41a0-11ec-ad00-c81f66cfe365_0 |
|||
``` |
|||
|
|||
> 有两个?(另外一个僵尸进程先不管) |
|||
|
|||
|
|||
|
|||
进入容器: |
|||
|
|||
```sh |
|||
docker exec -it 05b04c4667bc /bin/ash |
|||
``` |
|||
|
|||
|
|||
|
|||
> 容器里没有 curl命令? |
|||
> |
|||
> 使用 wget -q -O - https://www.baidu.com 直接输出返回结果 |
|||
|
|||
|
|||
|
|||
在宿主机: |
|||
|
|||
```sh |
|||
go tool pprof -inuse_space http://10.244.1.235:6060/debug/pprof/heap |
|||
|
|||
# top 查看当前内存占用top10 |
|||
(pprof) top |
|||
Showing nodes accounting for 913.11MB, 85.77% of 1064.60MB total |
|||
Dropped 215 nodes (cum <= 5.32MB) |
|||
Showing top 10 nodes out of 109 |
|||
flat flat% sum% cum cum% |
|||
534.20MB 50.18% 50.18% 534.20MB 50.18% runtime.malg |
|||
95.68MB 8.99% 59.17% 95.68MB 8.99% iota/vendor/github.com/yuin/gopher-lua.newLTable |
|||
61.91MB 5.82% 64.98% 90.47MB 8.50% iota/vendor/github.com/yuin/gopher-lua.newFuncContext |
|||
50.23MB 4.72% 69.70% 50.23MB 4.72% iota/vendor/github.com/yuin/gopher-lua.newRegistry |
|||
34.52MB 3.24% 72.94% 34.52MB 3.24% iota/vendor/github.com/yuin/gopher-lua.(*LTable).RawSetString |
|||
33MB 3.10% 76.04% 33MB 3.10% iota/vendor/github.com/eclipse/paho%2emqtt%2egolang.outgoing |
|||
31MB 2.91% 78.95% 31MB 2.91% iota/vendor/github.com/eclipse/paho%2emqtt%2egolang.errorWatch |
|||
31MB 2.91% 81.87% 31MB 2.91% iota/vendor/github.com/eclipse/paho%2emqtt%2egolang.keepalive |
|||
27.06MB 2.54% 84.41% 27.06MB 2.54% iota/vendor/github.com/yuin/gopher-lua.newFunctionProto (inline) |
|||
14.50MB 1.36% 85.77% 14.50MB 1.36% iota/vendor/github.com/eclipse/paho%2emqtt%2egolang.alllogic |
|||
``` |
|||
|
|||
|
|||
|
|||
> 列出消耗最大的部分 top |
|||
> |
|||
> 列出函数代码以及对应的取样数据 list |
|||
> |
|||
> 汇编代码以及对应的取样数据 disasm |
|||
> |
|||
> web命令生成svg图 |
|||
|
|||
|
|||
|
|||
在服务器上执行go tool pprof后生成profile文件,拷贝到本机windows机器,执行 |
|||
|
|||
![image-20211116103902511](imgs/UCloud-DAC上云测试/image-20211116103902511.png) |
|||
|
|||
|
|||
|
|||
> 安装 graphviz |
|||
> |
|||
> https://graphviz.gitlab.io/_pages/Download/Download_windows.html |
|||
> |
|||
> 下载zip解压配置系统环境变量 |
|||
> |
|||
> ```sh |
|||
> C:\Users\yww08>dot -version |
|||
> dot - graphviz version 2.45.20200701.0038 (20200701.0038) |
|||
> There is no layout engine support for "dot" |
|||
> Perhaps "dot -c" needs to be run (with installer's privileges) to register the plugins? |
|||
> ``` |
|||
|
|||
> ```sh |
|||
> 执行dot初始化 |
|||
> |
|||
> dot -c |
|||
> ``` |
|||
|
|||
|
|||
|
|||
本机执行pprof |
|||
|
|||
```sh |
|||
go tool pprof --http=:8080 pprof.dac.alloc_objects.alloc_space.inuse_objects.inuse_space.003.pb.gz |
|||
``` |
|||
|
|||
!["sss"](imgs/UCloud-DAC上云测试/image-20211116112452820.png) |
|||
|
|||
内存的占用主要集中在: |
|||
|
|||
runtime malg |
|||
|
|||
去搜寻了大量资料之后,发现go的官网早就有这个issue(官方issue),大佬们知道,只是不好解决,描述如下: |
|||
Your observation is correct. Currently the runtime never frees the g objects created for goroutines, though it does reuse them. The main reason for this is that the scheduler often manipulates g pointers without write barriers (a lot of scheduler code runs without a P, and hence cannot have write barriers), and this makes it very hard to determine when a g can be garbage collected. |
|||
|
|||
大致原因就是go的gc采用的是并发垃圾回收,调度器在操作协程指针的时候不使用写屏障(可以看看draveness大佬的分析),因为调度器在很多执行的时候需要使用P(GPM),因此不能使用写屏障,所以调度器很难确定一个协程是否可以当成垃圾回收,这样调度器里的协程指针信息就会泄露。 |
|||
———————————————— |
|||
版权声明:本文为CSDN博主「wuyuhao13579」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 |
|||
原文链接:https://blog.csdn.net/wuyuhao13579/article/details/109079570 |
|||
|
|||
|
|||
|
|||
找进程的日志: |
|||
|
|||
发现出问题的DAC日志重复出现 |
|||
|
|||
```sh |
|||
Loss connection |
|||
``` |
|||
|
|||
这是DAC代码中mqtt断连的时候触发的日志。查看源码: |
|||
|
|||
```go |
|||
func (d *Mqtt) Connect() (err error) { |
|||
|
|||
//TODO not safe |
|||
d.setConnStat(statInit) |
|||
//decode |
|||
|
|||
//set opts |
|||
opts := pahomqtt.NewClientOptions().AddBroker(d.config.URL) |
|||
opts.SetClientID(d.config.ClientID) |
|||
opts.SetCleanSession(d.config.CleanSessionFlag) |
|||
opts.SetKeepAlive(time.Second * time.Duration(d.config.KeepAlive)) // 30s |
|||
opts.SetPingTimeout(time.Second * time.Duration(d.config.KeepAlive*2)) |
|||
opts.SetConnectionLostHandler(func(c pahomqtt.Client, err error) { |
|||
// mqtt连接掉线时的回调函数 |
|||
log.Debug("[Mqtt] Loss connection, %s %v", err, d.config) |
|||
d.terminateFlag <- true |
|||
//d.Reconnect() |
|||
}) |
|||
} |
|||
``` |
|||
|
|||
|
|||
|
|||
## 对象存储(OSS) |
|||
|
|||
阿里云 OSS基础概念 https://help.aliyun.com/document_detail/31827.html |
|||
|
|||
|
|||
|
After Width: | Height: | Size: 66 KiB |
After Width: | Height: | Size: 49 KiB |
After Width: | Height: | Size: 14 KiB |
After Width: | Height: | Size: 35 KiB |
After Width: | Height: | Size: 109 KiB |
After Width: | Height: | Size: 96 KiB |
After Width: | Height: | Size: 55 KiB |
After Width: | Height: | Size: 57 KiB |
After Width: | Height: | Size: 146 KiB |
After Width: | Height: | Size: 14 KiB |
After Width: | Height: | Size: 326 KiB |
After Width: | Height: | Size: 51 KiB |
After Width: | Height: | Size: 18 KiB |
After Width: | Height: | Size: 39 KiB |
After Width: | Height: | Size: 9.4 KiB |
After Width: | Height: | Size: 72 KiB |
After Width: | Height: | Size: 2.0 MiB |
After Width: | Height: | Size: 75 KiB |
After Width: | Height: | Size: 27 KiB |
After Width: | Height: | Size: 16 KiB |
After Width: | Height: | Size: 49 KiB |
After Width: | Height: | Size: 11 KiB |
After Width: | Height: | Size: 212 KiB |
After Width: | Height: | Size: 49 KiB |
After Width: | Height: | Size: 74 KiB |
After Width: | Height: | Size: 36 KiB |
After Width: | Height: | Size: 35 KiB |
After Width: | Height: | Size: 30 KiB |
After Width: | Height: | Size: 32 KiB |
After Width: | Height: | Size: 27 KiB |
After Width: | Height: | Size: 23 KiB |
After Width: | Height: | Size: 115 KiB |
After Width: | Height: | Size: 62 KiB |
After Width: | Height: | Size: 92 KiB |
After Width: | Height: | Size: 28 KiB |
After Width: | Height: | Size: 1.1 MiB |
After Width: | Height: | Size: 228 KiB |
After Width: | Height: | Size: 677 KiB |
After Width: | Height: | Size: 116 KiB |
After Width: | Height: | Size: 7.9 KiB |
After Width: | Height: | Size: 50 KiB |
After Width: | Height: | Size: 18 KiB |
After Width: | Height: | Size: 60 KiB |
After Width: | Height: | Size: 5.2 KiB |
After Width: | Height: | Size: 109 KiB |
After Width: | Height: | Size: 2.0 MiB |
After Width: | Height: | Size: 75 KiB |
After Width: | Height: | Size: 27 KiB |
After Width: | Height: | Size: 16 KiB |
After Width: | Height: | Size: 49 KiB |
After Width: | Height: | Size: 686 KiB |
After Width: | Height: | Size: 686 KiB |
After Width: | Height: | Size: 10 KiB |
After Width: | Height: | Size: 81 KiB |
After Width: | Height: | Size: 36 KiB |
After Width: | Height: | Size: 11 KiB |
After Width: | Height: | Size: 51 KiB |
After Width: | Height: | Size: 72 KiB |
After Width: | Height: | Size: 62 KiB |
After Width: | Height: | Size: 55 KiB |
After Width: | Height: | Size: 86 KiB |
After Width: | Height: | Size: 30 KiB |
After Width: | Height: | Size: 30 KiB |
After Width: | Height: | Size: 46 KiB |
@ -0,0 +1,998 @@ |
|||
### 环境恢复 |
|||
|
|||
**安装新mysql** |
|||
|
|||
```shell |
|||
#命令1 |
|||
sudo apt-get update |
|||
#命令2 |
|||
sudo apt-get install mysql-server |
|||
|
|||
# 初始化安全配置*(可选) |
|||
sudo mysql_secure_installation |
|||
|
|||
# 远程访问和权限问题*(可选) |
|||
#前情提要:事先声明一下,这样做是对安全有好处的。刚初始化好的MySQL是不能进行远程登录的。要实现登录的话,强烈建议新建一个权限低一点的用户再进行远程登录。直接使用root用户远程登录有很大的风险。分分钟数据库就有可能被黑客drop掉。 |
|||
#首先,修改/etc/mysql/my.cnf文件。把bind-address = 127.0.0.1这句给注释掉。解除地址绑定(或者是绑定一个你的固定地址。但宽带上网地址都是随机分配的,固定ip不可行)。 |
|||
#然后,给一个用户授权使他能够远程登录。执行下面两句即可。 |
|||
|
|||
grant all PRIVILEGES on *.* to user1@'%'identified by '123456' WITH GRANT OPTION; |
|||
FLUSH PRIVILEGES; |
|||
service mysql restart。 |
|||
``` |
|||
|
|||
|
|||
|
|||
**重新启动Hive** |
|||
|
|||
STILL ON `37测试机` `/home/anxin/apache-hive-3.1.2-bin` |
|||
|
|||
```sh |
|||
./schematool -initSchema -dbType mysql |
|||
# 加载我的环境变量,应为本机还安装了ambari的hive |
|||
source /etc/profile |
|||
hive --service metastore |
|||
|
|||
#P.S. 我的环境变量 |
|||
export JAVA_HOME=/usr/local/java/jdk1.8.0_131 |
|||
export JAVA_HOME=/home/anxin/jdk8_322/jdk8u322-b06 |
|||
export JRE_HOME=$JAVA_HOME/jre |
|||
export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib:$JRE_HOME/lib |
|||
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH |
|||
export HIVE_HOME=/home/anxin/apache-hive-3.1.2-bin |
|||
export HIVE_CONF_DIR=$HIVE_HOME/conf |
|||
export PATH=$HIVE_HOME/bin:$PATH |
|||
export HADOOP_HOME=/usr/hdp/3.1.4.0-315/hadoop |
|||
export HADOOP_CONF_DIR=/usr/hdp/3.1.4.0-315/hadoop/conf |
|||
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` |
|||
export FLINK_HOME=/home/anxin/flink-1.13.6 |
|||
|
|||
|
|||
``` |
|||
|
|||
|
|||
|
|||
### Hive基础操作 |
|||
|
|||
参考:https://www.cnblogs.com/wangrd/p/6275162.html |
|||
|
|||
```sql |
|||
--就会在HDFS的[/user/hive/warehouse/]中生成一个tabletest.db文件夹。 |
|||
CREATE DATABASE tableset; |
|||
|
|||
-- 切换当前数据库 |
|||
USE tableset; |
|||
|
|||
-- 创建表 |
|||
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name |
|||
[(col_name data_type [COMMENT col_comment], ...)] |
|||
[COMMENT table_comment] |
|||
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] |
|||
[CLUSTERED BY (col_name, col_name, ...) |
|||
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] |
|||
[ROW FORMAT row_format] |
|||
[STORED AS file_format] |
|||
[LOCATION hdfs_path] |
|||
|
|||
CREATE TABLE t_order ( |
|||
id int, |
|||
name string |
|||
) |
|||
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' -- 指定字段分隔符 |
|||
STORED AS TEXTFILE; -- 指定数据存储格式 |
|||
|
|||
-- 查看表结构 |
|||
DESC t_order; |
|||
|
|||
-- 导入数据 |
|||
load data local inpath '/home/anxin/data/data.txt' [OVERWRITE] into table t_order; |
|||
|
|||
-- EXTERNAL表 |
|||
-- 创建外部表,不会对源文件位置作任何改变 |
|||
-- 删除外部表不会删除源文件 |
|||
CREATE EXTERNAL TABLE ex_order ( |
|||
id int, |
|||
name string |
|||
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' |
|||
STORED AS TEXTFILE |
|||
LOCATION '/external/hive'; |
|||
|
|||
--分区 |
|||
CREATE TABLE t_order(id int,name string) partitioned by (part_flag string) |
|||
row format delimited fields terminated by '\t'; |
|||
load data local inpath '/home/hadoop/ip.txt' overwrite into table t_order |
|||
partition(part_flag='part1'); -- 数据上传到part1子目录下 |
|||
|
|||
-- 查看所有表 |
|||
SHOW TABLES; |
|||
SHOW TABLES 'TMP'; |
|||
SHOW PARTITIONS TMP_TABLE;-- 查看表有哪些分区 |
|||
DESCRIBE TMP_TABLE; -- 查看表结构 |
|||
|
|||
-- 分桶表 |
|||
create table stu_buck(Sno int,Sname string,Sex string,Sage int,Sdept string) |
|||
clustered by(Sno) |
|||
sorted by(Sno DESC) |
|||
into 4 buckets |
|||
row format delimited |
|||
fields terminated by ','; |
|||
-- 通过insert into ...select...进行数据插入 |
|||
set hive.enforce.bucketing = true; |
|||
set mapreduce.job.reduces=4; |
|||
insert overwrite table stu_buck |
|||
select * from student cluster by(Sno); --等价于 distribute by(Sno) sort by(Sno asc); |
|||
|
|||
-- 删除表 |
|||
DROP TABLE tablename; |
|||
|
|||
-- 临时表 |
|||
CREATE TABLE tmp_table |
|||
AS |
|||
SELECT id,name |
|||
FROM t_order |
|||
SORT BY new_id; |
|||
|
|||
-- UDF 用户定义函数 |
|||
-- 基层UDF函数,打包jar到程序,注册函数 |
|||
CREATE TEMPORARY function tolowercase as 'cn.demo.Namespace'; |
|||
|
|||
select id,tolowercase(name) from t_order; |
|||
``` |
|||
|
|||
|
|||
|
|||
### Hadoop基础操作 |
|||
|
|||
本例中最终选择通过Hadoop Catalog实现IceBerg数据存储: |
|||
|
|||
```sh |
|||
|
|||
# -skipTrash 直接删除不放到回收站 |
|||
hdfs dfs -rm -skipTrash /path/to/file/you/want/to/remove/permanently |
|||
# 清理所有Trash中的数据 |
|||
hdfs dfs -expunge |
|||
|
|||
## **清理指定文件夹下的所有数据** |
|||
hdfs dfs -rm -r -skipTrash /user/hadoop/* |
|||
|
|||
## hadoop 启动错误: |
|||
chown -R hdfs:hdfs /hadoop/hdfs/namenode |
|||
# DataNode启动失败:可能多次format导致。修改data-node的clusterid和namenode中的一致 |
|||
/hadoop/hdfs/data/current/VERSION |
|||
/hadoop/hdfs/namenode/current/VERSION |
|||
|
|||
# 查看DataNode启动日志 |
|||
root@node38:/var/log/hadoop/hdfs# tail -n 1000 hadoop-hdfs-datanode-node38.log |
|||
``` |
|||
|
|||
|
|||
|
|||
查看恢复的Hadoop集群: |
|||
|
|||
![image-20220127110428706](imgs/数据湖2/image-20220127110428706.png) |
|||
|
|||
|
|||
|
|||
### Flink SQL流式从Kafka到Hive |
|||
|
|||
https://www.cnblogs.com/Springmoon-venn/p/13726089.html |
|||
|
|||
读取kafka的sql: |
|||
|
|||
```sql |
|||
tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) |
|||
|
|||
create table myhive.testhive.iotaKafkatable( |
|||
`userId` STRING, |
|||
`dimensionId` STRING, |
|||
`dimCapId` STRING, |
|||
`scheduleId` STRING, |
|||
`jobId` STRING, |
|||
`jobRepeatId` STRING, |
|||
`thingId` STRING , |
|||
`deviceId` STRING, |
|||
`taskId` STRING, |
|||
`triggerTime` STRING, |
|||
`finishTime` STRING, |
|||
`seq` STRING, |
|||
`result` STRING, |
|||
`data` STRING |
|||
)with |
|||
('connector' = 'kafka', |
|||
'topic'='iceberg', |
|||
'properties.bootstrap.servers' = '10.8.30.37:6667', |
|||
'properties.group.id' = 'iceberg-demo' , |
|||
'scan.startup.mode' = 'latest-offset', |
|||
'format' = 'json', |
|||
'json.ignore-parse-errors'='true') |
|||
``` |
|||
|
|||
创建hive表: |
|||
|
|||
```sql |
|||
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) |
|||
|
|||
CREATE TABLE myhive.testhive.iotatable2( |
|||
`userId` STRING, |
|||
`dimensionId` STRING, |
|||
`dimCapId` STRING, |
|||
`scheduleId` STRING, |
|||
`jobId` STRING, |
|||
`jobRepeatId` STRING, |
|||
`thingId` STRING , |
|||
`deviceId` STRING, |
|||
`taskId` STRING, |
|||
`triggerTime` TIMESTAMP, |
|||
`seq` STRING, |
|||
`result` STRING, |
|||
`data` STRING |
|||
) |
|||
PARTITIONED BY ( finishTime STRING) -- 分区间字段,该字段不存放实际的数据内容 |
|||
STORED AS PARQUET |
|||
TBLPROPERTIES ( |
|||
'sink.partition-commit.policy.kind' = 'metastore,success-file', |
|||
'partition.time-extractor.timestamp-pattern' = '$finishTime' |
|||
) |
|||
``` |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
### IceBerg |
|||
|
|||
概念再解析: |
|||
|
|||
> 好文推荐: |
|||
> |
|||
> + [数据湖存储架构选型](https://blog.csdn.net/u011598442/article/details/110152352) |
|||
> + |
|||
|
|||
参考:[Flink+IceBerg+对象存储,构建数据湖方案](https://baijiahao.baidu.com/s?id=1705407920794793309&wfr=spider&for=pc) |
|||
|
|||
![img](imgs/数据湖2/b58f8c5494eef01f5824f06566c8492dbc317d19.jpeg)![img](imgs/数据湖2/f3d3572c11dfa9ec7f198010e3e6270b918fc146.jpeg) |
|||
|
|||
|
|||
|
|||
IceBerg表数据组织架构: |
|||
|
|||
命名空间-》表-》快照》表数据(Parquet/ORC/Avro等格式) |
|||
|
|||
- **快照 Metadata**:表格 Schema、Partition、Partition spec、Manifest List 路径、当前快照等。 |
|||
- **Manifest List:**Manifest File 路径及其 Partition,数据文件统计信息。 |
|||
- **Manifest File:**Data File 路径及其每列数据上下边界。 |
|||
- **Data File:**实际表内容数据,以 Parque,ORC,Avro 等格式组织。 |
|||
|
|||
![img](imgs/数据湖2/a6efce1b9d16fdfa26174a12c9b95c5c95ee7b96.jpeg) |
|||
|
|||
由DataWorker读取元数据进行解析,让后把一条记录提交给IceBerg存储,IceBerg将记录写入预定义的分区,形成一些新文件。 |
|||
|
|||
Flink在执行Checkpoint的时候完成这一批文件的写入,然后生成这批文件的清单,提交给Commit Worker. |
|||
|
|||
CommitWorker读出当前快照信息,然后与本次生成的文件列表进行合并,生成新的ManifestList文件以及后续元数据的表文件的信息。之后进行提交,成功后形成新快照。 |
|||
|
|||
![img](imgs/数据湖2/77094b36acaf2edd63d01449f226d1e139019328.jpeg) |
|||
|
|||
![img](imgs/数据湖2/377adab44aed2e73ddb8d5980337718386d6faf4.jpeg) |
|||
|
|||
catalog是Iceberg对表进行管理(create、drop、rename等)的一个组件。目前Iceberg主要支持HiveCatalog和HadoopCatalog两种。 |
|||
|
|||
HiveCatalog通过metastore数据库(一般MySQL)提供ACID,HadoopCatalog基于乐观锁机制和HDFS rename的原子性保障写入提交的ACID。 |
|||
|
|||
|
|||
|
|||
Flink兼容性 |
|||
|
|||
![image-20220119142219318](imgs/数据湖2/image-20220119142219318.png) |
|||
|
|||
|
|||
|
|||
### 写入IceBerg |
|||
|
|||
+ IceBerg官网 https://iceberg.apache.org/#flink/ |
|||
|
|||
+ 官网翻译 https://www.cnblogs.com/swordfall/p/14548574.html |
|||
|
|||
+ 基于HiveCatalog的问题(未写入Hive) https://issueexplorer.com/issue/apache/iceberg/3092 |
|||
|
|||
+ [Flink + Iceberg: How to Construct a Whole-scenario Real-time Data Warehouse](https://www.alibabacloud.com/blog/flink-%2B-iceberg-how-to-construct-a-whole-scenario-real-time-data-warehouse_597824) |
|||
|
|||
+ 被他玩明白了 https://miaowenting.site/2021/01/20/Apache-Iceberg/ |
|||
|
|||
|
|||
|
|||
#### 1.使用HadoopCatalog |
|||
|
|||
https://cloud.tencent.com/developer/article/1807008 |
|||
|
|||
关键代码: |
|||
|
|||
svn: http://svn.anxinyun.cn/Iota/branches/fs-iot/code/flink-iceberg/flink-iceberg/src/main/scala/com/fs/IceBergDealHadoopApplication.scala |
|||
|
|||
```scala |
|||
... |
|||
``` |
|||
|
|||
|
|||
|
|||
#### 2. 使用HiveCatalog |
|||
|
|||
> 进展:??? Hive中可以查询到数据。在FlinkSQL中查询不到数据 |
|||
|
|||
关键代码说明: |
|||
|
|||
```scala |
|||
env.enableCheckpointing(5000) |
|||
// 创建IceBerg Catalog和Database |
|||
val createIcebergCatalogSql = |
|||
"""CREATE CATALOG iceberg WITH( |
|||
| 'type'='iceberg', |
|||
| 'catalog-type'='hive', |
|||
| 'hive-conf-dir'='E:\Iota\branches\fs-iot\code\flink-iceberg\flink-iceberg' |
|||
|) |
|||
""".stripMargin |
|||
|
|||
// 创建原始数据表 iota_raw |
|||
val createIotaRawSql = |
|||
"""CREATE TABLE iceberg.iceberg_dba.iota_raw ( |
|||
|`userId` STRING, |
|||
|`dimensionId` STRING, |
|||
|`dimCapId` STRING, |
|||
|`scheduleId` STRING, |
|||
|`jobId` STRING, |
|||
|`jobRepeatId` STRING, |
|||
|`thingId` STRING , |
|||
|`deviceId` STRING, |
|||
|`taskId` STRING, |
|||
|`triggerTime` TIMESTAMP, |
|||
|`day` STRING, |
|||
|`seq` STRING, |
|||
|`result` STRING, |
|||
| `data` STRING |
|||
|) PARTITIONED BY (`thingId`,`day`) |
|||
|WITH ( |
|||
| 'engine.hive.enabled' = 'true', |
|||
| 'table.exec.sink.not-null-enforcer'='ERROR' |
|||
|) |
|||
""".stripMargin |
|||
|
|||
val kafka_iota_sql = |
|||
"""create table myhive.testhive.iotaKafkatable( |
|||
|`userId` STRING, |
|||
|`dimensionId` STRING, |
|||
|`dimCapId` STRING, |
|||
|`scheduleId` STRING, |
|||
|`jobId` STRING, |
|||
|`jobRepeatId` STRING, |
|||
|`thingId` STRING , |
|||
|`deviceId` STRING, |
|||
|`taskId` STRING, |
|||
|`triggerTime` STRING, |
|||
|`finishTime` STRING, |
|||
|`seq` STRING, |
|||
|`result` STRING, |
|||
| `data` STRING |
|||
|)with |
|||
|('connector' = 'kafka', |
|||
|'topic'='iceberg', |
|||
|'properties.bootstrap.servers' = '10.8.30.37:6667', |
|||
|'properties.group.id' = 'iceberg-demo' , |
|||
|'scan.startup.mode' = 'latest-offset', |
|||
|'format' = 'json', |
|||
|'json.ignore-parse-errors'='true' |
|||
|) |
|||
""".stripMargin |
|||
|
|||
// 注册自定义函数 Transform |
|||
tenv.createTemporarySystemFunction("dcFunction", classOf[DateCgFunction]) |
|||
tenv.createTemporarySystemFunction("tcFunction", classOf[TimeStampFunction]) |
|||
val insertSql = |
|||
""" |
|||
|insert into iceberg.iceberg_dba.iota_raw |
|||
| select userId, dimensionId,dimCapId,scheduleId,jobId,jobRepeatId,thingId,deviceId,taskId, |
|||
|tcFunction(triggerTime), |
|||
|DATE_FORMAT(dcFunction(triggerTime),'yyyy-MM-dd'), |
|||
|seq,`result`,data |
|||
|from myhive.testhive.iotakafkatable |
|||
""".stripMargin |
|||
``` |
|||
|
|||
> 1. 使用HiveCatalog方式,必须指定 'engine.hive.enabled' = 'true' |
|||
> |
|||
> 2. 'table.exec.sink.not-null-enforcer'='ERROR' 在非空字段插入空值时的处理办法 |
|||
> |
|||
> 3. 自定义函数实现 |
|||
> |
|||
> ```scala |
|||
> class TimeStampFunction extends ScalarFunction { |
|||
> def eval(@DataTypeHint(inputGroup = InputGroup.UNKNOWN) o: String): Timestamp = { |
|||
> val v = DateParser.parse(o) |
|||
> if (v.isEmpty) { |
|||
> null |
|||
> } else { |
|||
> new Timestamp(v.get.getMillis) |
|||
> } |
|||
> } |
|||
> } |
|||
> ``` |
|||
> |
|||
> 4. PARTITIONED BY (`thingId`,`day`) 根据thingid和日期分区,文件路径如: http://10.8.30.37:50070/explorer.html#/user/hive/warehouse/iceberg_dba.db/iota_raw/data/thingId=b6cfc716-3766-4949-88bc-71cb0dbf31ee/day=2022-01-20 |
|||
> |
|||
> 5. 详细代码见 http://svn.anxinyun.cn/Iota/branches/fs-iot/code/flink-iceberg/flink-iceberg/src/main/scala/com/fs/DataDealApplication.scala |
|||
|
|||
|
|||
|
|||
查看创建表结构的语句 |
|||
|
|||
```sql |
|||
show create table iota_raw; |
|||
|
|||
CREATE EXTERNAL TABLE `iota_raw`( |
|||
`userid` string COMMENT 'from deserializer', |
|||
`dimensionid` string COMMENT 'from deserializer', |
|||
`dimcapid` string COMMENT 'from deserializer', |
|||
`scheduleid` string COMMENT 'from deserializer', |
|||
`jobid` string COMMENT 'from deserializer', |
|||
`jobrepeatid` string COMMENT 'from deserializer', |
|||
`thingid` string COMMENT 'from deserializer', |
|||
`deviceid` string COMMENT 'from deserializer', |
|||
`taskid` string COMMENT 'from deserializer', |
|||
`triggertime` timestamp COMMENT 'from deserializer', |
|||
`day` string COMMENT 'from deserializer', |
|||
`seq` string COMMENT 'from deserializer', |
|||
`result` string COMMENT 'from deserializer', |
|||
`data` string COMMENT 'from deserializer') |
|||
ROW FORMAT SERDE |
|||
'org.apache.iceberg.mr.hive.HiveIcebergSerDe' |
|||
STORED BY |
|||
'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' |
|||
|
|||
LOCATION |
|||
'hdfs://node37:8020/user/hive/warehouse/iceberg_dba.db/iota_raw' |
|||
TBLPROPERTIES ( |
|||
'engine.hive.enabled'='true', |
|||
'metadata_location'='hdfs://node37:8020/user/hive/warehouse/iceberg_dba.db/iota_raw/metadata/00010-547022ad-c615-4e2e-854e-8f85592db7b6.metadata.json', |
|||
'previous_metadata_location'='hdfs://node37:8020/user/hive/warehouse/iceberg_dba.db/iota_raw/metadata/00009-abfb6af1-13dd-439a-88f5-9cb822d6c0e4.metadata.json', |
|||
'table_type'='ICEBERG', |
|||
'transient_lastDdlTime'='1642579682') |
|||
``` |
|||
|
|||
在Hive中查看数据 |
|||
|
|||
```sql |
|||
hive> add jar /tmp/iceberg-hive-runtime-0.12.1.jar; |
|||
hive> select * from iota_raw; |
|||
|
|||
``` |
|||
|
|||
#### 报错记录 |
|||
|
|||
1. HiveTableOperations$WaitingForLockException |
|||
|
|||
```sql |
|||
-- HiveMetaStore中的HIVE_LOCKS表 将报错的表所对应的锁记录删除 |
|||
select hl_lock_ext_id,hl_table,hl_lock_state,hl_lock_type,hl_last_heartbeat,hl_blockedby_ext_id from HIVE_LOCKS; |
|||
|
|||
delete from HIVE_LOCKS; |
|||
``` |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
### 查询IceBerg |
|||
|
|||
#### 启动Flink SQL Client |
|||
|
|||
flink 配置master `localhost:8081`,配置workers `localhost`. |
|||
|
|||
配置flink.conf (可选) |
|||
|
|||
```ini |
|||
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. |
|||
|
|||
taskmanager.numberOfTaskSlots: 4 |
|||
|
|||
# The parallelism used for programs that did not specify and other parallelism. |
|||
|
|||
parallelism.default: 1 |
|||
|
|||
``` |
|||
|
|||
配置sql-client-defaults.yaml (可选) |
|||
|
|||
```yaml |
|||
execution: |
|||
# select the implementation responsible for planning table programs |
|||
# possible values are 'blink' (used by default) or 'old' |
|||
planner: blink |
|||
# 'batch' or 'streaming' execution |
|||
type: streaming |
|||
# allow 'event-time' or only 'processing-time' in sources |
|||
time-characteristic: event-time |
|||
# interval in ms for emitting periodic watermarks |
|||
periodic-watermarks-interval: 200 |
|||
# 'changelog', 'table' or 'tableau' presentation of results |
|||
result-mode: table |
|||
# maximum number of maintained rows in 'table' presentation of results |
|||
max-table-result-rows: 1000000 |
|||
# parallelism of the program |
|||
# parallelism: 1 |
|||
# maximum parallelism |
|||
max-parallelism: 128 |
|||
# minimum idle state retention in ms |
|||
min-idle-state-retention: 0 |
|||
# maximum idle state retention in ms |
|||
max-idle-state-retention: 0 |
|||
# current catalog ('default_catalog' by default) |
|||
current-catalog: default_catalog |
|||
# current database of the current catalog (default database of the catalog by default) |
|||
current-database: default_database |
|||
# controls how table programs are restarted in case of a failures |
|||
# restart-strategy: |
|||
# strategy type |
|||
# possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default) |
|||
# type: fallback |
|||
``` |
|||
|
|||
启动flink集群: |
|||
|
|||
```sh |
|||
./bin/start-cluster.sh |
|||
``` |
|||
|
|||
访问Flink UI http://node37:8081 |
|||
|
|||
|
|||
|
|||
启动sql-client |
|||
|
|||
```sh |
|||
export HADOOP_CLASSPATH=`hadoop classpath` |
|||
|
|||
./bin/sql-client.sh embedded \ |
|||
-j /home/anxin/iceberg/iceberg-flink-runtime-0.12.0.jar \ |
|||
-j /home/anxin/iceberg/flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar \ |
|||
-j /home/anxin/flink-1.11.4/lib/flink-sql-connector-kafka-0.11_2.11-1.11.4.jar \ |
|||
shell |
|||
``` |
|||
|
|||
#### 查询语句基础 |
|||
|
|||
```sql |
|||
CREATE CATALOG iceberg WITH( |
|||
'type'='iceberg', |
|||
'catalog-type'='hadoop', |
|||
'warehouse'='hdfs://node37:8020/user/hadoop', |
|||
'property-version'='1' |
|||
); |
|||
use catalog iceberg; |
|||
use iceberg_db; -- 选择数据库 |
|||
|
|||
|
|||
--可选区域 |
|||
SET; -- 查看当前配置 |
|||
SET sql-client.execution.result-mode = table; -- changelog/tableau |
|||
SET sql-client.verbose=true; -- 打印异常堆栈 |
|||
SET sql-client.execution.max-table-result.rows=1000000; -- 在表格模式下缓存的行数 |
|||
SET table.planner = blink; -- planner: either blink (default) or old |
|||
SET execution.runtime-mode = streaming; -- execution mode either batch or streaming |
|||
SET sql-client.execution.result-mode = table; -- available values: table, changelog and tableau |
|||
SET parallelism.default = 1; -- optional: Flinks parallelism (1 by default) |
|||
SET pipeline.auto-watermark-interval = 200; --optional: interval for periodic watermarks |
|||
SET pipeline.max-parallelism = 10; -- optional: Flink's maximum parallelism |
|||
SET table.exec.state.ttl = 1000; -- optional: table program's idle state time |
|||
SET restart-strategy = fixed-delay; |
|||
|
|||
SET table.optimizer.join-reorder-enabled = true; |
|||
SET table.exec.spill-compression.enabled = true; |
|||
SET table.exec.spill-compression.block-size = 128kb; |
|||
|
|||
SET execution.savepoint.path = tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab; -- restore from the specific savepoint path |
|||
-- 执行一组SQL指令 |
|||
BEGIN STATEMENT SET; |
|||
-- one or more INSERT INTO statements |
|||
{ INSERT INTO|OVERWRITE <select_statement>; }+ |
|||
END; |
|||
``` |
|||
|
|||
|
|||
|
|||
![image-20220120164032739](imgs/数据湖2/image-20220120164032739.png) |
|||
|
|||
|
|||
|
|||
#### 批量读取 |
|||
|
|||
> 在FlinkSQL执行SET后再执行查询,总是报错: |
|||
> |
|||
> ; |
|||
> |
|||
> 所以需要在执行SQL Client之前设置一些参数 |
|||
|
|||
修改 `conf/sql-client-defaults.yaml` |
|||
|
|||
execution.type=batch |
|||
|
|||
```yaml |
|||
catalogs: |
|||
# A typical catalog definition looks like: |
|||
- name: myhive |
|||
type: hive |
|||
hive-conf-dir: /home/anxin/apache-hive-3.1.2-bin/conf |
|||
# default-database: ... |
|||
- name: hadoop_catalog |
|||
type: iceberg |
|||
warehouse: hdfs://node37:8020/user/hadoop |
|||
catalog-type: hadoop |
|||
|
|||
#============================================================================== |
|||
# Modules |
|||
#============================================================================== |
|||
|
|||
# Define modules here. |
|||
|
|||
#modules: # note the following modules will be of the order they are specified |
|||
# - name: core |
|||
# type: core |
|||
|
|||
#============================================================================== |
|||
# Execution properties |
|||
#============================================================================== |
|||
|
|||
# Properties that change the fundamental execution behavior of a table program. |
|||
|
|||
execution: |
|||
# select the implementation responsible for planning table programs |
|||
# possible values are 'blink' (used by default) or 'old' |
|||
planner: blink |
|||
# 'batch' or 'streaming' execution |
|||
type: batch |
|||
# allow 'event-time' or only 'processing-time' in sources |
|||
time-characteristic: event-time |
|||
# interval in ms for emitting periodic watermarks |
|||
periodic-watermarks-interval: 200 |
|||
# 'changelog', 'table' or 'tableau' presentation of results |
|||
result-mode: table |
|||
# maximum number of maintained rows in 'table' presentation of results |
|||
max-table-result-rows: 1000000 |
|||
# parallelism of the program |
|||
# parallelism: 1 |
|||
# maximum parallelism |
|||
max-parallelism: 128 |
|||
# minimum idle state retention in ms |
|||
min-idle-state-retention: 0 |
|||
# maximum idle state retention in ms |
|||
max-idle-state-retention: 0 |
|||
# current catalog ('default_catalog' by default) |
|||
current-catalog: default_catalog |
|||
# current database of the current catalog (default database of the catalog by default) |
|||
current-database: default_database |
|||
# controls how table programs are restarted in case of a failures |
|||
# restart-strategy: |
|||
# strategy type |
|||
# possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default) |
|||
# type: fallback |
|||
|
|||
#============================================================================== |
|||
# Configuration options |
|||
#============================================================================== |
|||
|
|||
# Configuration options for adjusting and tuning table programs. |
|||
|
|||
# A full list of options and their default values can be found |
|||
# on the dedicated "Configuration" web page. |
|||
|
|||
# A configuration can look like: |
|||
configuration: |
|||
table.exec.spill-compression.enabled: true |
|||
table.exec.spill-compression.block-size: 128kb |
|||
table.optimizer.join-reorder-enabled: true |
|||
# execution.checkpointing.interval: 10s |
|||
table.dynamic-table-options.enabled: true |
|||
``` |
|||
|
|||
|
|||
|
|||
#### 流式读取 |
|||
|
|||
修改 `conf/sql-client-defaults.yaml` |
|||
|
|||
execution.type=streaming |
|||
|
|||
execution.checkpointing.interval: 10s |
|||
|
|||
table.dynamic-table-options.enabled: true // 开启[动态表(Dynamic Table)选项](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/hints/) |
|||
|
|||
|
|||
|
|||
```sql |
|||
-- Submit the flink job in streaming mode for current session. |
|||
SET execution.type = streaming ; |
|||
|
|||
-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options. |
|||
SET table.dynamic-table-options.enabled=true; |
|||
|
|||
-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot. |
|||
SELECT * FROM iota_raw /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s')*/ ; |
|||
|
|||
-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded). |
|||
SELECT * FROM iota_raw /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s', 'start-snapshot-id'='3821550127947089987')*/ ; |
|||
``` |
|||
|
|||
|
|||
|
|||
#### 通过外部Hive表查询 |
|||
|
|||
```sql |
|||
-- HIVE SHELL |
|||
add jar /tmp/iceberg-hive-runtime-0.12.1.jar; |
|||
|
|||
use iceberg_dba; |
|||
|
|||
SET engine.hive.enabled=true; |
|||
SET iceberg.engine.hive.enabled=true; |
|||
SET iceberg.mr.catalog=hive; |
|||
|
|||
CREATE EXTERNAL TABLE iceberg_dba.iota_rawe( |
|||
`userId` STRING, |
|||
`dimensionId` STRING, |
|||
`dimCapId` STRING, |
|||
`scheduleId` STRING, |
|||
`jobId` STRING, |
|||
`jobRepeatId` STRING, |
|||
`thingId` STRING , |
|||
`deviceId` STRING, |
|||
`taskId` STRING, |
|||
`triggerTime` TIMESTAMP, |
|||
`day` STRING, |
|||
`seq` STRING, |
|||
`result` STRING, |
|||
`data` STRING |
|||
) |
|||
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' |
|||
LOCATION '/user/hadoop/iceberg_db/iota_raw' |
|||
TBLPROPERTIES ( |
|||
'iceberg.mr.catalog'='hadoop', |
|||
'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://node37:8020/user/hadoop/iceberg_db/iota_raw' |
|||
); |
|||
``` |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
#### 处理小文件的三种方式 |
|||
|
|||
https://zhuanlan.zhihu.com/p/349420627 |
|||
|
|||
1. Iceberg表中设置 write.distribution-mode=hash |
|||
|
|||
```sql |
|||
CREATE TABLE sample ( |
|||
id BIGINT, |
|||
data STRING |
|||
) PARTITIONED BY (data) WITH ( |
|||
'write.distribution-mode'='hash' |
|||
); |
|||
``` |
|||
|
|||
|
|||
|
|||
2. 定期对 Apache Iceberg 表执行 Major Compaction 来合并 Apache iceberg 表中的小文件。这个作业目前是一个 Flink 的批作业,提供 Java API 的方式来提交作业,使用姿势可以参考文档[8]。 |
|||
|
|||
3. 在每个 Flink Sink 流作业之后,外挂算子用来实现小文件的自动合并。这个功能目前暂未 merge 到社区版本,由于涉及到 format v2 的 compaction 的一些讨论,我们会在 0.12.0 版本中发布该功能。 |
|||
|
|||
> Iceberg provides API to rewrite small files into large files by submitting flink batch job. The behavior of this flink action is the same as the spark's rewriteDataFiles. |
|||
|
|||
```java |
|||
import org.apache.iceberg.flink.actions.Actions; |
|||
|
|||
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path"); |
|||
Table table = tableLoader.loadTable(); |
|||
RewriteDataFilesActionResult result = Actions.forTable(table) |
|||
.rewriteDataFiles() |
|||
.execute(); |
|||
``` |
|||
|
|||
For more doc about options of the rewrite files action, please see [RewriteDataFilesAction](https://iceberg.apache.org/#javadoc/0.12.1/org/apache/iceberg/flink/actions/RewriteDataFilesAction.html) |
|||
|
|||
|
|||
|
|||
#### 插播“ [Flink操作HUDI](https://hudi.apache.org/docs/0.8.0/flink-quick-start-guide/) |
|||
|
|||
流式读取 |
|||
|
|||
```sql |
|||
CREATE TABLE t1( |
|||
uuid VARCHAR(20), |
|||
name VARCHAR(10), |
|||
age INT, |
|||
ts TIMESTAMP(3), |
|||
`partition` VARCHAR(20) |
|||
) |
|||
PARTITIONED BY (`partition`) |
|||
WITH ( |
|||
'connector' = 'hudi', |
|||
'path' = 'oss://vvr-daily/hudi/t1', |
|||
'table.type' = 'MERGE_ON_READ', |
|||
'read.streaming.enabled' = 'true', -- this option enable the streaming read |
|||
'read.streaming.start-commit' = '20210316134557' -- specifies the start commit instant time |
|||
'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s. |
|||
); |
|||
|
|||
-- Then query the table in stream mode |
|||
select * from t1; |
|||
``` |
|||
|
|||
|
|||
|
|||
#### 报错记录: |
|||
|
|||
1. java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configurable |
|||
|
|||
|
|||
|
|||
2. 执行 Flink SQL 报错 `[ERROR] Could not execute SQL statement. Reason: java.net.ConnectException: Connection refused` |
|||
|
|||
启动flink集群: |
|||
|
|||
```sh |
|||
./bin/start-cluster.sh |
|||
``` |
|||
|
|||
|
|||
|
|||
3. 执行batch查询时: |
|||
|
|||
```scala |
|||
val bsSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() |
|||
val tenv = TableEnvironment.create(bsSetting) |
|||
``` |
|||
|
|||
|
|||
|
|||
```sh |
|||
Error:(22, 43) Static methods in interface require -target:jvm-1.8 |
|||
val tenv = TableEnvironment.create(bsSetting) |
|||
``` |
|||
|
|||
|
|||
|
|||
未解决:继续使用StreamTableEnvironment |
|||
|
|||
4. MiniCluster is not yet running or has already been shut down |
|||
|
|||
本地同时调试写入和查询两个Flink程序。不能同时调试两个程序 |
|||
|
|||
?? |
|||
|
|||
5. flink SQL 程序执行报错 Job client must be a CoordinationRequestGateway. This is a bug |
|||
|
|||
通过命令行提交执行: |
|||
|
|||
```shell |
|||
./bin/flink run -c com.fs.OfficialRewriteData -p 1 ./flink-iceberg-1.0-SNAPSHOT-shaded.jar --host localhost --port 8081 |
|||
``` |
|||
|
|||
6. 任务提交时,Unable to instantiate java compiler |
|||
|
|||
```sh |
|||
Unable to instantiate java compiler: calcite依赖冲突 |
|||
``` |
|||
|
|||
参考 : https://blog.csdn.net/weixin_44056920/article/details/118110262 |
|||
|
|||
7. Flink报错OOM |
|||
|
|||
放大Flink内存 |
|||
|
|||
```yaml |
|||
jobmanager.memory.process.size: 2600m |
|||
taskmanager.memory.jvm-metaspace.size: 1000m |
|||
jobmanager.memory.jvm-metaspace.size: 1000m |
|||
``` |
|||
|
|||
|
|||
|
|||
8. 网路上的问题汇总帖 |
|||
|
|||
> IceBerg+Kafka+FlinkSQL https://blog.csdn.net/qq_33476283/article/details/119138610 |
|||
|
|||
|
|||
|
|||
### 大数据湖最佳实践 |
|||
|
|||
实施数据湖的路线图 |
|||
|
|||
+ 建设基础设施(Hadoop集群) |
|||
+ 组织好数据湖的各个区域(给不同的用户群创建各种区域,并导入数据) |
|||
+ 设置好数据湖的自助服务(创建数据资产目录、访问控制机制、准备分析师使用的工具) |
|||
+ 将数据湖开放给用户 |
|||
|
|||
规划数据湖: |
|||
|
|||
+ 原始区:保存采集的数据 |
|||
+ 产品区:清洗处理后的数据 |
|||
+ 工作区:数据科学家在此分析数据,一般按用户、项目、主题划分。投产后迁移至产品区 |
|||
+ 敏感区. |
|||
|
|||
|
|||
|
|||
传统数据库是基于Schema On Write,数据湖(Hadoop等)是Schema On Read. |
|||
|
|||
|
|||
|
|||
Michael Hausenblas: |
|||
|
|||
> 数据湖一般与静态数据相关联。其基本思想是引入数据探索的自助服务方法,使相关的业务数据集可以在组织中共享 |
|||
|
|||
+ 数据存储 HDFS HBase Cassandra Kafka |
|||
+ 处理引擎 Spark Flink Beam |
|||
+ 交互 Zeppelin/Spark noteboook,Tableau/Datameer |
|||
|
|||
|
|||
|
|||
### 附录 |
|||
|
|||
1. hive-site.xml |
|||
|
|||
```xml |
|||
<?xml version="1.0" encoding="UTF-8" standalone="no"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?> |
|||
<configuration> |
|||
<property> |
|||
<name>javax.jdo.option.ConnectionUserName</name> |
|||
<value>root</value> |
|||
</property> |
|||
<property> |
|||
<name>javax.jdo.option.ConnectionPassword</name> |
|||
<value>123456</value> |
|||
</property> |
|||
<property> |
|||
<name>javax.jdo.option.ConnectionURL</name> |
|||
<value>jdbc:mysql://10.8.30.37:3306/metastore_db?createDatabaseIfNotExist=true</value> |
|||
</property> |
|||
<property> |
|||
<name>javax.jdo.option.ConnectionDriverName</name> |
|||
<value>com.mysql.jdbc.Driver</value> |
|||
</property> |
|||
<property> |
|||
<name>hive.metastore.schema.verification</name> |
|||
<value>false</value> |
|||
</property> |
|||
<property> |
|||
<name>hive.cli.print.current.db</name> |
|||
<value>true</value> |
|||
</property> |
|||
<property> |
|||
<name>hive.cli.print.header</name> |
|||
<value>true</value> |
|||
</property> |
|||
|
|||
<property> |
|||
<name>hive.metastore.warehouse.dir</name> |
|||
<value>/user/hive/warehouse</value> |
|||
</property> |
|||
|
|||
<property> |
|||
<name>hive.metastore.local</name> |
|||
<value>false</value> |
|||
</property> |
|||
|
|||
<property> |
|||
<name>hive.metastore.uris</name> |
|||
<value>thrift://10.8.30.37:9083</value> |
|||
</property> |
|||
|
|||
<!-- hiveserver2 --> |
|||
<property> |
|||
<name>hive.server2.thrift.port</name> |
|||
<value>10000</value> |
|||
</property> |
|||
<property> |
|||
<name>hive.server2.thrift.bind.host</name> |
|||
<value>10.8.30.37</value> |
|||
</property> |
|||
</configuration> |
|||
|
|||
``` |
@ -0,0 +1,110 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" |
|||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|||
<parent> |
|||
<artifactId>etl</artifactId> |
|||
<groupId>com.freesun</groupId> |
|||
<version>1.0-SNAPSHOT</version> |
|||
</parent> |
|||
<modelVersion>4.0.0</modelVersion> |
|||
|
|||
<artifactId>flink2hudi</artifactId> |
|||
|
|||
<properties> |
|||
<hudi.version>0.8.0</hudi.version> |
|||
<avro.version>1.8.2</avro.version> |
|||
</properties> |
|||
|
|||
<dependencies> |
|||
<!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-flink-client --> |
|||
<dependency> |
|||
<groupId>org.apache.hudi</groupId> |
|||
<artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId> |
|||
<version>${hudi.version}</version> |
|||
</dependency> |
|||
|
|||
<!--FLINK TABLE--> |
|||
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --> |
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-table-common</artifactId> |
|||
<version>${flink.version}</version> |
|||
<scope>${flink.scope}</scope> |
|||
</dependency> |
|||
|
|||
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --> |
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-table-planner_${scala.binary.version}</artifactId> |
|||
<version>${flink.version}</version> |
|||
<scope>${flink.scope}</scope> |
|||
</dependency> |
|||
|
|||
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --> |
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> |
|||
<version>${flink.version}</version> |
|||
<scope>${flink.scope}</scope> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> |
|||
<version>${flink.version}</version> |
|||
<scope>${flink.scope}</scope> |
|||
</dependency> |
|||
|
|||
<!--HADOOP--> |
|||
<dependency> |
|||
<groupId>org.apache.hadoop</groupId> |
|||
<artifactId>hadoop-client</artifactId> |
|||
<version>${hadoop.version}</version> |
|||
<scope>${hadoop.scope}</scope> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.hadoop</groupId> |
|||
<artifactId>hadoop-common</artifactId> |
|||
<version>${hadoop.version}</version> |
|||
<scope>${hadoop.scope}</scope> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-orc_${scala.binary.version}</artifactId> |
|||
<version>${flink.version}</version> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-csv</artifactId> |
|||
<version>${flink.version}</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId> |
|||
<version>${flink.version}</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.avro</groupId> |
|||
<artifactId>avro</artifactId> |
|||
<version>${avro.version}</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.hive</groupId> |
|||
<artifactId>hive-exec</artifactId> |
|||
<version>3.1.2</version> |
|||
<exclusions> |
|||
<exclusion> |
|||
<groupId>org.apache.avro</groupId> |
|||
<artifactId>avro</artifactId> |
|||
</exclusion> |
|||
</exclusions> |
|||
</dependency> |
|||
|
|||
</dependencies> |
|||
</project> |
@ -0,0 +1,234 @@ |
|||
package com.freesun.flink2hudi |
|||
|
|||
import java.util.Properties |
|||
import java.util.concurrent.TimeUnit |
|||
|
|||
import comm.models.IotaData |
|||
import comm.utils.{JsonHelper, Loader} |
|||
import de.javakaffee.kryoserializers.jodatime.{JodaDateTimeSerializer, JodaLocalDateSerializer, JodaLocalDateTimeSerializer} |
|||
import org.apache.flink.api.common.restartstrategy.RestartStrategies |
|||
import org.apache.flink.api.common.serialization.SimpleStringSchema |
|||
import org.apache.flink.api.common.time.Time |
|||
import org.apache.flink.api.common.typeinfo.TypeInformation |
|||
import org.apache.flink.api.java.utils.ParameterTool |
|||
import org.apache.flink.api.scala.typeutils.Types |
|||
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} |
|||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer |
|||
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment |
|||
import org.apache.flink.table.descriptors.{Json, Kafka, Schema} |
|||
import org.joda.time.{DateTime, LocalDate, LocalDateTime} |
|||
import org.slf4j.LoggerFactory |
|||
import org.apache.flink.streaming.api.scala._ |
|||
|
|||
|
|||
import scala.collection.JavaConversions |
|||
|
|||
|
|||
object StreamJob { |
|||
|
|||
private val logger = LoggerFactory.getLogger(getClass) |
|||
|
|||
def main(args: Array[String]): Unit = { |
|||
val props = Loader.from("/config.properties", args: _*) |
|||
logger.info(props.toString) |
|||
import scala.collection.JavaConversions._ |
|||
val params = ParameterTool.fromMap(props.map(p => (p._1, p._2))) |
|||
|
|||
// set up the streaming execution environment |
|||
val env = StreamExecutionEnvironment.getExecutionEnvironment |
|||
|
|||
// make parameters available in the web interface |
|||
env.getConfig.setGlobalJobParameters(params) |
|||
|
|||
// set jota-time kyro serializers |
|||
env.registerTypeWithKryoSerializer(classOf[DateTime], classOf[JodaDateTimeSerializer]) |
|||
env.registerTypeWithKryoSerializer(classOf[LocalDate], classOf[JodaLocalDateSerializer]) |
|||
env.registerTypeWithKryoSerializer(classOf[LocalDateTime], classOf[JodaLocalDateTimeSerializer]) |
|||
|
|||
// set restart strategy |
|||
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Int.MaxValue, Time.of(30, TimeUnit.SECONDS))) |
|||
|
|||
val warehouse = "hdfs://node39:9000/user/root/warehouse" |
|||
|
|||
|
|||
// create hadoop catalog |
|||
val tenv = StreamTableEnvironment.create(env) |
|||
|
|||
|
|||
val kafkaProperties = buildKafkaProps(props) |
|||
val dataTopic = props.getProperty("kafka.topics.data") |
|||
val kafkaSource = new FlinkKafkaConsumer[String](dataTopic, new SimpleStringSchema(), kafkaProperties) |
|||
|
|||
val ds: DataStream[IotaData] = |
|||
env.addSource(kafkaSource) |
|||
.map(a => parseIotaData(a)) |
|||
.filter(_.nonEmpty) |
|||
.map(_.get) |
|||
|
|||
tenv.registerDataStream("raw_data", ds) |
|||
val r2 = tenv.sqlQuery("select * from raw_data") |
|||
r2.printSchema() |
|||
return |
|||
|
|||
|
|||
// tenv.executeSql("""CREATE TABLE kafkaTable ( |
|||
// | userId STRING |
|||
// | ) WITH ( |
|||
// | 'connector' = 'kafka', |
|||
// | 'topic' = 'anxinyun_data4', |
|||
// | 'properties.bootstrap.servers' = '10.8.30.37:6667', |
|||
// | 'properties.group.id' = 'flink.raw.hudi', |
|||
// | 'format' = 'json', |
|||
// | 'scan.startup.mode' = 'earliest-offset' |
|||
// | )""".stripMargin) |
|||
// val re1=tenv.sqlQuery("select * from kafkaTable") |
|||
// re1.printSchema() |
|||
|
|||
tenv.executeSql("DROP TABLE IF EXISTS kafka_2_hudi") |
|||
tenv.executeSql( |
|||
""" |
|||
|CREATE TABLE kafka_2_hudi ( |
|||
| userId STRING |
|||
|) WITH ( |
|||
| 'connector.type'='kafka', -- 使用 kafka connector |
|||
| 'connector.version'='universal', |
|||
| 'connector.topic'='anxinyun_data4', -- kafka主题 |
|||
| 'connector.startup-mode'='latest-offset', -- 偏移量 |
|||
| 'connector.properties.bootstrap.servers'='10.8.30.37:6667,10.8.30.38:6667,10.8.30.156:6667', -- KAFKA Brokers |
|||
| 'connector.properties.group.id'='flink.raw.hudi', -- 消费者组 |
|||
| 'format.type'='json' -- 数据源格式为json |
|||
|) |
|||
""".stripMargin) |
|||
val r1 = tenv.sqlQuery("select * from kafka_2_hudi") |
|||
r1.printSchema() |
|||
|
|||
|
|||
val kafka = new Kafka() |
|||
.version("0.10") |
|||
.topic("anxinyun_data4") |
|||
.property("bootstrap.servers", "10.8.30.37:6667,10.8.30.38:6667,10.8.30.156:6667") |
|||
.property("group.id", "flink.raw.hudi") |
|||
.property("zookeeper.connect", "10.8.30.37:2181") |
|||
.startFromLatest() |
|||
|
|||
tenv.connect(kafka) |
|||
.withFormat(new Json().failOnMissingField(true).deriveSchema()) |
|||
.withSchema(new Schema() |
|||
.field("userId", Types.STRING) |
|||
) |
|||
.inAppendMode() |
|||
.createTemporaryTable("kafka_data") |
|||
|
|||
val sql = "select * from kafka_data" |
|||
// val table=tenv.executeSql(sql) |
|||
val table = tenv.sqlQuery(sql) |
|||
table.printSchema() |
|||
implicit val typeInfo = TypeInformation.of(classOf[SimpleIotaData]) |
|||
tenv.toAppendStream[SimpleIotaData](table) |
|||
.print() |
|||
|
|||
val catalogs = tenv.listCatalogs() |
|||
println(catalogs.toList) |
|||
|
|||
val databases = tenv.listDatabases() |
|||
println(databases.toList) |
|||
|
|||
val tables = tenv.listTables() |
|||
println(tables.toList) |
|||
|
|||
// tenv.executeSql( |
|||
// s""" |
|||
// |CREATE CATALOG hadoop_catalog WITH ( |
|||
// | 'type'='hudi', |
|||
// | 'catalog-type'='hadoop', |
|||
// | 'warehouse'='$warehouse', |
|||
// | 'property-version'='1' |
|||
// |) |
|||
// """.stripMargin) |
|||
// |
|||
// // change catalog |
|||
// tenv.useCatalog("hadoop_catalog") |
|||
// tenv.executeSql("CREATE DATABASE if not exists hudi_hadoop_db") |
|||
// tenv.useDatabase("hudi_hadoop_db") |
|||
|
|||
// create hudi result table |
|||
return |
|||
|
|||
tenv.executeSql("drop table if exists hudi_raw") |
|||
val resultTable = tenv.executeSql( |
|||
s"""CREATE TABLE hudi_raw( |
|||
| uuid VARCHAR(20), |
|||
| name VARCHAR(10), |
|||
| age INT, |
|||
| ts TIMESTAMP(3), |
|||
| `partition` VARCHAR(20) |
|||
|) |
|||
|PARTITIONED BY (`partition`) |
|||
|WITH ( |
|||
| 'connector' = 'hudi', |
|||
| 'path' = '$warehouse', |
|||
| 'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE |
|||
|) |
|||
""".stripMargin) |
|||
|
|||
tenv.executeSql("insert into hudi_raw values('id3','yinweiwen',23,TIMESTAMP '1970-01-01 00:00:10','par1')") |
|||
|
|||
val rs = tenv.executeSql("select * from hudi_raw") |
|||
rs.print() |
|||
|
|||
// create kafka stream table |
|||
tenv.executeSql("DROP TABLE IF EXISTS kafka_2_hudi") |
|||
tenv.executeSql( |
|||
""" |
|||
|CREATE TABLE kafka_2_hudi ( |
|||
| userId STRING |
|||
|) WITH ( |
|||
| 'connector.type'='kafka', -- 使用 kafka connector |
|||
| 'connector.version'='universal', |
|||
| 'connector.topic'='anxinyun_data4', -- kafka主题 |
|||
| 'connector.startup-mode'='latest-offset', -- 偏移量 |
|||
| 'connector.properties.bootstrap.servers'='10.8.30.37:6667,10.8.30.38:6667,10.8.30.156:6667', -- KAFKA Brokers |
|||
| 'connector.properties.group.id'='flink.raw.hudi', -- 消费者组 |
|||
| 'format.type'='json' -- 数据源格式为json |
|||
|) |
|||
""".stripMargin) |
|||
|
|||
// copy data from kafka to hadoop |
|||
tenv.executeSql( |
|||
s""" |
|||
|INSERT INTO hudi_raw SELECT userId FROM kafka_2_hudi |
|||
""".stripMargin) |
|||
|
|||
} |
|||
|
|||
|
|||
/** |
|||
* kafka source params builder |
|||
* |
|||
* @param props config props |
|||
* @return |
|||
*/ |
|||
def buildKafkaProps(props: Properties): Properties = { |
|||
val kafkaProps = new Properties() |
|||
kafkaProps.setProperty("bootstrap.servers", props.getProperty("kafka.brokers")) |
|||
kafkaProps.setProperty("group.id", props.getProperty("kafka.group.id")) |
|||
kafkaProps.setProperty("auto.offset.reset", "latest") |
|||
// support kafka properties(start with 'kafkap.') |
|||
JavaConversions.propertiesAsScalaMap(props) |
|||
.filter(_._1.startsWith("kafkap")) |
|||
.map(a => (a._1.substring(7), a._2)) |
|||
.foreach(p => kafkaProps.put(p._1, p._2)) |
|||
kafkaProps |
|||
} |
|||
|
|||
|
|||
def parseIotaData(record: String): Option[IotaData] = { |
|||
val (data, ex) = JsonHelper.Json2Object[IotaData](record) |
|||
if (data.isEmpty) { |
|||
logger.warn(s"data msg parse error: $record") |
|||
} |
|||
data |
|||
} |
|||
} |
|||
|
|||
case class SimpleIotaData(userId: String) |
@ -0,0 +1,174 @@ |
|||
package com.freesun.flink2hudi |
|||
|
|||
import java.util.{Date, Properties, UUID} |
|||
import java.util.concurrent.TimeUnit |
|||
|
|||
import comm.models.IotaData |
|||
import comm.utils.{JsonHelper, Loader} |
|||
import de.javakaffee.kryoserializers.jodatime.{JodaDateTimeSerializer, JodaLocalDateSerializer, JodaLocalDateTimeSerializer} |
|||
import org.apache.flink.api.common.restartstrategy.RestartStrategies |
|||
import org.apache.flink.api.common.serialization.SimpleStringSchema |
|||
import org.apache.flink.api.common.time.Time |
|||
import org.apache.flink.api.java.utils.ParameterTool |
|||
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _} |
|||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer |
|||
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment |
|||
import org.joda.time.{DateTime, LocalDate, LocalDateTime} |
|||
import org.slf4j.LoggerFactory |
|||
|
|||
import scala.collection.JavaConversions |
|||
|
|||
/** |
|||
* Read from kafka stream table to Hudi Table |
|||
* create at 2021年9月8日 10:55:19 |
|||
*/ |
|||
object StreamJobSimplify { |
|||
|
|||
private val logger = LoggerFactory.getLogger(getClass) |
|||
|
|||
def main(args: Array[String]): Unit = { |
|||
val props = Loader.from("/config.properties", args: _*) |
|||
logger.info(props.toString) |
|||
import scala.collection.JavaConversions._ |
|||
val params = ParameterTool.fromMap(props.map(p => (p._1, p._2))) |
|||
|
|||
// set up the streaming execution environment |
|||
val env = StreamExecutionEnvironment.getExecutionEnvironment |
|||
|
|||
// make parameters available in the web interface |
|||
env.getConfig.setGlobalJobParameters(params) |
|||
|
|||
// set jota-time kyro serializers |
|||
env.registerTypeWithKryoSerializer(classOf[DateTime], classOf[JodaDateTimeSerializer]) |
|||
env.registerTypeWithKryoSerializer(classOf[LocalDate], classOf[JodaLocalDateSerializer]) |
|||
env.registerTypeWithKryoSerializer(classOf[LocalDateTime], classOf[JodaLocalDateTimeSerializer]) |
|||
|
|||
// set restart strategy |
|||
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Int.MaxValue, Time.of(30, TimeUnit.SECONDS))) |
|||
|
|||
// define constants |
|||
val warehouse = "hdfs://localhost:9000/user/yww08/warehouse" |
|||
// val warehouse = "hdfs://node39:9000/user/root/warehouse" |
|||
|
|||
|
|||
// create hadoop catalog |
|||
val tenv = StreamTableEnvironment.create(env) |
|||
|
|||
|
|||
// init kafka properties |
|||
val kafkaProperties = buildKafkaProps(props) |
|||
val dataTopic = props.getProperty("kafka.topics.data") |
|||
val kafkaSource = new FlinkKafkaConsumer[String](dataTopic, new SimpleStringSchema(), kafkaProperties) |
|||
|
|||
// create kafka iota data stream |
|||
val ds: DataStream[IotaData] = |
|||
env.addSource(kafkaSource) |
|||
.map(a => { |
|||
logger.info(a) |
|||
val r = parseIotaData(a) |
|||
r |
|||
}) |
|||
.filter(_.nonEmpty) |
|||
.map(_.get) |
|||
tenv.createTemporaryView("raw_data", ds) |
|||
// tenv.registerDataStream("raw_data", ds) |
|||
|
|||
// val rt=tenv.sqlQuery("select * from raw_data") |
|||
// tenv.toAppendStream[IotaData](rt).print() |
|||
|
|||
// create hudi table |
|||
tenv.executeSql("drop table if exists hudi_raw") |
|||
tenv.executeSql( |
|||
s"""CREATE TABLE hudi_raw( |
|||
| uuid VARCHAR(20), |
|||
| `userId` VARCHAR(20), |
|||
| `thingId` VARCHAR(20), |
|||
| ts TIMESTAMP(3) |
|||
|) |
|||
|PARTITIONED BY (`thingId`) |
|||
|WITH ( |
|||
| 'connector' = 'hudi', |
|||
| 'path' = '$warehouse', |
|||
| 'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE |
|||
|) |
|||
""".stripMargin) |
|||
|
|||
tenv.executeSql( |
|||
"""CREATE TABLE tmp_raw( |
|||
| uuid VARCHAR(20), |
|||
| `userId` VARCHAR(20), |
|||
| `thingId` VARCHAR(20), |
|||
| ts TIMESTAMP(3) |
|||
|) |
|||
|PARTITIONED BY (`thingId`) |
|||
|WITH( |
|||
| 'connector'='filesystem', |
|||
| 'path'='file:///tmp/cde', |
|||
| 'format'='orc' |
|||
|)""".stripMargin |
|||
) |
|||
|
|||
tenv.executeSql(s"insert into hudi_raw values ('${UUID.randomUUID().toString}','user2','THINGX',TIMESTAMP '1970-01-01 08:00:00')") |
|||
tenv.executeSql(s"insert into hudi_raw values ('id2','user2','THINGC',TIMESTAMP '1970-01-01 08:00:00')") |
|||
val rs = tenv.sqlQuery("select * from hudi_raw") |
|||
rs.printSchema() |
|||
|
|||
// change data to filesystem |
|||
tenv.executeSql( |
|||
s""" |
|||
|INSERT INTO tmp_raw SELECT '${UUID.randomUUID().toString}',userId,thingId,TIMESTAMP '1970-01-01 00:00:10' FROM raw_data |
|||
""".stripMargin) |
|||
|
|||
// change data to hudi |
|||
tenv.executeSql( |
|||
s""" |
|||
|INSERT INTO hudi_raw SELECT '${UUID.randomUUID().toString}',userId,thingId,TIMESTAMP '1970-01-01 00:00:10' FROM raw_data |
|||
""".stripMargin) |
|||
|
|||
env.execute("flink-kafka-to-hudi") |
|||
} |
|||
|
|||
|
|||
/** |
|||
* kafka source params builder |
|||
* |
|||
* @param props config props |
|||
* @return |
|||
*/ |
|||
def buildKafkaProps(props: Properties): Properties = { |
|||
val kafkaProps = new Properties() |
|||
kafkaProps.setProperty("bootstrap.servers", props.getProperty("kafka.brokers")) |
|||
kafkaProps.setProperty("group.id", props.getProperty("kafka.group.id")) |
|||
kafkaProps.setProperty("auto.offset.reset", "latest") |
|||
// support kafka properties(start with 'kafkap.') |
|||
JavaConversions.propertiesAsScalaMap(props) |
|||
.filter(_._1.startsWith("kafkap")) |
|||
.map(a => (a._1.substring(7), a._2)) |
|||
.foreach(p => kafkaProps.put(p._1, p._2)) |
|||
kafkaProps |
|||
} |
|||
|
|||
|
|||
def parseIotaData(record: String): Option[IotaData] = { |
|||
return Some(IotaData("user1", "thing123", "deviceId", "", "", DateTime.now, DateTime.now, null)) |
|||
// TODO JACKSON VERSION CONFLICT |
|||
/** |
|||
* java.lang.NoSuchMethodError: com.fasterxml.jackson.databind.JsonMappingException.<init>(Ljava/io/Closeable;Ljava/lang/String;)V |
|||
* at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:61) |
|||
* at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:17) |
|||
*/ |
|||
try { |
|||
val (data, ex) = JsonHelper.Json2Object[IotaData](record) |
|||
if (data.isEmpty) { |
|||
logger.warn(s"data msg parse error: $record") |
|||
} |
|||
data |
|||
} catch { |
|||
case ex: Exception => |
|||
logger.info(s"parse iotadata error: ${ex.getMessage}") |
|||
Some(IotaData("user1", "thing123", "deviceId", "", "", DateTime.now, DateTime.now, null)) |
|||
} |
|||
} |
|||
} |
|||
|
|||
case class HudiData(userId: String, thingId: String, ts: Date) |
@ -0,0 +1,178 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" |
|||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|||
<parent> |
|||
<artifactId>etl</artifactId> |
|||
<groupId>com.freesun</groupId> |
|||
<version>1.0-SNAPSHOT</version> |
|||
</parent> |
|||
<modelVersion>4.0.0</modelVersion> |
|||
|
|||
<artifactId>flink2iceberg</artifactId> |
|||
|
|||
<properties> |
|||
<iceberg.version>0.12.0</iceberg.version> |
|||
<hive.version>2.3.6</hive.version> |
|||
<jackson.version>2.7.8</jackson.version> |
|||
<!--<hive.version>3.1.2</hive.version>--> |
|||
</properties> |
|||
|
|||
|
|||
<dependencies> |
|||
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink --> |
|||
<dependency> |
|||
<groupId>org.apache.iceberg</groupId> |
|||
<artifactId>iceberg-flink</artifactId> |
|||
<version>${iceberg.version}</version> |
|||
</dependency> |
|||
|
|||
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime --> |
|||
<dependency> |
|||
<groupId>org.apache.iceberg</groupId> |
|||
<artifactId>iceberg-flink-runtime</artifactId> |
|||
<version>${iceberg.version}</version> |
|||
</dependency> |
|||
|
|||
|
|||
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-data --> |
|||
<dependency> |
|||
<groupId>org.apache.iceberg</groupId> |
|||
<artifactId>iceberg-data</artifactId> |
|||
<version>${iceberg.version}</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.hadoop</groupId> |
|||
<artifactId>hadoop-client</artifactId> |
|||
<version>${hadoop.version}</version> |
|||
<scope>${hadoop.scope}</scope> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.hadoop</groupId> |
|||
<artifactId>hadoop-common</artifactId> |
|||
<version>${hadoop.version}</version> |
|||
<scope>${hadoop.scope}</scope> |
|||
</dependency> |
|||
|
|||
<!--TEST START--> |
|||
<dependency> |
|||
<groupId>org.apache.parquet</groupId> |
|||
<artifactId>parquet-avro</artifactId> |
|||
<version>1.10.1</version> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.apache.avro</groupId> |
|||
<artifactId>avro</artifactId> |
|||
<version>1.9.0</version> |
|||
</dependency> |
|||
<!--TEST END--> |
|||
|
|||
<!--<dependency>--> |
|||
<!--<groupId>org.apache.hadoop</groupId>--> |
|||
<!--<artifactId>hadoop-client</artifactId>--> |
|||
<!--<version>${hadoop.version}</version>--> |
|||
<!--<scope>provided</scope>--> |
|||
<!--</dependency>--> |
|||
|
|||
|
|||
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --> |
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-table-common</artifactId> |
|||
<version>${flink.version}</version> |
|||
<scope>${flink.scope}</scope> |
|||
</dependency> |
|||
|
|||
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --> |
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-table-planner_${scala.binary.version}</artifactId> |
|||
<version>${flink.version}</version> |
|||
<scope>${flink.scope}</scope> |
|||
</dependency> |
|||
|
|||
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --> |
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> |
|||
<version>${flink.version}</version> |
|||
<scope>${flink.scope}</scope> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> |
|||
<version>${flink.version}</version> |
|||
<scope>${flink.scope}</scope> |
|||
</dependency> |
|||
|
|||
<!-- Flink HIVE Dependency --> |
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId> |
|||
<version>${flink.version}</version> |
|||
<scope>${flink.scope}</scope> |
|||
</dependency> |
|||
|
|||
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec --> |
|||
<dependency> |
|||
<groupId>org.apache.hive</groupId> |
|||
<artifactId>hive-exec</artifactId> |
|||
<version>${hive.version}</version> |
|||
<exclusions> |
|||
<exclusion> |
|||
<groupId>com.fasterxml.jackson.core</groupId> |
|||
<artifactId>jackson-databind</artifactId> |
|||
</exclusion> |
|||
<exclusion> |
|||
<groupId>com.fasterxml.jackson.core</groupId> |
|||
<artifactId>jackson-annotations</artifactId> |
|||
</exclusion> |
|||
<exclusion> |
|||
<groupId>com.fasterxml.jackson.core</groupId> |
|||
<artifactId>jackson-core</artifactId> |
|||
</exclusion> |
|||
<exclusion> |
|||
<!--cannot download !!--> |
|||
<groupId>org.pentaho</groupId> |
|||
<artifactId>pentaho-aggdesigner-algorithm</artifactId> |
|||
</exclusion> |
|||
</exclusions> |
|||
</dependency> |
|||
|
|||
<!-- https://mvnrepository.com/artifact/org.pentaho/pentaho-aggdesigner-algorithm --> |
|||
<!--<dependency>--> |
|||
<!--<groupId>org.pentaho</groupId>--> |
|||
<!--<artifactId>pentaho-aggdesigner-algorithm</artifactId>--> |
|||
<!--<version>5.1.5-jhyde</version>--> |
|||
<!--<scope>test</scope>--> |
|||
<!--</dependency>--> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-json</artifactId> |
|||
<version>${flink.version}</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-sql-client_${scala.binary.version}</artifactId> |
|||
<version>${flink.version}</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}</artifactId> |
|||
<version>${flink.version}</version> |
|||
</dependency> |
|||
|
|||
<!--<dependency>--> |
|||
<!--<groupId>org.apache.flink</groupId>--> |
|||
<!--<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>--> |
|||
<!--<version>${flink.version}</version>--> |
|||
<!--</dependency>--> |
|||
|
|||
</dependencies> |
|||
</project> |
@ -0,0 +1,28 @@ |
|||
################################################################################ |
|||
# Licensed to the Apache Software Foundation (ASF) under one |
|||
# or more contributor license agreements. See the NOTICE file |
|||
# distributed with this work for additional information |
|||
# regarding copyright ownership. The ASF licenses this file |
|||
# to you under the Apache License, Version 2.0 (the |
|||
# "License"); you may not use this file except in compliance |
|||
# with the License. You may obtain a copy of the License at |
|||
# |
|||
# http://www.apache.org/licenses/LICENSE-2.0 |
|||
# |
|||
# Unless required by applicable law or agreed to in writing, software |
|||
# distributed under the License is distributed on an "AS IS" BASIS, |
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
# See the License for the specific language governing permissions and |
|||
# limitations under the License. |
|||
################################################################################ |
|||
|
|||
log4j.rootLogger=INFO, console |
|||
|
|||
log4j.appender.console=org.apache.log4j.ConsoleAppender |
|||
log4j.appender.console.layout=org.apache.log4j.PatternLayout |
|||
log4j.appender.console.layout.ConversionPattern=[${topic.perfix}]%d{MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n |
|||
|
|||
log4j.logger.org.apache.flink=WARN,stdout |
|||
log4j.logger.org.apache.kafka=WARN,stdout |
|||
log4j.logger.org.apache.zookeeper=WARN,stdout |
|||
log4j.logger.org.I0Itec.zkclient=WARN,stdout |
@ -0,0 +1,35 @@ |
|||
package com.freesun.flink2iceberg |
|||
|
|||
import org.apache.flink.api.scala.typeutils.Types |
|||
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment |
|||
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment |
|||
import org.apache.flink.types.Row |
|||
|
|||
/** |
|||
* Created by yww08 on 2021/9/2. |
|||
*/ |
|||
object StreamChangelog { |
|||
|
|||
def main(args: Array[String]): Unit = { |
|||
val env = StreamExecutionEnvironment.getExecutionEnvironment |
|||
val tenv = StreamTableEnvironment.create(env) |
|||
|
|||
val dataStream = env.fromElements( |
|||
Row.of("Alice", Int.box(10)), |
|||
Row.of("Bob", Int.box(8)), |
|||
Row.of("Alice", Int.box(100)) |
|||
)(Types.ROW(Types.STRING, Types.INT)) |
|||
|
|||
val inputTable = tenv.fromDataStream(dataStream).as("name", "score") |
|||
|
|||
tenv.createTemporaryView("atable", inputTable) |
|||
val resultTalbe = tenv.sqlQuery("select name,SUM(score) from atable group by name") |
|||
|
|||
// error: doesn't support consuming update changes .... |
|||
// val resultStream = tenv.toDataStream(resultTalbe) |
|||
|
|||
// val resultStream = tenv.toChangelogStream(resultTalbe) |
|||
// resultStream.print() |
|||
// env.execute() |
|||
} |
|||
} |
@ -0,0 +1,304 @@ |
|||
package com.freesun.flink2iceberg |
|||
|
|||
import java.util.Properties |
|||
import java.util.concurrent.TimeUnit |
|||
|
|||
import comm.utils.{ESHelper, Loader} |
|||
import comm.utils.storage.EsData |
|||
import de.javakaffee.kryoserializers.jodatime.{JodaDateTimeSerializer, JodaLocalDateSerializer, JodaLocalDateTimeSerializer} |
|||
import org.apache.flink.api.common.restartstrategy.RestartStrategies |
|||
import org.apache.flink.api.common.serialization.SimpleStringSchema |
|||
import org.apache.flink.api.common.time.Time |
|||
import org.apache.flink.api.java.utils.ParameterTool |
|||
import org.apache.flink.streaming.api.TimeCharacteristic |
|||
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment |
|||
import org.apache.flink.streaming.connectors.elasticsearch6.{ElasticsearchSink, RestClientFactory} |
|||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer |
|||
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment |
|||
import org.apache.flink.table.api.{DataTypes, TableSchema} |
|||
import org.apache.flink.table.catalog.hive.HiveCatalog |
|||
import org.apache.flink.table.data.{GenericRowData, RowData, StringData} |
|||
import org.apache.flink.table.types.logical.RowType |
|||
import org.apache.flink.types.RowKind |
|||
import org.apache.hadoop.conf.Configuration |
|||
import org.apache.hadoop.fs.Path |
|||
//import org.apache.http.client.config.RequestConfig |
|||
import org.apache.iceberg._ |
|||
import org.apache.iceberg.data.{GenericRecord, Record} |
|||
import org.apache.iceberg.flink.sink.FlinkAppenderFactory |
|||
import org.apache.iceberg.flink.{FlinkSchemaUtil, TableLoader} |
|||
import org.apache.iceberg.flink.source.FlinkSource |
|||
import org.apache.iceberg.hadoop.HadoopOutputFile.fromPath |
|||
import org.apache.iceberg.hadoop.{HadoopInputFile, HadoopTables} |
|||
import org.apache.iceberg.io.{FileAppender, FileAppenderFactory} |
|||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions |
|||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap |
|||
import org.apache.iceberg.types.Types |
|||
import org.elasticsearch.client.RestClientBuilder |
|||
import org.joda.time.{DateTime, LocalDate, LocalDateTime} |
|||
import org.joda.time.format.{DateTimeFormat, DateTimeFormatterBuilder} |
|||
import org.slf4j.LoggerFactory |
|||
|
|||
import scala.collection.JavaConversions |
|||
import scala.util.Try |
|||
|
|||
/** |
|||
* Created by yww08 on 2021/8/13. |
|||
*/ |
|||
object StreamJob { |
|||
|
|||
private val logger = LoggerFactory.getLogger(getClass) |
|||
|
|||
def main(args: Array[String]): Unit = { |
|||
val props = Loader.from("/config.properties", args: _*) |
|||
logger.info(props.toString) |
|||
|
|||
import scala.collection.JavaConversions._ |
|||
val params = ParameterTool.fromMap(props.map(p => (p._1, p._2))) |
|||
|
|||
// set up the streaming execution environment |
|||
val env = StreamExecutionEnvironment.getExecutionEnvironment |
|||
|
|||
// make parameters available in the web interface |
|||
env.getConfig.setGlobalJobParameters(params) |
|||
|
|||
// set jota-time kyro serializers |
|||
env.registerTypeWithKryoSerializer(classOf[DateTime], classOf[JodaDateTimeSerializer]) |
|||
env.registerTypeWithKryoSerializer(classOf[LocalDate], classOf[JodaLocalDateSerializer]) |
|||
env.registerTypeWithKryoSerializer(classOf[LocalDateTime], classOf[JodaLocalDateTimeSerializer]) |
|||
|
|||
// set restart strategy |
|||
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Int.MaxValue, Time.of(30, TimeUnit.SECONDS))) |
|||
|
|||
// iceberg read |
|||
env.enableCheckpointing(100) |
|||
env.setParallelism(2) |
|||
env.setMaxParallelism(2) |
|||
|
|||
// create hadoop catalog |
|||
val tenv = StreamTableEnvironment.create(env) |
|||
tenv.executeSql( |
|||
""" |
|||
|CREATE CATALOG hadoop_catalog WITH ( |
|||
| 'type'='iceberg', |
|||
| 'catalog-type'='hadoop', |
|||
| 'warehouse'='hdfs://node37:8020/user/root/warehouse', |
|||
| 'property-version'='1' |
|||
|) |
|||
""".stripMargin) |
|||
|
|||
// change catalog |
|||
tenv.useCatalog("hadoop_catalog") |
|||
tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db") |
|||
tenv.useDatabase("iceberg_hadoop_db") |
|||
|
|||
// create iceberg result table |
|||
tenv.executeSql("drop table if exists hadoop_catalog.iceberg_hadoop_db.iceberg_raw") |
|||
val resultTable = tenv.executeSql( |
|||
"""CREATE TABLE hadoop_catalog.iceberg_hadoop_db.iceberg_raw ( |
|||
| user_id STRING COMMENT 'user_id' |
|||
|) WITH ( |
|||
|'connector'='iceberg', |
|||
|'catalog-type'='hadoop', |
|||
|'warehouse'='hdfs://node37:8020/user/root/warehouse' |
|||
|) |
|||
""".stripMargin) |
|||
|
|||
tenv.executeSql("insert into hadoop_catalog.iceberg_hadoop_db.iceberg_raw values('abc')") |
|||
val rs = tenv.executeSql("select * from hadoop_catalog.iceberg_hadoop_db.iceberg_raw") |
|||
rs.print() |
|||
// val resultTable = tenv.executeSql("CREATE TABLE hadoop_catalog.iceberg_hadoop_db.iceberg_002 (\n" + |
|||
// " user_id STRING COMMENT 'user_id',\n" + |
|||
// " order_amount DOUBLE COMMENT 'order_amount',\n" + |
|||
// " log_ts STRING\n" + |
|||
// ")") |
|||
|
|||
resultTable.print() |
|||
|
|||
|
|||
// val hdfsUser = props.getProperty("fs.hdfs.user", "root") |
|||
// if (hdfsUser != null) { |
|||
// System.setProperty("HADOOP_USER_NAME", hdfsUser) |
|||
// // System.setProperty("HADOOP_CLASSPATH",".") |
|||
// } |
|||
|
|||
|
|||
val HIVE_CATALOG = "myhive" |
|||
val DEFAULT_DATABASE = "iceberg_db" |
|||
val HIVE_CONF_DIR = "./" |
|||
val catalog = new HiveCatalog(HIVE_CATALOG, DEFAULT_DATABASE, HIVE_CONF_DIR) |
|||
tenv.registerCatalog(HIVE_CATALOG, catalog) |
|||
tenv.useCatalog(HIVE_CATALOG) |
|||
|
|||
// create kafka stream table |
|||
tenv.executeSql("DROP TABLE IF EXISTS kafka_2_iceberg") |
|||
tenv.executeSql( |
|||
""" |
|||
|CREATE TABLE kafka_2_iceberg ( |
|||
| userId STRING |
|||
|) WITH ( |
|||
| 'connector'='kafka', -- 使用 kafka connector |
|||
| 'topic'='anxinyun_data4', -- kafka主题 |
|||
| 'scan.startup.mode'='latest-offset', -- 偏移量 |
|||
| 'properties.bootstrap.servers'='10.8.30.37:6667,10.8.30.38:6667,10.8.30.156:6667', -- KAFKA Brokers |
|||
| 'properties.group.id'='flink.raw.iceberg', -- 消费者组 |
|||
| 'format'='json', -- 数据源格式为json |
|||
| 'json.fail-on-missing-field' = 'false', |
|||
| 'json.ignore-parse-errors' = 'false' |
|||
|) |
|||
""".stripMargin) |
|||
// 'is_generic' = 'false' -- 创建HIVE兼容表 |
|||
|
|||
// copy data from kafka to hadoop |
|||
tenv.executeSql( |
|||
s""" |
|||
|INSERT INTO hadoop_catalog.iceberg_hadoop_db.iceberg_raw |
|||
| SELECT userId FROM $HIVE_CATALOG.$DEFAULT_DATABASE.kafka_2_iceberg |
|||
""".stripMargin) |
|||
|
|||
|
|||
// val tableLoader = TableLoader.fromHadoopTable("hdfs://node37:8020/user/hive/warehouse/iceberg_db1.db/anxinyun_data") |
|||
// |
|||
// val stream = FlinkSource.forRowData() |
|||
// .env(env.getJavaEnv) |
|||
// .tableLoader(tableLoader) |
|||
// .streaming(true) |
|||
// // .startSnapshotId(snapshotId) // read from special Snapshot |
|||
// .build() |
|||
// |
|||
// stream.print() |
|||
|
|||
|
|||
// val kafkaProperties = buildKafkaProps(props) |
|||
// val dataTopic = props.getProperty("kafka.topics.data") |
|||
// val kafkaSource = new FlinkKafkaConsumer[String](dataTopic, new SimpleStringSchema(), kafkaProperties) |
|||
// |
|||
// |
|||
// setKafkaSource(kafkaSource, props) |
|||
// |
|||
// // 使用数据自带时间戳 |
|||
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) |
|||
// |
|||
// val data = env.addSource(kafkaSource) |
|||
// |
|||
// data.map(_ => println(_)) |
|||
|
|||
// env.execute(props.getProperty("app.name", "iot-iceberg")) |
|||
} |
|||
|
|||
/** |
|||
* kafka source params builder |
|||
* |
|||
* @param props config props |
|||
* @return |
|||
*/ |
|||
def buildKafkaProps(props: Properties): Properties = { |
|||
val kafkaProps = new Properties() |
|||
kafkaProps.setProperty("bootstrap.servers", props.getProperty("kafka.brokers")) |
|||
kafkaProps.setProperty("group.id", props.getProperty("kafka.group.id")) |
|||
kafkaProps.setProperty("auto.offset.reset", "latest") |
|||
// support kafka properties(start with 'kafkap.') |
|||
JavaConversions.propertiesAsScalaMap(props) |
|||
.filter(_._1.startsWith("kafkap")) |
|||
.map(a => (a._1.substring(7), a._2)) |
|||
.foreach(p => kafkaProps.put(p._1, p._2)) |
|||
kafkaProps |
|||
} |
|||
|
|||
/** |
|||
* set kafka source offset |
|||
* |
|||
* @param kafkaSource kafka source |
|||
* @param props config props |
|||
*/ |
|||
def setKafkaSource(kafkaSource: FlinkKafkaConsumer[String], props: Properties): Unit = { |
|||
// set up the start position |
|||
val startMode = props.getProperty("start") |
|||
if (startMode != null) { |
|||
startMode match { |
|||
case "earliest" => |
|||
kafkaSource.setStartFromEarliest() |
|||
logger.info("set kafka start from earliest") |
|||
case "latest" => kafkaSource.setStartFromLatest() |
|||
logger.info("set kafka start from latest") |
|||
case _ => |
|||
val startTimestampOpt = Try( |
|||
new DateTimeFormatterBuilder().append(null, |
|||
Array("yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd'T'HH:mm:ssZ") |
|||
.map(pat => DateTimeFormat.forPattern(pat).getParser)) |
|||
.toFormatter |
|||
.parseDateTime(startMode)).toOption |
|||
if (startTimestampOpt.nonEmpty) { |
|||
kafkaSource.setStartFromTimestamp(startTimestampOpt.get.getMillis) |
|||
logger.info(s"set kafka start from $startMode") |
|||
} else { |
|||
throw new Exception(s"unsupport startmode at ($startMode)") |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 简单的数据操作工具 |
|||
*/ |
|||
object SimpleDataUtil { |
|||
|
|||
val SCHEMA: Schema = new Schema( |
|||
Types.NestedField.optional(1, "id", Types.IntegerType.get()), |
|||
Types.NestedField.optional(2, "name", Types.StringType.get()) |
|||
) |
|||
|
|||
val FLINK_SCHEMA: TableSchema = TableSchema.builder.field("id", DataTypes.INT).field("data", DataTypes.STRING).build |
|||
|
|||
val ROW_TYPE: RowType = FLINK_SCHEMA.toRowDataType.getLogicalType.asInstanceOf[RowType] |
|||
|
|||
val RECORD: Record = GenericRecord.create(SCHEMA) |
|||
|
|||
def createTable(path: String, properties: Map[String, String], partitioned: Boolean): Table = { |
|||
val spec: PartitionSpec = |
|||
if (partitioned) PartitionSpec.builderFor(SCHEMA).identity("data").build |
|||
else PartitionSpec.unpartitioned |
|||
new HadoopTables().create(SCHEMA, spec, JavaConversions.mapAsJavaMap(properties), path) |
|||
} |
|||
|
|||
def createRecord(id: Int, data: String): Record = { |
|||
val record: Record = RECORD.copy() |
|||
record.setField("id", id) |
|||
record.setField("name", data) |
|||
record |
|||
} |
|||
|
|||
def createRowData(id: Integer, data: String): RowData = GenericRowData.of(id, StringData.fromString(data)) |
|||
|
|||
def createInsert(id: Integer, data: String): RowData = GenericRowData.ofKind(RowKind.INSERT, id, StringData.fromString(data)) |
|||
|
|||
def createDelete(id: Integer, data: String): RowData = GenericRowData.ofKind(RowKind.DELETE, id, StringData.fromString(data)) |
|||
|
|||
def createUpdateBefore(id: Integer, data: String): RowData = GenericRowData.ofKind(RowKind.UPDATE_BEFORE, id, StringData.fromString(data)) |
|||
|
|||
def createUpdateAfter(id: Integer, data: String): RowData = GenericRowData.ofKind(RowKind.UPDATE_AFTER, id, StringData.fromString(data)) |
|||
|
|||
def writeFile(schema: Schema, spec: PartitionSpec, conf: Configuration, |
|||
location: String, filename: String, rows: Seq[RowData]): DataFile = { |
|||
val path = new Path(location, filename) |
|||
val fileFormat = FileFormat.fromFileName(filename) |
|||
Preconditions.checkNotNull(fileFormat, s"Cannot determine format for file %s", filename) |
|||
|
|||
val flinkSchema = FlinkSchemaUtil.convert(schema) |
|||
val appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec) |
|||
|
|||
val appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat) |
|||
|
|||
try { |
|||
val closeableAppender = appender |
|||
try |
|||
closeableAppender.addAll(JavaConversions.seqAsJavaList(rows)) |
|||
finally if (closeableAppender != null) closeableAppender.close() |
|||
} |
|||
DataFiles.builder(spec) |
|||
.withInputFile(HadoopInputFile.fromPath(path, conf)) |
|||
.withMetrics(appender.metrics()) |
|||
.build() |
|||
} |
|||
} |
@ -0,0 +1,33 @@ |
|||
package com.freesun.flink2iceberg |
|||
|
|||
import org.apache.flink.api.scala._ |
|||
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment |
|||
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment |
|||
/** |
|||
* Created by yww08 on 2021/9/1. |
|||
*/ |
|||
object StreamTableDemo { |
|||
|
|||
def main(args: Array[String]): Unit = { |
|||
// create environments of both APIs |
|||
val env = StreamExecutionEnvironment.getExecutionEnvironment |
|||
val tableEnv = StreamTableEnvironment.create(env) |
|||
|
|||
// create a DataStream |
|||
val dataStream = env.fromElements("Alice", "Bob", "John") |
|||
|
|||
// interpret the insert-only DataStream as a Table |
|||
val inputTable = tableEnv.fromDataStream(dataStream) |
|||
|
|||
// register the Table object as a view and query it |
|||
tableEnv.createTemporaryView("InputTable", inputTable) |
|||
val resultTable = tableEnv.sqlQuery("SELECT LOWER(f0) FROM InputTable") |
|||
|
|||
// interpret the insert-only Table as a DataStream again |
|||
// val resultStream = tableEnv.toDataStream(resultTable) |
|||
// |
|||
// // add a printing sink and execute in DataStream API |
|||
// resultStream.print() |
|||
// env.execute() |
|||
} |
|||
} |
@ -0,0 +1,149 @@ |
|||
/* |
|||
* Licensed to the Apache Software Foundation (ASF) under one |
|||
* or more contributor license agreements. See the NOTICE file |
|||
* distributed with this work for additional information |
|||
* regarding copyright ownership. The ASF licenses this file |
|||
* to you under the Apache License, Version 2.0 (the |
|||
* "License"); you may not use this file except in compliance |
|||
* with the License. You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, |
|||
* software distributed under the License is distributed on an |
|||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
|||
* KIND, either express or implied. See the License for the |
|||
* specific language governing permissions and limitations |
|||
* under the License. |
|||
*/ |
|||
|
|||
package org.apache.iceberg.flink; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import org.apache.flink.util.ArrayUtils; |
|||
import org.apache.hadoop.hive.conf.HiveConf; |
|||
import org.apache.iceberg.CatalogProperties; |
|||
import org.apache.iceberg.catalog.Catalog; |
|||
import org.apache.iceberg.catalog.Namespace; |
|||
import org.apache.iceberg.catalog.SupportsNamespaces; |
|||
import org.apache.iceberg.hadoop.HadoopCatalog; |
|||
import org.apache.iceberg.relocated.com.google.common.base.Joiner; |
|||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
|||
import org.apache.iceberg.relocated.com.google.common.collect.Maps; |
|||
import org.junit.After; |
|||
import org.junit.AfterClass; |
|||
import org.junit.Before; |
|||
import org.junit.BeforeClass; |
|||
import org.junit.rules.TemporaryFolder; |
|||
import org.junit.runner.RunWith; |
|||
import org.junit.runners.Parameterized; |
|||
|
|||
@RunWith(Parameterized.class) |
|||
public abstract class FlinkCatalogTestBase extends FlinkTestBase { |
|||
|
|||
protected static final String DATABASE = "db"; |
|||
private static TemporaryFolder hiveWarehouse = new TemporaryFolder(); |
|||
private static TemporaryFolder hadoopWarehouse = new TemporaryFolder(); |
|||
|
|||
@BeforeClass |
|||
public static void createWarehouse() throws IOException { |
|||
hiveWarehouse.create(); |
|||
hadoopWarehouse.create(); |
|||
} |
|||
|
|||
@AfterClass |
|||
public static void dropWarehouse() { |
|||
hiveWarehouse.delete(); |
|||
hadoopWarehouse.delete(); |
|||
} |
|||
|
|||
@Before |
|||
public void before() { |
|||
sql("CREATE CATALOG %s WITH %s", catalogName, toWithClause(config)); |
|||
} |
|||
|
|||
@After |
|||
public void clean() { |
|||
sql("DROP CATALOG IF EXISTS %s", catalogName); |
|||
} |
|||
|
|||
@Parameterized.Parameters(name = "catalogName = {0} baseNamespace = {1}") |
|||
public static Iterable<Object[]> parameters() { |
|||
return Lists.newArrayList( |
|||
new Object[] {"testhive", Namespace.empty()}, |
|||
new Object[] {"testhadoop", Namespace.empty()}, |
|||
new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")} |
|||
); |
|||
} |
|||
|
|||
protected final String catalogName; |
|||
protected final Namespace baseNamespace; |
|||
protected final Catalog validationCatalog; |
|||
protected final SupportsNamespaces validationNamespaceCatalog; |
|||
protected final Map<String, String> config = Maps.newHashMap(); |
|||
|
|||
protected final String flinkDatabase; |
|||
protected final Namespace icebergNamespace; |
|||
protected final boolean isHadoopCatalog; |
|||
|
|||
public FlinkCatalogTestBase(String catalogName, Namespace baseNamespace) { |
|||
this.catalogName = catalogName; |
|||
this.baseNamespace = baseNamespace; |
|||
this.isHadoopCatalog = catalogName.startsWith("testhadoop"); |
|||
this.validationCatalog = isHadoopCatalog ? |
|||
new HadoopCatalog(hiveConf, "file:" + hadoopWarehouse.getRoot()) : |
|||
catalog; |
|||
this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog; |
|||
|
|||
config.put("type", "iceberg"); |
|||
if (!baseNamespace.isEmpty()) { |
|||
config.put(FlinkCatalogFactory.BASE_NAMESPACE, baseNamespace.toString()); |
|||
} |
|||
if (isHadoopCatalog) { |
|||
config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop"); |
|||
} else { |
|||
config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive"); |
|||
config.put(CatalogProperties.URI, getURI(hiveConf)); |
|||
} |
|||
config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot())); |
|||
|
|||
this.flinkDatabase = catalogName + "." + DATABASE; |
|||
this.icebergNamespace = Namespace.of(ArrayUtils.concat(baseNamespace.levels(), new String[] {DATABASE})); |
|||
} |
|||
|
|||
protected String warehouseRoot() { |
|||
if (isHadoopCatalog) { |
|||
return hadoopWarehouse.getRoot().getAbsolutePath(); |
|||
} else { |
|||
return hiveWarehouse.getRoot().getAbsolutePath(); |
|||
} |
|||
} |
|||
|
|||
protected String getFullQualifiedTableName(String tableName) { |
|||
final List<String> levels = Lists.newArrayList(icebergNamespace.levels()); |
|||
levels.add(tableName); |
|||
return Joiner.on('.').join(levels); |
|||
} |
|||
|
|||
static String getURI(HiveConf conf) { |
|||
return conf.get(HiveConf.ConfVars.METASTOREURIS.varname); |
|||
} |
|||
|
|||
static String toWithClause(Map<String, String> props) { |
|||
StringBuilder builder = new StringBuilder(); |
|||
builder.append("("); |
|||
int propCount = 0; |
|||
for (Map.Entry<String, String> entry : props.entrySet()) { |
|||
if (propCount > 0) { |
|||
builder.append(","); |
|||
} |
|||
builder.append("'").append(entry.getKey()).append("'").append("=") |
|||
.append("'").append(entry.getValue()).append("'"); |
|||
propCount++; |
|||
} |
|||
builder.append(")"); |
|||
return builder.toString(); |
|||
} |
|||
} |