diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index b27bb14a..505b1deb 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -2598,6 +2598,27 @@ def cmd_init_mock(self, r: ClusterNode) -> None: assert rc.get_node(host=default_host, port=7001) is not None assert rc.get_node(host=default_host, port=7002) is not None + @pytest.mark.parametrize("dynamic_startup_nodes", [True, False]) + async def test_init_slots_dynamic_startup_nodes(self, dynamic_startup_nodes): + rc = await get_mocked_valkey_client( + host="my@DNS.com", + port=7000, + cluster_slots=default_cluster_slots, + dynamic_startup_nodes=dynamic_startup_nodes, + ) + # Nodes are taken from default_cluster_slots + discovered_nodes = [ + "127.0.0.1:7000", + "127.0.0.1:7001", + "127.0.0.1:7002", + "127.0.0.1:7003", + ] + startup_nodes = list(rc.nodes_manager.startup_nodes.keys()) + if dynamic_startup_nodes is True: + assert startup_nodes.sort() == discovered_nodes.sort() + else: + assert startup_nodes == ["my@DNS.com:7000"] + class TestClusterPipeline: """Tests for the ClusterPipeline class.""" diff --git a/valkey/asyncio/cluster.py b/valkey/asyncio/cluster.py index 3386eddc..12800815 100644 --- a/valkey/asyncio/cluster.py +++ b/valkey/asyncio/cluster.py @@ -139,6 +139,14 @@ class ValkeyCluster(AbstractValkey, AbstractValkeyCluster, AsyncValkeyClusterCom | Enable read from replicas in READONLY mode. You can read possibly stale data. When set to true, read commands will be assigned between the primary and its replications in a Round-Robin manner. + :param dynamic_startup_nodes: + | Set the ValkeyCluster's startup nodes to all the discovered nodes. + If true (default value), the cluster's discovered nodes will be used to + determine the cluster nodes-slots mapping in the next topology refresh. + It will remove the initial passed startup nodes if their endpoints aren't + listed in the CLUSTER SLOTS output. + If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists + specific IP addresses, it is best to set it to false. :param reinitialize_steps: | Specifies the number of MOVED errors that need to occur before reinitializing the whole cluster topology. If a MOVED error occurs and the cluster does not @@ -237,6 +245,7 @@ def __init__( startup_nodes: Optional[List["ClusterNode"]] = None, require_full_coverage: bool = True, read_from_replicas: bool = False, + dynamic_startup_nodes: bool = True, reinitialize_steps: int = 5, cluster_error_retry_attempts: int = 3, connection_error_retry_attempts: int = 3, @@ -389,6 +398,7 @@ def __init__( startup_nodes, require_full_coverage, kwargs, + dynamic_startup_nodes=dynamic_startup_nodes, address_remap=address_remap, ) self.encoder = Encoder(encoding, encoding_errors, decode_responses) @@ -1142,6 +1152,7 @@ class NodesManager: "require_full_coverage", "slots_cache", "startup_nodes", + "_dynamic_startup_nodes", "address_remap", ) @@ -1150,11 +1161,13 @@ def __init__( startup_nodes: List["ClusterNode"], require_full_coverage: bool, connection_kwargs: Dict[str, Any], + dynamic_startup_nodes: bool = True, address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, ) -> None: self.startup_nodes = {node.name: node for node in startup_nodes} self.require_full_coverage = require_full_coverage self.connection_kwargs = connection_kwargs + self._dynamic_startup_nodes = dynamic_startup_nodes self.address_remap = address_remap self.default_node: "ClusterNode" = None @@ -1387,8 +1400,10 @@ async def initialize(self) -> None: # Set the tmp variables to the real variables self.slots_cache = tmp_slots self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True) - # Populate the startup nodes with all discovered nodes - self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True) + + if self._dynamic_startup_nodes: + # Populate the startup nodes with all discovered nodes + self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True) # Set the default node self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]