Experimenting with Comet cluster support

This commit is contained in:
Viktor Klang 2009-12-23 21:37:39 +01:00
parent 194fc86062
commit 429ce066c4
4 changed files with 50 additions and 3 deletions

View file

@ -0,0 +1,37 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.comet
import se.scalablesolutions.akka.util.{Logging}
import se.scalablesolutions.akka.actor.{Actor}
import se.scalablesolutions.akka.remote.{Cluster}
import org.atmosphere.cpr.{ClusterBroadcastFilter,Broadcaster}
import scala.reflect.{BeanProperty}
sealed trait AkkaClusterBroadcastMessage
case class BroadcastMessage(val name : String, val msg : AnyRef) extends AkkaClusterBroadcastMessage
class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRef] with Logging {
@BeanProperty var clusterName = ""
@BeanProperty var broadcaster : Broadcaster = null
override def init = ()
def destroy = ()
def filter(o : AnyRef) : AnyRef = {
Cluster.relayMessage(classOf[AkkaClusterBroadcastFilter],BroadcastMessage(clusterName,o))
log.info("filter invoked for message [%s], message was forwarded to cluster",o)
o
}
def receive = {
case BroadcastMessage(clusterName,m) if broadcaster ne null => {
log.info("Receiving remote message, broadcasting it to listeners: [%s]",m)
broadcaster broadcast m
}
case x => log.info("Not a valid message for cluster[%s] = [%s]",clusterName,x)
}
}

View file

@ -32,6 +32,8 @@ trait BootableCometActorService extends Bootable with Logging {
val scheme = uri.getScheme
if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException(
"The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'")
log.info("Attempting to start REST service on uri [%s]",uri)
val adapter = new ServletAdapter
adapter.setHandleStaticResources(true)

View file

@ -54,6 +54,11 @@
</dependency>
<!-- For Atmosphere -->
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-annotations</artifactId>

View file

@ -8,13 +8,14 @@ import se.scalablesolutions.akka.actor.{Transactor, SupervisorFactory, Actor}
import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.comet.{AkkaClusterBroadcastFilter}
import java.lang.Integer
import java.nio.ByteBuffer
import javax.ws.rs.core.MultivaluedMap
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam}
import org.atmosphere.annotation.{Broadcast, Suspend}
import org.atmosphere.annotation.{Broadcast, Suspend,Cluster}
import org.atmosphere.util.XSSHtmlFilter
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
import org.atmosphere.jersey.Broadcastable
@ -85,6 +86,7 @@ class PubSub extends Actor {
@Broadcast
@Path("/topic/{topic}/{message}/")
@Produces(Array("text/plain;charset=ISO-8859-1"))
@Cluster(Array(classOf[AkkaClusterBroadcastFilter])) { val name = "foo" }
def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
def receive = { case _ => }
@ -150,9 +152,10 @@ class Chat extends Transactor {
case x => log.info("recieve unknown: " + x)
}
@Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
@Consumes(Array("application/x-www-form-urlencoded"))
@POST
@Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
@Cluster(Array(classOf[AkkaClusterBroadcastFilter])) { val name = "bar" }
@Consumes(Array("application/x-www-form-urlencoded"))
@Produces(Array("text/html"))
def publishMessage(form: MultivaluedMap[String, String]) =
(this !! Chat(form.getFirst("name"),