Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
Expand All @@ -43,10 +44,12 @@
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.operator.LockRemover;
import org.apache.iceberg.flink.maintenance.operator.LockRemoverOperatorFactory;
import org.apache.iceberg.flink.maintenance.operator.MonitorSource;
import org.apache.iceberg.flink.maintenance.operator.TableChange;
import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
import org.apache.iceberg.flink.maintenance.operator.TriggerManager;
import org.apache.iceberg.flink.maintenance.operator.TriggerManagerOperatorFactory;
import org.apache.iceberg.flink.sink.IcebergSink;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -71,39 +74,77 @@ private TableMaintenance() {}
*
* @param changeStream the table changes
* @param tableLoader used for accessing the table
* @param lockFactory used for preventing concurrent task runs
* @param lockFactory used for preventing concurrent task runs, if null, use coordination lock.
* @return builder for the maintenance stream
* @deprecated since 1.12.0, will be removed in 2.0.0. Use {@link #forChangeStream(DataStream,
* TableLoader)} instead.
*/
@Deprecated
@Internal
public static Builder forChangeStream(
DataStream<TableChange> changeStream,
TableLoader tableLoader,
TriggerLockFactory lockFactory) {
@Nullable TriggerLockFactory lockFactory) {
Preconditions.checkNotNull(changeStream, "The change stream should not be null");
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");

return new Builder(null, changeStream, tableLoader, lockFactory);
}

/**
* Use when the change stream is already provided, like in the {@link
* IcebergSink#addPostCommitTopology(DataStream)}.
*
* @param changeStream the table changes
* @param tableLoader used for accessing the table
* @return builder for the maintenance stream
*/
@Internal
public static Builder forChangeStream(
DataStream<TableChange> changeStream, TableLoader tableLoader) {
Preconditions.checkNotNull(changeStream, "The change stream should not be null");
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");

return new Builder(null, changeStream, tableLoader, null);
}

/**
* Use this for standalone maintenance job. It creates a monitor source that detect table changes
* and build the maintenance pipelines afterwards.
*
* @param env used to register the monitor source
* @param tableLoader used for accessing the table
* @param lockFactory used for preventing concurrent task runs
* @param lockFactory used for preventing concurrent task runs. If null, use coordination lock.
* @return builder for the maintenance stream
* @deprecated since 1.12.0, will be removed in 2.0.0. Use {@link
* #forTable(StreamExecutionEnvironment, TableLoader)} instead.
*/
@Deprecated
public static Builder forTable(
StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) {
StreamExecutionEnvironment env,
TableLoader tableLoader,
@Nullable TriggerLockFactory lockFactory) {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");

return new Builder(env, null, tableLoader, lockFactory);
}

/**
* Use this for standalone maintenance job. It creates a monitor source that detect table changes
* and build the maintenance pipelines afterwards. But use coordination lock default.
*
* @param env used to register the monitor source
* @param tableLoader used for accessing the table
* @return builder for the maintenance stream
*/
public static Builder forTable(StreamExecutionEnvironment env, TableLoader tableLoader) {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");

return new Builder(env, null, tableLoader, null);
}

public static class Builder {
private final StreamExecutionEnvironment env;
private final DataStream<TableChange> inputStream;
Expand Down Expand Up @@ -226,21 +267,43 @@ public void append() throws IOException {
try (TableLoader loader = tableLoader.clone()) {
loader.open();
String tableName = loader.loadTable().name();
DataStream<Trigger> triggers =
DataStreamUtils.reinterpretAsKeyedStream(
changeStream(tableName, loader), unused -> true)
.process(
new TriggerManager(
loader,
lockFactory,
taskNames,
evaluators,
rateLimit.toMillis(),
lockCheckDelay.toMillis()))
.name(TRIGGER_MANAGER_OPERATOR_NAME)
.uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel()
DataStream<Trigger> triggers;
if (lockFactory == null) {
triggers =
DataStreamUtils.reinterpretAsKeyedStream(
changeStream(tableName, loader), unused -> true)
.transform(
TRIGGER_MANAGER_OPERATOR_NAME,
TypeInformation.of(Trigger.class),
new TriggerManagerOperatorFactory(
tableName,
taskNames,
evaluators,
rateLimit.toMillis(),
lockCheckDelay.toMillis()))
.uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel();
} else {
triggers =
DataStreamUtils.reinterpretAsKeyedStream(
changeStream(tableName, loader), unused -> true)
.process(
new TriggerManager(
loader,
lockFactory,
taskNames,
evaluators,
rateLimit.toMillis(),
lockCheckDelay.toMillis()))
.name(TRIGGER_MANAGER_OPERATOR_NAME)
.uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel();
}

triggers =
triggers
.assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
.name(WATERMARK_ASSIGNER_OPERATOR_NAME)
.uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
Expand Down Expand Up @@ -277,14 +340,25 @@ public void append() throws IOException {
}

// Add the LockRemover to the end
unioned
.transform(
LOCK_REMOVER_OPERATOR_NAME,
TypeInformation.of(Void.class),
new LockRemover(tableName, lockFactory, taskNames))
.forceNonParallel()
.uid("lock-remover-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
if (lockFactory == null) {
unioned
.transform(
LOCK_REMOVER_OPERATOR_NAME,
TypeInformation.of(Void.class),
new LockRemoverOperatorFactory(tableName, taskNames))
.uid("lock-remover-" + uidSuffix)
.forceNonParallel()
.slotSharingGroup(slotSharingGroup);
} else {
unioned
.transform(
LOCK_REMOVER_OPERATOR_NAME,
TypeInformation.of(Void.class),
new LockRemover(tableName, lockFactory, taskNames))
.forceNonParallel()
.uid("lock-remover-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
}
}
}

Expand Down
Loading