Tag Archives: scala

WebSockets Echo Using Play, Scala, And Actors, Part II

The first part of this tutorial went into detail on Play’s support for WebSockets based on iteratees. You don’t need to use actors to handle WebSockets, but in a lot of real-world situations you’ll want to use actors because you’ll want to keep some longer-lived state associated with the connection.

Let’s start with a “naive” implementation of the code. This is a direct translation of our initial simpleIterateeWebSocket to use an actor:

  def naiveActorWebSocket = WebSocket.using[String] {
    val actor = Akka.system.actorOf(Props[NaiveEchoActor])
    val out = Enumerator.imperative[String]()
    actor ! NaiveStart(out)
    val in = Iteratee.foreach[String] {
      msg =>
        actor ! Message(msg)
    }
    (in, out)
  }
...
// Actor messages
case class NaiveStart(out: PushEnumerator[String])
case class Message(msg: String)

class NaiveEchoActor extends Actor {
  var out: PushEnumerator[String] = _
  
  override def receive = {
    case NaiveStart(out) => this.out = out
    case Message(msg) => this.out.push(msg)
  }
}

(You can test the examples in this tutorial by cloning the github repo. Use the echo test at websocket.org and point it to ws://localhost:9000/wsNaiveActor. Each example in this tutorial has its own URL that you can see in the routes file.)

In this implementation, we pass the out enumerator to the actor using the NaiveStart message. The NaiveStart message acts like an initializer for the actor. It takes the out enumerator and stores it in a member variable. The in iteratee now simply passes each message to the actor. The actor then handles that message by doing what the old iteratee did, pushing the message back to the client via the enumerator. Notice we haven’t used WebSocket.async yet: While the actor tell (!) invocation is asynchronous, the overall method is synchronous.

This code more or less works but it has one important bug: There’s no guarantee that the out member on NaiveEchoActor will actually be set before the first invocation of actor ! Message(msg). If it’s not set, this would result in an NPE. If you look at the WebSocket chat example that comes with Play, you’ll notice they take a slightly more sophisticated approach using the ask method on actor (an implicit requiring import akka.pattern.ask). We can fix our code by doing the same thing:

  import akka.pattern.ask
  import akka.util.duration._
  
  implicit val timeout = akka.util.Timeout(1 second)
  
  def actorWebSocket = WebSocket.async[String] {
    val actor = Akka.system.actorOf(Props[EchoActor])
    (actor ? Start()).asPromise map {
      case Connected(out) =>
        val in = Iteratee.foreach[String] {
          event => actor ! Message(event)
        }
        (in, out)
    }
  }
...
case class Start()
case class Connected(out: PushEnumerator[String])
class EchoActor extends Actor {
  var out: PushEnumerator[String] = _
  override def receive = {
    case Start() =>
      this.out = Enumerator.imperative[String]()
      sender ! Connected(out)
    case Message(msg) => this.out.push(msg)
  }
}

The ? operator is a synonym for ask. ask sends a reply back to the caller as a Future, which we immediately convert to a Promise. (This conversion will become unnecessary with Scala 2.10 and Play 2.1). Now we can wait for the Connected reply message before creating our iteratee. We also make the Start message behave more like a real constructor and let it create the enumerator for itself. The out enumerator then gets passed back in the Connected reply. The Promise.map method invoked on (echoActor ? Start()) is like an “onReply” event handler. This approach guarantees that the enumerator is set on the actor before the iteratee is created. Since ask is asynchronous it is wrapped in a Future/Promise, as is the result of map. Hence, we can use the WebSocket.async call and everything works as before except now we have actors in play!

WebSockets Echo Using Play, Scala, and Actors, Part I

There’s a perfectly good example of a simple chat room server using the Play Framework, WebSockets, and Actors.  But other than the example code and one page in the tutorial, using WebSockets with Actors isn’t terribly well-documented.  So I thought I’d walk through an even simpler example that makes use of WebSockets and Actors, and explain what’s going on in more detail.

This echo server accepts a WebSocket connection then runs indefinitely until the client closes the connection. Each WebSocket message received is echoed back to the client:

  def simpleIterateeWebSocket = WebSocket.using[String] {
    val out = Enumerator.imperative[String]()
    val in = Iteratee.foreach[String] {
      msg =>
        out.push(msg)
    }
    (in, out)
  }

You can try out all the code in this tutorial by cloning the github repo. Use the echo test at websocket.org and point it to ws://localhost:9000/wsSimple. Each example in this tutorial has its own URL that you can see in the routes file.

Iteratees

What’s going on here? First off, it’s important to understand Play iteratees. I won’t go into iteratees in great detail here other than to say that they’re a deeply interesting functional programming technique, and an incredibly useful one as well. There are already a few useful guides to them that I encourage you to read.

WebSocket.using takes a block returning an (in, out) pair. in as an Iteratee that will handle incoming messages — the framework will deliver WebSocket messages from the client to this iteratee. And out is an Enumerator that acts as an outbound channel. out can be used to send messages back to the client.

The easiest way to understand Iteratee.foreach is that it’s like an “onMessage” event handler. Iteratees are more powerful and general than event handlers. Iteratee.foreach creates a specialized iteratee that invokes the block on each message received. In this sense, in isn’t really the input stream so much as the handler on that input stream.

The out val is the Enumerator that will be used to send messages back to the client. An ordinary enumerator would be crafted by chaining messages together using various basic enumerators and combinators. After the enumerator is returned by simpleIterateeWebSocket, the framework connects the enumerator to the client and the messages on it are consumed. The framework uses its own iteratee, in fact, to consume the messages and write them to the client.

Enumerator.imperative() creates a specialized enumerator called a PushEnumerator. This enumerator allows you to write messages through the enumerator by calling PushEnumerator.push(msg). Writing the message “wakes up” the iteratee on the other side of the enumerator and invokes its message handler. As I said, this iteratee is given by the framework and writes each message out to the client. This iteratee remains alive until you send a special close message. In our example, the server will never close the connection.

Note that you can’t invoke out.push("hello world") immediately after instantiating the enumerator. That’s because the enumerator hasn’t been connected to the client yet. push must be invoked after simpleIterateeWebSocket has returned and the framework has connected the enumerator to its own iteratee. In our example, push isn’t invoked until a message is received from the client.

Asynchrony Teaser

I’ve gone into a lot of detail and we haven’t even talked about Actors yet! Before I conclude Part I, let me show you an asynchronous version of this code (that still doesn’t use actors :-):

  import play.api.libs.concurrent.Akka
  import play.api.Play.current // needed by Akka.future

  def simpleAsyncWebSocket = WebSocket.async[String] {
    Akka.future {
      val out = Enumerator.imperative[String]()
      val in = Iteratee.foreach[String] {
        msg =>
          out.push(msg)
      }
      (in, out)
    }
  }

The code turns out to be identical to the ordinary version except we use WebSocket.async instead of WebSocket.using. And we wrap the block in an Akka.future. Akka.future simply runs the block in a separate thread. It returns a Promise[T] where T is the return type of the block. WebSocket.async indicates a WebSocket handler just like WebSocket.using except that it takes a block with a result of type Promise[(Iteratee, Enumerator)] instead of simply (Iteratee, Enumerator). This code acts the same as the first code, except that the function runs asynchronously and returns immediately. This is trivial for now, but it sets us up for Part II when we introduce actors into the mix.