You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

999 lines
29 KiB

3 years ago
### 环境恢复
**安装新mysql**
```shell
#命令1
sudo apt-get update
#命令2
sudo apt-get install mysql-server
# 初始化安全配置*(可选)
sudo mysql_secure_installation
# 远程访问和权限问题*(可选)
#前情提要:事先声明一下,这样做是对安全有好处的。刚初始化好的MySQL是不能进行远程登录的。要实现登录的话,强烈建议新建一个权限低一点的用户再进行远程登录。直接使用root用户远程登录有很大的风险。分分钟数据库就有可能被黑客drop掉。
#首先,修改/etc/mysql/my.cnf文件。把bind-address = 127.0.0.1这句给注释掉。解除地址绑定(或者是绑定一个你的固定地址。但宽带上网地址都是随机分配的,固定ip不可行)。
#然后,给一个用户授权使他能够远程登录。执行下面两句即可。
grant all PRIVILEGES on *.* to user1@'%'identified by '123456' WITH GRANT OPTION;
FLUSH PRIVILEGES;
service mysql restart。
```
**重新启动Hive**
STILL ON `37测试机` `/home/anxin/apache-hive-3.1.2-bin`
```sh
./schematool -initSchema -dbType mysql
# 加载我的环境变量,应为本机还安装了ambari的hive
source /etc/profile
hive --service metastore
#P.S. 我的环境变量
export JAVA_HOME=/usr/local/java/jdk1.8.0_131
export JAVA_HOME=/home/anxin/jdk8_322/jdk8u322-b06
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib:$JRE_HOME/lib
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH
export HIVE_HOME=/home/anxin/apache-hive-3.1.2-bin
export HIVE_CONF_DIR=$HIVE_HOME/conf
export PATH=$HIVE_HOME/bin:$PATH
export HADOOP_HOME=/usr/hdp/3.1.4.0-315/hadoop
export HADOOP_CONF_DIR=/usr/hdp/3.1.4.0-315/hadoop/conf
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
export FLINK_HOME=/home/anxin/flink-1.13.6
```
### Hive基础操作
参考:https://www.cnblogs.com/wangrd/p/6275162.html
```sql
--就会在HDFS的[/user/hive/warehouse/]中生成一个tabletest.db文件夹。
CREATE DATABASE tableset;
-- 切换当前数据库
USE tableset;
-- 创建表
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path]
CREATE TABLE t_order (
id int,
name string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' -- 指定字段分隔符
STORED AS TEXTFILE; -- 指定数据存储格式
-- 查看表结构
DESC t_order;
-- 导入数据
load data local inpath '/home/anxin/data/data.txt' [OVERWRITE] into table t_order;
-- EXTERNAL表
-- 创建外部表,不会对源文件位置作任何改变
-- 删除外部表不会删除源文件
CREATE EXTERNAL TABLE ex_order (
id int,
name string
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/external/hive';
--分区
CREATE TABLE t_order(id int,name string) partitioned by (part_flag string)
row format delimited fields terminated by '\t';
load data local inpath '/home/hadoop/ip.txt' overwrite into table t_order
partition(part_flag='part1'); -- 数据上传到part1子目录下
-- 查看所有表
SHOW TABLES;
SHOW TABLES 'TMP';
SHOW PARTITIONS TMP_TABLE;-- 查看表有哪些分区
DESCRIBE TMP_TABLE; -- 查看表结构
-- 分桶表
create table stu_buck(Sno int,Sname string,Sex string,Sage int,Sdept string)
clustered by(Sno)
sorted by(Sno DESC)
into 4 buckets
row format delimited
fields terminated by ',';
-- 通过insert into ...select...进行数据插入
set hive.enforce.bucketing = true;
set mapreduce.job.reduces=4;
insert overwrite table stu_buck
select * from student cluster by(Sno); --等价于 distribute by(Sno) sort by(Sno asc);
-- 删除表
DROP TABLE tablename;
-- 临时表
CREATE TABLE tmp_table
AS
SELECT id,name
FROM t_order
SORT BY new_id;
-- UDF 用户定义函数
-- 基层UDF函数,打包jar到程序,注册函数
CREATE TEMPORARY function tolowercase as 'cn.demo.Namespace';
select id,tolowercase(name) from t_order;
```
### Hadoop基础操作
本例中最终选择通过Hadoop Catalog实现IceBerg数据存储:
```sh
# -skipTrash 直接删除不放到回收站
hdfs dfs -rm -skipTrash /path/to/file/you/want/to/remove/permanently
# 清理所有Trash中的数据
hdfs dfs -expunge
## **清理指定文件夹下的所有数据**
hdfs dfs -rm -r -skipTrash /user/hadoop/*
## hadoop 启动错误:
chown -R hdfs:hdfs /hadoop/hdfs/namenode
# DataNode启动失败:可能多次format导致。修改data-node的clusterid和namenode中的一致
/hadoop/hdfs/data/current/VERSION
/hadoop/hdfs/namenode/current/VERSION
# 查看DataNode启动日志
root@node38:/var/log/hadoop/hdfs# tail -n 1000 hadoop-hdfs-datanode-node38.log
```
查看恢复的Hadoop集群:
![image-20220127110428706](imgs/数据湖2/image-20220127110428706.png)
### Flink SQL流式从Kafka到Hive
https://www.cnblogs.com/Springmoon-venn/p/13726089.html
读取kafka的sql:
```sql
tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
create table myhive.testhive.iotaKafkatable(
`userId` STRING,
`dimensionId` STRING,
`dimCapId` STRING,
`scheduleId` STRING,
`jobId` STRING,
`jobRepeatId` STRING,
`thingId` STRING ,
`deviceId` STRING,
`taskId` STRING,
`triggerTime` STRING,
`finishTime` STRING,
`seq` STRING,
`result` STRING,
`data` STRING
)with
('connector' = 'kafka',
'topic'='iceberg',
'properties.bootstrap.servers' = '10.8.30.37:6667',
'properties.group.id' = 'iceberg-demo' ,
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.ignore-parse-errors'='true')
```
创建hive表:
```sql
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
CREATE TABLE myhive.testhive.iotatable2(
`userId` STRING,
`dimensionId` STRING,
`dimCapId` STRING,
`scheduleId` STRING,
`jobId` STRING,
`jobRepeatId` STRING,
`thingId` STRING ,
`deviceId` STRING,
`taskId` STRING,
`triggerTime` TIMESTAMP,
`seq` STRING,
`result` STRING,
`data` STRING
)
PARTITIONED BY ( finishTime STRING) -- 分区间字段,该字段不存放实际的数据内容
STORED AS PARQUET
TBLPROPERTIES (
'sink.partition-commit.policy.kind' = 'metastore,success-file',
'partition.time-extractor.timestamp-pattern' = '$finishTime'
)
```
### IceBerg
概念再解析:
> 好文推荐:
>
> + [数据湖存储架构选型](https://blog.csdn.net/u011598442/article/details/110152352)
> +
参考:[Flink+IceBerg+对象存储,构建数据湖方案](https://baijiahao.baidu.com/s?id=1705407920794793309&wfr=spider&for=pc)
![img](imgs/数据湖2/b58f8c5494eef01f5824f06566c8492dbc317d19.jpeg)![img](imgs/数据湖2/f3d3572c11dfa9ec7f198010e3e6270b918fc146.jpeg)
IceBerg表数据组织架构:
命名空间-》表-》快照》表数据(Parquet/ORC/Avro等格式)
- **快照 Metadata**:表格 Schema、Partition、Partition spec、Manifest List 路径、当前快照等。
- **Manifest List:**Manifest File 路径及其 Partition,数据文件统计信息。
- **Manifest File:**Data File 路径及其每列数据上下边界。
- **Data File:**实际表内容数据,以 Parque,ORC,Avro 等格式组织。
![img](imgs/数据湖2/a6efce1b9d16fdfa26174a12c9b95c5c95ee7b96.jpeg)
由DataWorker读取元数据进行解析,让后把一条记录提交给IceBerg存储,IceBerg将记录写入预定义的分区,形成一些新文件。
Flink在执行Checkpoint的时候完成这一批文件的写入,然后生成这批文件的清单,提交给Commit Worker.
CommitWorker读出当前快照信息,然后与本次生成的文件列表进行合并,生成新的ManifestList文件以及后续元数据的表文件的信息。之后进行提交,成功后形成新快照。
![img](imgs/数据湖2/77094b36acaf2edd63d01449f226d1e139019328.jpeg)
![img](imgs/数据湖2/377adab44aed2e73ddb8d5980337718386d6faf4.jpeg)
catalog是Iceberg对表进行管理(create、drop、rename等)的一个组件。目前Iceberg主要支持HiveCatalog和HadoopCatalog两种。
HiveCatalog通过metastore数据库(一般MySQL)提供ACID,HadoopCatalog基于乐观锁机制和HDFS rename的原子性保障写入提交的ACID。
Flink兼容性
![image-20220119142219318](imgs/数据湖2/image-20220119142219318.png)
### 写入IceBerg
+ IceBerg官网 https://iceberg.apache.org/#flink/
+ 官网翻译 https://www.cnblogs.com/swordfall/p/14548574.html
+ 基于HiveCatalog的问题(未写入Hive) https://issueexplorer.com/issue/apache/iceberg/3092
+ [Flink + Iceberg: How to Construct a Whole-scenario Real-time Data Warehouse](https://www.alibabacloud.com/blog/flink-%2B-iceberg-how-to-construct-a-whole-scenario-real-time-data-warehouse_597824)
+ 被他玩明白了 https://miaowenting.site/2021/01/20/Apache-Iceberg/
#### 1.使用HadoopCatalog
https://cloud.tencent.com/developer/article/1807008
关键代码:
svn: http://svn.anxinyun.cn/Iota/branches/fs-iot/code/flink-iceberg/flink-iceberg/src/main/scala/com/fs/IceBergDealHadoopApplication.scala
```scala
...
```
#### 2. 使用HiveCatalog
> 进展:??? Hive中可以查询到数据。在FlinkSQL中查询不到数据
关键代码说明:
```scala
env.enableCheckpointing(5000)
// 创建IceBerg Catalog和Database
val createIcebergCatalogSql =
"""CREATE CATALOG iceberg WITH(
| 'type'='iceberg',
| 'catalog-type'='hive',
| 'hive-conf-dir'='E:\Iota\branches\fs-iot\code\flink-iceberg\flink-iceberg'
|)
""".stripMargin
// 创建原始数据表 iota_raw
val createIotaRawSql =
"""CREATE TABLE iceberg.iceberg_dba.iota_raw (
|`userId` STRING,
|`dimensionId` STRING,
|`dimCapId` STRING,
|`scheduleId` STRING,
|`jobId` STRING,
|`jobRepeatId` STRING,
|`thingId` STRING ,
|`deviceId` STRING,
|`taskId` STRING,
|`triggerTime` TIMESTAMP,
|`day` STRING,
|`seq` STRING,
|`result` STRING,
| `data` STRING
|) PARTITIONED BY (`thingId`,`day`)
|WITH (
| 'engine.hive.enabled' = 'true',
| 'table.exec.sink.not-null-enforcer'='ERROR'
|)
""".stripMargin
val kafka_iota_sql =
"""create table myhive.testhive.iotaKafkatable(
|`userId` STRING,
|`dimensionId` STRING,
|`dimCapId` STRING,
|`scheduleId` STRING,
|`jobId` STRING,
|`jobRepeatId` STRING,
|`thingId` STRING ,
|`deviceId` STRING,
|`taskId` STRING,
|`triggerTime` STRING,
|`finishTime` STRING,
|`seq` STRING,
|`result` STRING,
| `data` STRING
|)with
|('connector' = 'kafka',
|'topic'='iceberg',
|'properties.bootstrap.servers' = '10.8.30.37:6667',
|'properties.group.id' = 'iceberg-demo' ,
|'scan.startup.mode' = 'latest-offset',
|'format' = 'json',
|'json.ignore-parse-errors'='true'
|)
""".stripMargin
// 注册自定义函数 Transform
tenv.createTemporarySystemFunction("dcFunction", classOf[DateCgFunction])
tenv.createTemporarySystemFunction("tcFunction", classOf[TimeStampFunction])
val insertSql =
"""
|insert into iceberg.iceberg_dba.iota_raw
| select userId, dimensionId,dimCapId,scheduleId,jobId,jobRepeatId,thingId,deviceId,taskId,
|tcFunction(triggerTime),
|DATE_FORMAT(dcFunction(triggerTime),'yyyy-MM-dd'),
|seq,`result`,data
|from myhive.testhive.iotakafkatable
""".stripMargin
```
> 1. 使用HiveCatalog方式,必须指定 'engine.hive.enabled' = 'true'
>
> 2. 'table.exec.sink.not-null-enforcer'='ERROR' 在非空字段插入空值时的处理办法
>
> 3. 自定义函数实现
>
> ```scala
> class TimeStampFunction extends ScalarFunction {
> def eval(@DataTypeHint(inputGroup = InputGroup.UNKNOWN) o: String): Timestamp = {
> val v = DateParser.parse(o)
> if (v.isEmpty) {
> null
> } else {
> new Timestamp(v.get.getMillis)
> }
> }
> }
> ```
>
> 4. PARTITIONED BY (`thingId`,`day`) 根据thingid和日期分区,文件路径如: http://10.8.30.37:50070/explorer.html#/user/hive/warehouse/iceberg_dba.db/iota_raw/data/thingId=b6cfc716-3766-4949-88bc-71cb0dbf31ee/day=2022-01-20
>
> 5. 详细代码见 http://svn.anxinyun.cn/Iota/branches/fs-iot/code/flink-iceberg/flink-iceberg/src/main/scala/com/fs/DataDealApplication.scala
查看创建表结构的语句
```sql
show create table iota_raw;
CREATE EXTERNAL TABLE `iota_raw`(
`userid` string COMMENT 'from deserializer',
`dimensionid` string COMMENT 'from deserializer',
`dimcapid` string COMMENT 'from deserializer',
`scheduleid` string COMMENT 'from deserializer',
`jobid` string COMMENT 'from deserializer',
`jobrepeatid` string COMMENT 'from deserializer',
`thingid` string COMMENT 'from deserializer',
`deviceid` string COMMENT 'from deserializer',
`taskid` string COMMENT 'from deserializer',
`triggertime` timestamp COMMENT 'from deserializer',
`day` string COMMENT 'from deserializer',
`seq` string COMMENT 'from deserializer',
`result` string COMMENT 'from deserializer',
`data` string COMMENT 'from deserializer')
ROW FORMAT SERDE
'org.apache.iceberg.mr.hive.HiveIcebergSerDe'
STORED BY
'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION
'hdfs://node37:8020/user/hive/warehouse/iceberg_dba.db/iota_raw'
TBLPROPERTIES (
'engine.hive.enabled'='true',
'metadata_location'='hdfs://node37:8020/user/hive/warehouse/iceberg_dba.db/iota_raw/metadata/00010-547022ad-c615-4e2e-854e-8f85592db7b6.metadata.json',
'previous_metadata_location'='hdfs://node37:8020/user/hive/warehouse/iceberg_dba.db/iota_raw/metadata/00009-abfb6af1-13dd-439a-88f5-9cb822d6c0e4.metadata.json',
'table_type'='ICEBERG',
'transient_lastDdlTime'='1642579682')
```
在Hive中查看数据
```sql
hive> add jar /tmp/iceberg-hive-runtime-0.12.1.jar;
hive> select * from iota_raw;
```
#### 报错记录
1. HiveTableOperations$WaitingForLockException
```sql
-- HiveMetaStore中的HIVE_LOCKS表 将报错的表所对应的锁记录删除
select hl_lock_ext_id,hl_table,hl_lock_state,hl_lock_type,hl_last_heartbeat,hl_blockedby_ext_id from HIVE_LOCKS;
delete from HIVE_LOCKS;
```
### 查询IceBerg
#### 启动Flink SQL Client
flink 配置master `localhost:8081`,配置workers `localhost`.
配置flink.conf (可选)
```ini
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1
```
配置sql-client-defaults.yaml (可选)
```yaml
execution:
# select the implementation responsible for planning table programs
# possible values are 'blink' (used by default) or 'old'
planner: blink
# 'batch' or 'streaming' execution
type: streaming
# allow 'event-time' or only 'processing-time' in sources
time-characteristic: event-time
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog', 'table' or 'tableau' presentation of results
result-mode: table
# maximum number of maintained rows in 'table' presentation of results
max-table-result-rows: 1000000
# parallelism of the program
# parallelism: 1
# maximum parallelism
max-parallelism: 128
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
# current catalog ('default_catalog' by default)
current-catalog: default_catalog
# current database of the current catalog (default database of the catalog by default)
current-database: default_database
# controls how table programs are restarted in case of a failures
# restart-strategy:
# strategy type
# possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)
# type: fallback
```
启动flink集群:
```sh
./bin/start-cluster.sh
```
访问Flink UI http://node37:8081
启动sql-client
```sh
export HADOOP_CLASSPATH=`hadoop classpath`
./bin/sql-client.sh embedded \
-j /home/anxin/iceberg/iceberg-flink-runtime-0.12.0.jar \
-j /home/anxin/iceberg/flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar \
-j /home/anxin/flink-1.11.4/lib/flink-sql-connector-kafka-0.11_2.11-1.11.4.jar \
shell
```
#### 查询语句基础
```sql
CREATE CATALOG iceberg WITH(
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://node37:8020/user/hadoop',
'property-version'='1'
);
use catalog iceberg;
use iceberg_db; -- 选择数据库
--可选区域
SET; -- 查看当前配置
SET sql-client.execution.result-mode = table; -- changelog/tableau
SET sql-client.verbose=true; -- 打印异常堆栈
SET sql-client.execution.max-table-result.rows=1000000; -- 在表格模式下缓存的行数
SET table.planner = blink; -- planner: either blink (default) or old
SET execution.runtime-mode = streaming; -- execution mode either batch or streaming
SET sql-client.execution.result-mode = table; -- available values: table, changelog and tableau
SET parallelism.default = 1; -- optional: Flinks parallelism (1 by default)
SET pipeline.auto-watermark-interval = 200; --optional: interval for periodic watermarks
SET pipeline.max-parallelism = 10; -- optional: Flink's maximum parallelism
SET table.exec.state.ttl = 1000; -- optional: table program's idle state time
SET restart-strategy = fixed-delay;
SET table.optimizer.join-reorder-enabled = true;
SET table.exec.spill-compression.enabled = true;
SET table.exec.spill-compression.block-size = 128kb;
SET execution.savepoint.path = tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab; -- restore from the specific savepoint path
-- 执行一组SQL指令
BEGIN STATEMENT SET;
-- one or more INSERT INTO statements
{ INSERT INTO|OVERWRITE <select_statement>; }+
END;
```
![image-20220120164032739](imgs/数据湖2/image-20220120164032739.png)
#### 批量读取
> 在FlinkSQL执行SET后再执行查询,总是报错:
>
>
>
> 所以需要在执行SQL Client之前设置一些参数
修改 `conf/sql-client-defaults.yaml`
execution.type=batch
```yaml
catalogs:
# A typical catalog definition looks like:
- name: myhive
type: hive
hive-conf-dir: /home/anxin/apache-hive-3.1.2-bin/conf
# default-database: ...
- name: hadoop_catalog
type: iceberg
warehouse: hdfs://node37:8020/user/hadoop
catalog-type: hadoop
#==============================================================================
# Modules
#==============================================================================
# Define modules here.
#modules: # note the following modules will be of the order they are specified
# - name: core
# type: core
#==============================================================================
# Execution properties
#==============================================================================
# Properties that change the fundamental execution behavior of a table program.
execution:
# select the implementation responsible for planning table programs
# possible values are 'blink' (used by default) or 'old'
planner: blink
# 'batch' or 'streaming' execution
type: batch
# allow 'event-time' or only 'processing-time' in sources
time-characteristic: event-time
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog', 'table' or 'tableau' presentation of results
result-mode: table
# maximum number of maintained rows in 'table' presentation of results
max-table-result-rows: 1000000
# parallelism of the program
# parallelism: 1
# maximum parallelism
max-parallelism: 128
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
# current catalog ('default_catalog' by default)
current-catalog: default_catalog
# current database of the current catalog (default database of the catalog by default)
current-database: default_database
# controls how table programs are restarted in case of a failures
# restart-strategy:
# strategy type
# possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)
# type: fallback
#==============================================================================
# Configuration options
#==============================================================================
# Configuration options for adjusting and tuning table programs.
# A full list of options and their default values can be found
# on the dedicated "Configuration" web page.
# A configuration can look like:
configuration:
table.exec.spill-compression.enabled: true
table.exec.spill-compression.block-size: 128kb
table.optimizer.join-reorder-enabled: true
# execution.checkpointing.interval: 10s
table.dynamic-table-options.enabled: true
```
#### 流式读取
修改 `conf/sql-client-defaults.yaml`
execution.type=streaming
execution.checkpointing.interval: 10s
table.dynamic-table-options.enabled: true // 开启[动态表(Dynamic Table)选项](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/hints/)
```sql
-- Submit the flink job in streaming mode for current session.
SET execution.type = streaming ;
-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;
-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM iota_raw /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s')*/ ;
-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM iota_raw /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s', 'start-snapshot-id'='3821550127947089987')*/ ;
```
#### 通过外部Hive表查询
```sql
-- HIVE SHELL
add jar /tmp/iceberg-hive-runtime-0.12.1.jar;
use iceberg_dba;
SET engine.hive.enabled=true;
SET iceberg.engine.hive.enabled=true;
SET iceberg.mr.catalog=hive;
CREATE EXTERNAL TABLE iceberg_dba.iota_rawe(
`userId` STRING,
`dimensionId` STRING,
`dimCapId` STRING,
`scheduleId` STRING,
`jobId` STRING,
`jobRepeatId` STRING,
`thingId` STRING ,
`deviceId` STRING,
`taskId` STRING,
`triggerTime` TIMESTAMP,
`day` STRING,
`seq` STRING,
`result` STRING,
`data` STRING
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION '/user/hadoop/iceberg_db/iota_raw'
TBLPROPERTIES (
'iceberg.mr.catalog'='hadoop',
'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://node37:8020/user/hadoop/iceberg_db/iota_raw'
);
```
#### 处理小文件的三种方式
https://zhuanlan.zhihu.com/p/349420627
1. Iceberg表中设置 write.distribution-mode=hash
```sql
CREATE TABLE sample (
id BIGINT,
data STRING
) PARTITIONED BY (data) WITH (
'write.distribution-mode'='hash'
);
```
2. 定期对 Apache Iceberg 表执行 Major Compaction 来合并 Apache iceberg 表中的小文件。这个作业目前是一个 Flink 的批作业,提供 Java API 的方式来提交作业,使用姿势可以参考文档[8]。
3. 在每个 Flink Sink 流作业之后,外挂算子用来实现小文件的自动合并。这个功能目前暂未 merge 到社区版本,由于涉及到 format v2 的 compaction 的一些讨论,我们会在 0.12.0 版本中发布该功能。
> Iceberg provides API to rewrite small files into large files by submitting flink batch job. The behavior of this flink action is the same as the spark's rewriteDataFiles.
```java
import org.apache.iceberg.flink.actions.Actions;
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table)
.rewriteDataFiles()
.execute();
```
For more doc about options of the rewrite files action, please see [RewriteDataFilesAction](https://iceberg.apache.org/#javadoc/0.12.1/org/apache/iceberg/flink/actions/RewriteDataFilesAction.html)
#### 插播“ [Flink操作HUDI](https://hudi.apache.org/docs/0.8.0/flink-quick-start-guide/)
流式读取
```sql
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'oss://vvr-daily/hudi/t1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- this option enable the streaming read
'read.streaming.start-commit' = '20210316134557' -- specifies the start commit instant time
'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);
-- Then query the table in stream mode
select * from t1;
```
#### 报错记录:
1. java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configurable
2. 执行 Flink SQL 报错 `[ERROR] Could not execute SQL statement. Reason: java.net.ConnectException: Connection refused`
启动flink集群:
```sh
./bin/start-cluster.sh
```
3. 执行batch查询时:
```scala
val bsSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tenv = TableEnvironment.create(bsSetting)
```
```sh
Error:(22, 43) Static methods in interface require -target:jvm-1.8
val tenv = TableEnvironment.create(bsSetting)
```
未解决:继续使用StreamTableEnvironment
4. MiniCluster is not yet running or has already been shut down
本地同时调试写入和查询两个Flink程序。不能同时调试两个程序
??
5. flink SQL 程序执行报错 Job client must be a CoordinationRequestGateway. This is a bug
通过命令行提交执行:
```shell
./bin/flink run -c com.fs.OfficialRewriteData -p 1 ./flink-iceberg-1.0-SNAPSHOT-shaded.jar --host localhost --port 8081
```
6. 任务提交时,Unable to instantiate java compiler
```sh
Unable to instantiate java compiler: calcite依赖冲突
```
参考 : https://blog.csdn.net/weixin_44056920/article/details/118110262
7. Flink报错OOM
放大Flink内存
```yaml
jobmanager.memory.process.size: 2600m
taskmanager.memory.jvm-metaspace.size: 1000m
jobmanager.memory.jvm-metaspace.size: 1000m
```
8. 网路上的问题汇总帖
> IceBerg+Kafka+FlinkSQL https://blog.csdn.net/qq_33476283/article/details/119138610
### 大数据湖最佳实践
实施数据湖的路线图
+ 建设基础设施(Hadoop集群)
+ 组织好数据湖的各个区域(给不同的用户群创建各种区域,并导入数据)
+ 设置好数据湖的自助服务(创建数据资产目录、访问控制机制、准备分析师使用的工具)
+ 将数据湖开放给用户
规划数据湖:
+ 原始区:保存采集的数据
+ 产品区:清洗处理后的数据
+ 工作区:数据科学家在此分析数据,一般按用户、项目、主题划分。投产后迁移至产品区
+ 敏感区.
传统数据库是基于Schema On Write,数据湖(Hadoop等)是Schema On Read.
Michael Hausenblas:
> 数据湖一般与静态数据相关联。其基本思想是引入数据探索的自助服务方法,使相关的业务数据集可以在组织中共享
+ 数据存储 HDFS HBase Cassandra Kafka
+ 处理引擎 Spark Flink Beam
+ 交互 Zeppelin/Spark noteboook,Tableau/Datameer
### 附录
1. hive-site.xml
```xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://10.8.30.37:3306/metastore_db?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://10.8.30.37:9083</value>
</property>
<!-- hiveserver2 -->
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>10.8.30.37</value>
</property>
</configuration>
```