Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
[Join] Inline and parallelize tbb in getAllTableColumnFragments.
Browse files Browse the repository at this point in the history
This commit refactors and simplifies method `getAllTableColumnFragments`.
Also some parallelization added.

Partially resolves: #574

Signed-off-by: Dmitrii Makarenko <[email protected]>
  • Loading branch information
Devjiu committed Oct 2, 2023
1 parent 03855db commit 05b98f3
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 65 deletions.
126 changes: 89 additions & 37 deletions omniscidb/QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@

#include "QueryEngine/ColumnFetcher.h"

#include <memory>

#include "DataMgr/ArrayNoneEncoder.h"
#include "QueryEngine/ErrorHandling.h"
#include "QueryEngine/Execute.h"
#include "Shared/Intervals.h"
#include "Shared/likely.h"
#include "Shared/sqltypes.h"

#include <tbb/parallel_for.h>
#include <memory>

namespace {

std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel) {
Expand Down Expand Up @@ -239,6 +240,11 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
int db_id = col_info->db_id;
int table_id = col_info->table_id;
int col_id = col_info->column_id;

// Array type passed to getAllTableColumnFragments. Should be handled in
// linearization.
CHECK(!col_info->type->isString() && !col_info->type->isArray());

const auto fragments_it = all_tables_fragments.find({db_id, table_id});
CHECK(fragments_it != all_tables_fragments.end());
const auto fragments = fragments_it->second;
Expand All @@ -248,7 +254,6 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
const InputDescriptor table_desc(db_id, table_id, int(0));
{
std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);

auto col_token = data_provider_->getZeroCopyColumnData(*col_info);
if (col_token != nullptr) {
size_t num_rows = col_token->getSize() / col_token->getType()->size();
Expand All @@ -262,44 +267,91 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
}

auto column_it = columnarized_scan_table_cache_.find({table_id, col_id});
if (column_it == columnarized_scan_table_cache_.end()) {
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
if (executor_->getConfig()
.exec.interrupt.enable_non_kernel_time_query_interrupt &&
executor_->checkNonKernelTimeInterrupted()) {
throw QueryExecutionError(Executor::ERR_INTERRUPTED);
}
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
std::list<ChunkIter> chunk_iter_holder;
const auto& fragment = (*fragments)[frag_id];
if (fragment.isEmptyPhysicalFragment()) {
continue;
}
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
auto col_buffer = getOneTableColumnFragment(col_info,
static_cast<int>(frag_id),
all_tables_fragments,
chunk_holder,
chunk_iter_holder,
Data_Namespace::CPU_LEVEL,
int(0),
device_allocator);
column_frags.push_back(
std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
col_buffer,
fragment.getNumTuples(),
chunk_meta_it->second->type(),
thread_idx));
}
auto merged_results =
ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
if (column_it != columnarized_scan_table_cache_.end()) {
table_column = column_it->second.get();
return ColumnFetcher::transferColumnIfNeeded(
table_column, 0, memory_level, device_id, device_allocator);
}

if (executor_->getConfig().exec.interrupt.enable_non_kernel_time_query_interrupt &&
executor_->checkNonKernelTimeInterrupted()) {
throw QueryExecutionError(Executor::ERR_INTERRUPTED);
}

size_t total_row_count = 0;
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
const auto& fragment = (*fragments)[frag_id];
const auto rows_in_frag = fragment.getNumTuples();
total_row_count += rows_in_frag;
}

if (total_row_count == 0) {
std::unique_ptr<ColumnarResults> merged_results(nullptr);

table_column = merged_results.get();
columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id),
std::move(merged_results));
} else {
table_column = column_it->second.get();

return ColumnFetcher::transferColumnIfNeeded(
table_column, 0, memory_level, device_id, device_allocator);
}

const auto type_width = col_info->type->size();
auto write_ptr =
executor_->row_set_mem_owner_->allocate(type_width * total_row_count);
std::vector<std::pair<int8_t*, size_t>> write_ptrs;
std::vector<size_t> valid_fragments;
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
const auto& fragment = (*fragments)[frag_id];
if (fragment.isEmptyPhysicalFragment()) {
continue;
}
CHECK_EQ(type_width, fragment.getChunkMetadataMap().at(col_id)->type()->size());
write_ptrs.push_back({write_ptr, fragment.getNumTuples() * type_width});
write_ptr += fragment.getNumTuples() * type_width;
valid_fragments.push_back(frag_id);
}

CHECK(!write_ptrs.empty());
size_t valid_frag_count = valid_fragments.size();
tbb::parallel_for(
tbb::blocked_range<size_t>(0, valid_frag_count),
[&](const tbb::blocked_range<size_t>& frag_ids) {
for (size_t v_frag_id = frag_ids.begin(); v_frag_id < frag_ids.end();
++v_frag_id) {
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
std::list<ChunkIter> chunk_iter_holder;
size_t frag_id = valid_fragments[v_frag_id];
const auto& fragment = (*fragments)[frag_id];
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
std::shared_ptr<Chunk_NS::Chunk> chunk;
{
ChunkKey chunk_key{
db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
chunk = data_provider_->getChunk(col_info,
chunk_key,
Data_Namespace::CPU_LEVEL,
0,
chunk_meta_it->second->numBytes(),
chunk_meta_it->second->numElements());
std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
chunk_holder.push_back(chunk);
}
auto ab = chunk->getBuffer();
CHECK(ab->getMemoryPtr());
int8_t* col_buffer =
ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second);
}
});

std::unique_ptr<ColumnarResults> merged_results(new ColumnarResults(
{write_ptrs[0].first}, total_row_count, col_info->type, thread_idx));

table_column = merged_results.get();
columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id),
std::move(merged_results));
}
return ColumnFetcher::transferColumnIfNeeded(
table_column, 0, memory_level, device_id, device_allocator);
Expand Down
22 changes: 0 additions & 22 deletions omniscidb/ResultSetRegistry/ColumnarResults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,28 +122,6 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
}
}

ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const int8_t* one_col_buffer,
const size_t num_rows,
const hdk::ir::Type* target_type,
const size_t thread_idx)
: column_buffers_(1)
, num_rows_(num_rows)
, target_types_{target_type}
, parallel_conversion_(false)
, direct_columnar_conversion_(false)
, thread_idx_(thread_idx) {
auto timer = DEBUG_TIMER(__func__);

if (target_type->isVarLen()) {
throw ColumnarConversionNotSupported();
}
const auto buf_size = num_rows * target_type->size();
column_buffers_[0] =
reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
}

ColumnarResults::ColumnarResults(const std::vector<int8_t*> one_col_buffer,
const size_t num_rows,
const hdk::ir::Type* target_type,
Expand Down
6 changes: 0 additions & 6 deletions omniscidb/ResultSetRegistry/ColumnarResults.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ class ColumnarResults {
const Config& config,
const bool is_parallel_execution_enforced = false);

ColumnarResults(const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const int8_t* one_col_buffer,
const size_t num_rows,
const hdk::ir::Type* target_type,
const size_t thread_idx);

ColumnarResults(const std::vector<int8_t*> one_col_buffer,
const size_t num_rows,
const hdk::ir::Type* target_type,
Expand Down

0 comments on commit 05b98f3

Please sign in to comment.