Skip to content

Commit

Permalink
some more little fixes and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Desour committed Oct 7, 2024
1 parent 9733534 commit 64985bb
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/benchmark/benchmark_ipc_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ TEST_CASE("benchmark_ipc_channel")
{
auto end_a_thread_b_p = make_test_ipc_channel([](IPCChannelEnd end_b) {
// echos back messages. stops if "" is sent
for (;;) {
while (true) {
end_b.recv();
end_b.send(end_b.getRecvData(), end_b.getRecvSize());
if (end_b.getRecvSize() == 0)
Expand Down
39 changes: 20 additions & 19 deletions src/threading/ipc_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
if (buf->futex.load(std::memory_order_acquire) == 1) {
// yes
// reset it. (relaxed ordering is sufficient, because the other thread
// does not need to see the side effects we did before writing 0)
// does not need to see the side effects we did before unposting)
buf->futex.store(0, std::memory_order_relaxed);
return true;
}
Expand All @@ -93,7 +93,7 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
// wait with futex
while (true) {
// write 2 to show that we're futexing
if (buf->futex.exchange(2, std::memory_order_acq_rel) == 1) {
if (buf->futex.exchange(2, std::memory_order_acquire) == 1) {
// it was posted in the meantime
buf->futex.store(0, std::memory_order_relaxed);
return true;
Expand All @@ -113,7 +113,7 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept

static void post(IPCChannelBuffer *buf) noexcept
{
if (buf->futex.exchange(1, std::memory_order_acq_rel) == 2) {
if (buf->futex.exchange(1, std::memory_order_release) == 2) {
// 2 means reader needs to be notified
int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0);
if (s == -1) {
Expand All @@ -130,17 +130,18 @@ static void post(IPCChannelBuffer *buf) noexcept
// returns false on timeout
static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
{
bool timed_out = false;
pthread_mutex_lock(&buf->mutex);
if (!buf->posted) {
if (timeout)
timed_out = pthread_cond_timedwait(&buf->cond, &buf->mutex, timeout) == ETIMEDOUT;
else
while (!buf->posted) {
if (timeout) {
if (pthread_cond_timedwait(&buf->cond, &buf->mutex, timeout) == ETIMEDOUT)
return false;
} else {
pthread_cond_wait(&buf->cond, &buf->mutex);
}
}
buf->posted = false;
pthread_mutex_unlock(&buf->mutex);
return !timed_out;
return true;
}

static void post(IPCChannelBuffer *buf) noexcept
Expand Down Expand Up @@ -172,18 +173,19 @@ static bool wait_in(IPCChannelEnd::Dir *dir, u64 timeout_ms_abs)
struct timespec timeout;
struct timespec *timeoutp = nullptr;
if (timeout_ms_abs > 0) {
// Relative time
u64 tnow = porting::getTimeMs();
if (tnow > timeout_ms_abs)
return false;
u64 timeout_ms_rel = timeout_ms_abs - tnow;
#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
// Relative time
timeout.tv_sec = 0;
timeout.tv_nsec = 0;
#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
// Absolute time, relative to cond_clockid
FATAL_ERROR_IF(clock_gettime(dir->buf_in->cond_clockid, &timeout) < 0,
"clock_gettime failed");
// prevent overflow
if (timeout.tv_nsec >= 1000'000'000L) {
timeout.tv_nsec -= 1000'000'000L;
timeout.tv_sec += 1;
Expand Down Expand Up @@ -339,7 +341,8 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept
if (size <= IPC_CHANNEL_MSG_SIZE) {
// small msg
// (m_large_recv.size() is always >= IPC_CHANNEL_MSG_SIZE)
memcpy(m_large_recv.data(), m_dir.buf_in->data, size);
if (size != 0)
memcpy(m_large_recv.data(), m_dir.buf_in->data, size);

} else {
// large msg
Expand All @@ -360,7 +363,8 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept
if (!wait_in(&m_dir, timeout_ms_abs))
return false;
} while (size > IPC_CHANNEL_MSG_SIZE);
memcpy(recv_data, m_dir.buf_in->data, size);
if (size != 0)
memcpy(recv_data, m_dir.buf_in->data, size);
}
return true;
}
Expand All @@ -382,18 +386,15 @@ std::pair<IPCChannelEnd, std::thread> make_test_ipc_channel(
#endif
}();

auto resources_first = std::make_unique<IPCChannelResourcesSingleProcess>();
resources_first->setFirst(resource_data);

IPCChannelEnd end_a = IPCChannelEnd::makeA(std::move(resources_first));

std::thread thread_b([=] {
auto resources_second = std::make_unique<IPCChannelResourcesSingleProcess>();
resources_second->setSecond(resource_data);
auto resources_second = IPCChannelResourcesSingleProcess::makeSecond(resource_data);
IPCChannelEnd end_b = IPCChannelEnd::makeB(std::move(resources_second));

fun(std::move(end_b));
});

auto resources_first = IPCChannelResourcesSingleProcess::makeFirst(resource_data);
IPCChannelEnd end_a = IPCChannelEnd::makeA(std::move(resources_first));

return {std::move(end_a), std::move(thread_b)};
}
54 changes: 31 additions & 23 deletions src/threading/ipc_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
* other posix: uses posix mutex and condition variable
*/

#define IPC_CHANNEL_MSG_SIZE 0x2000U
constexpr size_t IPC_CHANNEL_MSG_SIZE = 0x2000;

struct IPCChannelBuffer
{
Expand All @@ -83,6 +83,7 @@ struct IPCChannelBuffer
u8 data[IPC_CHANNEL_MSG_SIZE] = {};

IPCChannelBuffer();
DISABLE_CLASS_COPY(IPCChannelBuffer)
~IPCChannelBuffer(); // Note: only destruct once, i.e. in one process
};

Expand All @@ -96,18 +97,8 @@ struct IPCChannelShared
IPCChannelBuffer b{};
};

struct IPCChannelDirection
{
IPCChannelBuffer *buf_in;
IPCChannelBuffer *buf_out;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_in;
HANDLE sem_out;
#endif
};

// Each end holds this. One is A, one is B.
// Implementors of this struct decide how to allocate buffers (i.e. malloc or mmap).
// Interface for managing the shared resources.
// Implementors decide whether to use malloc or mmap.
struct IPCChannelResources
{
// new struct, because the win32 #if is annoying
Expand All @@ -123,6 +114,15 @@ struct IPCChannelResources

Data data;

IPCChannelResources() = default;
DISABLE_CLASS_COPY(IPCChannelResources)

// Child should call cleanup().
// (Parent destructor can not do this, because when it's called the child is
// already dead.)
virtual ~IPCChannelResources() = default;

protected:
// Used for previously unmanaged data_ (move semantics)
void setFirst(Data data_)
{
Expand Down Expand Up @@ -160,14 +160,6 @@ struct IPCChannelResources
cleanupNotLast();
}
}

IPCChannelResources() = default;
DISABLE_CLASS_COPY(IPCChannelResources)

// Child should call cleanup().
// (Parent destructor can not do this, because when it's called the child is
// already dead.)
virtual ~IPCChannelResources() = default;
};

class IPCChannelEnd
Expand All @@ -184,13 +176,15 @@ class IPCChannelEnd
#endif
};

// Unusable empty end
IPCChannelEnd() = default;

// Construct end A or end B from resources
static IPCChannelEnd makeA(std::unique_ptr<IPCChannelResources> resources);
static IPCChannelEnd makeB(std::unique_ptr<IPCChannelResources> resources);

// Note: timeouts may be for receiving any response, not a whole message.
// If send, recv, or exchange return false (=timeout), stop using the channel.
// Note: Timeouts may be for receiving any response, not a whole message.
// Therefore, if a timeout occurs, stop using the channel.

// Returns false on timeout
[[nodiscard]]
Expand Down Expand Up @@ -276,6 +270,20 @@ struct IPCChannelResourcesSingleProcess final : public IPCChannelResources
}

~IPCChannelResourcesSingleProcess() override { cleanup(); }

static std::unique_ptr<IPCChannelResourcesSingleProcess> makeFirst(Data data)
{
auto ret = std::make_unique<IPCChannelResourcesSingleProcess>();
ret->setFirst(data);
return ret;
}

static std::unique_ptr<IPCChannelResourcesSingleProcess> makeSecond(Data data)
{
auto ret = std::make_unique<IPCChannelResourcesSingleProcess>();
ret->setSecond(data);
return ret;
}
};

// For testing
Expand Down
23 changes: 16 additions & 7 deletions src/unittest/test_threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,28 +250,37 @@ void TestThreading::testIPCChannel()
{
auto [end_a, thread_b] = make_test_ipc_channel([](IPCChannelEnd end_b) {
// echos back messages. stops if "" is sent
for (;;) {
while (true) {
UASSERT(end_b.recvWithTimeout(-1));
UASSERT(end_b.sendWithTimeout(end_b.getRecvData(), end_b.getRecvSize(), -1));
if (end_b.getRecvSize() == 0)
break;
}
});

u8 buf[20000] = {};
for (int i = sizeof(buf); i > 0; i -= 100) {
buf[i - 1] = 123;
UASSERT(end_a.exchangeWithTimeout(buf, i, -1));
u8 buf1[20000] = {};
for (int i = sizeof(buf1); i > 0; i -= 100) {
buf1[i - 1] = 123;
UASSERT(end_a.exchangeWithTimeout(buf1, i, -1));
UASSERTEQ(int, end_a.getRecvSize(), i);
UASSERTEQ(int, reinterpret_cast<const u8 *>(end_a.getRecvData())[i - 1], 123);
}

u8 buf2[IPC_CHANNEL_MSG_SIZE * 3 + 10];
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 3 + 10);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 3);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 2);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE - 1);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE + 1);
end_a.exchange(buf2, 1);

// stop thread_b
UASSERT(end_a.exchangeWithTimeout(buf, 0, -1));
UASSERT(end_a.exchangeWithTimeout(nullptr, 0, -1));
UASSERTEQ(int, end_a.getRecvSize(), 0);

thread_b.join();

// other side dead ==> should time out
UASSERT(!end_a.exchangeWithTimeout(buf, 0, 200));
UASSERT(!end_a.exchangeWithTimeout(nullptr, 0, 200));
}

0 comments on commit 64985bb

Please sign in to comment.