diff --git a/CHANGES b/CHANGES index e0959b0e..82c5b6db 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,4 @@ + * Update `ResponseT` type hint * Allow to control the minimum SSL version * Add an optional lock_name attribute to LockError. * Fix return types for `get`, `set_path` and `strappend` in JSONCommands diff --git a/setup.py b/setup.py index e6e25520..69a0d35a 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ long_description_content_type="text/markdown", keywords=["Valkey", "key-value store", "database"], license="MIT", - version="5.1.0b5", + version="5.1.0b6", packages=find_packages( include=[ "valkey", @@ -33,7 +33,7 @@ "Issue tracker": "https://github.com/valkey-io/valkey-py/issues", }, author="valkey-py authors", - author_email="placeholder@valkey.io", + author_email="valkey-py@lists.valkey.io", python_requires=">=3.8", install_requires=[ 'async-timeout>=4.0.3; python_full_version<"3.11.3"', diff --git a/tests/ssl_utils.py b/tests/ssl_utils.py index ab9c2e89..1de53bbf 100644 --- a/tests/ssl_utils.py +++ b/tests/ssl_utils.py @@ -9,6 +9,6 @@ def get_ssl_filename(name): os.path.join(root, "..", "dockers", "stunnel", "keys") ) if not os.path.isdir(cert_dir): - raise IOError(f"No SSL certificates found. They should be in {cert_dir}") + raise OSError(f"No SSL certificates found. They should be in {cert_dir}") return os.path.join(cert_dir, name) diff --git a/tests/test_asyncio/compat.py b/tests/test_asyncio/compat.py index 4a9778b7..aa1dc49a 100644 --- a/tests/test_asyncio/compat.py +++ b/tests/test_asyncio/compat.py @@ -4,7 +4,7 @@ try: mock.AsyncMock except AttributeError: - import mock + from unittest import mock try: from contextlib import aclosing diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 6e85f16c..82f55c55 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -1430,7 +1430,7 @@ async def test_memory_stats(self, r: ValkeyCluster) -> None: assert isinstance(stats, dict) for key, value in stats.items(): if key.startswith("db."): - assert isinstance(value, dict) + assert not isinstance(value, list) @skip_if_server_version_lt("4.0.0") async def test_memory_help(self, r: ValkeyCluster) -> None: diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 3b570c90..b374321f 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -1352,7 +1352,7 @@ async def test_hscan(self, r: valkey.Valkey): _, dic = await r.hscan("a_notset", match="a") assert dic == {} - @skip_if_server_version_lt("7.4.0") + @skip_if_server_version_lt("7.3.240") async def test_hscan_novalues(self, r: valkey.Valkey): await r.hset("a", mapping={"a": 1, "b": 2, "c": 3}) cursor, keys = await r.hscan("a", no_values=True) @@ -1373,7 +1373,7 @@ async def test_hscan_iter(self, r: valkey.Valkey): dic = {k: v async for k, v in r.hscan_iter("a_notset", match="a")} assert dic == {} - @skip_if_server_version_lt("7.4.0") + @skip_if_server_version_lt("7.3.240") async def test_hscan_iter_novalues(self, r: valkey.Valkey): await r.hset("a", mapping={"a": 1, "b": 2, "c": 3}) keys = list([k async for k in r.hscan_iter("a", no_values=True)]) @@ -3235,7 +3235,7 @@ async def test_memory_stats(self, r: valkey.Valkey): assert isinstance(stats, dict) for key, value in stats.items(): if key.startswith("db."): - assert isinstance(value, dict) + assert not isinstance(value, list) @skip_if_server_version_lt("4.0.0") async def test_memory_usage(self, r: valkey.Valkey): diff --git a/tests/test_asyncio/test_hash.py b/tests/test_asyncio/test_hash.py new file mode 100644 index 00000000..d4f18053 --- /dev/null +++ b/tests/test_asyncio/test_hash.py @@ -0,0 +1,300 @@ +import asyncio +from datetime import datetime, timedelta + +from tests.conftest import skip_if_server_version_lt + + +@skip_if_server_version_lt("7.3.240") +async def test_hexpire_basic(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + assert await r.hexpire("test:hash", 1, "field1") == [1] + await asyncio.sleep(1.1) + assert await r.hexists("test:hash", "field1") is False + assert await r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +async def test_hexpire_with_timedelta(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + assert await r.hexpire("test:hash", timedelta(seconds=1), "field1") == [1] + await asyncio.sleep(1.1) + assert await r.hexists("test:hash", "field1") is False + assert await r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +async def test_hexpire_conditions(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1"}) + assert await r.hexpire("test:hash", 2, "field1", xx=True) == [0] + assert await r.hexpire("test:hash", 2, "field1", nx=True) == [1] + assert await r.hexpire("test:hash", 1, "field1", xx=True) == [1] + assert await r.hexpire("test:hash", 2, "field1", nx=True) == [0] + await asyncio.sleep(1.1) + assert await r.hexists("test:hash", "field1") is False + await r.hset("test:hash", "field1", "value1") + await r.hexpire("test:hash", 2, "field1") + assert await r.hexpire("test:hash", 1, "field1", gt=True) == [0] + assert await r.hexpire("test:hash", 1, "field1", lt=True) == [1] + await asyncio.sleep(1.1) + assert await r.hexists("test:hash", "field1") is False + + +@skip_if_server_version_lt("7.3.240") +async def test_hexpire_nonexistent_key_or_field(r): + await r.delete("test:hash") + assert await r.hexpire("test:hash", 1, "field1") == [] + await r.hset("test:hash", "field1", "value1") + assert await r.hexpire("test:hash", 1, "nonexistent_field") == [-2] + + +@skip_if_server_version_lt("7.3.240") +async def test_hexpire_multiple_fields(r): + await r.delete("test:hash") + await r.hset( + "test:hash", + mapping={"field1": "value1", "field2": "value2", "field3": "value3"}, + ) + assert await r.hexpire("test:hash", 1, "field1", "field2") == [1, 1] + await asyncio.sleep(1.1) + assert await r.hexists("test:hash", "field1") is False + assert await r.hexists("test:hash", "field2") is False + assert await r.hexists("test:hash", "field3") is True + + +@skip_if_server_version_lt("7.3.240") +async def test_hpexpire_basic(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + assert await r.hpexpire("test:hash", 500, "field1") == [1] + await asyncio.sleep(0.6) + assert await r.hexists("test:hash", "field1") is False + assert await r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +async def test_hpexpire_with_timedelta(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + assert await r.hpexpire("test:hash", timedelta(milliseconds=500), "field1") == [1] + await asyncio.sleep(0.6) + assert await r.hexists("test:hash", "field1") is False + assert await r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +async def test_hpexpire_conditions(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1"}) + assert await r.hpexpire("test:hash", 1500, "field1", xx=True) == [0] + assert await r.hpexpire("test:hash", 1500, "field1", nx=True) == [1] + assert await r.hpexpire("test:hash", 500, "field1", xx=True) == [1] + assert await r.hpexpire("test:hash", 1500, "field1", nx=True) == [0] + await asyncio.sleep(0.6) + assert await r.hexists("test:hash", "field1") is False + await r.hset("test:hash", "field1", "value1") + await r.hpexpire("test:hash", 1000, "field1") + assert await r.hpexpire("test:hash", 500, "field1", gt=True) == [0] + assert await r.hpexpire("test:hash", 500, "field1", lt=True) == [1] + await asyncio.sleep(0.6) + assert await r.hexists("test:hash", "field1") is False + + +@skip_if_server_version_lt("7.3.240") +async def test_hpexpire_nonexistent_key_or_field(r): + await r.delete("test:hash") + assert await r.hpexpire("test:hash", 500, "field1") == [] + await r.hset("test:hash", "field1", "value1") + assert await r.hpexpire("test:hash", 500, "nonexistent_field") == [-2] + + +@skip_if_server_version_lt("7.3.240") +async def test_hpexpire_multiple_fields(r): + await r.delete("test:hash") + await r.hset( + "test:hash", + mapping={"field1": "value1", "field2": "value2", "field3": "value3"}, + ) + assert await r.hpexpire("test:hash", 500, "field1", "field2") == [1, 1] + await asyncio.sleep(0.6) + assert await r.hexists("test:hash", "field1") is False + assert await r.hexists("test:hash", "field2") is False + assert await r.hexists("test:hash", "field3") is True + + +@skip_if_server_version_lt("7.3.240") +async def test_hexpireat_basic(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp()) + assert await r.hexpireat("test:hash", exp_time, "field1") == [1] + await asyncio.sleep(1.1) + assert await r.hexists("test:hash", "field1") is False + assert await r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +async def test_hexpireat_with_datetime(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + exp_time = datetime.now() + timedelta(seconds=1) + assert await r.hexpireat("test:hash", exp_time, "field1") == [1] + await asyncio.sleep(1.1) + assert await r.hexists("test:hash", "field1") is False + assert await r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +async def test_hexpireat_conditions(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1"}) + future_exp_time = int((datetime.now() + timedelta(seconds=2)).timestamp()) + past_exp_time = int((datetime.now() - timedelta(seconds=1)).timestamp()) + assert await r.hexpireat("test:hash", future_exp_time, "field1", xx=True) == [0] + assert await r.hexpireat("test:hash", future_exp_time, "field1", nx=True) == [1] + assert await r.hexpireat("test:hash", past_exp_time, "field1", gt=True) == [0] + assert await r.hexpireat("test:hash", past_exp_time, "field1", lt=True) == [2] + assert await r.hexists("test:hash", "field1") is False + + +@skip_if_server_version_lt("7.3.240") +async def test_hexpireat_nonexistent_key_or_field(r): + await r.delete("test:hash") + future_exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp()) + assert await r.hexpireat("test:hash", future_exp_time, "field1") == [] + await r.hset("test:hash", "field1", "value1") + assert await r.hexpireat("test:hash", future_exp_time, "nonexistent_field") == [-2] + + +@skip_if_server_version_lt("7.3.240") +async def test_hexpireat_multiple_fields(r): + await r.delete("test:hash") + await r.hset( + "test:hash", + mapping={"field1": "value1", "field2": "value2", "field3": "value3"}, + ) + exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp()) + assert await r.hexpireat("test:hash", exp_time, "field1", "field2") == [1, 1] + await asyncio.sleep(1.1) + assert await r.hexists("test:hash", "field1") is False + assert await r.hexists("test:hash", "field2") is False + assert await r.hexists("test:hash", "field3") is True + + +@skip_if_server_version_lt("7.3.240") +async def test_hpexpireat_basic(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + exp_time = int((datetime.now() + timedelta(milliseconds=400)).timestamp() * 1000) + assert await r.hpexpireat("test:hash", exp_time, "field1") == [1] + await asyncio.sleep(0.5) + assert await r.hexists("test:hash", "field1") is False + assert await r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +async def test_hpexpireat_with_datetime(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + exp_time = datetime.now() + timedelta(milliseconds=400) + assert await r.hpexpireat("test:hash", exp_time, "field1") == [1] + await asyncio.sleep(0.5) + assert await r.hexists("test:hash", "field1") is False + assert await r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +async def test_hpexpireat_conditions(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1"}) + future_exp_time = int( + (datetime.now() + timedelta(milliseconds=500)).timestamp() * 1000 + ) + past_exp_time = int( + (datetime.now() - timedelta(milliseconds=500)).timestamp() * 1000 + ) + assert await r.hpexpireat("test:hash", future_exp_time, "field1", xx=True) == [0] + assert await r.hpexpireat("test:hash", future_exp_time, "field1", nx=True) == [1] + assert await r.hpexpireat("test:hash", past_exp_time, "field1", gt=True) == [0] + assert await r.hpexpireat("test:hash", past_exp_time, "field1", lt=True) == [2] + assert await r.hexists("test:hash", "field1") is False + + +@skip_if_server_version_lt("7.3.240") +async def test_hpexpireat_nonexistent_key_or_field(r): + await r.delete("test:hash") + future_exp_time = int( + (datetime.now() + timedelta(milliseconds=500)).timestamp() * 1000 + ) + assert await r.hpexpireat("test:hash", future_exp_time, "field1") == [] + await r.hset("test:hash", "field1", "value1") + assert await r.hpexpireat("test:hash", future_exp_time, "nonexistent_field") == [-2] + + +@skip_if_server_version_lt("7.3.240") +async def test_hpexpireat_multiple_fields(r): + await r.delete("test:hash") + await r.hset( + "test:hash", + mapping={"field1": "value1", "field2": "value2", "field3": "value3"}, + ) + exp_time = int((datetime.now() + timedelta(milliseconds=400)).timestamp() * 1000) + assert await r.hpexpireat("test:hash", exp_time, "field1", "field2") == [1, 1] + await asyncio.sleep(0.5) + assert await r.hexists("test:hash", "field1") is False + assert await r.hexists("test:hash", "field2") is False + assert await r.hexists("test:hash", "field3") is True + + +@skip_if_server_version_lt("7.3.240") +async def test_hpersist_multiple_fields_mixed_conditions(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + await r.hexpire("test:hash", 5000, "field1") + assert await r.hpersist("test:hash", "field1", "field2", "field3") == [1, -1, -2] + + +@skip_if_server_version_lt("7.3.240") +async def test_hexpiretime_multiple_fields_mixed_conditions(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + future_time = int((datetime.now() + timedelta(minutes=30)).timestamp()) + await r.hexpireat("test:hash", future_time, "field1") + result = await r.hexpiretime("test:hash", "field1", "field2", "field3") + assert future_time - 10 < result[0] <= future_time + assert result[1:] == [-1, -2] + + +@skip_if_server_version_lt("7.3.240") +async def test_hpexpiretime_multiple_fields_mixed_conditions(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + future_time = int((datetime.now() + timedelta(minutes=30)).timestamp()) + await r.hexpireat("test:hash", future_time, "field1") + result = await r.hpexpiretime("test:hash", "field1", "field2", "field3") + assert future_time * 1000 - 10000 < result[0] <= future_time * 1000 + assert result[1:] == [-1, -2] + + +@skip_if_server_version_lt("7.3.240") +async def test_ttl_multiple_fields_mixed_conditions(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + future_time = int((datetime.now() + timedelta(minutes=30)).timestamp()) + await r.hexpireat("test:hash", future_time, "field1") + result = await r.httl("test:hash", "field1", "field2", "field3") + assert 30 * 60 - 10 < result[0] <= 30 * 60 + assert result[1:] == [-1, -2] + + +@skip_if_server_version_lt("7.3.240") +async def test_pttl_multiple_fields_mixed_conditions(r): + await r.delete("test:hash") + await r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + future_time = int((datetime.now() + timedelta(minutes=30)).timestamp()) + await r.hexpireat("test:hash", future_time, "field1") + result = await r.hpttl("test:hash", "field1", "field2", "field3") + assert 30 * 60000 - 10000 < result[0] <= 30 * 60000 + assert result[1:] == [-1, -2] diff --git a/tests/test_asyncio/test_search.py b/tests/test_asyncio/test_search.py index b4661cf5..ec6f1812 100644 --- a/tests/test_asyncio/test_search.py +++ b/tests/test_asyncio/test_search.py @@ -1491,14 +1491,14 @@ async def test_withsuffixtrie(decoded_r: valkey.Valkey): assert await decoded_r.ft().dropindex("idx") # create withsuffixtrie index (text field) - assert await decoded_r.ft().create_index((TextField("t", withsuffixtrie=True))) + assert await decoded_r.ft().create_index(TextField("t", withsuffixtrie=True)) await waitForIndex(decoded_r, getattr(decoded_r.ft(), "index_name", "idx")) info = await decoded_r.ft().info() assert "WITHSUFFIXTRIE" in info["attributes"][0] assert await decoded_r.ft().dropindex("idx") # create withsuffixtrie index (tag field) - assert await decoded_r.ft().create_index((TagField("t", withsuffixtrie=True))) + assert await decoded_r.ft().create_index(TagField("t", withsuffixtrie=True)) await waitForIndex(decoded_r, getattr(decoded_r.ft(), "index_name", "idx")) info = await decoded_r.ft().info() assert "WITHSUFFIXTRIE" in info["attributes"][0] @@ -1508,14 +1508,14 @@ async def test_withsuffixtrie(decoded_r: valkey.Valkey): assert await decoded_r.ft().dropindex("idx") # create withsuffixtrie index (text fields) - assert await decoded_r.ft().create_index((TextField("t", withsuffixtrie=True))) + assert await decoded_r.ft().create_index(TextField("t", withsuffixtrie=True)) waitForIndex(decoded_r, getattr(decoded_r.ft(), "index_name", "idx")) info = await decoded_r.ft().info() assert "WITHSUFFIXTRIE" in info["attributes"][0]["flags"] assert await decoded_r.ft().dropindex("idx") # create withsuffixtrie index (tag field) - assert await decoded_r.ft().create_index((TagField("t", withsuffixtrie=True))) + assert await decoded_r.ft().create_index(TagField("t", withsuffixtrie=True)) waitForIndex(decoded_r, getattr(decoded_r.ft(), "index_name", "idx")) info = await decoded_r.ft().info() assert "WITHSUFFIXTRIE" in info["attributes"][0]["flags"] diff --git a/tests/test_cluster.py b/tests/test_cluster.py index b78418f2..38c95b5b 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -1548,7 +1548,7 @@ def test_memory_stats(self, r): assert isinstance(stats, dict) for key, value in stats.items(): if key.startswith("db."): - assert isinstance(value, dict) + assert not isinstance(value, list) @skip_if_server_version_lt("4.0.0") def test_memory_help(self, r): diff --git a/tests/test_commands.py b/tests/test_commands.py index 9f35b5d7..38bfa422 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -692,11 +692,12 @@ def test_client_kill_filter_by_user(self, r, request): assert c["user"] != killuser r.acl_deluser(killuser) - @skip_if_server_version_lt("7.4.0") + @skip_if_server_version_lt("7.3.240") + @pytest.mark.onlynoncluster def test_client_kill_filter_by_maxage(self, r, request): _get_client(valkey.Valkey, request, flushdb=False) time.sleep(4) - assert len(r.client_list()) == 2 + assert len(r.client_list()) >= 2 r.client_kill_filter(maxage=2) assert len(r.client_list()) == 1 @@ -2133,7 +2134,7 @@ def test_hscan(self, r): _, dic = r.hscan("a_notset") assert dic == {} - @skip_if_server_version_lt("7.4.0") + @skip_if_server_version_lt("7.3.240") def test_hscan_novalues(self, r): r.hset("a", mapping={"a": 1, "b": 2, "c": 3}) cursor, keys = r.hscan("a", no_values=True) @@ -2154,7 +2155,7 @@ def test_hscan_iter(self, r): dic = dict(r.hscan_iter("a_notset")) assert dic == {} - @skip_if_server_version_lt("7.4.0") + @skip_if_server_version_lt("7.3.240") def test_hscan_iter_novalues(self, r): r.hset("a", mapping={"a": 1, "b": 2, "c": 3}) keys = list(r.hscan_iter("a", no_values=True)) @@ -4842,7 +4843,7 @@ def test_memory_stats(self, r): assert isinstance(stats, dict) for key, value in stats.items(): if key.startswith("db."): - assert isinstance(value, dict) + assert not isinstance(value, list) @skip_if_server_version_lt("4.0.0") def test_memory_usage(self, r): diff --git a/tests/test_hash.py b/tests/test_hash.py new file mode 100644 index 00000000..7145b10a --- /dev/null +++ b/tests/test_hash.py @@ -0,0 +1,369 @@ +import time +from datetime import datetime, timedelta + +import pytest +from tests.conftest import skip_if_server_version_lt + + +@skip_if_server_version_lt("7.3.240") +def test_hexpire_basic(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + assert r.hexpire("test:hash", 1, "field1") == [1] + time.sleep(1.1) + assert r.hexists("test:hash", "field1") is False + assert r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +def test_hexpire_with_timedelta(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + assert r.hexpire("test:hash", timedelta(seconds=1), "field1") == [1] + time.sleep(1.1) + assert r.hexists("test:hash", "field1") is False + assert r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +def test_hexpire_conditions(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1"}) + assert r.hexpire("test:hash", 2, "field1", xx=True) == [0] + assert r.hexpire("test:hash", 2, "field1", nx=True) == [1] + assert r.hexpire("test:hash", 1, "field1", xx=True) == [1] + assert r.hexpire("test:hash", 2, "field1", nx=True) == [0] + time.sleep(1.1) + assert r.hexists("test:hash", "field1") is False + r.hset("test:hash", "field1", "value1") + r.hexpire("test:hash", 2, "field1") + assert r.hexpire("test:hash", 1, "field1", gt=True) == [0] + assert r.hexpire("test:hash", 1, "field1", lt=True) == [1] + time.sleep(1.1) + assert r.hexists("test:hash", "field1") is False + + +@skip_if_server_version_lt("7.3.240") +def test_hexpire_nonexistent_key_or_field(r): + r.delete("test:hash") + assert r.hexpire("test:hash", 1, "field1") == [] + r.hset("test:hash", "field1", "value1") + assert r.hexpire("test:hash", 1, "nonexistent_field") == [-2] + + +@skip_if_server_version_lt("7.3.240") +def test_hexpire_multiple_fields(r): + r.delete("test:hash") + r.hset( + "test:hash", + mapping={"field1": "value1", "field2": "value2", "field3": "value3"}, + ) + assert r.hexpire("test:hash", 1, "field1", "field2") == [1, 1] + time.sleep(1.1) + assert r.hexists("test:hash", "field1") is False + assert r.hexists("test:hash", "field2") is False + assert r.hexists("test:hash", "field3") is True + + +@skip_if_server_version_lt("7.3.240") +def test_hexpire_multiple_condition_flags_error(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1"}) + with pytest.raises(ValueError) as e: + r.hexpire("test:hash", 1, "field1", nx=True, xx=True) + assert "Only one of" in str(e) + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpire_basic(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + assert r.hpexpire("test:hash", 500, "field1") == [1] + time.sleep(0.6) + assert r.hexists("test:hash", "field1") is False + assert r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpire_with_timedelta(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + assert r.hpexpire("test:hash", timedelta(milliseconds=500), "field1") == [1] + time.sleep(0.6) + assert r.hexists("test:hash", "field1") is False + assert r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpire_conditions(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1"}) + assert r.hpexpire("test:hash", 1500, "field1", xx=True) == [0] + assert r.hpexpire("test:hash", 1500, "field1", nx=True) == [1] + assert r.hpexpire("test:hash", 500, "field1", xx=True) == [1] + assert r.hpexpire("test:hash", 1500, "field1", nx=True) == [0] + time.sleep(0.6) + assert r.hexists("test:hash", "field1") is False + r.hset("test:hash", "field1", "value1") + r.hpexpire("test:hash", 1000, "field1") + assert r.hpexpire("test:hash", 500, "field1", gt=True) == [0] + assert r.hpexpire("test:hash", 500, "field1", lt=True) == [1] + time.sleep(0.6) + assert r.hexists("test:hash", "field1") is False + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpire_nonexistent_key_or_field(r): + r.delete("test:hash") + assert r.hpexpire("test:hash", 500, "field1") == [] + r.hset("test:hash", "field1", "value1") + assert r.hpexpire("test:hash", 500, "nonexistent_field") == [-2] + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpire_multiple_fields(r): + r.delete("test:hash") + r.hset( + "test:hash", + mapping={"field1": "value1", "field2": "value2", "field3": "value3"}, + ) + assert r.hpexpire("test:hash", 500, "field1", "field2") == [1, 1] + time.sleep(0.6) + assert r.hexists("test:hash", "field1") is False + assert r.hexists("test:hash", "field2") is False + assert r.hexists("test:hash", "field3") is True + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpire_multiple_condition_flags_error(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1"}) + with pytest.raises(ValueError) as e: + r.hpexpire("test:hash", 500, "field1", nx=True, xx=True) + assert "Only one of" in str(e) + + +@skip_if_server_version_lt("7.3.240") +def test_hexpireat_basic(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp()) + assert r.hexpireat("test:hash", exp_time, "field1") == [1] + time.sleep(1.1) + assert r.hexists("test:hash", "field1") is False + assert r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +def test_hexpireat_with_datetime(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + exp_time = datetime.now() + timedelta(seconds=1) + assert r.hexpireat("test:hash", exp_time, "field1") == [1] + time.sleep(1.1) + assert r.hexists("test:hash", "field1") is False + assert r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +def test_hexpireat_conditions(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1"}) + future_exp_time = int((datetime.now() + timedelta(seconds=2)).timestamp()) + past_exp_time = int((datetime.now() - timedelta(seconds=1)).timestamp()) + assert r.hexpireat("test:hash", future_exp_time, "field1", xx=True) == [0] + assert r.hexpireat("test:hash", future_exp_time, "field1", nx=True) == [1] + assert r.hexpireat("test:hash", past_exp_time, "field1", gt=True) == [0] + assert r.hexpireat("test:hash", past_exp_time, "field1", lt=True) == [2] + assert r.hexists("test:hash", "field1") is False + + +@skip_if_server_version_lt("7.3.240") +def test_hexpireat_nonexistent_key_or_field(r): + r.delete("test:hash") + future_exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp()) + assert r.hexpireat("test:hash", future_exp_time, "field1") == [] + r.hset("test:hash", "field1", "value1") + assert r.hexpireat("test:hash", future_exp_time, "nonexistent_field") == [-2] + + +@skip_if_server_version_lt("7.3.240") +def test_hexpireat_multiple_fields(r): + r.delete("test:hash") + r.hset( + "test:hash", + mapping={"field1": "value1", "field2": "value2", "field3": "value3"}, + ) + exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp()) + assert r.hexpireat("test:hash", exp_time, "field1", "field2") == [1, 1] + time.sleep(1.1) + assert r.hexists("test:hash", "field1") is False + assert r.hexists("test:hash", "field2") is False + assert r.hexists("test:hash", "field3") is True + + +@skip_if_server_version_lt("7.3.240") +def test_hexpireat_multiple_condition_flags_error(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1"}) + exp_time = int((datetime.now() + timedelta(seconds=1)).timestamp()) + with pytest.raises(ValueError) as e: + r.hexpireat("test:hash", exp_time, "field1", nx=True, xx=True) + assert "Only one of" in str(e) + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpireat_basic(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + exp_time = int((datetime.now() + timedelta(milliseconds=400)).timestamp() * 1000) + assert r.hpexpireat("test:hash", exp_time, "field1") == [1] + time.sleep(0.5) + assert r.hexists("test:hash", "field1") is False + assert r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpireat_with_datetime(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + exp_time = datetime.now() + timedelta(milliseconds=400) + assert r.hpexpireat("test:hash", exp_time, "field1") == [1] + time.sleep(0.5) + assert r.hexists("test:hash", "field1") is False + assert r.hexists("test:hash", "field2") is True + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpireat_conditions(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1"}) + future_exp_time = int( + (datetime.now() + timedelta(milliseconds=500)).timestamp() * 1000 + ) + past_exp_time = int( + (datetime.now() - timedelta(milliseconds=500)).timestamp() * 1000 + ) + assert r.hpexpireat("test:hash", future_exp_time, "field1", xx=True) == [0] + assert r.hpexpireat("test:hash", future_exp_time, "field1", nx=True) == [1] + assert r.hpexpireat("test:hash", past_exp_time, "field1", gt=True) == [0] + assert r.hpexpireat("test:hash", past_exp_time, "field1", lt=True) == [2] + assert r.hexists("test:hash", "field1") is False + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpireat_nonexistent_key_or_field(r): + r.delete("test:hash") + future_exp_time = int( + (datetime.now() + timedelta(milliseconds=500)).timestamp() * 1000 + ) + assert r.hpexpireat("test:hash", future_exp_time, "field1") == [] + r.hset("test:hash", "field1", "value1") + assert r.hpexpireat("test:hash", future_exp_time, "nonexistent_field") == [-2] + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpireat_multiple_fields(r): + r.delete("test:hash") + r.hset( + "test:hash", + mapping={"field1": "value1", "field2": "value2", "field3": "value3"}, + ) + exp_time = int((datetime.now() + timedelta(milliseconds=400)).timestamp() * 1000) + assert r.hpexpireat("test:hash", exp_time, "field1", "field2") == [1, 1] + time.sleep(0.5) + assert r.hexists("test:hash", "field1") is False + assert r.hexists("test:hash", "field2") is False + assert r.hexists("test:hash", "field3") is True + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpireat_multiple_condition_flags_error(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1"}) + exp_time = int((datetime.now() + timedelta(milliseconds=500)).timestamp()) + with pytest.raises(ValueError) as e: + r.hpexpireat("test:hash", exp_time, "field1", nx=True, xx=True) + assert "Only one of" in str(e) + + +@skip_if_server_version_lt("7.3.240") +def test_hpersist_multiple_fields(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + r.hexpire("test:hash", 5000, "field1") + assert r.hpersist("test:hash", "field1", "field2", "field3") == [1, -1, -2] + + +@skip_if_server_version_lt("7.3.240") +def test_hpersist_nonexistent_key(r): + r.delete("test:hash") + assert r.hpersist("test:hash", "field1", "field2", "field3") == [] + + +@skip_if_server_version_lt("7.3.240") +def test_hexpiretime_multiple_fields_mixed_conditions(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + future_time = int((datetime.now() + timedelta(minutes=30)).timestamp()) + r.hexpireat("test:hash", future_time, "field1") + result = r.hexpiretime("test:hash", "field1", "field2", "field3") + assert future_time - 10 < result[0] <= future_time + assert result[1:] == [-1, -2] + + +@skip_if_server_version_lt("7.3.240") +def test_hexpiretime_nonexistent_key(r): + r.delete("test:hash") + assert r.hexpiretime("test:hash", "field1", "field2", "field3") == [] + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpiretime_multiple_fields_mixed_conditions(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + future_time = int((datetime.now() + timedelta(minutes=30)).timestamp()) + r.hexpireat("test:hash", future_time, "field1") + result = r.hpexpiretime("test:hash", "field1", "field2", "field3") + assert future_time * 1000 - 10000 < result[0] <= future_time * 1000 + assert result[1:] == [-1, -2] + + +@skip_if_server_version_lt("7.3.240") +def test_hpexpiretime_nonexistent_key(r): + r.delete("test:hash") + assert r.hpexpiretime("test:hash", "field1", "field2", "field3") == [] + + +@skip_if_server_version_lt("7.3.240") +def test_httl_multiple_fields_mixed_conditions(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + future_time = int((datetime.now() + timedelta(minutes=30)).timestamp()) + r.hexpireat("test:hash", future_time, "field1") + result = r.httl("test:hash", "field1", "field2", "field3") + assert 30 * 60 - 10 < result[0] <= 30 * 60 + assert result[1:] == [-1, -2] + + +@skip_if_server_version_lt("7.3.240") +def test_httl_nonexistent_key(r): + r.delete("test:hash") + assert r.httl("test:hash", "field1", "field2", "field3") == [] + + +@skip_if_server_version_lt("7.3.240") +def test_hpttl_multiple_fields_mixed_conditions(r): + r.delete("test:hash") + r.hset("test:hash", mapping={"field1": "value1", "field2": "value2"}) + future_time = int((datetime.now() + timedelta(minutes=30)).timestamp()) + r.hexpireat("test:hash", future_time, "field1") + result = r.hpttl("test:hash", "field1", "field2", "field3") + assert 30 * 60000 - 10000 < result[0] <= 30 * 60000 + assert result[1:] == [-1, -2] + + +@skip_if_server_version_lt("7.3.240") +def test_hpttl_nonexistent_key(r): + r.delete("test:hash") + assert r.hpttl("test:hash", "field1", "field2", "field3") == [] diff --git a/tests/test_parsers/test_helpers.py b/tests/test_parsers/test_helpers.py new file mode 100644 index 00000000..5986370a --- /dev/null +++ b/tests/test_parsers/test_helpers.py @@ -0,0 +1,35 @@ +from valkey._parsers.helpers import parse_info + + +def test_parse_info(): + info_output = """ +# Modules +module:name=search,ver=999999,api=1,filters=0,usedby=[],using=[ReJSON],options=[handle-io-errors] + +# search_fields_statistics +search_fields_text:Text=3 +search_fields_tag:Tag=2,Sortable=1 + +# search_version +search_version:99.99.99 +search_redis_version:7.2.2 - oss + +# search_runtime_configurations +search_query_timeout_ms:500 + """ + info = parse_info(info_output) + + assert isinstance(info["modules"], list) + assert isinstance(info["modules"][0], dict) + assert info["modules"][0]["name"] == "search" + + assert isinstance(info["search_fields_text"], dict) + assert info["search_fields_text"]["Text"] == 3 + + assert isinstance(info["search_fields_tag"], dict) + assert info["search_fields_tag"]["Tag"] == 2 + assert info["search_fields_tag"]["Sortable"] == 1 + + assert info["search_version"] == "99.99.99" + assert info["search_redis_version"] == "7.2.2 - oss" + assert info["search_query_timeout_ms"] == 500 diff --git a/tests/test_search.py b/tests/test_search.py index aa760055..5b645096 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -1383,16 +1383,14 @@ def test_aggregations_apply(client): ) res = client.ft().aggregate(req) if is_resp2_connection(client): - res_set = set([res.rows[0][1], res.rows[1][1]]) - assert res_set == set(["6373878785249699840", "6373878758592700416"]) + res_set = {res.rows[0][1], res.rows[1][1]} + assert res_set == {"6373878785249699840", "6373878758592700416"} else: - res_set = set( - [ - res["results"][0]["extra_attributes"]["CreatedDateTimeUTC"], - res["results"][1]["extra_attributes"]["CreatedDateTimeUTC"], - ], - ) - assert res_set == set(["6373878785249699840", "6373878758592700416"]) + res_set = { + res["results"][0]["extra_attributes"]["CreatedDateTimeUTC"], + res["results"][1]["extra_attributes"]["CreatedDateTimeUTC"], + } + assert res_set == {"6373878785249699840", "6373878758592700416"} @pytest.mark.valkeymod @@ -2097,7 +2095,7 @@ def test_numeric_params(client): @pytest.mark.valkeymod @skip_ifmodversion_lt("2.4.3", "search") def test_geo_params(client): - client.ft().create_index((GeoField("g"))) + client.ft().create_index(GeoField("g")) client.hset("doc1", mapping={"g": "29.69465, 34.95126"}) client.hset("doc2", mapping={"g": "29.69350, 34.94737"}) client.hset("doc3", mapping={"g": "29.68746, 34.94882"}) @@ -2225,14 +2223,14 @@ def test_withsuffixtrie(client: valkey.Valkey): assert client.ft().dropindex("idx") # create withsuffixtrie index (text fields) - assert client.ft().create_index((TextField("t", withsuffixtrie=True))) + assert client.ft().create_index(TextField("t", withsuffixtrie=True)) waitForIndex(client, getattr(client.ft(), "index_name", "idx")) info = client.ft().info() assert "WITHSUFFIXTRIE" in info["attributes"][0] assert client.ft().dropindex("idx") # create withsuffixtrie index (tag field) - assert client.ft().create_index((TagField("t", withsuffixtrie=True))) + assert client.ft().create_index(TagField("t", withsuffixtrie=True)) waitForIndex(client, getattr(client.ft(), "index_name", "idx")) info = client.ft().info() assert "WITHSUFFIXTRIE" in info["attributes"][0] @@ -2242,14 +2240,14 @@ def test_withsuffixtrie(client: valkey.Valkey): assert client.ft().dropindex("idx") # create withsuffixtrie index (text fields) - assert client.ft().create_index((TextField("t", withsuffixtrie=True))) + assert client.ft().create_index(TextField("t", withsuffixtrie=True)) waitForIndex(client, getattr(client.ft(), "index_name", "idx")) info = client.ft().info() assert "WITHSUFFIXTRIE" in info["attributes"][0]["flags"] assert client.ft().dropindex("idx") # create withsuffixtrie index (tag field) - assert client.ft().create_index((TagField("t", withsuffixtrie=True))) + assert client.ft().create_index(TagField("t", withsuffixtrie=True)) waitForIndex(client, getattr(client.ft(), "index_name", "idx")) info = client.ft().info() assert "WITHSUFFIXTRIE" in info["attributes"][0]["flags"] @@ -2268,7 +2266,7 @@ def test_query_timeout(r: valkey.Valkey): @pytest.mark.valkeymod def test_geoshape(client: valkey.Valkey): - client.ft().create_index((GeoShapeField("geom", GeoShapeField.FLAT))) + client.ft().create_index(GeoShapeField("geom", GeoShapeField.FLAT)) waitForIndex(client, getattr(client.ft(), "index_name", "idx")) client.hset("small", "geom", "POLYGON((1 1, 1 100, 100 100, 100 1, 1 1))") client.hset("large", "geom", "POLYGON((1 1, 1 200, 200 200, 200 1, 1 1))") diff --git a/valkey/__init__.py b/valkey/__init__.py index 9190f7fa..e4202fbe 100644 --- a/valkey/__init__.py +++ b/valkey/__init__.py @@ -1,4 +1,4 @@ -import sys +from importlib import metadata from valkey import asyncio # noqa from valkey.backoff import default_backoff @@ -36,11 +36,6 @@ ) from valkey.utils import from_url -if sys.version_info >= (3, 8): - from importlib import metadata -else: - import importlib_metadata as metadata - def int_or_str(value): try: diff --git a/valkey/_parsers/helpers.py b/valkey/_parsers/helpers.py index 0b30efd6..1e89c38d 100644 --- a/valkey/_parsers/helpers.py +++ b/valkey/_parsers/helpers.py @@ -38,7 +38,7 @@ def parse_info(response): response = str_if_bytes(response) def get_value(value): - if "," not in value or "=" not in value: + if "," not in value and "=" not in value: try: if "." in value: return float(value) @@ -80,7 +80,7 @@ def parse_memory_stats(response, **kwargs): """Parse the results of MEMORY STATS""" stats = pairs_to_dict(response, decode_keys=True, decode_string_values=True) for key, value in stats.items(): - if key.startswith("db."): + if key.startswith("db.") and isinstance(value, list): stats[key] = pairs_to_dict( value, decode_keys=True, decode_string_values=True ) diff --git a/valkey/asyncio/cluster.py b/valkey/asyncio/cluster.py index 4e7e3580..b240b4df 100644 --- a/valkey/asyncio/cluster.py +++ b/valkey/asyncio/cluster.py @@ -278,7 +278,7 @@ def __init__( ssl_min_version: Optional[ssl.TLSVersion] = None, ssl_ciphers: Optional[str] = None, protocol: Optional[int] = 2, - address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None, + address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, cache_enabled: bool = False, client_cache: Optional[AbstractCache] = None, cache_max_size: int = 100, @@ -1155,7 +1155,7 @@ def __init__( startup_nodes: List["ClusterNode"], require_full_coverage: bool, connection_kwargs: Dict[str, Any], - address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None, + address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, ) -> None: self.startup_nodes = {node.name: node for node in startup_nodes} self.require_full_coverage = require_full_coverage diff --git a/valkey/backoff.py b/valkey/backoff.py index c62e760b..f612d607 100644 --- a/valkey/backoff.py +++ b/valkey/backoff.py @@ -19,7 +19,7 @@ def reset(self): pass @abstractmethod - def compute(self, failures): + def compute(self, failures: int) -> float: """Compute backoff in seconds upon failure""" pass @@ -27,25 +27,25 @@ def compute(self, failures): class ConstantBackoff(AbstractBackoff): """Constant backoff upon failure""" - def __init__(self, backoff): + def __init__(self, backoff: float) -> None: """`backoff`: backoff time in seconds""" self._backoff = backoff - def compute(self, failures): + def compute(self, failures: int) -> float: return self._backoff class NoBackoff(ConstantBackoff): """No backoff upon failure""" - def __init__(self): + def __init__(self) -> None: super().__init__(0) class ExponentialBackoff(AbstractBackoff): """Exponential backoff upon failure""" - def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): + def __init__(self, cap: float = DEFAULT_CAP, base: float = DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -53,14 +53,14 @@ def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): self._cap = cap self._base = base - def compute(self, failures): + def compute(self, failures: int) -> float: return min(self._cap, self._base * 2**failures) class FullJitterBackoff(AbstractBackoff): """Full jitter backoff upon failure""" - def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): + def __init__(self, cap: float = DEFAULT_CAP, base: float = DEFAULT_BASE) -> None: """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -68,14 +68,14 @@ def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): self._cap = cap self._base = base - def compute(self, failures): + def compute(self, failures: int) -> float: return random.uniform(0, min(self._cap, self._base * 2**failures)) class EqualJitterBackoff(AbstractBackoff): """Equal jitter backoff upon failure""" - def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): + def __init__(self, cap: float = DEFAULT_CAP, base: float = DEFAULT_BASE) -> None: """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -83,7 +83,7 @@ def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): self._cap = cap self._base = base - def compute(self, failures): + def compute(self, failures: int) -> float: temp = min(self._cap, self._base * 2**failures) / 2 return temp + random.uniform(0, temp) @@ -91,7 +91,7 @@ def compute(self, failures): class DecorrelatedJitterBackoff(AbstractBackoff): """Decorrelated jitter backoff upon failure""" - def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): + def __init__(self, cap: float = DEFAULT_CAP, base: float = DEFAULT_BASE) -> None: """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -100,10 +100,10 @@ def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): self._base = base self._previous_backoff = 0 - def reset(self): + def reset(self) -> None: self._previous_backoff = 0 - def compute(self, failures): + def compute(self, failures: int) -> float: max_backoff = max(self._base, self._previous_backoff * 3) temp = random.uniform(self._base, max_backoff) self._previous_backoff = min(self._cap, temp) diff --git a/valkey/client.py b/valkey/client.py index 6e5dd9ae..eed48b45 100755 --- a/valkey/client.py +++ b/valkey/client.py @@ -829,7 +829,7 @@ def clean_health_check_responses(self) -> None: else: raise PubSubError( "A non health check response was cleaned by " - "execute_command: {0}".format(response) + "execute_command: {}".format(response) ) ttl -= 1 diff --git a/valkey/cluster.py b/valkey/cluster.py index 1619184b..0a4a3504 100644 --- a/valkey/cluster.py +++ b/valkey/cluster.py @@ -504,7 +504,7 @@ def __init__( read_from_replicas: bool = False, dynamic_startup_nodes: bool = True, url: Optional[str] = None, - address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None, + address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, **kwargs, ): """ @@ -1344,7 +1344,7 @@ def __init__( lock=None, dynamic_startup_nodes=True, connection_pool_class=ConnectionPool, - address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None, + address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, **kwargs, ): self.nodes_cache = {} @@ -1848,8 +1848,7 @@ def _sharded_message_generator(self): def _pubsubs_generator(self): while True: - for pubsub in self.node_pubsub_mapping.values(): - yield pubsub + yield from self.node_pubsub_mapping.values() def get_sharded_message( self, ignore_subscribe_messages=False, timeout=0.0, target_node=None diff --git a/valkey/commands/bf/__init__.py b/valkey/commands/bf/__init__.py index b0ca008a..27369023 100644 --- a/valkey/commands/bf/__init__.py +++ b/valkey/commands/bf/__init__.py @@ -5,7 +5,7 @@ from .info import BFInfo, CFInfo, CMSInfo, TDigestInfo, TopKInfo -class AbstractBloom(object): +class AbstractBloom: """ The client allows to interact with RedisBloom and use all of it's functionality. diff --git a/valkey/commands/bf/info.py b/valkey/commands/bf/info.py index e1f02086..1a876c16 100644 --- a/valkey/commands/bf/info.py +++ b/valkey/commands/bf/info.py @@ -1,7 +1,7 @@ from ..helpers import nativestr -class BFInfo(object): +class BFInfo: capacity = None size = None filterNum = None @@ -26,7 +26,7 @@ def __getitem__(self, item): return getattr(self, item) -class CFInfo(object): +class CFInfo: size = None bucketNum = None filterNum = None @@ -57,7 +57,7 @@ def __getitem__(self, item): return getattr(self, item) -class CMSInfo(object): +class CMSInfo: width = None depth = None count = None @@ -72,7 +72,7 @@ def __getitem__(self, item): return getattr(self, item) -class TopKInfo(object): +class TopKInfo: k = None width = None depth = None @@ -89,7 +89,7 @@ def __getitem__(self, item): return getattr(self, item) -class TDigestInfo(object): +class TDigestInfo: compression = None capacity = None merged_nodes = None diff --git a/valkey/commands/core.py b/valkey/commands/core.py index cd071c31..33b5abe4 100644 --- a/valkey/commands/core.py +++ b/valkey/commands/core.py @@ -5090,6 +5090,374 @@ def hstrlen(self, name: str, key: str) -> Union[Awaitable[int], int]: """ return self.execute_command("HSTRLEN", name, key, keys=[name]) + def hexpire( + self, + name: KeyT, + seconds: ExpiryT, + *fields: str, + nx: bool = False, + xx: bool = False, + gt: bool = False, + lt: bool = False, + ) -> ResponseT: + """ + Sets or updates the expiration time for fields within a hash key, using relative + time in seconds. + + If a field already has an expiration time, the behavior of the update can be + controlled using the `nx`, `xx`, `gt`, and `lt` parameters. + + The return value provides detailed information about the outcome for each field. + + For more information, see https://redis.io/commands/hexpire + + Args: + name: The name of the hash key. + seconds: Expiration time in seconds, relative. Can be an integer, or a + Python `timedelta` object. + fields: List of fields within the hash to apply the expiration time to. + nx: Set expiry only when the field has no expiry. + xx: Set expiry only when the field has an existing expiry. + gt: Set expiry only when the new expiry is greater than the current one. + lt: Set expiry only when the new expiry is less than the current one. + + Returns: + If the key does not exist, returns an empty list. If the key exists, returns + a list which contains for each field in the request: + - `-2` if the field does not exist. + - `0` if the specified NX | XX | GT | LT condition was not met. + - `1` if the expiration time was set or updated. + - `2` if the field was deleted because the specified expiration time is + in the past. + """ + conditions = [nx, xx, gt, lt] + if sum(conditions) > 1: + raise ValueError("Only one of 'nx', 'xx', 'gt', 'lt' can be specified.") + + if isinstance(seconds, datetime.timedelta): + seconds = int(seconds.total_seconds()) + + options = [] + if nx: + options.append("NX") + if xx: + options.append("XX") + if gt: + options.append("GT") + if lt: + options.append("LT") + + return self.execute_command( + "HEXPIRE", name, seconds, *options, "FIELDS", len(fields), *fields + ) + + def hpexpire( + self, + name: KeyT, + milliseconds: ExpiryT, + *fields: str, + nx: bool = False, + xx: bool = False, + gt: bool = False, + lt: bool = False, + ) -> ResponseT: + """ + Sets or updates the expiration time for fields within a hash key, using relative + time in milliseconds. + + If a field already has an expiration time, the behavior of the update can be + controlled using the `nx`, `xx`, `gt`, and `lt` parameters. + + The return value provides detailed information about the outcome for each field. + + For more information, see https://redis.io/commands/hpexpire + + Args: + name: The name of the hash key. + milliseconds: Expiration time in milliseconds, relative. Can be an integer, + or a Python `timedelta` object. + fields: List of fields within the hash to apply the expiration time to. + nx: Set expiry only when the field has no expiry. + xx: Set expiry only when the field has an existing expiry. + gt: Set expiry only when the new expiry is greater than the current one. + lt: Set expiry only when the new expiry is less than the current one. + + Returns: + If the key does not exist, returns an empty list. If the key exists, returns + a list which contains for each field in the request: + - `-2` if the field does not exist. + - `0` if the specified NX | XX | GT | LT condition was not met. + - `1` if the expiration time was set or updated. + - `2` if the field was deleted because the specified expiration time is + in the past. + """ + conditions = [nx, xx, gt, lt] + if sum(conditions) > 1: + raise ValueError("Only one of 'nx', 'xx', 'gt', 'lt' can be specified.") + + if isinstance(milliseconds, datetime.timedelta): + milliseconds = int(milliseconds.total_seconds() * 1000) + + options = [] + if nx: + options.append("NX") + if xx: + options.append("XX") + if gt: + options.append("GT") + if lt: + options.append("LT") + + return self.execute_command( + "HPEXPIRE", name, milliseconds, *options, "FIELDS", len(fields), *fields + ) + + def hexpireat( + self, + name: KeyT, + unix_time_seconds: AbsExpiryT, + *fields: str, + nx: bool = False, + xx: bool = False, + gt: bool = False, + lt: bool = False, + ) -> ResponseT: + """ + Sets or updates the expiration time for fields within a hash key, using an + absolute Unix timestamp in seconds. + + If a field already has an expiration time, the behavior of the update can be + controlled using the `nx`, `xx`, `gt`, and `lt` parameters. + + The return value provides detailed information about the outcome for each field. + + For more information, see https://redis.io/commands/hexpireat + + Args: + name: The name of the hash key. + unix_time_seconds: Expiration time as Unix timestamp in seconds. Can be an + integer or a Python `datetime` object. + fields: List of fields within the hash to apply the expiration time to. + nx: Set expiry only when the field has no expiry. + xx: Set expiry only when the field has an existing expiration time. + gt: Set expiry only when the new expiry is greater than the current one. + lt: Set expiry only when the new expiry is less than the current one. + + Returns: + If the key does not exist, returns an empty list. If the key exists, returns + a list which contains for each field in the request: + - `-2` if the field does not exist. + - `0` if the specified NX | XX | GT | LT condition was not met. + - `1` if the expiration time was set or updated. + - `2` if the field was deleted because the specified expiration time is + in the past. + """ + conditions = [nx, xx, gt, lt] + if sum(conditions) > 1: + raise ValueError("Only one of 'nx', 'xx', 'gt', 'lt' can be specified.") + + if isinstance(unix_time_seconds, datetime.datetime): + unix_time_seconds = int(unix_time_seconds.timestamp()) + + options = [] + if nx: + options.append("NX") + if xx: + options.append("XX") + if gt: + options.append("GT") + if lt: + options.append("LT") + + return self.execute_command( + "HEXPIREAT", + name, + unix_time_seconds, + *options, + "FIELDS", + len(fields), + *fields, + ) + + def hpexpireat( + self, + name: KeyT, + unix_time_milliseconds: AbsExpiryT, + *fields: str, + nx: bool = False, + xx: bool = False, + gt: bool = False, + lt: bool = False, + ) -> ResponseT: + """ + Sets or updates the expiration time for fields within a hash key, using an + absolute Unix timestamp in milliseconds. + + If a field already has an expiration time, the behavior of the update can be + controlled using the `nx`, `xx`, `gt`, and `lt` parameters. + + The return value provides detailed information about the outcome for each field. + + For more information, see https://redis.io/commands/hpexpireat + + Args: + name: The name of the hash key. + unix_time_milliseconds: Expiration time as Unix timestamp in milliseconds. + Can be an integer or a Python `datetime` object. + fields: List of fields within the hash to apply the expiry. + nx: Set expiry only when the field has no expiry. + xx: Set expiry only when the field has an existing expiry. + gt: Set expiry only when the new expiry is greater than the current one. + lt: Set expiry only when the new expiry is less than the current one. + + Returns: + If the key does not exist, returns an empty list. If the key exists, returns + a list which contains for each field in the request: + - `-2` if the field does not exist. + - `0` if the specified NX | XX | GT | LT condition was not met. + - `1` if the expiration time was set or updated. + - `2` if the field was deleted because the specified expiration time is + in the past. + """ + conditions = [nx, xx, gt, lt] + if sum(conditions) > 1: + raise ValueError("Only one of 'nx', 'xx', 'gt', 'lt' can be specified.") + + if isinstance(unix_time_milliseconds, datetime.datetime): + unix_time_milliseconds = int(unix_time_milliseconds.timestamp() * 1000) + + options = [] + if nx: + options.append("NX") + if xx: + options.append("XX") + if gt: + options.append("GT") + if lt: + options.append("LT") + + return self.execute_command( + "HPEXPIREAT", + name, + unix_time_milliseconds, + *options, + "FIELDS", + len(fields), + *fields, + ) + + def hpersist(self, name: KeyT, *fields: str) -> ResponseT: + """ + Removes the expiration time for each specified field in a hash. + + For more information, see https://redis.io/commands/hpersist + + Args: + name: The name of the hash key. + fields: A list of fields within the hash from which to remove the + expiration time. + + Returns: + If the key does not exist, returns an empty list. If the key exists, returns + a list which contains for each field in the request: + - `-2` if the field does not exist. + - `-1` if the field exists but has no associated expiration time. + - `1` if the expiration time was successfully removed from the field. + """ + return self.execute_command("HPERSIST", name, "FIELDS", len(fields), *fields) + + def hexpiretime(self, key: KeyT, *fields: str) -> ResponseT: + """ + Returns the expiration times of hash fields as Unix timestamps in seconds. + + For more information, see https://redis.io/commands/hexpiretime + + Args: + key: The hash key. + fields: A list of fields within the hash for which to get the expiration + time. + + Returns: + If the key does not exist, returns an empty list. If the key exists, returns + a list which contains for each field in the request: + - `-2` if the field does not exist. + - `-1` if the field exists but has no associated expire time. + - A positive integer representing the expiration Unix timestamp in + seconds, if the field has an associated expiration time. + """ + return self.execute_command( + "HEXPIRETIME", key, "FIELDS", len(fields), *fields, keys=[key] + ) + + def hpexpiretime(self, key: KeyT, *fields: str) -> ResponseT: + """ + Returns the expiration times of hash fields as Unix timestamps in milliseconds. + + For more information, see https://redis.io/commands/hpexpiretime + + Args: + key: The hash key. + fields: A list of fields within the hash for which to get the expiration + time. + + Returns: + If the key does not exist, returns an empty list. If the key exists, returns + a list which contains for each field in the request: + - `-2` if the field does not exist. + - `-1` if the field exists but has no associated expire time. + - A positive integer representing the expiration Unix timestamp in + milliseconds, if the field has an associated expiration time. + """ + return self.execute_command( + "HPEXPIRETIME", key, "FIELDS", len(fields), *fields, keys=[key] + ) + + def httl(self, key: KeyT, *fields: str) -> ResponseT: + """ + Returns the TTL (Time To Live) in seconds for each specified field within a hash + key. + + For more information, see https://redis.io/commands/httl + + Args: + key: The hash key. + fields: A list of fields within the hash for which to get the TTL. + + Returns: + If the key does not exist, returns an empty list. If the key exists, returns + a list which contains for each field in the request: + - `-2` if the field does not exist. + - `-1` if the field exists but has no associated expire time. + - A positive integer representing the TTL in seconds if the field has + an associated expiration time. + """ + return self.execute_command( + "HTTL", key, "FIELDS", len(fields), *fields, keys=[key] + ) + + def hpttl(self, key: KeyT, *fields: str) -> ResponseT: + """ + Returns the TTL (Time To Live) in milliseconds for each specified field within a + hash key. + + For more information, see https://redis.io/commands/hpttl + + Args: + key: The hash key. + fields: A list of fields within the hash for which to get the TTL. + + Returns: + If the key does not exist, returns an empty list. If the key exists, returns + a list which contains for each field in the request: + - `-2` if the field does not exist. + - `-1` if the field exists but has no associated expire time. + - A positive integer representing the TTL in milliseconds if the field + has an associated expiration time. + """ + return self.execute_command( + "HPTTL", key, "FIELDS", len(fields), *fields, keys=[key] + ) + AsyncHashCommands = HashCommands diff --git a/valkey/commands/graph/__init__.py b/valkey/commands/graph/__init__.py index ffaf1fb4..ddc0e34f 100644 --- a/valkey/commands/graph/__init__.py +++ b/valkey/commands/graph/__init__.py @@ -252,7 +252,7 @@ async def call_procedure(self, procedure, *args, read_only=False, **kwagrs): return await self.query(q, read_only=read_only) async def labels(self): - return ((await self.call_procedure(DB_LABELS, read_only=True))).result_set + return (await self.call_procedure(DB_LABELS, read_only=True)).result_set async def property_keys(self): return (await self.call_procedure(DB_PROPERTYKEYS, read_only=True)).result_set diff --git a/valkey/commands/helpers.py b/valkey/commands/helpers.py index 5e8d028f..08790637 100644 --- a/valkey/commands/helpers.py +++ b/valkey/commands/helpers.py @@ -85,7 +85,11 @@ def parse_to_dict(response): res = {} for det in response: - if isinstance(det[1], list): + if not isinstance(det, list) or not det: + continue + if len(det) == 1: + res[det[0]] = True + elif isinstance(det[1], list): res[det[0]] = parse_list_to_dict(det[1]) else: try: # try to set the attribute. may be provided without value diff --git a/valkey/commands/json/commands.py b/valkey/commands/json/commands.py index 3b7ee09a..7866b185 100644 --- a/valkey/commands/json/commands.py +++ b/valkey/commands/json/commands.py @@ -314,7 +314,7 @@ def set_file( """ - with open(file_name, "r") as fp: + with open(file_name) as fp: file_content = loads(fp.read()) return self.set(name, path, file_content, nx=nx, xx=xx, decode_keys=decode_keys) diff --git a/valkey/typing.py b/valkey/typing.py index c44e4cbb..98059061 100644 --- a/valkey/typing.py +++ b/valkey/typing.py @@ -32,7 +32,7 @@ PatternT = _StringLikeT # Patterns matched against keys, fields etc FieldT = EncodableT # Fields within hash tables, streams and geo commands KeysT = Union[KeyT, Iterable[KeyT]] -ResponseT = Union[Awaitable, Any] +ResponseT = Union[Awaitable[Any], Any] ChannelT = _StringLikeT GroupT = _StringLikeT # Consumer group ConsumerT = _StringLikeT # Consumer name diff --git a/valkey/utils.py b/valkey/utils.py index 77ad0e35..4826cd2c 100644 --- a/valkey/utils.py +++ b/valkey/utils.py @@ -1,5 +1,4 @@ import logging -import sys from contextlib import contextmanager from functools import wraps from typing import Any, Dict, Mapping, Union @@ -28,10 +27,7 @@ except ImportError: CRYPTOGRAPHY_AVAILABLE = False -if sys.version_info >= (3, 8): - from importlib import metadata -else: - import importlib_metadata as metadata +from importlib import metadata def from_url(url, **kwargs):