Ported samples rest scala to the new akka-http
This commit is contained in:
parent
64f4b36b74
commit
2a060c9fa3
1 changed files with 59 additions and 42 deletions
|
|
@ -11,12 +11,12 @@ import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage
|
|||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.comet.AkkaClusterBroadcastFilter
|
||||
|
||||
import scala.xml.NodeSeq
|
||||
import java.lang.Integer
|
||||
import java.nio.ByteBuffer
|
||||
import javax.ws.rs.core.MultivaluedMap
|
||||
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam}
|
||||
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor
|
||||
import org.atmosphere.annotation.{Broadcast, Suspend,Cluster}
|
||||
import org.atmosphere.util.XSSHtmlFilter
|
||||
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
|
||||
|
|
@ -27,16 +27,13 @@ class Boot {
|
|||
SupervisorConfig(
|
||||
RestartStrategy(OneForOne, 3, 100,List(classOf[Exception])),
|
||||
Supervise(
|
||||
actorOf[SimpleService],
|
||||
actorOf[SimpleServiceActor],
|
||||
LifeCycle(Permanent)) ::
|
||||
Supervise(
|
||||
actorOf[Chat],
|
||||
actorOf[ChatActor],
|
||||
LifeCycle(Permanent)) ::
|
||||
Supervise(
|
||||
actorOf[PersistentSimpleService],
|
||||
LifeCycle(Permanent)) ::
|
||||
Supervise(
|
||||
actorOf[PubSub],
|
||||
actorOf[PersistentSimpleServiceActor],
|
||||
LifeCycle(Permanent))
|
||||
:: Nil))
|
||||
factory.newInstance.start
|
||||
|
|
@ -50,19 +47,25 @@ class Boot {
|
|||
* Or browse to the URL from a web browser.
|
||||
*/
|
||||
@Path("/scalacount")
|
||||
class SimpleService extends Transactor {
|
||||
|
||||
case object Tick
|
||||
class SimpleService {
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
def count = {
|
||||
//Fetch the first actor of type SimpleServiceActor
|
||||
//Send it the "Tick" message and expect a NdeSeq back
|
||||
val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption
|
||||
r <- a.!} yield r
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
result getOrElse <error>Error in counter</error>
|
||||
}
|
||||
}
|
||||
class SimpleServiceActor extends Transactor {
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private lazy val storage = TransactionalState.newMap[String, Integer]
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
def count = (self !! Tick).getOrElse(<error>Error in counter</error>)
|
||||
|
||||
def receive = {
|
||||
case Tick => if (hasStartedTicking) {
|
||||
case "Tick" => if (hasStartedTicking) {
|
||||
val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue
|
||||
storage.put(KEY, new Integer(counter + 1))
|
||||
self.reply(<success>Tick:{counter + 1}</success>)
|
||||
|
|
@ -75,9 +78,7 @@ class SimpleService extends Transactor {
|
|||
}
|
||||
|
||||
@Path("/pubsub/")
|
||||
class PubSub extends Actor {
|
||||
case class Msg(topic: String, message: String)
|
||||
|
||||
class PubSub {
|
||||
@GET
|
||||
@Suspend
|
||||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
||||
|
|
@ -90,8 +91,6 @@ class PubSub extends Actor {
|
|||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
||||
//FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "foo")
|
||||
def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
|
||||
|
||||
def receive = { case _ => }
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -102,19 +101,26 @@ class PubSub extends Actor {
|
|||
* Or browse to the URL from a web browser.
|
||||
*/
|
||||
@Path("/persistentscalacount")
|
||||
class PersistentSimpleService extends Transactor {
|
||||
class PersistentSimpleService {
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
def count = {
|
||||
//Fetch the first actor of type PersistentSimpleServiceActor
|
||||
//Send it the "Tick" message and expect a NdeSeq back
|
||||
val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption
|
||||
r <- a.!} yield r
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
result getOrElse <error>Error in counter</error>
|
||||
}
|
||||
}
|
||||
|
||||
case object Tick
|
||||
class PersistentSimpleServiceActor extends Transactor {
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private lazy val storage = CassandraStorage.newMap
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
def count = (self !! Tick).getOrElse(<error>Error in counter</error>)
|
||||
|
||||
def receive = {
|
||||
case Tick => if (hasStartedTicking) {
|
||||
case "Tick" => if (hasStartedTicking) {
|
||||
val bytes = storage.get(KEY.getBytes).get
|
||||
val counter = ByteBuffer.wrap(bytes).getInt
|
||||
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
|
||||
|
|
@ -128,16 +134,37 @@ class PersistentSimpleService extends Transactor {
|
|||
}
|
||||
|
||||
@Path("/chat")
|
||||
class Chat extends Actor with Logging {
|
||||
case class Chat(val who: String, val what: String, val msg: String)
|
||||
|
||||
class Chat {
|
||||
import ChatActor.ChatMsg
|
||||
@Suspend
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
def suspend = ()
|
||||
|
||||
@POST
|
||||
@Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
|
||||
//FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "bar")
|
||||
@Consumes(Array("application/x-www-form-urlencoded"))
|
||||
@Produces(Array("text/html"))
|
||||
def publishMessage(form: MultivaluedMap[String, String]) = {
|
||||
val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message"))
|
||||
//Fetch the first actor of type ChatActor
|
||||
//Send it the "Tick" message and expect a NdeSeq back
|
||||
val result = for{a <- actorsFor(classOf[ChatActor]).headOption
|
||||
r <- a.!} yield r
|
||||
//Return either the resulting String or a default one
|
||||
result getOrElse "System__error"
|
||||
}
|
||||
}
|
||||
|
||||
object ChatActor {
|
||||
case class ChatMsg(val who: String, val what: String, val msg: String)
|
||||
}
|
||||
|
||||
class ChatActor extends Actor with Logging {
|
||||
import ChatActor.ChatMsg
|
||||
def receive = {
|
||||
case Chat(who, what, msg) => {
|
||||
case ChatMsg(who, what, msg) => {
|
||||
what match {
|
||||
case "login" => self.reply("System Message__" + who + " has joined.")
|
||||
case "post" => self.reply("" + who + "__" + msg)
|
||||
|
|
@ -146,16 +173,6 @@ class Chat extends Actor with Logging {
|
|||
}
|
||||
case x => log.info("recieve unknown: " + x)
|
||||
}
|
||||
|
||||
@POST
|
||||
@Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
|
||||
//FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "bar")
|
||||
@Consumes(Array("application/x-www-form-urlencoded"))
|
||||
@Produces(Array("text/html"))
|
||||
def publishMessage(form: MultivaluedMap[String, String]) =
|
||||
(self !! Chat(form.getFirst("name"),
|
||||
form.getFirst("action"),
|
||||
form.getFirst("message"))).getOrElse("System__error")
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue