Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ public void processAlterTable(AlterTableCommand command) throws UserException {
DynamicPartitionUtil.checkAlterAllowed(
(OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP));
}
Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionOp) alterOp, false, 0, true);
Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionOp) alterOp, false, 0, true, null);
} else if (alterOp instanceof AddPartitionLikeOp) {
if (!((AddPartitionLikeOp) alterOp).getTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed(
Expand Down
13 changes: 8 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -3479,14 +3479,17 @@ public boolean createTable(CreateTableInfo createTableInfo) throws UserException
* @param isCreateTable this call is for creating table
* @param generatedPartitionId the preset partition id for the partition to add
* @param writeEditLog whether to write an edit log for this addition
* @return PartitionPersistInfo to be written to editlog. It may be null if no partitions added.
* @batchPartitions output parameter, used to batch write edit log outside this function, can be null.
* first is editlog PartitionPersistInfo, second is the added Partition
* @throws DdlException
*/
public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionOp addPartitionOp,
public void addPartition(Database db, String tableName, AddPartitionOp addPartitionOp,
boolean isCreateTable, long generatedPartitionId,
boolean writeEditLog) throws DdlException {
return getInternalCatalog().addPartition(db, tableName, addPartitionOp,
isCreateTable, generatedPartitionId, writeEditLog);
boolean writeEditLog,
List<Pair<PartitionPersistInfo, Partition>> batchPartitions)
throws DdlException {
getInternalCatalog().addPartition(db, tableName, addPartitionOp,
isCreateTable, generatedPartitionId, writeEditLog, batchPartitions);
}

public void addMultiPartitions(Database db, String tableName, AlterMultiPartitionOp multiPartitionOp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
cloudBatchBeforeCreatePartitions(executeFirstTime, addPartitionOps, olapTable, indexIds,
db, tableName, generatedPartitionIds);

List<PartitionPersistInfo> partsInfo = new ArrayList<>();
List<Pair<PartitionPersistInfo, Partition>> batchPartsInfo = new ArrayList<>();
for (int i = 0; i < addPartitionOps.size(); i++) {
try {
boolean needWriteEditLog = true;
Expand All @@ -783,15 +783,10 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
if (Config.isCloudMode()) {
needWriteEditLog = !executeFirstTime;
}
PartitionPersistInfo info =
Env.getCurrentEnv().addPartition(db, tableName, addPartitionOps.get(i),
Env.getCurrentEnv().addPartition(db, tableName, addPartitionOps.get(i),
executeFirstTime,
executeFirstTime && Config.isCloudMode() ? generatedPartitionIds.get(i) : 0,
needWriteEditLog);
if (info == null) {
throw new Exception("null persisted partition returned");
}
partsInfo.add(info);
needWriteEditLog, batchPartsInfo);
clearCreatePartitionFailedMsg(olapTable.getId());
} catch (Exception e) {
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
Expand All @@ -802,7 +797,7 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
}
}
}
cloudBatchAfterCreatePartitions(executeFirstTime, partsInfo,
cloudBatchAfterCreatePartitions(executeFirstTime, batchPartsInfo,
addPartitionOps, db, olapTable, indexIds, tableName);

// ATTN: Breaking up dynamic partition table scheduling, consuming peak CPU consumption
Expand All @@ -822,15 +817,16 @@ public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitio
}
}

private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, List<PartitionPersistInfo> partsInfo,
ArrayList<AddPartitionOp> addPartitionOps, Database db,
OlapTable olapTable, List<Long> indexIds,
String tableName) throws DdlException {
private void cloudBatchAfterCreatePartitions(boolean executeFirstTime,
List<Pair<PartitionPersistInfo, Partition>> batchPartsInfo,
ArrayList<AddPartitionOp> addPartitionOps, Database db,
OlapTable olapTable, List<Long> indexIds,
String tableName) throws DdlException {
if (Config.isNotCloudMode()) {
return;
}
List<Long> succeedPartitionIds = partsInfo.stream().map(partitionPersistInfo
-> partitionPersistInfo.getPartition().getId()).collect(Collectors.toList());
List<Long> succeedPartitionIds = batchPartsInfo.stream().map(partitionInfo
-> partitionInfo.first.getPartition().getId()).collect(Collectors.toList());
if (!executeFirstTime || addPartitionOps.isEmpty()) {
LOG.info("cloud commit rpc in batch, {}-{}", !executeFirstTime, addPartitionOps.size());
return;
Expand All @@ -847,28 +843,56 @@ private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, List<Part
succeedPartitionIds, indexIds, true /* isCreateTable */, false /* isBatchCommit */);
LOG.info("begin write edit log to add partitions in batch, "
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
partsInfo.size(), db.getFullName(), tableName, olapTable.getId());
batchPartsInfo.size(), db.getFullName(), tableName, olapTable.getId());
// ATTN: here, edit log must after commit cloud partition,
// prevent commit RPC failure from causing data loss
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions")) {
LOG.info("debug point FE.DynamicPartitionScheduler.before.logEditPartitions, throw e");
// committed, but not log edit
throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition");
}
for (int i = 0; i < partsInfo.size(); i++) {
Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i));
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) {
if (i == partsInfo.size() / 2) {
LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e");
// committed, but log some edit, others failed
throw new Exception("debug point FE.DynamicPartitionScheduler"
+ ".in.commitCloudPartition");

for (int i = 0; i < batchPartsInfo.size(); i++) {
// get table write lock to add partition, edit log and modify table state must be atomic
olapTable.writeLockOrDdlException();
try {
boolean isTempPartition = addPartitionOps.get(i).isTempPartition();
Partition toAddPartition = batchPartsInfo.get(i).second;
String partitionName = toAddPartition.getName();
// ATTN: Check here to see if the newly created dynamic
// partition has already been added by another process.
// If it has, do not add this dynamic partition again,
// and call `onErasePartition` to clean up any remaining information.
Partition checkIsAdded = olapTable.getPartition(partitionName, isTempPartition);
if (checkIsAdded != null) {
LOG.warn("dynamic partition has been added, skip it. "
+ "db: {}, table: {}, partition: {}, tableId: {}",
db.getFullName(), tableName, partitionName, olapTable.getId());
Env.getCurrentEnv().onErasePartition(toAddPartition);
continue;
}
if (isTempPartition) {
olapTable.addTempPartition(toAddPartition);
} else {
olapTable.addPartition(toAddPartition);
}

Env.getCurrentEnv().getEditLog().logAddPartition(batchPartsInfo.get(i).first);
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) {
if (i == batchPartsInfo.size() / 2) {
LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e");
// committed, but log some edit, others failed
throw new Exception("debug point FE.DynamicPartitionScheduler"
+ ".in.commitCloudPartition");
}
}
} finally {
olapTable.writeUnlock();
}
}
LOG.info("finish write edit log to add partitions in batch, "
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
partsInfo.size(), db.getFullName(), tableName, olapTable.getId());
batchPartsInfo.size(), db.getFullName(), tableName, olapTable.getId());
} catch (Exception e) {
LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId {} exception {}",
db.getFullName(), tableName, olapTable.getId(), e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ public void addPartitionLike(Database db, String tableName, AddPartitionLikeOp a
} finally {
table.readUnlock();
}
addPartition(db, tableName, addPartitionOp, false, 0, true);
addPartition(db, tableName, addPartitionOp, false, 0, true, null);

} catch (UserException e) {
throw new DdlException("Failed to ADD PARTITION " + addPartitionLikeOp.getPartitionName()
Expand Down Expand Up @@ -1443,9 +1443,11 @@ private ResultOr<CompletableFuture<Void>, DdlException> getCurrentPartitionFutur
}
}

public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionOp addPartitionOp,
public void addPartition(Database db, String tableName, AddPartitionOp addPartitionOp,
boolean isCreateTable, long generatedPartitionId,
boolean writeEditLog) throws DdlException {
boolean writeEditLog,
List<Pair<PartitionPersistInfo, Partition>> batchPartitions)
throws DdlException {
// in cloud mode, isCreateTable == true, create dynamic partition use, so partitionId must have been generated.
// isCreateTable == false, other case, partitionId generate in below, must be set 0
if (!FeConstants.runningUnitTest && Config.isCloudMode()
Expand Down Expand Up @@ -1474,7 +1476,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
if (singlePartitionDesc.isSetIfNotExists()) {
LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName);
if (!DebugPointUtil.isEnable("InternalCatalog.addPartition.noCheckExists")) {
return null;
return;
}
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
Expand Down Expand Up @@ -1641,7 +1643,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
db, tableName, olapTable, partitionName, singlePartitionDesc);
if (ownerFutureOr.isErr()) {
if (ownerFutureOr.unwrapErr() == null) {
return null;
return;
} else {
throw ownerFutureOr.unwrapErr();
}
Expand Down Expand Up @@ -1697,7 +1699,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName);
if (singlePartitionDesc.isSetIfNotExists()) {
failedCleanCallback.run();
return null;
return;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName);
}
Expand Down Expand Up @@ -1755,12 +1757,6 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
// update partition info
partitionInfo.handleNewSinglePartitionDesc(singlePartitionDesc, partitionId, isTempPartition);

if (isTempPartition) {
olapTable.addTempPartition(partition);
} else {
olapTable.addPartition(partition);
}

// log
PartitionPersistInfo info = null;
if (partitionInfo.getType() == PartitionType.RANGE) {
Expand All @@ -1786,11 +1782,16 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
}
if (writeEditLog) {
Env.getCurrentEnv().getEditLog().logAddPartition(info);
if (isTempPartition) {
olapTable.addTempPartition(partition);
} else {
olapTable.addPartition(partition);
}
LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition);
} else {
batchPartitions.add(Pair.of(info, partition));
LOG.info("postpone creating partition[{}], temp: {}", partitionId, isTempPartition);
}
return info;
} finally {
olapTable.writeUnlock();
}
Expand Down Expand Up @@ -1837,7 +1838,7 @@ public void addMultiPartitions(Database db, String tableName, AlterMultiPartitio
AddPartitionOp addPartitionOp = new AddPartitionOp(
singlePartitionDesc.translateToPartitionDefinition(), null,
multiPartitionOp.getProperties(), false);
addPartition(db, tableName, addPartitionOp, false, 0, true);
addPartition(db, tableName, addPartitionOp, false, 0, true, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public static void addPartition(MTMV mtmv, PartitionKeyDesc oldPartitionKeyDesc)
mtmv.getDefaultDistributionInfo().toDistributionDesc().toDistributionDescriptor(),
partitionProperties, false);
Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause,
false, 0, true);
false, 0, true, null);
}

/**
Expand Down
22 changes: 21 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1666,14 +1666,29 @@ public void logRefreshExternalTableSchema(RefreshExternalTableInfo info) {
}

public long logAddPartition(PartitionPersistInfo info) {
if (DebugPointUtil.isEnable("FE.logAddPartition.slow")) {
DebugPointUtil.DebugPoint debugPoint = DebugPointUtil.getDebugPoint("FE.logAddPartition.slow");
String pName = debugPoint.param("pName", "");
if (info.getPartition().getName().equals(pName)) {
int sleepMs = debugPoint.param("sleep", 1000);
LOG.info("logAddPartition debug point hit, pName {}, sleep {} s", pName, sleepMs);
try {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
LOG.warn("sleep interrupted", e);
}
}
}
long logId = logEdit(OperationType.OP_ADD_PARTITION, info);
LOG.info("log add partition, logId:{}, info: {}", logId, info.toJson());
AddPartitionRecord record = new AddPartitionRecord(logId, info);
Env.getCurrentEnv().getBinlogManager().addAddPartitionRecord(record);
return logId;
}

public long logDropPartition(DropPartitionInfo info) {
long logId = logEdit(OperationType.OP_DROP_PARTITION, info);
LOG.info("log drop partition, logId:{}, info: {}", logId, info.toJson());
Env.getCurrentEnv().getBinlogManager().addDropPartitionRecord(info, logId);
return logId;
}
Expand All @@ -1684,6 +1699,7 @@ public void logErasePartition(long partitionId) {

public void logRecoverPartition(RecoverInfo info) {
long logId = logEdit(OperationType.OP_RECOVER_PARTITION, info);
LOG.info("log recover partition, logId:{}, info: {}", logId, info.toJson());
Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId);
}

Expand All @@ -1702,6 +1718,7 @@ public void logBatchModifyPartition(BatchModifyPartitionsInfo info) {

public void logDropTable(DropInfo info) {
long logId = logEdit(OperationType.OP_DROP_TABLE, info);
LOG.info("log drop table, logId : {}, infos: {}", logId, info);
if (Strings.isNullOrEmpty(info.getCtl()) || info.getCtl().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
DropTableRecord record = new DropTableRecord(logId, info);
Env.getCurrentEnv().getBinlogManager().addDropTableRecord(record);
Expand All @@ -1714,11 +1731,13 @@ public void logEraseTable(long tableId) {

public void logRecoverTable(RecoverInfo info) {
long logId = logEdit(OperationType.OP_RECOVER_TABLE, info);
LOG.info("log recover table, logId : {}, infos: {}", logId, info);
Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId);
}

public void logDropRollup(DropInfo info) {
long logId = logEdit(OperationType.OP_DROP_ROLLUP, info);
LOG.info("log drop rollup, logId : {}, infos: {}", logId, info);
Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId);
}

Expand Down Expand Up @@ -1835,7 +1854,8 @@ public void logDropRole(PrivInfo info) {
}

public void logDatabaseRename(DatabaseInfo databaseInfo) {
logEdit(OperationType.OP_RENAME_DB, databaseInfo);
long logId = logEdit(OperationType.OP_RENAME_DB, databaseInfo);
LOG.info("log database rename, logId : {}, infos: {}", logId, databaseInfo);
}

public void logTableRename(TableInfo tableInfo) {
Expand Down
Loading