diff --git a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala new file mode 100644 index 0000000000..1c79f09477 --- /dev/null +++ b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.comet + +import se.scalablesolutions.akka.util.{Logging} +import se.scalablesolutions.akka.actor.{Actor} +import se.scalablesolutions.akka.remote.{Cluster} +import org.atmosphere.cpr.{ClusterBroadcastFilter,Broadcaster} +import scala.reflect.{BeanProperty} + +sealed trait AkkaClusterBroadcastMessage +case class BroadcastMessage(val name : String, val msg : AnyRef) extends AkkaClusterBroadcastMessage + +class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRef] with Logging { + @BeanProperty var clusterName = "" + @BeanProperty var broadcaster : Broadcaster = null + + override def init = () + + def destroy = () + + def filter(o : AnyRef) : AnyRef = { + Cluster.relayMessage(classOf[AkkaClusterBroadcastFilter],BroadcastMessage(clusterName,o)) + log.info("filter invoked for message [%s], message was forwarded to cluster",o) + o + } + + def receive = { + case BroadcastMessage(clusterName,m) if broadcaster ne null => { + log.info("Receiving remote message, broadcasting it to listeners: [%s]",m) + broadcaster broadcast m + } + case x => log.info("Not a valid message for cluster[%s] = [%s]",clusterName,x) + } +} \ No newline at end of file diff --git a/akka-comet/src/main/scala/BootableCometActorService.scala b/akka-comet/src/main/scala/BootableCometActorService.scala index 84bb52a14e..90c8db9544 100644 --- a/akka-comet/src/main/scala/BootableCometActorService.scala +++ b/akka-comet/src/main/scala/BootableCometActorService.scala @@ -32,6 +32,8 @@ trait BootableCometActorService extends Bootable with Logging { val scheme = uri.getScheme if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException( "The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'") + + log.info("Attempting to start REST service on uri [%s]",uri) val adapter = new ServletAdapter adapter.setHandleStaticResources(true) diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml index 93acfec28d..c32504f5a1 100755 --- a/akka-kernel/pom.xml +++ b/akka-kernel/pom.xml @@ -54,6 +54,11 @@ + + com.sun.jersey + jersey-server + ${jersey.version} + org.atmosphere atmosphere-annotations diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala index 11e8304551..20b8d4a280 100644 --- a/akka-samples-scala/src/main/scala/SimpleService.scala +++ b/akka-samples-scala/src/main/scala/SimpleService.scala @@ -8,13 +8,14 @@ import se.scalablesolutions.akka.actor.{Transactor, SupervisorFactory, Actor} import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.comet.{AkkaClusterBroadcastFilter} import java.lang.Integer import java.nio.ByteBuffer import javax.ws.rs.core.MultivaluedMap import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam} -import org.atmosphere.annotation.{Broadcast, Suspend} +import org.atmosphere.annotation.{Broadcast, Suspend,Cluster} import org.atmosphere.util.XSSHtmlFilter import org.atmosphere.cpr.{Broadcaster, BroadcastFilter} import org.atmosphere.jersey.Broadcastable @@ -85,6 +86,7 @@ class PubSub extends Actor { @Broadcast @Path("/topic/{topic}/{message}/") @Produces(Array("text/plain;charset=ISO-8859-1")) + @Cluster(Array(classOf[AkkaClusterBroadcastFilter])) { val name = "foo" } def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic) def receive = { case _ => } @@ -150,9 +152,10 @@ class Chat extends Transactor { case x => log.info("recieve unknown: " + x) } - @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter])) - @Consumes(Array("application/x-www-form-urlencoded")) @POST + @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter])) + @Cluster(Array(classOf[AkkaClusterBroadcastFilter])) { val name = "bar" } + @Consumes(Array("application/x-www-form-urlencoded")) @Produces(Array("text/html")) def publishMessage(form: MultivaluedMap[String, String]) = (this !! Chat(form.getFirst("name"),