Skip to content

Commit

Permalink
Fix redis events outbound transit (#33)
Browse files Browse the repository at this point in the history
* Fix redis events outbound transit

Signed-off-by: jamshale <[email protected]>

* Fix linting

Signed-off-by: jamshale <[email protected]>

---------

Signed-off-by: jamshale <[email protected]>
  • Loading branch information
jamshale authored Nov 15, 2023
1 parent 4a547ed commit edfbd8f
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 45 deletions.
3 changes: 3 additions & 0 deletions redis_events/docker/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,8 @@ wallet-key: "test-wallet-key"

no-ledger: true

debug-connections: true
auto-accept-invites: true
auto-respond-messages: true

log-level: info
25 changes: 20 additions & 5 deletions redis_events/integration/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ services:
'/wait && python -m redis_events.v1_0.services.deliverer.deliver "$@"',
"--",
]
extra_hosts:
- "alice:host-gateway"
- "faber:host-gateway"

faber:
image: plugin-image
Expand All @@ -170,19 +173,22 @@ services:
- install_flags=--no-interaction --with integration
ports:
- 3001:3001
- 3000:3000
depends_on:
- redis-cluster
- relay
- deliverer
command: start --arg-file integration.yml -e http://faber:3000 --label faber --log-level debug
command: start --arg-file integration.yml --label faber
environment:
- WAIT_BEFORE_HOSTS=10
- WAIT_HOSTS=redis-node-3:6379, relay:8071, relay:8081
- WAIT_HOSTS=redis-node-3:6379, relay:8071
- WAIT_HOSTS_TIMEOUT=120
- WAIT_SLEEP_INTERVAL=1
- WAIT_HOST_CONNECT_TIMEOUT=60
networks:
- acapy_default
extra_hosts:
- "alice:host-gateway"

alice:
image: plugin-image
Expand All @@ -191,18 +197,23 @@ services:
dockerfile: docker/Dockerfile
args:
- install_flags=--no-interaction --with integration
command: start -it http 0.0.0.0 3000 -ot http -e http://alice:3000 --auto-accept-invites --auto-respond-messages --admin 0.0.0.0 3001 --admin-insecure-mode --label alice --no-ledger --log-level debug
ports:
- 8021:8021
- 8020:8020
command: start -it http 0.0.0.0 8020 -ot http -e http://alice:8020 --auto-accept-invites --auto-respond-messages --admin 0.0.0.0 8021 --admin-insecure-mode --label alice --no-ledger --log-level debug --debug-connections
networks:
- acapy_default
extra_hosts:
- "relay:host-gateway"

tests:
container_name: juggernaut
build:
context: .
dockerfile: Dockerfile.test.runner
environment:
- WAIT_BEFORE_HOSTS=10
- WAIT_HOSTS=faber:3001, alice:3000
- WAIT_BEFORE_HOSTS=15
- WAIT_HOSTS=faber:3001, alice:8020, relay:8071
- WAIT_HOSTS_TIMEOUT=60
- WAIT_SLEEP_INTERVAL=1
- WAIT_HOST_CONNECT_TIMEOUT=30
Expand All @@ -212,6 +223,10 @@ services:
- alice
networks:
- acapy_default
extra_hosts:
- "faber:host-gateway"
- "relay:host-gateway"
- "alice:host-gateway"

networks:
acapy_default:
Expand Down
31 changes: 12 additions & 19 deletions redis_events/integration/tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest

from . import FABER, ALICE, RELAY, Agent, post
from . import FABER, ALICE, RELAY, Agent


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -45,13 +45,6 @@ def relay():
ZVFiakktVmpkN21hcWdTNElGTlEifQ==
"""

PAYLOAD_JSON = {
"protected": "eyJlbmMiOiAieGNoYWNoYTIwcG9seTEzMDVfaWV0ZiIsICJ0eXAiOiAiSldNLzEuMCIsICJhbGciOiAiQXV0aGNyeXB0IiwgInJlY2lwaWVudHMiOiBbeyJlbmNyeXB0ZWRfa2V5IjogIjFjZ3l0Qm13M3ExaGdiVzBJbjNtSG82WWhLT2tpRzVEeThrRjJIWjYxcTJvWWM3bmtuSzlOSUc2SEhlU2NoeWEiLCAiaGVhZGVyIjogeyJraWQiOiAiNERCSjRacDg1MWdqek50Sm1tb0U5N1dxVnJXTjM2eVpSYUdpZjRBR3J4d1EiLCAic2VuZGVyIjogIjNWcHlScUFZTWsyNk5Fc0QzNmNfZ2g0VHk0ZjgwMGxDRGEwM1llRW5mRXBmV2hJLWdzZEctVGRkMWVNaDlZSXo3NHRFSzJsR1VaTXpfNGt1d0JTUko0TF9hd1RKQVVVd2tTVnhrMzRnUVVfNWdrd1RkOEY1TkFlSU5QVSIsICJpdiI6ICJqVVJCQmNiT3g3NkNsVl8xazhRM29rUnJtRGQ1a3BpQiJ9fV19",
"iv": "MWgGuQ4_ZogqURTn",
"ciphertext": "UMLaP9Mwd_p8TumgpqVVAfSIfWsX7kIV-DxFw_TtSCjVu5SlnQbkdMRKwurdb5vgzCKT5ArlUtXAL2mlIIiPjRc5fK8KsMwKGEzi2pKkvlC7Q3QtJY19ZeSJ9X0iT9lNjcD3nJKJ5o9dJ8UXfi5O4dKZ-leW-j8ysLASI8uxFXUShRle7-7nnGfFgFVAF3ZYZj6TWQBkvGRROzO30LsDXpsjSj1f_wNzEgqNjO0DYzdJkIA6mACP",
"tag": "eAeQbjI-Vjd7maqgS4IFNQ"
}


@pytest.fixture(scope="session", autouse=True)
def established_connection(faber, alice):
Expand All @@ -69,20 +62,22 @@ async def test_base_redis_keys_are_set(redis):


@pytest.mark.asyncio
async def test_outbound_queue_removes_messages_from_queue(faber: Agent, established_connection: str, redis):
async def test_outbound_queue_removes_messages_from_queue_and_deliver_sends_them(faber: Agent, established_connection: str, redis):
faber.send_message(established_connection, "Hello Alice")
faber.send_message(established_connection, "Another Alice")
msg_received = False
retry_pop_count = 0
while not msg_received:
msg = await redis.blpop("acapy_outbound", 10)
msg = await redis.blpop("acapy_outbound", 2)
if not msg:
if retry_pop_count > 3:
raise Exception("blpop call failed to retrieve message")
retry_pop_count = retry_pop_count + 1
time.sleep(1)
msg_received = True
assert "Hello Alice" in (msg['content']
for msg in faber.retrieve_basicmessages()['results'])
messages = faber.retrieve_basicmessages()['results']
assert "Hello Alice" in (msg['content'] for msg in messages)
assert "Another Alice" in (msg['content'] for msg in messages)


@pytest.mark.asyncio
Expand All @@ -105,18 +100,16 @@ async def test_deliverer_pulls_messages_from_queue_and_sends_them(
messages = faber.retrieve_basicmessages()['results']
matching_msgs = [
msg for msg in messages if msg['content'] == "test-msg"]
assert matching_msgs.__len__() == 1
assert matching_msgs.__len__() == 2 # 1 for sent, 1 for received
assert await redis.lrange("acapy_outbound", 0, -1) == []


@pytest.mark.asyncio
async def test_relay_sets_redis_keys_for_queue(redis, relay: Agent):
post(relay.url, "/", json=PAYLOAD_JSON)
async def test_relay_has_keys_in_recip_key_uid_map(redis, relay: Agent):
time.sleep(1)
uid = await redis.hget("recip_key_uid_map", "4DBJ4Zp851gjzNtJmmoE97WqVrWN36yZRaGif4AGrxwQ")
assert uid

msg_count = await redis.hget("uid_recip_key_pending_msg_count", uid.decode() + "_4DBJ4Zp851gjzNtJmmoE97WqVrWN36yZRaGif4AGrxwQ")
recip_keys = await redis.hgetall("recip_key_uid_map")
assert recip_keys
msg_count = await redis.hgetall("uid_recip_key_pending_msg_count")
assert msg_count


Expand Down
2 changes: 2 additions & 0 deletions redis_events/redis_events/v1_0/redis_queue/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

async def setup(context: InjectionContext):
"""Setup the plugin."""
LOGGER.info("> plugin setup...")
config = get_config(context.settings).event or EventConfig.default()

bus = context.inject(EventBus)
Expand All @@ -34,6 +35,7 @@ async def setup(context: InjectionContext):

bus.subscribe(STARTUP_EVENT_PATTERN, on_startup)
bus.subscribe(SHUTDOWN_EVENT_PATTERN, on_shutdown)
LOGGER.info("< plugin setup.")


RECORD_RE = re.compile(r"acapy::record::([^:]*)(?:::(.*))?")
Expand Down
7 changes: 3 additions & 4 deletions redis_events/redis_events/v1_0/redis_queue/outbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import logging

from aries_cloudagent.transport.wire_format import BaseWireFormat
from aries_cloudagent.core.profile import Profile
from aries_cloudagent.transport.outbound.base import (
BaseOutboundTransport,
Expand Down Expand Up @@ -36,11 +35,11 @@ class RedisOutboundQueue(BaseOutboundTransport):

def __init__(
self,
wire_format: BaseWireFormat,
root_profile: Profile,
root_profile,
**kwargs,
):
"""Initialize base queue type."""
super().__init__(root_profile, wire_format)
super().__init__(**kwargs)
self.outbound_config = (
get_config(root_profile.context.settings).outbound
or OutboundConfig.default()
Expand Down
30 changes: 13 additions & 17 deletions redis_events/redis_events/v1_0/redis_queue/tests/test_outbound.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import base64
import datetime
import redis
import time
import json
import time

import redis
from aiohttp.test_utils import unused_port
from aries_cloudagent.core.in_memory import InMemoryProfile
from aries_cloudagent.transport.outbound.base import (ConnectionTarget,
OutboundMessage,
QueuedOutboundMessage)
from aries_cloudagent.transport.wire_format import BaseWireFormat
from aries_cloudagent.transport.outbound.base import (
QueuedOutboundMessage,
OutboundMessage,
ConnectionTarget,
)
from aiohttp.test_utils import unused_port
from asynctest import TestCase as AsyncTestCase, mock as async_mock, PropertyMock
from asynctest import PropertyMock
from asynctest import TestCase as AsyncTestCase
from asynctest import mock as async_mock

from .. import config as test_config
from .. import outbound as test_outbound
from .. import utils as test_util
from .. import config as test_config
from ..outbound import RedisOutboundQueue

SETTINGS = {
Expand Down Expand Up @@ -340,9 +340,7 @@ async def test_handle_message_x(self):
async_mock.MagicMock(),
)
wire_format = BaseWireFormat()
redis_outbound_inst = RedisOutboundQueue(
root_profile=self.profile, wire_format=wire_format
)
redis_outbound_inst = RedisOutboundQueue(self.profile)
q_out_msg = QueuedOutboundMessage(
profile=self.profile,
message=OutboundMessage(payload="test-message"),
Expand Down Expand Up @@ -391,9 +389,8 @@ async def test_handle_message(self):
)
),
):
wire_format = BaseWireFormat()
redis_outbound_inst = RedisOutboundQueue(
root_profile=self.profile, wire_format=wire_format
root_profile=self.profile
)
q_out_msg = QueuedOutboundMessage(
profile=self.profile,
Expand Down Expand Up @@ -438,9 +435,8 @@ async def test_handle_message_mediator(self):
)
),
):
wire_format = BaseWireFormat()
redis_outbound_inst = RedisOutboundQueue(
root_profile=self.profile, wire_format=wire_format
root_profile=self.profile
)
q_out_msg = QueuedOutboundMessage(
profile=self.profile,
Expand Down

0 comments on commit edfbd8f

Please sign in to comment.