Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and improve key generation when working in cluster mode. #206

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 24 additions & 87 deletions cluster_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,13 @@
#include "memtier_benchmark.h"
#include "obj_gen.h"
#include "shard_connection.h"

#define KEY_INDEX_QUEUE_MAX_SIZE 1000000
#include "crc16_slottable.h"

#define MOVED_MSG_PREFIX "-MOVED"
#define MOVED_MSG_PREFIX_LEN 6
#define ASK_MSG_PREFIX "-ASK"
#define ASK_MSG_PREFIX_LEN 4

#define MAX_CLUSTER_HSLOT 16383
static const uint16_t crc16tab[256]= {
0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
Expand Down Expand Up @@ -100,24 +98,13 @@ static inline uint16_t crc16(const char *buf, size_t len) {
return crc;
}

static uint32_t calc_hslot_crc16_cluster(const char *str, size_t length)
{
uint32_t rv = (uint32_t) crc16(str, length) & MAX_CLUSTER_HSLOT;
return rv;
}

///////////////////////////////////////////////////////////////////////////////////////////////////////

cluster_client::cluster_client(client_group* group) : client(group)
{
}

cluster_client::~cluster_client() {
for (unsigned int i = 0; i < m_key_index_pools.size(); i++) {
key_index_pool* key_idx_pool = m_key_index_pools[i];
delete key_idx_pool;
}
m_key_index_pools.clear();
}

int cluster_client::connect(void) {
Expand All @@ -128,11 +115,6 @@ int cluster_client::connect(void) {
// set main connection to send 'CLUSTER SLOTS' command
sc->set_cluster_slots();

// create key index pool for main connection
key_index_pool* key_idx_pool = new key_index_pool;
m_key_index_pools.push_back(key_idx_pool);
assert(m_connections.size() == m_key_index_pools.size());

// continue with base class
client::connect();

Expand Down Expand Up @@ -166,22 +148,10 @@ shard_connection* cluster_client::create_shard_connection(abstract_protocol* abs

m_connections.push_back(sc);

// create key index pool
key_index_pool* key_idx_pool = new key_index_pool;
assert(key_idx_pool != NULL);

m_key_index_pools.push_back(key_idx_pool);
assert(m_connections.size() == m_key_index_pools.size());

return sc;
}

bool cluster_client::connect_shard_connection(shard_connection* sc, char* address, char* port) {
// empty key index queue
if (m_key_index_pools[sc->get_id()]->size()) {
key_index_pool empty_queue;
std::swap(*m_key_index_pools[sc->get_id()], empty_queue);
}

// save address and port
sc->set_address_port(address, port);
Expand Down Expand Up @@ -224,9 +194,12 @@ void cluster_client::handle_cluster_slots(protocol_response *r) {
*/
unsigned long prev_connections_size = m_connections.size();
std::vector<bool> close_sc(prev_connections_size, true);
for (unsigned int i = 0; i < MAX_SLOTS; i++) {
m_conn_to_init_slot[i] = UINT16_MAX;
}

// run over response and create connections
for (unsigned int i=0; i<r->get_mbulk_value()->mbulks_elements.size(); i++) {
for (unsigned int i = 0; i < r->get_mbulk_value()->mbulks_elements.size(); i++) {
// create connection
mbulk_size_el* shard = r->get_mbulk_value()->mbulks_elements[i]->as_mbulk_size();

Expand Down Expand Up @@ -273,17 +246,26 @@ void cluster_client::handle_cluster_slots(protocol_response *r) {
connect_shard_connection(sc, addr, port);
}

// update range
unsigned int sc_id = sc->get_id();
// Set the initial slot for this shard connection
if (m_conn_to_init_slot[sc_id] == UINT16_MAX) {
m_conn_to_init_slot[sc_id] = min_slot;
}
for (int j = min_slot; j <= max_slot; j++) {
m_slot_to_shard[j] = sc->get_id();
if (j < max_slot) {
m_slot_lists[j] = j+1;
} else {
// Close the loop - point the last index to the first one owned by the shard connection
m_slot_lists[j] = m_conn_to_init_slot[sc_id];
}
}

free(addr);
free(port);
}

// check if some connections left with no slots, and need to be closed
for (unsigned int i=0; i < prev_connections_size; i++) {
for (unsigned int i = 0; i < prev_connections_size; i++) {
if ((close_sc[i] == true) &&
(m_connections[i]->get_connection_state() != conn_disconnected)) {

Expand All @@ -299,8 +281,7 @@ bool cluster_client::hold_pipeline(unsigned int conn_id) {

// don't exceed requests
if (m_config->requests) {
if (m_key_index_pools[conn_id]->empty() &&
m_reqs_generated >= m_config->requests) {
if (m_reqs_generated >= m_config->requests) {
return true;
}
}
Expand All @@ -309,53 +290,13 @@ bool cluster_client::hold_pipeline(unsigned int conn_id) {
}

bool cluster_client::get_key_for_conn(unsigned int conn_id, int iter, unsigned long long* key_index) {
// first check if we already have key in pool
if (!m_key_index_pools[conn_id]->empty()) {
*key_index = m_key_index_pools[conn_id]->front();
m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s%llu", m_obj_gen->get_key_prefix(), *key_index);

m_key_index_pools[conn_id]->pop();
return true;
}

// keep generate key till it match for this connection, or requests reached
while (true) {
// generate key
*key_index = m_obj_gen->get_key_index(iter);
m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s%llu", m_obj_gen->get_key_prefix(), *key_index);

unsigned int hslot = calc_hslot_crc16_cluster(m_key_buffer, m_key_len);

// check if the key match for this connection
if (m_slot_to_shard[hslot] == conn_id) {
m_reqs_generated++;
return true;
}

// handle key for other connection
unsigned int other_conn_id = m_slot_to_shard[hslot];

// in case we generated key for connection that is disconnected, 'slot to shard' map may need to be updated
if (m_connections[other_conn_id]->get_connection_state() == conn_disconnected) {
m_connections[conn_id]->set_cluster_slots();
return false;
}

// in case connection is during cluster slots command, his slots mapping not relevant
if (m_connections[other_conn_id]->get_cluster_slots_state() != setup_done)
continue;

// store key for other connection, if queue is not full
key_index_pool* key_idx_pool = m_key_index_pools[other_conn_id];
if (key_idx_pool->size() < KEY_INDEX_QUEUE_MAX_SIZE) {
key_idx_pool->push(*key_index);
m_reqs_generated++;
}

// don't exceed requests
if (m_config->requests > 0 && m_reqs_generated >= m_config->requests)
return false;
}
*key_index = m_obj_gen->get_key_index(iter);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ushachar in cluster mode we're growing the keyspace range to above the max/min range. This is because we're using the crc16 + key index, meaning we can now have #primaries * keyspace range. The PR #208 includes tests to track this.

m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s{%s}%llu",
m_obj_gen->get_key_prefix(), crc16_slot_table[m_conn_to_init_slot[conn_id]], *key_index);
m_conn_to_init_slot[conn_id] = m_slot_lists[m_conn_to_init_slot[conn_id]];
m_reqs_generated++;
return true;
}

// This function could use some urgent TLC -- but we need to do it without altering the behavior
Expand Down Expand Up @@ -432,10 +373,6 @@ void cluster_client::handle_moved(unsigned int conn_id, struct timeval timestamp
if (m_connections[conn_id]->get_cluster_slots_state() != setup_done)
return;

// queue may stored uncorrected mapping indexes, empty them
key_index_pool empty_queue;
std::swap(*m_key_index_pools[conn_id], empty_queue);

// set connection to send 'CLUSTER SLOTS' command
m_connections[conn_id]->set_cluster_slots();
}
Expand Down
13 changes: 9 additions & 4 deletions cluster_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@
#include <set>
#include "client.h"

typedef std::queue<unsigned long long> key_index_pool;
#define MAX_SLOTS 16384

// forward decleration
// forward declaration
class shard_connection;

class cluster_client : public client {
protected:
std::vector<key_index_pool*> m_key_index_pools;
unsigned int m_slot_to_shard[16384];
/*
* Stores the first slot owned by the indexed connection.
* Since we connect only to primaries we can have at most 16K distinct connections...
*/
uint16_t m_conn_to_init_slot[MAX_SLOTS];
// An index-linked array used to store circular lists of slots, one for each shard returned by the SLOTS command.
uint16_t m_slot_lists[MAX_SLOTS];

char m_key_buffer[250];
int m_key_len;
Expand Down
Loading