Skip to content

Commit

Permalink
Merge pull request #78 from rsocket/tutorial_final_app
Browse files Browse the repository at this point in the history
Tutorial application and bug fixes
  • Loading branch information
jell-o-fishi authored Nov 13, 2022
2 parents a80e094 + a3cdf0d commit 2591db6
Show file tree
Hide file tree
Showing 58 changed files with 2,283 additions and 67 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@ Changelog

v0.4.4
======
- Fragmentation fix - empty payload (either in request or response) with fragmentation enabled failed to send.
- Fragmentation fix - empty payload (either in request or response) with fragmentation enabled failed to send
- Breaking change: *on_connection_lost* was renamed to *on_close*. An *on_connection_error* method was added to handle initial connection errors
- Routing request handler:
- Throws an RSocketUnknownRoute exception which results in an error frame on the requester side
- Added error logging for response/stream/channel requests
- Added *create_response* helper method as shorthand for creating a future with a Payload
- Added *utf8_decode* helper. Decodes bytes to utf-8. If data is None, returns None.
- Refactoring client reconnect flow
- Added example code for tutorial on rsocket.io

v0.4.3
======
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ or install any of the extras:
Example:

```shell
pip install --pre rsocket[reactivex]
pip install rsocket[reactivex]
```

Alternatively, download the source code, build a package:
Expand Down
3 changes: 2 additions & 1 deletion examples/client_reconnect.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import sys
from typing import Optional

from rsocket.extensions.helpers import route, composite, authenticate_simple
from rsocket.extensions.mimetypes import WellKnownMimeTypes
Expand All @@ -21,7 +22,7 @@ async def request_response(client: RSocketClient) -> Payload:

class Handler(BaseRequestHandler):

async def on_connection_lost(self, rsocket: RSocketClient, exception: Exception):
async def on_close(self, rsocket, exception: Optional[Exception] = None):
await asyncio.sleep(5)
await rsocket.reconnect()

Expand Down
Empty file added examples/tutorial/__init__.py
Empty file.
Empty file.
192 changes: 192 additions & 0 deletions examples/tutorial/reactivex/chat_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import asyncio
import json
import logging
import resource
from asyncio import Event, Task
from typing import List, Optional

from reactivex import operators

from examples.tutorial.step5.models import Message, chat_filename_mimetype, ServerStatistics, ClientStatistics
from reactivestreams.publisher import DefaultPublisher
from reactivestreams.subscriber import DefaultSubscriber
from rsocket.extensions.helpers import composite, route, metadata_item
from rsocket.extensions.mimetypes import WellKnownMimeTypes
from rsocket.frame_helpers import ensure_bytes
from rsocket.helpers import single_transport_provider, utf8_decode
from rsocket.payload import Payload
from rsocket.reactivex.reactivex_client import ReactiveXClient
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP


def encode_dataclass(obj):
return ensure_bytes(json.dumps(obj.__dict__))


class ChatClient:
def __init__(self, rsocket: RSocketClient):
self._rsocket = rsocket
self._listen_task: Optional[Task] = None
self._statistics_task: Optional[Task] = None
self._session_id: Optional[str] = None

async def login(self, username: str):
payload = Payload(ensure_bytes(username), composite(route('login')))
self._session_id = (await self._rsocket.request_response(payload)).data
return self

async def join(self, channel_name: str):
request = Payload(ensure_bytes(channel_name), composite(route('channel.join')))
await self._rsocket.request_response(request)
return self

async def leave(self, channel_name: str):
request = Payload(ensure_bytes(channel_name), composite(route('channel.leave')))
await self._rsocket.request_response(request)
return self

def listen_for_messages(self):
def print_message(data: bytes):
message = Message(**json.loads(data))
print(f'{message.user} ({message.channel}): {message.content}')

async def listen_for_messages():
await ReactiveXClient(self._rsocket).request_stream(Payload(metadata=composite(
route('messages.incoming')
))).pipe(
operators.do_action(on_next=lambda value: print_message(value.data),
on_error=lambda exception: print(exception)))

self._listen_task = asyncio.create_task(listen_for_messages())

async def wait_for_messages(self):
messages_done = asyncio.Event()
self._listen_task.add_done_callback(lambda _: messages_done.set())
await messages_done.wait()

def stop_listening_for_messages(self):
self._listen_task.cancel()

async def send_statistics(self):
memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
payload = Payload(encode_dataclass(ClientStatistics(memory_usage=memory_usage)),
metadata=composite(route('statistics')))
await self._rsocket.fire_and_forget(payload)

def listen_for_statistics(self):
class StatisticsHandler(DefaultPublisher, DefaultSubscriber):

def __init__(self):
super().__init__()
self.done = Event()

def on_next(self, value: Payload, is_complete=False):
statistics = ServerStatistics(**json.loads(utf8_decode(value.data)))
print(statistics)

if is_complete:
self.done.set()

async def listen_for_statistics(client: RSocketClient, subscriber):
client.request_channel(Payload(metadata=composite(
route('statistics')
))).subscribe(subscriber)

await subscriber.done.wait()

statistics_handler = StatisticsHandler()
self._statistics_task = asyncio.create_task(
listen_for_statistics(self._rsocket, statistics_handler))

async def private_message(self, username: str, content: str):
print(f'Sending {content} to user {username}')
await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)),
composite(route('message'))))

async def channel_message(self, channel: str, content: str):
print(f'Sending {content} to channel {channel}')
await self._rsocket.request_response(Payload(encode_dataclass(Message(channel=channel, content=content)),
composite(route('message'))))

async def upload(self, file_name, content):
await self._rsocket.request_response(Payload(content, composite(
route('file.upload'),
metadata_item(ensure_bytes(file_name), chat_filename_mimetype)
)))

async def download(self, file_name):
return await self._rsocket.request_response(Payload(
metadata=composite(route('file.download'), metadata_item(ensure_bytes(file_name), chat_filename_mimetype))))

async def list_files(self) -> List[str]:
request = Payload(metadata=composite(route('files')))
return await ReactiveXClient(self._rsocket).request_stream(
request
).pipe(operators.map(lambda x: utf8_decode(x.data)),
operators.to_list())

async def list_channels(self) -> List[str]:
request = Payload(metadata=composite(route('channels')))
return await ReactiveXClient(self._rsocket).request_stream(
request
).pipe(operators.map(lambda _: utf8_decode(_.data)),
operators.to_list())


async def main():
connection1 = await asyncio.open_connection('localhost', 6565)

async with RSocketClient(single_transport_provider(TransportTCP(*connection1)),
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA,
fragment_size_bytes=1_000_000) as client1:
connection2 = await asyncio.open_connection('localhost', 6565)

async with RSocketClient(single_transport_provider(TransportTCP(*connection2)),
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA,
fragment_size_bytes=1_000_000) as client2:

user1 = ChatClient(client1)
user2 = ChatClient(client2)

await user1.login('user1')
await user2.login('user2')

user1.listen_for_messages()
user2.listen_for_messages()

await user1.join('channel1')
await user2.join('channel1')

await user1.send_statistics()
user1.listen_for_statistics()

print(f'Files: {await user1.list_files()}')
print(f'Channels: {await user1.list_channels()}')

await user1.private_message('user2', 'private message from user1')
await user1.channel_message('channel1', 'channel message from user1')

file_contents = b'abcdefg1234567'
file_name = 'file_name_1.txt'
await user1.upload(file_name, file_contents)

download = await user2.download(file_name)

if download.data != file_contents:
raise Exception('File download failed')
else:
print(f'Downloaded file: {len(download.data)} bytes')

try:
await asyncio.wait_for(user2.wait_for_messages(), 3)
except asyncio.TimeoutError:
pass

user1.stop_listening_for_messages()
user2.stop_listening_for_messages()


if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
Loading

0 comments on commit 2591db6

Please sign in to comment.