/** * Copyright (C) 2009 Scalable Solutions. */ package sample.scala import se.scalablesolutions.akka.state.{PersistentState, TransactionalState, CassandraStorageConfig} import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import java.lang.Integer import javax.ws.rs.core.MultivaluedMap import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes} import org.atmosphere.core.annotation.{Broadcast, Suspend} import org.atmosphere.util.XSSHtmlFilter import org.atmosphere.cpr.BroadcastFilter class Boot { object factory extends SupervisorFactory { override def getSupervisorConfig: SupervisorConfig = { SupervisorConfig( RestartStrategy(OneForOne, 3, 100), Supervise( new SimpleService, LifeCycle(Permanent, 100)) :: Supervise( new Chat, LifeCycle(Permanent, 100)) :: Supervise( new PersistentSimpleService, LifeCycle(Permanent, 100)) :: Nil) } } val supervisor = factory.newSupervisor supervisor.startSupervisor } /** * Try service out by invoking (multiple times): *
 * curl http://localhost:9998/scalacount
 * 
* Or browse to the URL from a web browser. */ @Path("/scalacount") class SimpleService extends Actor { makeTransactionRequired case object Tick private val KEY = "COUNTER" private var hasStartedTicking = false private val storage = TransactionalState.newMap[String, Integer] @GET @Produces(Array("text/html")) def count = (this !! Tick).getOrElse(Error in counter) override def receive: PartialFunction[Any, Unit] = { case Tick => if (hasStartedTicking) { val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue storage.put(KEY, new Integer(counter + 1)) reply(Tick:{counter + 1}) } else { storage.put(KEY, new Integer(0)) hasStartedTicking = true reply(Tick: 0) } } } /** * Try service out by invoking (multiple times): *
 * curl http://localhost:9998/persistentscalacount
 * 
* Or browse to the URL from a web browser. */ @Path("/persistentscalacount") class PersistentSimpleService extends Actor { makeTransactionRequired case object Tick private val KEY = "COUNTER" private var hasStartedTicking = false private val storage = PersistentState.newMap(CassandraStorageConfig()) @GET @Produces(Array("text/html")) def count = (this !! Tick).getOrElse(Error in counter) override def receive: PartialFunction[Any, Unit] = { case Tick => if (hasStartedTicking) { val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue storage.put(KEY, new Integer(counter + 1)) reply(Tick:{counter + 1}) } else { storage.put(KEY, new Integer(0)) hasStartedTicking = true reply(Tick: 0) } } } @Path("/chat") class Chat extends Actor with Logging { makeTransactionRequired case class Chat(val who: String, val what: String, val msg: String) @Suspend @GET @Produces(Array("text/html")) def suspend = { val s = new StringBuilder s append "" s toString } override def receive: PartialFunction[Any, Unit] = { case Chat(who, what, msg) => { what match { case "login" => reply("System Message__" + who + " has joined.") case "post" => reply("" + who + "__" + msg) case _ => throw new WebApplicationException(422) } } case x => log.info("recieve unknown: " + x) } @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter])) @Consumes(Array("application/x-www-form-urlencoded")) @POST @Produces(Array("text/html")) def publishMessage(form: MultivaluedMap[String, String]) = (this !! Chat(form.getFirst("name"), form.getFirst("action"), form.getFirst("message"))).getOrElse("System__error") } class JsonpFilter extends BroadcastFilter[String] with Logging { def filter(an: AnyRef) = { val m = an.toString var name = m var message = "" if (m.indexOf("__") > 0) { name = m.substring(0, m.indexOf("__")) message = m.substring(m.indexOf("__") + 2) } ("\n") } }