Skip to content

Commit

Permalink
parallelizing L2 cache lookup (#3032)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #3032

X-link: facebookresearch/FBGEMM#130

Change sets
1. instead of allocate an intermediate tensor to collect the L2 cache miss info, we will do all the embeddings copy inside the originally provided tensor and mark related indices to -1
2. paralizing the cache lookup logic using multiple cachelib pools which helps reduce the LRU contention
3. fix cachelib->UVA tensor data copy bug(wrong offset)

Reviewed By: ehsanardestani

Differential Revision: D61417947
  • Loading branch information
duduyi2013 authored and facebook-github-bot committed Aug 28, 2024
1 parent a9a3713 commit 4237e3a
Show file tree
Hide file tree
Showing 17 changed files with 437 additions and 263 deletions.
2 changes: 2 additions & 0 deletions fbgemm_gpu/fbgemm_gpu/tbe/ssd/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ def max_ty_D(ty: SparseType) -> int:
),
ps_client_thread_num if ps_client_thread_num is not None else 32,
ps_max_key_per_request if ps_max_key_per_request is not None else 500,
0, # ssd_block_cache_size
self.max_D_cache,
)

# pyre-fixme[20]: Argument `self` expected.
Expand Down
5 changes: 3 additions & 2 deletions fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def __init__(
use_passed_in_path: int = True,
gather_ssd_cache_stats: Optional[bool] = False,
stats_reporter_config: Optional[TBEStatsReporterConfig] = None,
l2_cache_size: int = 0,
l2_cache_size: int = 1,
# Set to True to enable pipeline prefetching
prefetch_pipeline: bool = False,
# Set to True to alloc a UVM tensor using malloc+cudaHostRegister.
Expand Down Expand Up @@ -358,7 +358,7 @@ def __init__(
logging.info(f"tbe_unique_id: {tbe_unique_id}")
if not ps_hosts:
logging.info(
f"Logging SSD offloading setup, tbe_unique_id:{tbe_unique_id},"
f"Logging SSD offloading setup, tbe_unique_id:{tbe_unique_id}, l2_cache_size:{l2_cache_size}GB, "
f"passed_in_path={ssd_directory}, num_shards={ssd_rocksdb_shards},num_threads={ssd_rocksdb_shards},"
f"memtable_flush_period={ssd_memtable_flush_period},memtable_flush_offset={ssd_memtable_flush_offset},"
f"l0_files_per_compact={ssd_l0_files_per_compact},max_D={self.max_D},rate_limit_mbps={ssd_rate_limit_mbps},"
Expand Down Expand Up @@ -406,6 +406,7 @@ def __init__(
ps_client_thread_num if ps_client_thread_num is not None else 32,
ps_max_key_per_request if ps_max_key_per_request is not None else 500,
l2_cache_size,
self.max_D,
)
# pyre-fixme[20]: Argument `self` expected.
(low_priority, high_priority) = torch.cuda.Stream.priority_range()
Expand Down
142 changes: 142 additions & 0 deletions fbgemm_gpu/include/fbgemm_gpu/split_embeddings_cache/cachelib_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once
#include <ATen/ATen.h>
#include <cachelib/allocator/CacheAllocator.h>
#include <cachelib/facebook/admin/CacheAdmin.h>

#include <cstdint>
#include <iostream>
#include "fbgemm_gpu/split_embeddings_cache/kv_db_cpp_utils.h"

namespace l2_cache {

/// @ingroup embedding-ssd
///
/// @brief A Cachelib wrapper class for Cachlib interaction
///
/// It is for maintaining all the cache related operations, including
/// initialization, insertion, lookup and eviction.
/// It is stateful for eviction logic that caller has to specifically
/// fetch and reset eviction related states.
/// Cachelib related optimization will be captured inside this class
/// e.g. fetch and delayed markUseful to boost up get performance
///
/// @note that this class only handles single Cachelib read/update.
/// parallelism is done on the caller side
class CacheLibCache {
public:
using Cache = facebook::cachelib::LruAllocator;
struct CacheConfig {
size_t cacheSizeBytes;
};

explicit CacheLibCache(size_t cacheSizeBytes, int64_t num_shards)
: cacheConfig_(CacheConfig{.cacheSizeBytes = cacheSizeBytes}),
cache_(initializeCacheLib(cacheConfig_)),
admin_(createCacheAdmin(*cache_)) {
for (int i = 0; i < num_shards; i++) {
pool_ids_.push_back(cache_->addPool(
fmt::format("shard_{}", i),
cache_->getCacheMemoryStats().ramCacheSize / num_shards));
}
}

std::unique_ptr<Cache> initializeCacheLib(const CacheConfig& config) {
Cache::Config cacheLibConfig;
cacheLibConfig.setCacheSize(static_cast<uint64_t>(config.cacheSizeBytes))
.setCacheName("TBEL2Cache")
.setAccessConfig({25 /* bucket power */, 10 /* lock power */})
.setFullCoredump(false)
.validate();
return std::make_unique<Cache>(cacheLibConfig);
}

std::unique_ptr<facebook::cachelib::CacheAdmin> createCacheAdmin(
Cache& cache) {
facebook::cachelib::CacheAdmin::Config adminConfig;
adminConfig.oncall = "mvai";
return std::make_unique<facebook::cachelib::CacheAdmin>(
cache, std::move(adminConfig));
}

/// Find the stored embeddings from a given embedding indices, aka key
///
/// @param key embedding index to look up
///
/// @return an optional value, return none on cache misses, if cache hit
/// return a pointer to the cachelib underlying storage of associated
/// embeddings
///
/// @note that this is not thread safe, caller needs to make sure the data is
/// fully processed before doing cache insertion, otherwise the returned space
/// might be overwritten if cache is full
std::optional<void*> get(int64_t key) {
auto key_str = folly::StringPiece(
reinterpret_cast<const char*>(&key), sizeof(int64_t));
auto item = cache_->find(key_str);
if (!item) {
return std::nullopt;
}
return const_cast<void*>(item->getMemory());
}

/// Cachelib wrapper specific hash function
///
/// @param key embedding index to get hashed
///
/// @return an hashed value ranges from [0, num_pools)
size_t get_shard_id(int64_t key) {
return kv_db_utils::hash_shard(key, pool_ids_.size());
}

/// get pool id given an embedding index
///
/// @param key embedding index to get pool id
///
/// @return a pool id associated with the given key, this is to build a
/// deterministic mapping from a embedding index to a specific pool id
facebook::cachelib::PoolId get_pool_id(int64_t key) {
return pool_ids_[get_shard_id(key)];
}

/// Add an embedding index and embeddings into cachelib
///
/// @param key embedding index to insert
///
/// @return true on success insertion, false on failure insertion, a failure
/// insertion could happen if the refcount of bottom K items in LRU queue
/// isn't 0.

/// @note In training use case, this is not expected to happen as we do
/// bulk read and bluk write sequentially
///
/// @note cache_->allocation will trigger eviction callback func
bool put(int64_t key, const at::Tensor& data) {
auto key_str = folly::StringPiece(
reinterpret_cast<const char*>(&key), sizeof(int64_t));
auto item = cache_->allocate(get_pool_id(key), key_str, data.nbytes());
if (!item) {
XLOG(ERR) << fmt::format(
"Failed to allocate item {} in cache, skip", key);
return false;
}
std::memcpy(item->getMemory(), data.data_ptr(), data.nbytes());
cache_->insertOrReplace(std::move(item));
return true;
}

private:
const CacheConfig cacheConfig_;
std::unique_ptr<Cache> cache_;
std::vector<facebook::cachelib::PoolId> pool_ids_;
std::unique_ptr<facebook::cachelib::CacheAdmin> admin_;
};

} // namespace l2_cache
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <folly/hash/Hash.h>
#include <stddef.h>
#include <stdint.h>

namespace kv_db_utils {

/// @ingroup embedding-ssd
///
/// @brief hash function used for SSD L2 cache and rocksdb sharding algorithm
///
/// @param id sharding key
/// @param num_shards sharding range
///
/// @return shard id ranges from [0, num_shards)
inline size_t hash_shard(int64_t id, size_t num_shards) {
auto hash = folly::hash::fnv64_buf(
reinterpret_cast<const char*>(&id), sizeof(int64_t));
__uint128_t wide = __uint128_t{num_shards} * hash;
return static_cast<size_t>(wide >> 64);
}

}; // namespace kv_db_utils
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class EmbeddingParameterServerWrapper : public torch::jit::CustomClassHolder {
int64_t maxLocalIndexLength = 54,
int64_t num_threads = 32,
int64_t maxKeysPerRequest = 500,
int64_t l2_cache_size_gb = 0) {
int64_t l2_cache_size_gb = 0,
int64_t max_D = 0) {
TORCH_CHECK(
tps_ips.size() == tps_ports.size(),
"tps_ips and tps_ports must have the same size");
Expand All @@ -38,7 +39,8 @@ class EmbeddingParameterServerWrapper : public torch::jit::CustomClassHolder {
maxLocalIndexLength,
num_threads,
maxKeysPerRequest,
l2_cache_size_gb);
l2_cache_size_gb,
max_D);
}

void
Expand Down Expand Up @@ -86,6 +88,7 @@ static auto embedding_parameter_server_wrapper =
int64_t,
int64_t,
int64_t,
int64_t,
int64_t>())
.def("set_cuda", &EmbeddingParameterServerWrapper::set_cuda)
.def("get_cuda", &EmbeddingParameterServerWrapper::get_cuda)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ class EmbeddingParameterServer : public kv_db::EmbeddingKVDB {
int64_t maxLocalIndexLength = 54,
int64_t num_threads = 32,
int64_t maxKeysPerRequest = 500,
int64_t l2_cache_size_gb = 0)
: kv_db::EmbeddingKVDB(l2_cache_size_gb),
int64_t l2_cache_size_gb = 0,
int64_t max_D = 0)
: kv_db::EmbeddingKVDB(
num_threads,
max_D,
l2_cache_size_gb,
tbe_id), // update this interface
tps_client_(
std::make_shared<mvai_infra::experimental::ps_training::tps_client::
TrainingParameterServiceClient>(
Expand Down
80 changes: 0 additions & 80 deletions fbgemm_gpu/src/split_embeddings_cache/CacheLibCache.h

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* LICENSE file in the root directory of this source tree.
*/

#include "kv_db_utils.h"
#include "kv_db_cuda_utils.h"
#include <ATen/cuda/CUDAContext.h>

namespace kv_db_utils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

/// @defgroup embedding-ssd Embedding SSD Operators

#pragma once
#include <cuda_runtime.h>
#include <folly/executors/CPUThreadPoolExecutor.h>

Expand Down
Loading

0 comments on commit 4237e3a

Please sign in to comment.