Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class IcebergExecuteActionFactory {
public static final String EXPIRE_SNAPSHOTS = "expire_snapshots";
public static final String REWRITE_DATA_FILES = "rewrite_data_files";
public static final String PUBLISH_CHANGES = "publish_changes";
public static final String REWRITE_MANIFESTS = "rewrite_manifests";

/**
* Create an Iceberg-specific ExecuteAction instance.
Expand Down Expand Up @@ -84,6 +85,9 @@ public static ExecuteAction createAction(String actionType, Map<String, String>
case PUBLISH_CHANGES:
return new IcebergPublishChangesAction(properties, partitionNamesInfo,
whereCondition);
case REWRITE_MANIFESTS:
return new IcebergRewriteManifestsAction(properties, partitionNamesInfo,
whereCondition);
default:
throw new DdlException("Unsupported Iceberg procedure: " + actionType
+ ". Supported procedures: " + String.join(", ", getSupportedActions()));
Expand All @@ -104,7 +108,8 @@ public static String[] getSupportedActions() {
FAST_FORWARD,
EXPIRE_SNAPSHOTS,
REWRITE_DATA_FILES,
PUBLISH_CHANGES
PUBLISH_CHANGES,
REWRITE_MANIFESTS
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.action;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.ArgumentParsers;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.rewrite.ManifestRewriteExecutor;
import org.apache.doris.info.PartitionNamesInfo;
import org.apache.doris.nereids.trees.expressions.Expression;

import com.google.common.collect.Lists;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;

/**
* Action for rewriting Iceberg manifest files to optimize metadata layout
*/
public class IcebergRewriteManifestsAction extends BaseIcebergAction {
private static final Logger LOG = LogManager.getLogger(IcebergRewriteManifestsAction.class);
public static final String CLUSTER_BY_PARTITION = "cluster-by-partition";
public static final String REWRITE_ALL = "rewrite-all";
public static final String MIN_MANIFEST_SIZE_BYTES = "min-manifest-size-bytes";
public static final String MAX_MANIFEST_SIZE_BYTES = "max-manifest-size-bytes";
public static final String SCAN_THREAD_POOL_SIZE = "scan-thread-pool-size";

public IcebergRewriteManifestsAction(Map<String, String> properties,
Optional<PartitionNamesInfo> partitionNamesInfo,
Optional<Expression> whereCondition) {
super("rewrite_manifests", properties, partitionNamesInfo, whereCondition);
}

@Override
protected void registerIcebergArguments() {
namedArguments.registerOptionalArgument(CLUSTER_BY_PARTITION,
"Cluster manifests by partition fields",
true,
ArgumentParsers.booleanValue(CLUSTER_BY_PARTITION));

namedArguments.registerOptionalArgument(REWRITE_ALL,
"Rewrite all manifests when true; otherwise use size thresholds",
true,
ArgumentParsers.booleanValue(REWRITE_ALL));

namedArguments.registerOptionalArgument(MIN_MANIFEST_SIZE_BYTES,
"Minimum manifest file size to be considered for rewrite",
0L,
ArgumentParsers.positiveLong(MIN_MANIFEST_SIZE_BYTES));

namedArguments.registerOptionalArgument(MAX_MANIFEST_SIZE_BYTES,
"Maximum manifest file size to be considered for rewrite",
0L,
ArgumentParsers.positiveLong(MAX_MANIFEST_SIZE_BYTES));

namedArguments.registerOptionalArgument(SCAN_THREAD_POOL_SIZE,
"Thread pool size for parallel manifest scanning",
0,
ArgumentParsers.intRange(SCAN_THREAD_POOL_SIZE, 0, 16));
}

@Override
protected void validateIcebergAction() throws UserException {
validateNoPartitions();
validateNoWhereCondition();

// Validate size parameter relationships
long minSize = namedArguments.getLong(MIN_MANIFEST_SIZE_BYTES);
long maxSize = namedArguments.getLong(MAX_MANIFEST_SIZE_BYTES);

if (maxSize > 0 && minSize > maxSize) {
throw new UserException("min-manifest-size-bytes (" + minSize
+ ") cannot be greater than max-manifest-size-bytes (" + maxSize + ")");
}
}

@Override
protected List<String> executeAction(TableIf table) throws UserException {
try {
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
Snapshot current = icebergTable.currentSnapshot();
if (current == null) {
LOG.info("Table {} has no current snapshot, no manifests to rewrite",
table.getName());
return Lists.newArrayList("0", "0");
}

// Build predicate for manifest selection
Predicate<ManifestFile> predicate = buildManifestPredicate();

// Execute rewrite operation
boolean clusterByPartition = namedArguments.getBoolean(CLUSTER_BY_PARTITION);
int scanThreads = namedArguments.getInt(SCAN_THREAD_POOL_SIZE);

ManifestRewriteExecutor executor = new ManifestRewriteExecutor();
ManifestRewriteExecutor.Result result = executor.execute(
icebergTable,
(ExternalTable) table,
clusterByPartition,
scanThreads,
predicate);

return result.toStringList();
} catch (Exception e) {
LOG.error("Failed to rewrite manifests for table: {}", table.getName(), e);
throw new UserException("Rewrite manifests failed: " + e.getMessage(), e);
}
}

/**
* Build predicate for selecting manifest files to rewrite
*/
private Predicate<ManifestFile> buildManifestPredicate() {
boolean rewriteAll = namedArguments.getBoolean(REWRITE_ALL);
long minSize = namedArguments.getLong(MIN_MANIFEST_SIZE_BYTES);
long maxSize = namedArguments.getLong(MAX_MANIFEST_SIZE_BYTES);

if (rewriteAll) {
return mf -> true;
}

if (minSize == 0 && maxSize == 0) {
return mf -> true;
}

return mf -> {
long len = mf.length();
boolean tooSmall = minSize > 0 && len < minSize;
boolean tooLarge = maxSize > 0 && len > maxSize;
return tooSmall || tooLarge;
};
}

@Override
protected List<Column> getResultSchema() {
return Lists.newArrayList(
new Column("rewritten_manifests_count", Type.INT, false,
"Number of data manifests rewritten by this command"),
new Column("total_data_manifests_count", Type.INT, false,
"Total number of data manifests before rewrite")
);
}

@Override
public String getDescription() {
return "Rewrite Iceberg manifest files to optimize metadata layout";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.rewrite;

import org.apache.doris.catalog.Env;
import org.apache.doris.datasource.ExternalTable;

import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.RewriteManifests;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

/**
* Executor for manifest rewrite operations
*/
public class ManifestRewriteExecutor {
private static final Logger LOG = LogManager.getLogger(ManifestRewriteExecutor.class);

public static class Result {
private final int rewrittenCount;
private final int totalCount;

public Result(int rewrittenCount, int totalCount) {
this.rewrittenCount = rewrittenCount;
this.totalCount = totalCount;
}

public java.util.List<String> toStringList() {
return java.util.Arrays.asList(String.valueOf(rewrittenCount),
String.valueOf(totalCount));
}
}

/**
* Execute manifest rewrite using Iceberg RewriteManifests API
*/
public Result execute(Table table, ExternalTable extTable,
boolean clusterByPartition, int scanThreads,
Predicate<ManifestFile> predicate) {
ExecutorService executor = null;
try {
Snapshot currentSnapshot = table.currentSnapshot();
if (currentSnapshot == null) {
return new Result(0, 0);
}

// Get manifest statistics before rewrite
List<ManifestFile> dataManifests = currentSnapshot.dataManifests(table.io());
int totalManifests = dataManifests.size();
int selectedManifests = (int) dataManifests.stream()
.filter(predicate)
.count();

// Execute rewrite operation
RewriteManifests rm = table.rewriteManifests();

// Optional: cluster by partition
if (clusterByPartition) {
rm.clusterBy(ContentFile::partition);
}

// Optional: use parallel scanning
if (scanThreads > 0) {
executor = Executors.newFixedThreadPool(scanThreads);
rm.scanManifestsWith(executor);
}

// Execute rewrite based on predicate
rm.rewriteIf(predicate).commit();

// Invalidate cache
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(extTable);
return new Result(selectedManifests, totalManifests);
} finally {
if (executor != null) {
shutdownExecutor(executor);
}
}
}

private void shutdownExecutor(ExecutorService executor) {
// Disable new tasks from being submitted
executor.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
// Log warning if executor doesn't terminate
LOG.warn("ExecutorService did not terminate");
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
Loading