Skip to content

Commit

Permalink
Update vendored DuckDB sources to d590207
Browse files Browse the repository at this point in the history
  • Loading branch information
duckdblabs-bot committed Nov 14, 2024
1 parent d590207 commit 17b1477
Show file tree
Hide file tree
Showing 51 changed files with 862 additions and 294 deletions.
7 changes: 6 additions & 1 deletion src/duckdb/src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,12 @@ vector<CatalogSearchEntry> GetCatalogEntries(CatalogEntryRetriever &retriever, c
entries.emplace_back(catalog_name, schema);
}
if (entries.empty()) {
entries.emplace_back(DatabaseManager::GetDefaultDatabase(context), schema);
auto &default_entry = search_path.GetDefault();
if (!IsInvalidCatalog(default_entry.catalog)) {
entries.emplace_back(default_entry.catalog, schema);
} else {
entries.emplace_back(DatabaseManager::GetDefaultDatabase(context), schema);
}
}
} else if (IsInvalidSchema(schema)) {
auto schemas = search_path.GetSchemasForCatalog(catalog);
Expand Down
5 changes: 5 additions & 0 deletions src/duckdb/src/catalog/catalog_entry/duck_schema_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,11 @@ optional_ptr<CatalogEntry> DuckSchemaEntry::GetEntry(CatalogTransaction transact
return GetCatalogSet(type).GetEntry(transaction, name);
}

CatalogSet::EntryLookup DuckSchemaEntry::GetEntryDetailed(CatalogTransaction transaction, CatalogType type,
const string &name) {
return GetCatalogSet(type).GetEntryDetailed(transaction, name);
}

SimilarCatalogEntry DuckSchemaEntry::GetSimilarEntry(CatalogTransaction transaction, CatalogType type,
const string &name) {
return GetCatalogSet(type).SimilarEntry(transaction, name);
Expand Down
14 changes: 14 additions & 0 deletions src/duckdb/src/catalog/catalog_entry/schema_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ SimilarCatalogEntry SchemaCatalogEntry::GetSimilarEntry(CatalogTransaction trans
return result;
}

//! This should not be used, it's only implemented to not put the burden of implementing it on every derived class of
//! SchemaCatalogEntry
CatalogSet::EntryLookup SchemaCatalogEntry::GetEntryDetailed(CatalogTransaction transaction, CatalogType type,
const string &name) {
CatalogSet::EntryLookup result;
result.result = GetEntry(transaction, type, name);
if (!result.result) {
result.reason = CatalogSet::EntryLookup::FailureReason::DELETED;
} else {
result.reason = CatalogSet::EntryLookup::FailureReason::SUCCESS;
}
return result;
}

unique_ptr<CreateInfo> SchemaCatalogEntry::GetInfo() const {
auto result = make_uniq<CreateSchemaInfo>();
result->schema = name;
Expand Down
97 changes: 70 additions & 27 deletions src/duckdb/src/catalog/catalog_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ CatalogSet::CatalogSet(Catalog &catalog_p, unique_ptr<DefaultGenerator> defaults
CatalogSet::~CatalogSet() {
}

bool IsDependencyEntry(CatalogEntry &entry) {
return entry.type == CatalogType::DEPENDENCY_ENTRY;
}

bool CatalogSet::StartChain(CatalogTransaction transaction, const string &name, unique_lock<mutex> &read_lock) {
D_ASSERT(!map.GetEntry(name));

Expand All @@ -101,9 +97,8 @@ bool CatalogSet::StartChain(CatalogTransaction transaction, const string &name,
return false;
}

// first create a dummy deleted entry for this entry
// so transactions started before the commit of this transaction don't
// see it yet
// first create a dummy deleted entry
// so other transactions will see that instead of the entry that is to be added.
auto dummy_node = make_uniq<InCatalogEntry>(CatalogType::INVALID, catalog, name);
dummy_node->timestamp = 0;
dummy_node->deleted = true;
Expand All @@ -114,20 +109,23 @@ bool CatalogSet::StartChain(CatalogTransaction transaction, const string &name,
}

bool CatalogSet::VerifyVacancy(CatalogTransaction transaction, CatalogEntry &entry) {
// if it does, we have to check version numbers
if (HasConflict(transaction, entry.timestamp)) {
// current version has been written to by a currently active
// transaction
// A transaction that is not visible to our snapshot has already made a change to this entry.
// Because of Catalog limitations we can't push our change on this, even if the change was made by another
// active transaction that might end up being aborted. So we have to cancel this transaction.
throw TransactionException("Catalog write-write conflict on create with \"%s\"", entry.name);
}
// there is a current version that has been committed
// if it has not been deleted there is a conflict
// The entry is visible to our snapshot
if (!entry.deleted) {
return false;
}
return true;
}

static bool IsDependencyEntry(CatalogEntry &entry) {
return entry.type == CatalogType::DEPENDENCY_ENTRY;
}

void CatalogSet::CheckCatalogEntryInvariants(CatalogEntry &value, const string &name) {
if (value.internal && !catalog.IsSystemCatalog() && name != DEFAULT_SCHEMA) {
throw InternalException("Attempting to create internal entry \"%s\" in non-system catalog - internal entries "
Expand Down Expand Up @@ -160,7 +158,7 @@ optional_ptr<CatalogEntry> CatalogSet::CreateCommittedEntry(unique_ptr<CatalogEn
auto catalog_entry = entry.get();

entry->set = this;
// Set the timestamp to the first committed transaction
// Give the entry commit id 0, so it is visible to all transactions
entry->timestamp = 0;
map.AddEntry(std::move(entry));

Expand All @@ -176,7 +174,7 @@ bool CatalogSet::CreateEntryInternal(CatalogTransaction transaction, const strin
return false;
}
} else if (should_be_empty) {
// Verify that the chain is deleted, not altered by another transaction
// Verify that the entry is deleted, not altered by another transaction
if (!VerifyVacancy(transaction, *entry_value)) {
return false;
}
Expand All @@ -185,7 +183,7 @@ bool CatalogSet::CreateEntryInternal(CatalogTransaction transaction, const strin
// Finally add the new entry to the chain
auto value_ptr = value.get();
map.UpdateEntry(std::move(value));
// push the old entry in the undo buffer for this transaction
// Push the old entry in the undo buffer for this transaction, so it can be restored in the event of failure
if (transaction.transaction) {
DuckTransactionManager::Get(GetCatalog().GetAttached())
.PushCatalogEntry(*transaction.transaction, value_ptr->Child());
Expand All @@ -197,10 +195,9 @@ bool CatalogSet::CreateEntry(CatalogTransaction transaction, const string &name,
const LogicalDependencyList &dependencies) {
CheckCatalogEntryInvariants(*value, name);

// Set the timestamp to the timestamp of the current transaction
// Mark this entry as being created by the current active transaction
value->timestamp = transaction.transaction_id;
value->set = this;
// now add the dependency set of this object to the dependency manager
catalog.GetDependencyManager().AddObject(transaction, *value, dependencies);

// lock the catalog for writing
Expand All @@ -216,24 +213,24 @@ bool CatalogSet::CreateEntry(ClientContext &context, const string &name, unique_
return CreateEntry(catalog.GetCatalogTransaction(context), name, std::move(value), dependencies);
}

//! This method is used to retrieve an entry for the purpose of making a new version, through an alter/drop/create
optional_ptr<CatalogEntry> CatalogSet::GetEntryInternal(CatalogTransaction transaction, const string &name) {
auto entry_value = map.GetEntry(name);
if (!entry_value) {
// the entry does not exist, check if we can create a default entry
return nullptr;
}
auto &catalog_entry = *entry_value;

// if it does: we have to retrieve the entry and to check version numbers
// Check if this entry is visible to our snapshot
if (HasConflict(transaction, catalog_entry.timestamp)) {
// current version has been written to by a currently active
// transaction
// We intend to create a new version of the entry.
// Another transaction has already made an edit to this catalog entry, because of limitations in the Catalog we
// can't create an edit alongside this even if the other transaction might end up getting aborted. So we have to
// abort the transaction.
throw TransactionException("Catalog write-write conflict on alter with \"%s\"", catalog_entry.name);
}
// there is a current version that has been committed by this transaction
// The entry is visible to our snapshot, check if it's deleted
if (catalog_entry.deleted) {
// if the entry was already deleted, it now does not exist anymore
// so we return that we could not find it
return nullptr;
}
return &catalog_entry;
Expand Down Expand Up @@ -434,6 +431,35 @@ bool CatalogSet::DropEntry(ClientContext &context, const string &name, bool casc
return DropEntry(catalog.GetCatalogTransaction(context), name, cascade, allow_drop_internal);
}

//! Verify that the object referenced by the dependency still exists when we commit the dependency
void CatalogSet::VerifyExistenceOfDependency(transaction_t commit_id, CatalogEntry &entry) {
auto &duck_catalog = GetCatalog();

// Make sure that we don't see any uncommitted changes
auto transaction_id = MAX_TRANSACTION_ID;
// This will allow us to see all committed changes made before this COMMIT happened
auto tx_start_time = commit_id;
CatalogTransaction commit_transaction(duck_catalog.GetDatabase(), transaction_id, tx_start_time);

D_ASSERT(entry.type == CatalogType::DEPENDENCY_ENTRY);
auto &dep = entry.Cast<DependencyEntry>();
duck_catalog.GetDependencyManager().VerifyExistence(commit_transaction, dep);
}

//! Verify that no dependencies creations were committed since our transaction started, that reference the entry we're
//! dropping
void CatalogSet::CommitDrop(transaction_t commit_id, transaction_t start_time, CatalogEntry &entry) {
auto &duck_catalog = GetCatalog();

// Make sure that we don't see any uncommitted changes
auto transaction_id = MAX_TRANSACTION_ID;
// This will allow us to see all committed changes made before this COMMIT happened
auto tx_start_time = commit_id;
CatalogTransaction commit_transaction(duck_catalog.GetDatabase(), transaction_id, tx_start_time);

duck_catalog.GetDependencyManager().VerifyCommitDrop(commit_transaction, start_time, entry);
}

DuckCatalog &CatalogSet::GetCatalog() {
return catalog;
}
Expand Down Expand Up @@ -467,6 +493,11 @@ bool CatalogSet::HasConflict(CatalogTransaction transaction, transaction_t times
return CreatedByOtherActiveTransaction(transaction, timestamp) || CommittedAfterStarting(transaction, timestamp);
}

bool CatalogSet::IsCommitted(transaction_t timestamp) {
//! FIXME: `transaction_t` itself should be a class that has these methods
return timestamp < TRANSACTION_ID_START;
}

bool CatalogSet::UseTimestamp(CatalogTransaction transaction, transaction_t timestamp) {
if (timestamp == transaction.transaction_id) {
// we created this version
Expand All @@ -480,13 +511,20 @@ bool CatalogSet::UseTimestamp(CatalogTransaction transaction, transaction_t time
}

CatalogEntry &CatalogSet::GetEntryForTransaction(CatalogTransaction transaction, CatalogEntry &current) {
bool visible;
return GetEntryForTransaction(transaction, current, visible);
}

CatalogEntry &CatalogSet::GetEntryForTransaction(CatalogTransaction transaction, CatalogEntry &current, bool &visible) {
reference<CatalogEntry> entry(current);
while (entry.get().HasChild()) {
if (UseTimestamp(transaction, entry.get().timestamp)) {
break;
visible = true;
return entry.get();
}
entry = entry.get().Child();
}
visible = false;
return entry.get();
}

Expand Down Expand Up @@ -554,9 +592,14 @@ CatalogSet::EntryLookup CatalogSet::GetEntryDetailed(CatalogTransaction transact
// check the version numbers

auto &catalog_entry = *entry_value;
auto &current = GetEntryForTransaction(transaction, catalog_entry);
bool visible;
auto &current = GetEntryForTransaction(transaction, catalog_entry, visible);
if (current.deleted) {
return EntryLookup {nullptr, EntryLookup::FailureReason::DELETED};
if (!visible) {
return EntryLookup {nullptr, EntryLookup::FailureReason::INVISIBLE};
} else {
return EntryLookup {nullptr, EntryLookup::FailureReason::DELETED};
}
}
D_ASSERT(StringUtil::CIEquals(name, current.name));
return EntryLookup {&current, EntryLookup::FailureReason::SUCCESS};
Expand Down
90 changes: 85 additions & 5 deletions src/duckdb/src/catalog/dependency_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,17 +433,86 @@ string DependencyManager::CollectDependents(CatalogTransaction transaction, cata
return result;
}

void DependencyManager::DropObject(CatalogTransaction transaction, CatalogEntry &object, bool cascade) {
void DependencyManager::VerifyExistence(CatalogTransaction transaction, DependencyEntry &object) {
auto &subject = object.Subject();

CatalogEntryInfo info;
if (subject.flags.IsOwnership()) {
info = object.SourceInfo();
} else {
info = object.EntryInfo();
}

auto &type = info.type;
auto &schema = info.schema;
auto &name = info.name;

auto &duck_catalog = catalog.Cast<DuckCatalog>();
auto &schema_catalog_set = duck_catalog.GetSchemaCatalogSet();

CatalogSet::EntryLookup lookup_result;
lookup_result = schema_catalog_set.GetEntryDetailed(transaction, schema);

if (type != CatalogType::SCHEMA_ENTRY && lookup_result.result) {
auto &schema_entry = lookup_result.result->Cast<SchemaCatalogEntry>();
lookup_result = schema_entry.GetEntryDetailed(transaction, type, name);
}

if (lookup_result.reason == CatalogSet::EntryLookup::FailureReason::DELETED) {
throw DependencyException("Could not commit creation of dependency, subject \"%s\" has been deleted",
object.SourceInfo().name);
}
}

void DependencyManager::VerifyCommitDrop(CatalogTransaction transaction, transaction_t start_time,
CatalogEntry &object) {
if (IsSystemEntry(object)) {
// Don't do anything for this
return;
}

auto info = GetLookupProperties(object);
// Check if there are any entries that block the DROP because they still depend on the object
catalog_entry_set_t to_drop;
ScanDependents(transaction, info, [&](DependencyEntry &dep) {
auto dep_committed_at = dep.timestamp.load();
if (dep_committed_at > start_time) {
// In the event of a CASCADE, the dependency drop has not committed yet
// so we would be halted by the existence of a dependency we are already dropping unless we check the
// timestamp
//
// Which differentiates between objects that we were already aware of (and will subsequently be dropped) and
// objects that were introduced inbetween, which should cause this error:
throw DependencyException(
"Could not commit DROP of \"%s\" because a dependency was created after the transaction started",
object.name);
}
});
ScanSubjects(transaction, info, [&](DependencyEntry &dep) {
auto dep_committed_at = dep.timestamp.load();
if (!dep.Dependent().flags.IsOwnedBy()) {
return;
}
D_ASSERT(dep.Subject().flags.IsOwnership());
if (dep_committed_at > start_time) {
// Same as above, objects that are owned by the object that is being dropped will be dropped as part of this
// transaction. Only objects that were introduced by other transactions, that this transaction could not
// see, should cause this error:
throw DependencyException(
"Could not commit DROP of \"%s\" because a dependency was created after the transaction started",
object.name);
}
});
}

catalog_entry_set_t DependencyManager::CheckDropDependencies(CatalogTransaction transaction, CatalogEntry &object,
bool cascade) {
if (IsSystemEntry(object)) {
// Don't do anything for this
return catalog_entry_set_t();
}

catalog_entry_set_t to_drop;
catalog_entry_set_t blocking_dependents;

auto info = GetLookupProperties(object);
// Look through all the objects that depend on the 'object'
ScanDependents(transaction, info, [&](DependencyEntry &dep) {
// It makes no sense to have a schema depend on anything
D_ASSERT(dep.EntryInfo().type != CatalogType::SCHEMA_ENTRY);
Expand All @@ -467,6 +536,7 @@ void DependencyManager::DropObject(CatalogTransaction transaction, CatalogEntry
throw DependencyException(error_string);
}

// Look through all the entries that 'object' depends on
ScanSubjects(transaction, info, [&](DependencyEntry &dep) {
auto flags = dep.Subject().flags;
if (flags.IsOwnership()) {
Expand All @@ -475,7 +545,17 @@ void DependencyManager::DropObject(CatalogTransaction transaction, CatalogEntry
to_drop.insert(*entry);
}
});
return to_drop;
}

void DependencyManager::DropObject(CatalogTransaction transaction, CatalogEntry &object, bool cascade) {
if (IsSystemEntry(object)) {
// Don't do anything for this
return;
}

// Check if there are any entries that block the DROP because they still depend on the object
auto to_drop = CheckDropDependencies(transaction, object, cascade);
CleanupDependencies(transaction, object);

for (auto &entry : to_drop) {
Expand Down
4 changes: 4 additions & 0 deletions src/duckdb/src/catalog/duck_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ void DuckCatalog::ScanSchemas(std::function<void(SchemaCatalogEntry &)> callback
schemas->Scan([&](CatalogEntry &entry) { callback(entry.Cast<SchemaCatalogEntry>()); });
}

CatalogSet &DuckCatalog::GetSchemaCatalogSet() {
return *schemas;
}

optional_ptr<SchemaCatalogEntry> DuckCatalog::GetSchema(CatalogTransaction transaction, const string &schema_name,
OnEntryNotFound if_not_found, QueryErrorContext error_context) {
D_ASSERT(!schema_name.empty());
Expand Down
Loading

0 comments on commit 17b1477

Please sign in to comment.