Skip to content

Commit

Permalink
Offload TLS negotiation to I/O threads (#1338)
Browse files Browse the repository at this point in the history
## TLS Negotiation Offloading to I/O Threads

### Overview
This PR introduces the ability to offload TLS handshake negotiations to
I/O threads, significantly improving performance under high TLS
connection loads.

### Key Changes
- Added infrastructure to offload TLS negotiations to I/O threads
- Refactored SSL event handling to allow I/O threads modify conn flags.
- Introduced new connection flag to identify client connections

### Performance Impact
Testing with 650 clients with SET commands and 160 new TLS connections
per second in the background:

#### Throughput Impact of new TLS connections
- **With Offloading**: Minimal impact (1050K → 990K ops/sec)
- **Without Offloading**: Significant drop (1050K → 670K ops/sec)

#### New Connection Rate
- **With Offloading**: 
  - 1,757 conn/sec
- **Without Offloading**: 
  - 477 conn/sec

### Implementation Details
1. **Main Thread**:
   - Initiates negotiation-offload jobs to I/O threads
- Adds connections to pending-read clients list (using existing read
offload mechanism)
   - Post-negotiation handling:
     - Creates read/write events if needed for incomplete negotiations
     - Calls accept handler for completed negotiations

2. **I/O Thread**:
   - Performs TLS negotiation
   - Updates connection flags based on negotiation result

Related issue:#761

---------

Signed-off-by: Uri Yagelnik <[email protected]>
Signed-off-by: ranshid <[email protected]>
Co-authored-by: ranshid <[email protected]>
Co-authored-by: Madelyn Olson <[email protected]>
  • Loading branch information
3 people authored Dec 18, 2024
1 parent e203ca3 commit 8060c86
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 70 deletions.
38 changes: 38 additions & 0 deletions .github/workflows/daily.yml
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,44 @@ jobs:
if: true && !contains(github.event.inputs.skiptests, 'cluster')
run: ./runtest-cluster --io-threads ${{github.event.inputs.cluster_test_args}}

test-ubuntu-tls-io-threads:
runs-on: ubuntu-latest
if: |
(github.event_name == 'workflow_dispatch' ||
(github.event_name == 'schedule' && github.repository == 'valkey-io/valkey') ||
(github.event_name == 'pull_request' && (contains(github.event.pull_request.labels.*.name, 'run-extra-tests') || github.event.pull_request.base.ref != 'unstable'))) &&
!contains(github.event.inputs.skipjobs, 'tls') && !contains(github.event.inputs.skipjobs, 'iothreads')
timeout-minutes: 14400
steps:
- name: prep
if: github.event_name == 'workflow_dispatch'
run: |
echo "GITHUB_REPOSITORY=${{github.event.inputs.use_repo}}" >> $GITHUB_ENV
echo "GITHUB_HEAD_REF=${{github.event.inputs.use_git_ref}}" >> $GITHUB_ENV
echo "skipjobs: ${{github.event.inputs.skipjobs}}"
echo "skiptests: ${{github.event.inputs.skiptests}}"
echo "test_args: ${{github.event.inputs.test_args}}"
echo "cluster_test_args: ${{github.event.inputs.cluster_test_args}}"
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
with:
repository: ${{ env.GITHUB_REPOSITORY }}
ref: ${{ env.GITHUB_HEAD_REF }}
- name: make
run: |
make BUILD_TLS=yes SERVER_CFLAGS='-Werror'
- name: testprep
run: |
sudo apt-get install tcl8.6 tclx tcl-tls
./utils/gen-test-certs.sh
- name: test
if: true && !contains(github.event.inputs.skiptests, 'valkey')
run: |
./runtest --io-threads --tls --accurate --verbose --tags network --dump-logs ${{github.event.inputs.test_args}}
- name: cluster tests
if: true && !contains(github.event.inputs.skiptests, 'cluster')
run: |
./runtest-cluster --io-threads --tls ${{github.event.inputs.cluster_test_args}}
test-ubuntu-reclaim-cache:
runs-on: ubuntu-latest
if: |
Expand Down
5 changes: 3 additions & 2 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ typedef enum {
CONN_STATE_ERROR
} ConnectionState;

#define CONN_FLAG_CLOSE_SCHEDULED (1 << 0) /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER (1 << 1) /* Write barrier requested */
#define CONN_FLAG_CLOSE_SCHEDULED (1 << 0) /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER (1 << 1) /* Write barrier requested */
#define CONN_FLAG_ALLOW_ACCEPT_OFFLOAD (1 << 2) /* Connection accept can be offloaded to IO threads. */

#define CONN_TYPE_SOCKET "tcp"
#define CONN_TYPE_UNIX "unix"
Expand Down
52 changes: 52 additions & 0 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,55 @@ void trySendPollJobToIOThreads(void) {
aeSetPollProtect(server.el, 1);
IOJobQueue_push(jq, IOThreadPoll, server.el);
}

static void ioThreadAccept(void *data) {
client *c = (client *)data;
connAccept(c->conn, NULL);
c->io_read_state = CLIENT_COMPLETED_IO;
}

/*
* Attempts to offload an Accept operation (currently used for TLS accept) for a client
* connection to I/O threads.
*
* Returns:
* C_OK - If the accept operation was successfully queued for processing
* C_ERR - If the connection is not eligible for offloading
*
* Parameters:
* conn - The connection object to perform the accept operation on
*/
int trySendAcceptToIOThreads(connection *conn) {
if (server.io_threads_num <= 1) {
return C_ERR;
}

if (!(conn->flags & CONN_FLAG_ALLOW_ACCEPT_OFFLOAD)) {
return C_ERR;
}

client *c = connGetPrivateData(conn);
if (c->io_read_state != CLIENT_IDLE) {
return C_OK;
}

if (server.active_io_threads_num <= 1) {
return C_ERR;
}

size_t thread_id = (c->id % (server.active_io_threads_num - 1)) + 1;
IOJobQueue *job_queue = &io_jobs[thread_id];

if (IOJobQueue_isFull(job_queue)) {
return C_ERR;
}

c->io_read_state = CLIENT_PENDING_IO;
c->flag.pending_read = 1;
listLinkNodeTail(server.clients_pending_io_read, &c->pending_read_list_node);
connSetPostponeUpdateState(c->conn, 1);
server.stat_io_accept_offloaded++;
IOJobQueue_push(job_queue, ioThreadAccept, c);

return C_OK;
}
1 change: 1 addition & 0 deletions src/io_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ int tryOffloadFreeArgvToIOThreads(client *c, int argc, robj **argv);
void adjustIOThreadsByEventLoad(int numevents, int increase_only);
void drainIOThreadsQueue(void);
void trySendPollJobToIOThreads(void);
int trySendAcceptToIOThreads(connection *conn);

#endif /* IO_THREADS_H */
6 changes: 6 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ client *createClient(connection *conn) {
if (server.tcpkeepalive) connKeepAlive(conn, server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
conn->flags |= CONN_FLAG_ALLOW_ACCEPT_OFFLOAD;
}
c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size);
selectDb(c, 0);
Expand Down Expand Up @@ -4805,9 +4806,14 @@ int processIOThreadsReadDone(void) {
processed++;
server.stat_io_reads_processed++;

/* Save the current conn state, as connUpdateState may modify it */
int in_accept_state = (connGetState(c->conn) == CONN_STATE_ACCEPTING);
connSetPostponeUpdateState(c->conn, 0);
connUpdateState(c->conn);

/* In accept state, no client's data was read - stop here. */
if (in_accept_state) continue;

/* On read error - stop here. */
if (handleReadResult(c) == C_ERR) {
continue;
Expand Down
2 changes: 2 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2645,6 +2645,7 @@ void resetServerStats(void) {
server.stat_total_reads_processed = 0;
server.stat_io_writes_processed = 0;
server.stat_io_freed_objects = 0;
server.stat_io_accept_offloaded = 0;
server.stat_poll_processed_by_io_threads = 0;
server.stat_total_writes_processed = 0;
server.stat_client_qbuf_limit_disconnections = 0;
Expand Down Expand Up @@ -5915,6 +5916,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed,
"io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed,
"io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects,
"io_threaded_accept_processed:%lld\r\n", server.stat_io_accept_offloaded,
"io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads,
"io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches,
"io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries,
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1869,6 +1869,7 @@ struct valkeyServer {
long long stat_io_reads_processed; /* Number of read events processed by IO threads */
long long stat_io_writes_processed; /* Number of write events processed by IO threads */
long long stat_io_freed_objects; /* Number of objects freed by IO threads */
long long stat_io_accept_offloaded; /* Number of offloaded accepts */
long long stat_poll_processed_by_io_threads; /* Total number of poll jobs processed by IO */
long long stat_total_reads_processed; /* Total number of read events processed */
long long stat_total_writes_processed; /* Total number of write events processed */
Expand Down
Loading

0 comments on commit 8060c86

Please sign in to comment.