Skip to content

Commit

Permalink
Revert "Fix metaserver deadlock caused by bthread coroutine switching"
Browse files Browse the repository at this point in the history
This reverts commit 48014b5.

Signed-off-by: Hanqing Wu <[email protected]>
  • Loading branch information
wu-hanqing committed Dec 10, 2023
1 parent a81c673 commit 831b3d4
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 109 deletions.
97 changes: 41 additions & 56 deletions curvefs/src/metaserver/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include <memory>
#include <string>
#include <utility>
#include <future>

#include "curvefs/proto/metaserver.pb.h"
#include "curvefs/src/metaserver/copyset/copyset_node_manager.h"
Expand Down Expand Up @@ -532,68 +531,54 @@ MetaStatusCode Partition::GetAllBlockGroup(
}

void Partition::StartS3Compact() {
// register s3 compaction task in a separate thread, since the caller may
// holds a pthread wrlock when calling this function, and create `S3Compact`
// will acquire a bthread rwlock, may cause thread switching, thus causing a
// deadlock.
// FIXME(wuhanqing): handle it in a more elegant way
auto handle = std::async(std::launch::async, [this]() {
S3CompactManager::GetInstance().Register(
S3Compact{inodeManager_, partitionInfo_});
});

handle.wait();
S3CompactManager::GetInstance().Register(
S3Compact{inodeManager_, partitionInfo_});
}

void Partition::CancelS3Compact() {
S3CompactManager::GetInstance().Cancel(partitionInfo_.partitionid());
}

void Partition::StartVolumeDeallocate() {
// FIXME(wuhanqing): same as `StartS3Compact`
auto handle = std::async(std::launch::async, [this]() {
FsInfo fsInfo;
bool ok = FsInfoManager::GetInstance().GetFsInfo(partitionInfo_.fsid(),
&fsInfo);
if (!ok) {
LOG(ERROR) << "Partition start volume deallocate fail, get fsinfo "
"fail. fsid="
<< partitionInfo_.fsid();
return;
}

if (!fsInfo.detail().has_volume()) {
LOG(INFO) << "Partition not belong to volume, do not need start "
"deallocate. partitionInfo="
<< partitionInfo_.DebugString();
return;
}

VolumeDeallocateCalOption calOpt;
calOpt.kvStorage = kvStorage_;
calOpt.inodeStorage = inodeStorage_;
calOpt.nameGen = nameGen_;
auto copysetNode =
copyset::CopysetNodeManager::GetInstance().GetSharedCopysetNode(
partitionInfo_.poolid(), partitionInfo_.copysetid());
if (copysetNode == nullptr) {
LOG(ERROR) << "Partition get copyset node failed. poolid="
<< partitionInfo_.poolid()
<< ", copysetid=" << partitionInfo_.copysetid();
return;
}

InodeVolumeSpaceDeallocate task(
partitionInfo_.fsid(), partitionInfo_.partitionid(), copysetNode);
task.Init(calOpt);

VolumeDeallocateManager::GetInstance().Register(std::move(task));

VLOG(3) << "Partition start volume deallocate success. partitionInfo="
<< partitionInfo_.DebugString();
});

handle.wait();
FsInfo fsInfo;
bool ok =
FsInfoManager::GetInstance().GetFsInfo(partitionInfo_.fsid(), &fsInfo);
if (!ok) {
LOG(ERROR)
<< "Partition start volume deallocate fail, get fsinfo fail. fsid="
<< partitionInfo_.fsid();
return;
}

if (!fsInfo.detail().has_volume()) {
LOG(INFO) << "Partition not belong to volume, do not need start "
"deallocate. partitionInfo="
<< partitionInfo_.DebugString();
return;
}

VolumeDeallocateCalOption calOpt;
calOpt.kvStorage = kvStorage_;
calOpt.inodeStorage = inodeStorage_;
calOpt.nameGen = nameGen_;
auto copysetNode =
copyset::CopysetNodeManager::GetInstance().GetSharedCopysetNode(
partitionInfo_.poolid(), partitionInfo_.copysetid());
if (copysetNode == nullptr) {
LOG(ERROR) << "Partition get copyset node failed. poolid="
<< partitionInfo_.poolid()
<< ", copysetid=" << partitionInfo_.copysetid();
return;
}

InodeVolumeSpaceDeallocate task(partitionInfo_.fsid(),
partitionInfo_.partitionid(), copysetNode);
task.Init(calOpt);

VolumeDeallocateManager::GetInstance().Register(std::move(task));

VLOG(3) << "Partition start volume deallocate success. partitionInfo="
<< partitionInfo_.DebugString();
}

void Partition::CancelVolumeDeallocate() {
Expand Down
58 changes: 5 additions & 53 deletions src/common/concurrent/rw_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,13 @@
#ifndef SRC_COMMON_CONCURRENT_RW_LOCK_H_
#define SRC_COMMON_CONCURRENT_RW_LOCK_H_

#include <pthread.h>
#include <assert.h>
#include <bthread/bthread.h>
#include <glog/logging.h>
#include <pthread.h>
#include <sys/types.h> // gettid
#include <bthread/bthread.h>

#include "include/curve_compiler_specific.h"
#include "src/common/uncopyable.h"

// Due to the mixed use of bthread and pthread in some cases, acquiring another
// bthread lock(mutex/rwlock) after acquiring a write lock on a pthread rwlock
// may result in switching the bthread coroutine, and then the operation of
// releasing the previous write lock in the other pthread will not take effect
// (implying that the write lock is still held), thus causing a deadlock.

// Check pthread rwlock tid between wrlock and unlock
#if defined(ENABLE_CHECK_PTHREAD_WRLOCK_TID) && \
(ENABLE_CHECK_PTHREAD_WRLOCK_TID == 1)
#define CURVE_CHECK_PTHREAD_WRLOCK_TID 1
#elif !defined(ENABLE_CHECK_PTHREAD_WRLOCK_TID)
#define CURVE_CHECK_PTHREAD_WRLOCK_TID 1
#else
#define CURVE_CHECK_PTHREAD_WRLOCK_TID 0
#endif

namespace curve {
namespace common {

Expand All @@ -69,21 +51,10 @@ class PthreadRWLockBase : public RWLockBase {
void WRLock() override {
int ret = pthread_rwlock_wrlock(&rwlock_);
CHECK(0 == ret) << "wlock failed: " << ret << ", " << strerror(ret);
#if CURVE_CHECK_PTHREAD_WRLOCK_TID
tid_ = gettid();
#endif
}

int TryWRLock() override {
int ret = pthread_rwlock_trywrlock(&rwlock_);
if (CURVE_UNLIKELY(ret != 0)) {
return ret;
}

#if CURVE_CHECK_PTHREAD_WRLOCK_TID
tid_ = gettid();
#endif
return 0;
return pthread_rwlock_trywrlock(&rwlock_);
}

void RDLock() override {
Expand All @@ -96,19 +67,6 @@ class PthreadRWLockBase : public RWLockBase {
}

void Unlock() override {
#if CURVE_CHECK_PTHREAD_WRLOCK_TID
if (tid_ != 0) {
const pid_t current = gettid();
// If CHECK here is triggered, please look at the comments at the
// beginning of the file.
// In the meantime, the simplest solution might be to use
// `BthreadRWLock` locks everywhere.
CHECK(tid_ == current)
<< ", tid has changed, previous tid: " << tid_
<< ", current tid: " << current;
tid_ = 0;
}
#endif
pthread_rwlock_unlock(&rwlock_);
}

Expand All @@ -118,14 +76,8 @@ class PthreadRWLockBase : public RWLockBase {

pthread_rwlock_t rwlock_;
pthread_rwlockattr_t rwlockAttr_;

#if CURVE_CHECK_PTHREAD_WRLOCK_TID
pid_t tid_ = 0;
#endif
};

#undef CURVE_CHECK_PTHREAD_WRLOCK_TID

class RWLock : public PthreadRWLockBase {
public:
RWLock() {
Expand Down Expand Up @@ -170,7 +122,7 @@ class BthreadRWLock : public RWLockBase {
}

int TryWRLock() override {
LOG(WARNING) << "TryWRLock not support yet";
// not support yet
return EINVAL;
}

Expand All @@ -180,7 +132,7 @@ class BthreadRWLock : public RWLockBase {
}

int TryRDLock() override {
LOG(WARNING) << "TryRDLock not support yet";
// not support yet
return EINVAL;
}

Expand Down

0 comments on commit 831b3d4

Please sign in to comment.