diff --git a/docs/examples/advanced/websocket-server.md b/docs/examples/advanced/websocket-server.md deleted file mode 100644 index 2534d5e9d..000000000 --- a/docs/examples/advanced/websocket-server.md +++ /dev/null @@ -1,61 +0,0 @@ ---- -id: websocket-server -title: "WebSocket Server Example" -sidebar_label: "WebSocket Server" ---- - -```scala mdoc:silent -import zio._ - -import zio.http.ChannelEvent.{ExceptionCaught, Read, UserEvent, UserEventTriggered} -import zio.http._ -import zio.http.codec.PathCodec.string - -object WebSocketAdvanced extends ZIOAppDefault { - - val socketApp: WebSocketApp[Any] = - Handler.webSocket { channel => - channel.receiveAll { - case Read(WebSocketFrame.Text("end")) => - channel.shutdown - - // Send a "bar" if the server sends a "foo" - case Read(WebSocketFrame.Text("foo")) => - channel.send(Read(WebSocketFrame.text("bar"))) - - // Send a "foo" if the server sends a "bar" - case Read(WebSocketFrame.Text("bar")) => - channel.send(Read(WebSocketFrame.text("foo"))) - - // Echo the same message 10 times if it's not "foo" or "bar" - case Read(WebSocketFrame.Text(text)) => - channel.send(Read(WebSocketFrame.text(text))).repeatN(10) - - // Send a "greeting" message to the server once the connection is established - case UserEventTriggered(UserEvent.HandshakeComplete) => - channel.send(Read(WebSocketFrame.text("Greetings!"))) - - // Log when the channel is getting closed - case Read(WebSocketFrame.Close(status, reason)) => - Console.printLine("Closing channel with status: " + status + " and reason: " + reason) - - // Print the exception if it's not a normal close - case ExceptionCaught(cause) => - Console.printLine(s"Channel error!: ${cause.getMessage}") - - case _ => - ZIO.unit - } - } - - val app: HttpApp[Any] = - Routes( - Method.GET / "greet" / string("name") -> handler { (name: String, req: Request) => - Response.text(s"Greetings ${name}!") - }, - Method.GET / "subscriptions" -> handler(socketApp.toResponse), - ).toHttpApp - - override val run = Server.serve(app).provide(Server.default) -} -``` \ No newline at end of file diff --git a/docs/examples/advanced/websocket.md b/docs/examples/advanced/websocket.md new file mode 100644 index 000000000..174f330b8 --- /dev/null +++ b/docs/examples/advanced/websocket.md @@ -0,0 +1,138 @@ +--- +id: websocket +title: "WebSocket Example" +sidebar_label: "WebSocket Server & Client" +--- + +This example shows how to create a WebSocket server using ZIO Http and how to write a client to connect to it. + +## Server + +First we define a `WebSocketApp` that will handle the WebSocket connection. +The `Handler.webSocket` constructor gives access to the `WebSocketChannel`. The channel can be used to receive messages from the client and send messages back. +We use the `receiveAll` method, to pattern match on the different channel events that could occur. +The most important events are `Read` and `UserEventTriggered`. The `Read` event is triggered when the client sends a message to the server. The `UserEventTriggered` event is triggered when the connection is established. +We can identify the successful connection of a client by receiving a `UserEventTriggered(UserEvent.HandshakeComplete)` event. And if the client sends us a text message, we will receive a `Read(WebSocketFrame.Text())` event. + +Our WebSocketApp will handle the following events send by the client: +* If the client connects to the server, we will send a "Greetings!" message to the client. +* If the client sends "foo", we will send a "bar" message back to the client. +* If the client sends "bar", we will send a "foo" message back to the client. +* If the client sends "end", we will close the connection. +* If the client sends any other message, we will send the same message back to the client 10 times. + +For the client to establish a connection with the server, we offer the `/subscriptions` endpoint. + +```scala mdoc:silent + +import zio._ + +import zio.http.ChannelEvent.{ExceptionCaught, Read, UserEvent, UserEventTriggered} +import zio.http._ +import zio.http.codec.PathCodec.string + +object WebSocketAdvanced extends ZIOAppDefault { + + val socketApp: WebSocketApp[Any] = + Handler.webSocket { channel => + channel.receiveAll { + case Read(WebSocketFrame.Text("end")) => + channel.shutdown + + // Send a "bar" if the client sends a "foo" + case Read(WebSocketFrame.Text("foo")) => + channel.send(Read(WebSocketFrame.text("bar"))) + + // Send a "foo" if the client sends a "bar" + case Read(WebSocketFrame.Text("bar")) => + channel.send(Read(WebSocketFrame.text("foo"))) + + // Echo the same message 10 times if it's not "foo" or "bar" + case Read(WebSocketFrame.Text(text)) => + channel.send(Read(WebSocketFrame.text(text))).repeatN(10) + + // Send a "greeting" message to the client once the connection is established + case UserEventTriggered(UserEvent.HandshakeComplete) => + channel.send(Read(WebSocketFrame.text("Greetings!"))) + + // Log when the channel is getting closed + case Read(WebSocketFrame.Close(status, reason)) => + Console.printLine("Closing channel with status: " + status + " and reason: " + reason) + + // Print the exception if it's not a normal close + case ExceptionCaught(cause) => + Console.printLine(s"Channel error!: ${cause.getMessage}") + + case _ => + ZIO.unit + } + } + + val app: HttpApp[Any] = + Routes(Method.GET / "subscriptions" -> handler(socketApp.toResponse)).toHttpApp + + override val run = Server.serve(app).provide(Server.default) +} +``` + +A few things worth noting: + * `Server.default` starts a server on port 8080. + * `socketApp.toResponse` converts the `WebSocketApp` to a `Response`, so we can serve it with `handler`. + + +## Client + +The client will connect to the server and send a message to the server every time the user enters a message in the console. +For this we will use the `Console.readLine` method to read a line from the console. We will then send the message to the server using the `WebSocketChannel.send` method. +But since we don't want to reconnect to the server every time the user enters a message, we will use a `Queue` to store the messages. We will then use the `Queue.take` method to take a message from the queue and send it to the server, whenever a new message is available. +Adding a new message to the queue, as well as sending the messages to the server, should happen in a loop in the background. For this we will use the operators `forever` (looping) and `forkDaemon` (fork to a background fiber). + +Again we will use the `Handler.webSocket` constructor to define how to handle messages and create a `WebSocketApp`. But this time, instead of serving the `WebSocketApp` we will use the `connect` method to establish a connection to the server. +All we need for that, is the URL of the server. In our case it's `"ws://localhost:8080/subscriptions"`. + +```scala mdoc:silent +import zio._ + +import zio.http._ + +object WebSocketAdvancedClient extends ZIOAppDefault { + + def sendChatMessage(message: String): ZIO[Queue[String], Throwable, Unit] = + ZIO.serviceWithZIO[Queue[String]](_.offer(message).unit) + + def processQueue(channel: WebSocketChannel): ZIO[Queue[String], Throwable, Unit] = { + for { + queue <- ZIO.service[Queue[String]] + msg <- queue.take + _ <- channel.send(Read(WebSocketFrame.Text(msg))) + } yield () + }.forever.forkDaemon.unit + + private def webSocketHandler: ZIO[Queue[String] with Client with Scope, Throwable, Response] = + Handler.webSocket { channel => + for { + _ <- processQueue(channel) + _ <- channel.receiveAll { + case Read(WebSocketFrame.Text(text)) => + Console.printLine(s"Server: $text") + case _ => + ZIO.unit + } + } yield () + }.connect("ws://localhost:8080/subscriptions") + + override val run = + (for { + _ <- webSocketHandler + _ <- Console.readLine.flatMap(sendChatMessage).forever.forkDaemon + _ <- ZIO.never + } yield ()) + .provideSome[Scope]( + Client.default, + ZLayer(Queue.bounded[String](100)), + ) + +} +``` +While we access here `Queue[String]` via the ZIO environment, you should use a service in a real world application, that requires a queue as one of its constructor dependencies. +See [ZIO Services](https://zio.dev/reference/service-pattern/) for more information. diff --git a/docs/sidebars.js b/docs/sidebars.js index fb7f029c3..4215e9e00 100644 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -63,7 +63,7 @@ const sidebars = { "examples/advanced/server", "examples/advanced/streaming-file", "examples/advanced/streaming-response", - "examples/advanced/websocket-server" + "examples/advanced/websocket" ] } ] diff --git a/zio-http-example/src/main/scala/example/WebSocketAdvanced.scala b/zio-http-example/src/main/scala/example/WebSocketAdvanced.scala index 382fcdbc7..77f2ab118 100644 --- a/zio-http-example/src/main/scala/example/WebSocketAdvanced.scala +++ b/zio-http-example/src/main/scala/example/WebSocketAdvanced.scala @@ -14,11 +14,11 @@ object WebSocketAdvanced extends ZIOAppDefault { case Read(WebSocketFrame.Text("end")) => channel.shutdown - // Send a "bar" if the server sends a "foo" + // Send a "bar" if the client sends a "foo" case Read(WebSocketFrame.Text("foo")) => channel.send(Read(WebSocketFrame.text("bar"))) - // Send a "foo" if the server sends a "bar" + // Send a "foo" if the client sends a "bar" case Read(WebSocketFrame.Text("bar")) => channel.send(Read(WebSocketFrame.text("foo"))) @@ -31,7 +31,7 @@ object WebSocketAdvanced extends ZIOAppDefault { ZIO.logErrorCause(s"failed sending", cause) } - // Send a "greeting" message to the server once the connection is established + // Send a "greeting" message to the client once the connection is established case UserEventTriggered(UserEvent.HandshakeComplete) => channel.send(Read(WebSocketFrame.text("Greetings!"))) @@ -58,3 +58,42 @@ object WebSocketAdvanced extends ZIOAppDefault { override val run = Server.serve(app).provide(Server.default) } + +object WebSocketAdvancedClient extends ZIOAppDefault { + + def sendChatMessage(message: String): ZIO[Queue[String], Throwable, Unit] = + ZIO.serviceWithZIO[Queue[String]](_.offer(message).unit) + + def processQueue(channel: WebSocketChannel): ZIO[Queue[String], Throwable, Unit] = { + for { + queue <- ZIO.service[Queue[String]] + msg <- queue.take + _ <- channel.send(Read(WebSocketFrame.Text(msg))) + } yield () + }.forever.forkDaemon.unit + + private def webSocketHandler: ZIO[Queue[String] with Client with Scope, Throwable, Response] = + Handler.webSocket { channel => + for { + _ <- processQueue(channel) + _ <- channel.receiveAll { + case Read(WebSocketFrame.Text(text)) => + Console.printLine(s"Server: $text") + case _ => + ZIO.unit + } + } yield () + }.connect("ws://localhost:8080/subscriptions") + + override val run = + (for { + _ <- webSocketHandler + _ <- Console.readLine.flatMap(sendChatMessage).forever.forkDaemon + _ <- ZIO.never + } yield ()) + .provideSome[Scope]( + Client.default, + ZLayer(Queue.bounded[String](100)), + ) + +}