dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
- Assert.assertEquals("Should have 2 data files before rewrite", 2, dataFiles.size());
-
- Actions actions = Actions.forTable(icebergTableUnPartitioned);
-
- RewriteDataFilesActionResult result = actions
- .rewriteDataFiles()
- .filter(Expressions.equal("data", "0"))
- .execute();
- Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
- Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
-
- // Assert the table records as expected.
- SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
- }
-
- /**
- * a test case to test avoid repeate compress
- *
- * If datafile cannot be combined to CombinedScanTask with other DataFiles, the size of the CombinedScanTask list size
- * is 1, so we remove these CombinedScanTasks to avoid compressed repeatedly.
- *
- * In this test case,we generated 3 data files and set targetSizeInBytes greater than the largest file size so that it
- * cannot be combined a CombinedScanTask with other datafiles. The datafile with the largest file size will not be
- * compressed.
- *
- * @throws IOException IOException
- */
- @Test
- public void testRewriteAvoidRepeateCompress() throws IOException {
- Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC));
- List expected = Lists.newArrayList();
- Schema schema = icebergTableUnPartitioned.schema();
- GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
- File file = temp.newFile();
- int count = 0;
- try (FileAppender fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) {
- long filesize = 20000;
- for (; fileAppender.length() < filesize; count++) {
- Record record = SimpleDataUtil.createRecord(count, "iceberg");
- fileAppender.add(record);
- expected.add(record);
- }
- }
-
- DataFile dataFile = DataFiles.builder(icebergTableUnPartitioned.spec())
- .withPath(file.getAbsolutePath())
- .withFileSizeInBytes(file.length())
- .withFormat(format)
- .withRecordCount(count)
- .build();
-
- icebergTableUnPartitioned.newAppend()
- .appendFile(dataFile)
- .commit();
-
- sql("INSERT INTO %s SELECT 1,'a' ", TABLE_NAME_UNPARTITIONED);
- sql("INSERT INTO %s SELECT 2,'b' ", TABLE_NAME_UNPARTITIONED);
-
- icebergTableUnPartitioned.refresh();
-
- CloseableIterable tasks = icebergTableUnPartitioned.newScan().planFiles();
- List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
- Assert.assertEquals("Should have 3 data files before rewrite", 3, dataFiles.size());
-
- Actions actions = Actions.forTable(icebergTableUnPartitioned);
-
- long targetSizeInBytes = file.length() + 10;
- RewriteDataFilesActionResult result = actions
- .rewriteDataFiles()
- .targetSizeInBytes(targetSizeInBytes)
- .splitOpenFileCost(1)
- .execute();
- Assert.assertEquals("Action should rewrite 2 data files", 2, result.deletedDataFiles().size());
- Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFiles().size());
-
- icebergTableUnPartitioned.refresh();
-
- CloseableIterable tasks1 = icebergTableUnPartitioned.newScan().planFiles();
- List dataFilesRewrote = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
- Assert.assertEquals("Should have 2 data files after rewrite", 2, dataFilesRewrote.size());
-
- // the biggest file do not be rewrote
- List rewroteDataFileNames = dataFilesRewrote.stream().map(ContentFile::path).collect(Collectors.toList());
- Assert.assertTrue(rewroteDataFileNames.contains(file.getAbsolutePath()));
-
- // Assert the table records as expected.
- expected.add(SimpleDataUtil.createRecord(1, "a"));
- expected.add(SimpleDataUtil.createRecord(2, "b"));
- SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
- }
-}
diff --git a/doc/技术文档/数据湖DEMO/etl/flink2iceberg/src/test/javax/org/apache/iceberg/flink/data/RandomGenericData.java b/doc/技术文档/数据湖DEMO/etl/flink2iceberg/src/test/javax/org/apache/iceberg/flink/data/RandomGenericData.java
deleted file mode 100644
index 4b66103..0000000
--- a/doc/技术文档/数据湖DEMO/etl/flink2iceberg/src/test/javax/org/apache/iceberg/flink/data/RandomGenericData.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.flink.data;
-
-import java.nio.ByteBuffer;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Supplier;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.RandomUtil;
-
-import static java.time.temporal.ChronoUnit.MICROS;
-
-public class RandomGenericData {
- private RandomGenericData() {
- }
-
- public static List generate(Schema schema, int numRecords, long seed) {
- return Lists.newArrayList(generateIcebergGenerics(schema, numRecords, () -> new RandomRecordGenerator(seed)));
- }
-
- public static Iterable generateFallbackRecords(Schema schema, int numRecords, long seed, long numDictRows) {
- return generateIcebergGenerics(schema, numRecords, () -> new FallbackGenerator(seed, numDictRows));
- }
-
- public static Iterable generateDictionaryEncodableRecords(Schema schema, int numRecords, long seed) {
- return generateIcebergGenerics(schema, numRecords, () -> new DictionaryEncodedGenerator(seed));
- }
-
- private static Iterable generateIcebergGenerics(Schema schema, int numRecords,
- Supplier> supplier) {
- return () -> new Iterator() {
- private final RandomDataGenerator generator = supplier.get();
- private int count = 0;
-
- @Override
- public boolean hasNext() {
- return count < numRecords;
- }
-
- @Override
- public Record next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- ++count;
- return (Record) TypeUtil.visit(schema, generator);
- }
- };
- }
-
- private static class RandomRecordGenerator extends RandomDataGenerator {
- private RandomRecordGenerator(long seed) {
- super(seed);
- }
-
- @Override
- public Record schema(Schema schema, Supplier