Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
74d9da4
First draft of function to resolve concepts
awildturtok Nov 12, 2025
e028841
remove qualification from CTConditionContext to make integration with…
awildturtok Nov 12, 2025
4cccd65
Collect Auxiliary columns to then generate proper signature
awildturtok Nov 13, 2025
209d881
properly insert function (only postgres atm)
awildturtok Nov 17, 2025
8620c65
first draft towards applying matching stats
awildturtok Nov 18, 2025
51f1aea
Reworks registration of MatchingStats.
awildturtok Jan 29, 2025
98b69c4
fixes usage in MatchingStatsTests.java
awildturtok Jan 29, 2025
f09cdaa
wip
awildturtok Jan 30, 2025
107407d
implements conversion and extraction of matching stats for Hana and P…
awildturtok Dec 4, 2025
1b4949f
Merge remote-tracking branch 'origin/develop' into feature/matching-s…
awildturtok Jan 7, 2026
3f0b1bb
disable daterange and money type compat check
awildturtok Jan 7, 2026
ff18d93
adds missing coalesce for primaryColumn
awildturtok Jan 7, 2026
b86af08
don't remove connector column
awildturtok Jan 7, 2026
7735740
removes very verbose logging
awildturtok Jan 8, 2026
ed98bd2
use cursor to iterate results
awildturtok Jan 8, 2026
368b4f5
adds timing to SQL matching stats fetching
awildturtok Jan 8, 2026
5ffdade
use transaction to disable autocommit
awildturtok Jan 8, 2026
0cb22e8
log select statement for debugging
awildturtok Jan 8, 2026
8e64d47
add PARALLEL SAFE marker to created functions
awildturtok Jan 8, 2026
8d5cd0a
reworks SqlMatchingStats function as flattened table
awildturtok Jan 14, 2026
44899ee
adds some logging
awildturtok Jan 14, 2026
e2df161
adds missing concept Id
awildturtok Jan 14, 2026
750a03b
adds missing error handling in outer loop
awildturtok Jan 14, 2026
152fc72
adds grouping by params to map into most specific child
awildturtok Jan 14, 2026
37009b0
fixes wrong usage of immutable datasctructure
awildturtok Jan 14, 2026
67a0547
try to insert the join-tables
awildturtok Jan 15, 2026
34b8bb5
adds typing to fields
awildturtok Jan 15, 2026
4ff4776
remove primary key (nullability issue) and add index
awildturtok Jan 15, 2026
11bd235
fix index creation
awildturtok Jan 15, 2026
8e56eda
fix index creation #2
awildturtok Jan 15, 2026
0c355b3
fix index creation #3
awildturtok Jan 15, 2026
811afeb
outcomment index creation
awildturtok Jan 15, 2026
ced3e38
delete prior table
awildturtok Jan 15, 2026
bf1f37e
first draft of using join tables
awildturtok Jan 19, 2026
f04d739
fix dupe join
awildturtok Jan 19, 2026
6aef402
cleanup of SqlMatchingStats
awildturtok Jan 20, 2026
f5fe46e
fix naming
awildturtok Jan 20, 2026
8ef440f
adds exception handling
awildturtok Jan 20, 2026
556f5ee
hopefully fixes reference on ColumnValue
awildturtok Jan 20, 2026
4faabfb
hopefully fixes reference on ColumnValue
awildturtok Jan 20, 2026
46f46a1
hopefully fixes reference on ColumnValue
awildturtok Jan 21, 2026
c53ce5f
hopefully fixes reference on ColumnValue
awildturtok Jan 22, 2026
3093a9a
minor fixes for extraction of matching stats
awildturtok Jan 22, 2026
53ba076
more cleanup
awildturtok Jan 22, 2026
11c304f
more cleanup
awildturtok Jan 22, 2026
3d02375
fix union to intersection
awildturtok Jan 22, 2026
4366d85
cleanup
awildturtok Feb 2, 2026
1ef85b9
Cleanup of failing tests
awildturtok Feb 11, 2026
e301226
Merge branch 'develop' into feature/matching-stats-as-join-table
awildturtok Feb 17, 2026
14b72b5
fix hana insertion
awildturtok Feb 17, 2026
7b9d34d
Some more Hana fixes
awildturtok Feb 18, 2026
6e7d423
Merge branch 'develop' into feature/matching-stats-as-join-table
awildturtok Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,32 @@
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId;
import com.bakdata.conquery.models.identifiable.ids.specific.SecondaryIdDescriptionId;
import com.bakdata.conquery.models.identifiable.ids.specific.TableId;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.Namespace;
import lombok.Data;
import lombok.RequiredArgsConstructor;

/**
* Listener for updates of stored entities in ConQuery.
*/
public interface StorageListener {
@Data
public abstract class StorageListener<T extends Namespace>{
Comment on lines +19 to +20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ich hätte das Interface beibehalten und davon diese abstract Class abgeleitet.

Interfaces können auch praktisch sein, da man für diese Proxies implementieren kann


void onAddSecondaryId(SecondaryIdDescription secondaryId);
private final JobManager jobManager;
private final DatasetRegistry<T> datasetRegistry;

void onDeleteSecondaryId(SecondaryIdDescriptionId description);
public abstract void onAddSecondaryId(SecondaryIdDescription secondaryId);

void onAddTable(Table table);
public abstract void onDeleteSecondaryId(SecondaryIdDescriptionId description);

void onRemoveTable(TableId table);
public abstract void onAddTable(Table table);

void onAddConcept(Concept<?> concept);
public abstract void onRemoveTable(TableId table);

void onDeleteConcept(ConceptId concept);
public abstract void onAddConcept(Concept<?> concept);

public abstract void onDeleteConcept(ConceptId concept);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,48 @@
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.WorkerHandler;
import lombok.AllArgsConstructor;

/**
* Propagates changes of stored entities to relevant ConQuery shards in the cluster.
*/
@AllArgsConstructor
public
class ClusterStorageListener implements StorageListener {
public class ClusterStorageListener extends StorageListener<DistributedNamespace> {

private final JobManager jobManager;
private final DatasetRegistry<DistributedNamespace> datasetRegistry;

public ClusterStorageListener(JobManager jobManager, DatasetRegistry<DistributedNamespace> datasetRegistry) {
super(jobManager, datasetRegistry);
}

@Override
public void onAddSecondaryId(SecondaryIdDescription secondaryId) {
datasetRegistry.get(secondaryId.getDataset()).getWorkerHandler().sendToAll(new UpdateSecondaryId(secondaryId));
getDatasetRegistry().get(secondaryId.getDataset()).getWorkerHandler().sendToAll(new UpdateSecondaryId(secondaryId));
}

@Override
public void onDeleteSecondaryId(SecondaryIdDescriptionId secondaryId) {
datasetRegistry.get(secondaryId.getDataset()).getWorkerHandler().sendToAll(new RemoveSecondaryId(secondaryId));
getDatasetRegistry().get(secondaryId.getDataset()).getWorkerHandler().sendToAll(new RemoveSecondaryId(secondaryId));
}

@Override
public void onAddTable(Table table) {
datasetRegistry.get(table.getDataset()).getWorkerHandler().sendToAll(new UpdateTable(table));
getDatasetRegistry().get(table.getDataset()).getWorkerHandler().sendToAll(new UpdateTable(table));
}

@Override
public void onRemoveTable(TableId table) {
datasetRegistry.get(table.getDataset()).getWorkerHandler().sendToAll(new RemoveTable(table));
getDatasetRegistry().get(table.getDataset()).getWorkerHandler().sendToAll(new RemoveTable(table));
}

@Override
public void onAddConcept(Concept<?> concept) {
WorkerHandler handler = datasetRegistry.get(concept.getDataset()).getWorkerHandler();
WorkerHandler handler = getDatasetRegistry().get(concept.getDataset()).getWorkerHandler();
SimpleJob simpleJob = new SimpleJob(String.format("sendToAll : Add %s ", concept.getId()), () -> handler.sendToAll(new UpdateConcept(concept)));
jobManager.addSlowJob(simpleJob);
getJobManager().addSlowJob(simpleJob);
}

@Override
public void onDeleteConcept(ConceptId concept) {
WorkerHandler handler = datasetRegistry.get(concept.getDataset()).getWorkerHandler();
WorkerHandler handler = getDatasetRegistry().get(concept.getDataset()).getWorkerHandler();
SimpleJob simpleJob = new SimpleJob("sendToAll: remove " + concept, () -> handler.sendToAll(new RemoveConcept(concept)));
jobManager.addSlowJob(simpleJob);
getJobManager().addSlowJob(simpleJob);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.bakdata.conquery.mode.NamespaceHandler;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.models.worker.ShardNodeInformation;
Expand All @@ -33,19 +34,20 @@ public LocalManagerProvider(SqlDialectFactory dialectFactory) {

public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Environment environment) {

final JobManager jobManager = ManagerProvider.newJobManager(config);

final MetaStorage storage = new MetaStorage(config.getStorage());
final InternalMapperFactory internalMapperFactory = new InternalMapperFactory(config, environment.getValidator());
final NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, internalMapperFactory, dialectFactory);
final DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, internalMapperFactory);


return new DelegateManager<>(
config,
environment,
datasetRegistry,
storage,
new FailingImportHandler(),
new LocalStorageListener(),
new LocalStorageListener(jobManager, datasetRegistry),
EMPTY_NODE_PROVIDER,
List.of(),
internalMapperFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.bakdata.conquery.sql.DSLContextWrapper;
import com.bakdata.conquery.sql.DslContextFactory;
import com.bakdata.conquery.sql.conquery.SqlExecutionManager;
import com.bakdata.conquery.sql.conquery.SqlMatchingStats;
import com.bakdata.conquery.sql.conversion.NodeConversions;
import com.bakdata.conquery.sql.conversion.SqlConverter;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialect;
Expand Down Expand Up @@ -72,7 +73,8 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto
sqlStorageHandler,
namespaceData.jobManager(),
namespaceData.filterSearch(),
sqlEntityResolver
sqlEntityResolver,
databaseConfig
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,22 @@
import com.bakdata.conquery.models.datasets.SecondaryIdDescription;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId;
import com.bakdata.conquery.models.identifiable.ids.specific.SecondaryIdDescriptionId;
import com.bakdata.conquery.models.identifiable.ids.specific.TableId;
import lombok.Data;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.LocalNamespace;

@Data
public class LocalStorageListener implements StorageListener {
public class LocalStorageListener extends StorageListener<LocalNamespace> {


public LocalStorageListener(
JobManager jobManager,
DatasetRegistry<LocalNamespace> datasetRegistry) {
super(jobManager, datasetRegistry);
}

@Override
public void onAddSecondaryId(SecondaryIdDescription secondaryId) {
Expand All @@ -31,9 +40,13 @@ public void onRemoveTable(TableId table) {

@Override
public void onAddConcept(Concept<?> concept) {
LocalNamespace namespace = getDatasetRegistry().get(concept.getDataset());
namespace.getMatchingStats().createConceptIdJoinTable((TreeConcept) concept);
}

@Override
public void onDeleteConcept(ConceptId concept) {
LocalNamespace namespace = getDatasetRegistry().get(concept.getDataset());
namespace.getMatchingStats().deleteConceptIdJoinTable(concept);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Löschen?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ne eigentlich eher die Logik wenn es dann passt hierhin migrieren

Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.bakdata.conquery.mode.local;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.jobs.Job;
import com.bakdata.conquery.sql.conquery.SqlMatchingStats;
import com.google.common.base.Stopwatch;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.ToStringExclude;

@Slf4j
@Data
public class UpdateMatchingStatsSqlJob extends Job {

@ToString.Exclude
private final List<Concept<?>> concepts;
private final Dataset dataset;

@ToString.Exclude
private final SqlMatchingStats matchingStats;


@Override
public void execute() throws Exception {

log.info("BEGIN collecting SQL matching stats for {}", dataset);

Stopwatch stopwatch = Stopwatch.createStarted();

ExecutorService executorService = Executors.newSingleThreadExecutor();

List<CompletableFuture<?>> jobs = new ArrayList<>();


for (Concept<?> concept : concepts) {
if (!(concept instanceof TreeConcept)) {
continue;
}
jobs.add(matchingStats.collectMatchingStatsForConcept((TreeConcept) concept, executorService).toCompletableFuture());
}

CompletableFuture<Void> all = CompletableFuture.allOf(jobs.toArray(CompletableFuture[]::new));
while (!all.isDone()) {
if (isCancelled()) {
all.cancel(true);
log.debug("CANCELLED update matching stats for {}", getDataset(), all.exceptionNow());
return;
}

all.get(5, TimeUnit.SECONDS);
log.trace("WAITING for matching stats to finish {}", getDataset());

if (all.isCompletedExceptionally()) {
log.error("FAILED update matching stats for {}", getDataset(), all.exceptionNow());
return;
}
}

log.debug("DONE collecting SQL matching stats for {} within {}", dataset, stopwatch);
}

@Override
public String getLabel() {
return "Collect matching stats for %s (%s concepts)".formatted(dataset.getName(), concepts.size());
}
}
Loading
Loading