diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala index 6e67772f7e..e8f3576e9f 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala @@ -11,12 +11,12 @@ import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.comet.AkkaClusterBroadcastFilter - +import scala.xml.NodeSeq import java.lang.Integer import java.nio.ByteBuffer import javax.ws.rs.core.MultivaluedMap import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam} - +import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor import org.atmosphere.annotation.{Broadcast, Suspend,Cluster} import org.atmosphere.util.XSSHtmlFilter import org.atmosphere.cpr.{Broadcaster, BroadcastFilter} @@ -27,16 +27,13 @@ class Boot { SupervisorConfig( RestartStrategy(OneForOne, 3, 100,List(classOf[Exception])), Supervise( - actorOf[SimpleService], + actorOf[SimpleServiceActor], LifeCycle(Permanent)) :: Supervise( - actorOf[Chat], + actorOf[ChatActor], LifeCycle(Permanent)) :: Supervise( - actorOf[PersistentSimpleService], - LifeCycle(Permanent)) :: - Supervise( - actorOf[PubSub], + actorOf[PersistentSimpleServiceActor], LifeCycle(Permanent)) :: Nil)) factory.newInstance.start @@ -50,19 +47,25 @@ class Boot { * Or browse to the URL from a web browser. */ @Path("/scalacount") -class SimpleService extends Transactor { - - case object Tick +class SimpleService { + @GET + @Produces(Array("text/html")) + def count = { + //Fetch the first actor of type SimpleServiceActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption + r <- a.!![NodeSeq]("Tick")} yield r + //Return either the resulting NodeSeq or a default one + result getOrElse Error in counter + } +} +class SimpleServiceActor extends Transactor { private val KEY = "COUNTER" private var hasStartedTicking = false private lazy val storage = TransactionalState.newMap[String, Integer] - @GET - @Produces(Array("text/html")) - def count = (self !! Tick).getOrElse(Error in counter) - def receive = { - case Tick => if (hasStartedTicking) { + case "Tick" => if (hasStartedTicking) { val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue storage.put(KEY, new Integer(counter + 1)) self.reply(Tick:{counter + 1}) @@ -75,9 +78,7 @@ class SimpleService extends Transactor { } @Path("/pubsub/") -class PubSub extends Actor { - case class Msg(topic: String, message: String) - +class PubSub { @GET @Suspend @Produces(Array("text/plain;charset=ISO-8859-1")) @@ -90,8 +91,6 @@ class PubSub extends Actor { @Produces(Array("text/plain;charset=ISO-8859-1")) //FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "foo") def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic) - - def receive = { case _ => } } /** @@ -102,19 +101,26 @@ class PubSub extends Actor { * Or browse to the URL from a web browser. */ @Path("/persistentscalacount") -class PersistentSimpleService extends Transactor { +class PersistentSimpleService { + @GET + @Produces(Array("text/html")) + def count = { + //Fetch the first actor of type PersistentSimpleServiceActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption + r <- a.!![NodeSeq]("Tick")} yield r + //Return either the resulting NodeSeq or a default one + result getOrElse Error in counter + } +} - case object Tick +class PersistentSimpleServiceActor extends Transactor { private val KEY = "COUNTER" private var hasStartedTicking = false private lazy val storage = CassandraStorage.newMap - @GET - @Produces(Array("text/html")) - def count = (self !! Tick).getOrElse(Error in counter) - def receive = { - case Tick => if (hasStartedTicking) { + case "Tick" => if (hasStartedTicking) { val bytes = storage.get(KEY.getBytes).get val counter = ByteBuffer.wrap(bytes).getInt storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array) @@ -128,16 +134,37 @@ class PersistentSimpleService extends Transactor { } @Path("/chat") -class Chat extends Actor with Logging { - case class Chat(val who: String, val what: String, val msg: String) - +class Chat { + import ChatActor.ChatMsg @Suspend @GET @Produces(Array("text/html")) def suspend = () + @POST + @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter])) + //FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "bar") + @Consumes(Array("application/x-www-form-urlencoded")) + @Produces(Array("text/html")) + def publishMessage(form: MultivaluedMap[String, String]) = { + val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message")) + //Fetch the first actor of type ChatActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[ChatActor]).headOption + r <- a.!![String](msg)} yield r + //Return either the resulting String or a default one + result getOrElse "System__error" + } +} + +object ChatActor { + case class ChatMsg(val who: String, val what: String, val msg: String) +} + +class ChatActor extends Actor with Logging { + import ChatActor.ChatMsg def receive = { - case Chat(who, what, msg) => { + case ChatMsg(who, what, msg) => { what match { case "login" => self.reply("System Message__" + who + " has joined.") case "post" => self.reply("" + who + "__" + msg) @@ -146,16 +173,6 @@ class Chat extends Actor with Logging { } case x => log.info("recieve unknown: " + x) } - - @POST - @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter])) - //FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "bar") - @Consumes(Array("application/x-www-form-urlencoded")) - @Produces(Array("text/html")) - def publishMessage(form: MultivaluedMap[String, String]) = - (self !! Chat(form.getFirst("name"), - form.getFirst("action"), - form.getFirst("message"))).getOrElse("System__error") }