diff --git a/akka-actors/pom.xml b/akka-actors/pom.xml index b7372a1e6e..9b4767e7fa 100644 --- a/akka-actors/pom.xml +++ b/akka-actors/pom.xml @@ -67,6 +67,11 @@ javautils 2.7.4-0.1 + + jgroups + jgroups + 2.8.0.CR7 + diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala new file mode 100644 index 0000000000..55d4970f48 --- /dev/null +++ b/akka-actors/src/main/scala/nio/Cluster.scala @@ -0,0 +1,194 @@ +package se.scalablesolutions.akka.nio + +import se.scalablesolutions.akka.Config.config +import se.scalablesolutions.akka.util.Logging +import org.jgroups.{JChannel,View,Address,Message,ExtendedMembershipListener,Receiver,SetStateEvent} +import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.config.ScalaConfig._ +import scala.collection.immutable.{Map,HashMap,HashSet} +import org.jgroups.util.Util +import se.scalablesolutions.akka.actor.{Init,SupervisorFactory,Actor,ActorRegistry} +import se.scalablesolutions.akka.nio.Cluster.{Node,RelayedMessage} + +/** + Interface for interacting with the cluster +**/ +trait Cluster { + def members : List[Node] + def name : String + def registerLocalNode(hostname : String, port : Int) : Unit + def deregisterLocalNode(hostname : String, port : Int) : Unit + def relayMessage(to : Class[_ <: Actor],msg : AnyRef) : Unit +} + +/** + Baseclass for cluster implementations +**/ +abstract class ClusterActor extends Actor with Cluster +{ + val name = config.getString("akka.remote.cluster.name") getOrElse "default" +} + +/** + A singleton representing the Cluster + Loads a specified ClusterActor and delegates to that instance +**/ +object Cluster extends Cluster { + case class Node(endpoints : List[RemoteAddress]) + case class RelayedMessage(actorClass : Class[_ <: Actor],msg : AnyRef) + + lazy val impl : Option[ClusterActor] = { + config.getString("akka.remote.cluster.actor") map ( name => { + val actor = Class.forName(name) + .newInstance + .asInstanceOf[ClusterActor] + + SupervisorFactory( + SupervisorConfig( + RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + Supervise(actor, LifeCycle(Permanent)):: Nil + ) + ).newInstance.start + //actor !! Init(None) // FIXME for some reason the actor isn't init:ed unless we explicitly send it this Init message + actor + }) + } + + def name = impl.map(_.name).getOrElse("No cluster") + def members = impl.map(_.members).getOrElse(Nil) + def registerLocalNode(hostname : String, port : Int) : Unit = impl.map(_.registerLocalNode(hostname,port)) + def deregisterLocalNode(hostname : String, port : Int) : Unit = impl.map(_.deregisterLocalNode(hostname,port)) + def relayMessage(to : Class[_ <: Actor],msg : AnyRef) : Unit = impl.map(_.send(to,msg)) +} + +/** + Just a placeholder for the JGroupsClusterActor message types +**/ +object JGroupsClusterActor { + //Message types + case object PapersPlease + case class Papers(addresses : List[RemoteAddress]) + case object Block + case object Unblock + case class Zombie(address : Address) + case class RegisterLocalNode(server : RemoteAddress) + case class DeregisterLocalNode(server : RemoteAddress) +} + +/** + Clustering support via JGroups +**/ +class JGroupsClusterActor extends ClusterActor +{ + import JGroupsClusterActor._ + import org.scala_tools.javautils.Implicits._ + + private var local : Node = Node(Nil) + private var channel : Option[JChannel] = None + private var remotes : Map[Address,Node] = Map() + + override def init(config : AnyRef) = { + log info "Initiating cluster actor" + remotes = new HashMap[Address,Node] + val me = this + //Set up the JGroups local endpoint + channel = Some(new JChannel { + setReceiver(new Receiver with ExtendedMembershipListener { + def getState : Array[Byte] = null + def setState(state : Array[Byte]) : Unit = () + def receive(msg : Message) : Unit = me ! msg + def viewAccepted(view : View) : Unit = me ! view + def suspect(a : Address) : Unit = me ! Zombie(a) + def block : Unit = me ! Block + def unblock : Unit = me ! Unblock + }) + }) + channel.map(_.connect(name)) + } + + protected def serializer = Serializer.Java //FIXME make this configurable + def members = remotes.values.toList //FIXME We probably want to make this a !! InstalledRemotes + + def registerLocalNode(hostname : String, port : Int) : Unit = this ! RegisterLocalNode(RemoteAddress(hostname,port)) + def deregisterLocalNode(hostname : String, port : Int) : Unit = this ! DeregisterLocalNode(RemoteAddress(hostname,port)) + def relayMessage(to : Class[_ <: Actor],msg : AnyRef) : Unit = this ! RelayedMessage(to,msg) + + private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) : Unit = + for(c <- channel; r <- recipients) c.send(new Message(r,null,serializer out msg)) + + private def broadcast[T <: AnyRef](msg : T) : Unit = + channel.map( _.send(new Message(null,null,serializer out msg))) + + override def receive = { + case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead + log info "Zombie: "+x + broadcast(x :: Nil,PapersPlease) + remotes = remotes - x + } + + case v : View => { + log info v.printDetails + //Not present in the cluster anymore = presumably zombies + //Nodes we have no prior knowledge existed = unknowns + val members = Set[Address]() ++ v.getMembers.asScala - channel.get.getAddress //Exclude ourselves + val zombies = Set[Address]() ++ remotes.keySet -- members + val unknown = members -- remotes.keySet + + log info "Updating view, zombies: " + zombies + log info " , unknown: " + unknown + log info " , members: " + members + log info " , known: " + remotes.keySet + + //Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead + broadcast(zombies ++ unknown, PapersPlease) + remotes = remotes -- zombies + } + + case m : Message => { + if(m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) //handle non-own messages only, and only if we're connected + (serializer in(m.getRawBuffer,None)) match { + case PapersPlease => { + log info "Asked for papers by " + m.getSrc + broadcast(m.getSrc :: Nil,Papers(local.endpoints)) + } + case Papers(x) => { + log info "Got papers from " + m.getSrc + " = " + x + remotes = remotes + (m.getSrc -> Node(x)) + log info "Installed nodes: " + remotes.keySet + } + case RelayedMessage(c,m) => { + log info "Relaying [" + m + "] to ["+c.getName+"]" + ActorRegistry.actorsFor(c).firstOption.map(_ ! m) + } + case unknown => log info "Unknown message: "+unknown.toString + } + } + + case rm@RelayedMessage => { + log info "Relaying message: " + rm + broadcast(rm) + } + + case RegisterLocalNode(s) => { + log info "RegisterLocalNode: "+s + local = Node(local.endpoints + s) + broadcast(Papers(local.endpoints)) + } + + case DeregisterLocalNode(s) => { + log info "DeregisterLocalNode: "+s + local = Node(local.endpoints - s) + broadcast(Papers(local.endpoints)) + } + + case Block => log info "UNSUPPORTED: block" //TODO HotSwap to a buffering body + case Unblock => log info "UNSUPPORTED: unblock" //TODO HotSwap back and flush the buffer + } + + override def shutdown = { + log info "Shutting down "+this.getClass.getName + channel.map(_.close) + remotes = Map() + channel = None + } +} \ No newline at end of file diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala index 5a542268c8..2f7cfced3f 100755 --- a/akka-actors/src/main/scala/nio/RemoteServer.scala +++ b/akka-actors/src/main/scala/nio/RemoteServer.scala @@ -109,11 +109,13 @@ class RemoteServer extends Logging { bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS) bootstrap.bind(new InetSocketAddress(hostname, port)) isRunning = true + Cluster.registerLocalNode(hostname,port) } } def shutdown = { bootstrap.releaseExternalResources + Cluster.deregisterLocalNode(hostname,port) } } diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-kernel/src/main/scala/AkkaServlet.scala index 5ca2d701ee..8c9fdba4c2 100755 --- a/akka-kernel/src/main/scala/AkkaServlet.scala +++ b/akka-kernel/src/main/scala/AkkaServlet.scala @@ -12,10 +12,13 @@ import com.sun.jersey.api.core.ResourceConfig import com.sun.jersey.spi.container.servlet.ServletContainer import com.sun.jersey.spi.container.WebApplication +import java.util.{List => JList} + import javax.servlet.{ServletConfig} import javax.servlet.http.{HttpServletRequest, HttpServletResponse} -import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport} +import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver} +import org.atmosphere.container.{GrizzlyCometSupport} import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler} import org.atmosphere.jersey.JerseyBroadcaster @@ -74,22 +77,28 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging override def loadConfiguration(sc: ServletConfig) { config = new AtmosphereConfig { supportSession = false } atmosphereHandlers.put("/*", new AtmosphereServlet.AtmosphereHandlerWrapper(servlet, new JerseyBroadcaster)) - - loadCometSupport(sc.getInitParameter("cometSupport")) map( setCometSupport(_) ) } - private def loadCometSupport(fqn : String) = { + override def createCometSupportResolver() : CometSupportResolver = { + import org.scala_tools.javautils.Imports._ - log.info("Trying to load: " + fqn) - try { - Some(Class.forName(fqn) - .getConstructor(Array(classOf[AtmosphereConfig]): _*) - .newInstance(config) - .asInstanceOf[CometSupport[_ <: AtmosphereResource[_,_]]]) - } catch { - case e : Exception => - log.error(e, "Couldn't load comet support", fqn) - None + new DefaultCometSupportResolver(config) { + type CS = CometSupport[_ <: AtmosphereResource[_,_]] + override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = { + available.asScala.filter(_ != classOf[GrizzlyCometSupport]).toList match { + case Nil => new GrizzlyCometSupport(config) + case x :: Nil => newCometSupport(x.asInstanceOf[Class[_ <: CS]]) + case _ => super.resolveMultipleNativeSupportConflict(available) + } + } + + override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CS = { + val predef = config.getInitParameter("cometSupport") + if(testClassExists(predef)) + newCometSupport(predef) + else + super.resolve(useNativeIfPossible, useBlockingAsDefault) + } } } } diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index 4c4c444c08..79f256874c 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -99,7 +99,7 @@ object Kernel extends Logging { adapter.setServletInstance(new AkkaCometServlet) adapter.setContextPath(uri.getPath) //Using autodetection for now - adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") + //adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root") log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath) diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala index 3ad42a4540..98441378b8 100644 --- a/akka-samples-scala/src/main/scala/SimpleService.scala +++ b/akka-samples-scala/src/main/scala/SimpleService.scala @@ -22,7 +22,7 @@ import org.atmosphere.jersey.Broadcastable class Boot { val factory = SupervisorFactory( SupervisorConfig( - RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + RestartStrategy(OneForOne, 3, 100,List(classOf[Exception])), Supervise( new SimpleService, LifeCycle(Permanent)) :: @@ -39,6 +39,25 @@ class Boot { factory.newInstance.start } +@Path("/pubsub/") +class PubSub extends Actor { + case class Msg(topic: String, message: String) + + @GET + @Suspend + @Produces(Array("text/plain;charset=ISO-8859-1")) + @Path("/topic/{topic}/") + def subscribe(@PathParam("topic") topic: Broadcaster): Broadcastable = new Broadcastable("", topic) + + @GET + @Broadcast + @Path("/topic/{topic}/{message}/") + @Produces(Array("text/plain;charset=ISO-8859-1")) + def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic) + + override def receive = { case _ => } +} + /** * Try service out by invoking (multiple times): *
@@ -72,26 +91,6 @@ class SimpleService extends Actor {
   }
 }
 
-@Path("/pubsub/")
-class PubSub extends Actor {
- case class Msg(topic: String, message: String)
-
- @GET
- @Suspend
- @Produces(Array("text/plain;charset=ISO-8859-1"))
- @Path("/topic/{topic}/")
- def subscribe(@PathParam("topic") topic: Broadcaster): Broadcastable = new Broadcastable("", topic)
-
- @GET
- @Broadcast
- @Path("/topic/{topic}/{message}/")
- @Produces(Array("text/plain;charset=ISO-8859-1"))
- def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
-
- override def receive = { case _ => }
-}
-
-
 /**
  * Try service out by invoking (multiple times):
  * 
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 3fa382f252..85ee85740d 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -36,7 +36,12 @@
     compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
     zlib-compression-level = 6  # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
 
-    
+	
+        name = "default"
+        actor = "se.scalablesolutions.akka.nio.JGroupsClusterActor"
+    
+    
+      
       service = on
       hostname = "localhost"
       port = 9999
@@ -53,7 +58,7 @@
     service = on
     hostname = "localhost"
     port = 9998
-    filters = "[se.scalablesolutions.akka.security.AkkaSecurityFilterFactory]"              # List with all servlet filters to use
+    filters = "[se.scalablesolutions.akka.security.AkkaSecurityFilterFactory]"              # List with all jersey filters to use
     authenticator = "se.scalablesolutions.akka.security.samples.BasicAuthenticationService" # The authentication service to use
   
   
diff --git a/pom.xml b/pom.xml
index d89166c09d..7c8a069125 100755
--- a/pom.xml
+++ b/pom.xml
@@ -28,7 +28,7 @@
     ${maven.compiler.source}
     ${project.build.sourceEncoding}
     ${project.build.sourceEncoding}
-    0.4.1
+    0.5-SNAPSHOT
     1.1.4
     1.9.18-i
   
@@ -74,6 +74,16 @@
         Despot
       
     
+    
+      viktorklang
+      Viktor Klang
+      +1
+      viktor.klang [REMOVE] AT gmail DOT com
+      
+      	Sourceror
+        Ninja-Zombie
+      
+