diff --git a/akka-actors/src/main/scala/remote/Cluster.scala b/akka-actors/src/main/scala/remote/Cluster.scala new file mode 100644 index 0000000000..58be373b4b --- /dev/null +++ b/akka-actors/src/main/scala/remote/Cluster.scala @@ -0,0 +1,218 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.remote + +import org.jgroups.{JChannel, View, Address, Message, ExtendedMembershipListener, Receiver, SetStateEvent} +import org.jgroups.util.Util + +import se.scalablesolutions.akka.Config.config +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorRegistry} +import se.scalablesolutions.akka.remote.Cluster.{Node, RelayedMessage} + +import scala.collection.immutable.{Map, HashMap, HashSet} + +/** + * Interface for interacting with the cluster. + */ +trait Cluster { + + def name: String + + def registerLocalNode(hostname: String, port: Int): Unit + + def deregisterLocalNode(hostname: String, port: Int): Unit + + def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit + + def lookup[T](pf : PartialFunction[RemoteAddress,T]) : Option[T] +} + +/** + * Baseclass for cluster implementations + */ +abstract class ClusterActor extends Actor with Cluster { + val name = config.getString("akka.remote.cluster.name") getOrElse "default" +} + +/** + * A singleton representing the Cluster + * Loads a specified ClusterActor and delegates to that instance + */ +object Cluster extends Cluster { + case class Node(endpoints: List[RemoteAddress]) + case class RelayedMessage(actorClassFQN:String, msg: AnyRef) + + lazy val impl: Option[ClusterActor] = { + config.getString("akka.remote.cluster.actor") map (name => { + val actor = Class.forName(name) + .newInstance + .asInstanceOf[ClusterActor] + + SupervisorFactory( + SupervisorConfig( + RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + Supervise(actor, LifeCycle(Permanent)) :: Nil + ) + ).newInstance.start + + actor + }) + } + + def name = impl.map(_.name).getOrElse("No cluster") + + def lookup[T](pf : PartialFunction[RemoteAddress,T]) : Option[T] = impl.flatMap(_.lookup(pf)) + + def registerLocalNode(hostname: String, port: Int): Unit = impl.map(_.registerLocalNode(hostname, port)) + + def deregisterLocalNode(hostname: String, port: Int): Unit = impl.map(_.deregisterLocalNode(hostname, port)) + + def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = impl.map(_.relayMessage(to, msg)) +} + +/** + * Just a placeholder for the JGroupsClusterActor message types + */ +object JGroupsClusterActor { + //Message types + sealed trait ClusterMessage + case object PapersPlease extends ClusterMessage + case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage + case object Block extends ClusterMessage + case object Unblock extends ClusterMessage + case class Zombie(address: Address) extends ClusterMessage + case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage + case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage +} + +/** + * Clustering support via JGroups + */ +class JGroupsClusterActor extends ClusterActor { + import JGroupsClusterActor._ + import org.scala_tools.javautils.Implicits._ + + @volatile private var local: Node = Node(Nil) + @volatile private var channel: Option[JChannel] = None + @volatile private var remotes: Map[Address, Node] = Map() + + override def init = { + log debug "Initiating cluster actor" + remotes = new HashMap[Address, Node] + val me = this + //Set up the JGroups local endpoint + channel = Some(new JChannel { + setReceiver(new Receiver with ExtendedMembershipListener { + def getState: Array[Byte] = null + + def setState(state: Array[Byte]): Unit = () + + def receive(msg: Message): Unit = me send msg + + def viewAccepted(view: View): Unit = me send view + + def suspect(a: Address): Unit = me send Zombie(a) + + def block: Unit = me send Block + + def unblock: Unit = me send Unblock + }) + }) + channel.map(_.connect(name)) + } + + protected def serializer = Serializer.Java //FIXME make this configurable + + def lookup[T](pf : PartialFunction[RemoteAddress,T]) : Option[T] = remotes.values.toList.flatMap(_.endpoints).find(pf isDefinedAt _).map(pf) + + def registerLocalNode(hostname: String, port: Int): Unit = this send RegisterLocalNode(RemoteAddress(hostname, port)) + + def deregisterLocalNode(hostname: String, port: Int): Unit = this send DeregisterLocalNode(RemoteAddress(hostname, port)) + + def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = this send RelayedMessage(to.getName, msg) + + private def broadcast[T <: AnyRef](recipients: Iterable[Address], msg: T): Unit = { + lazy val m = serializer out msg + for (c <- channel; r <- recipients) c.send(new Message(r, null, m)) + + } + + private def broadcast[T <: AnyRef](msg: T): Unit = { + if(!remotes.isEmpty) //Don't broadcast if we are not connected anywhere... + channel.map(_.send(new Message(null, null, serializer out msg))) + } + + def receive = { + case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead + log debug ("Zombie: %s", x) + broadcast(x :: Nil, PapersPlease) + remotes = remotes - x + } + + case v: View => { + //Not present in the cluster anymore = presumably zombies + //Nodes we have no prior knowledge existed = unknowns + val members = Set[Address]() ++ v.getMembers.asScala - channel.get.getAddress //Exclude ourselves + val zombies = Set[Address]() ++ remotes.keySet -- members + val unknown = members -- remotes.keySet + + log debug v.printDetails + + //Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead + broadcast(zombies ++ unknown, PapersPlease) + remotes = remotes -- zombies + } + + case m: Message => { + if (m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) //handle non-own messages only, and only if we're connected + (serializer in (m.getRawBuffer, None)) match { + + case PapersPlease => { + log debug ("Asked for papers by %s", m.getSrc) + broadcast(m.getSrc :: Nil, Papers(local.endpoints)) + + if(remotes.get(m.getSrc).isEmpty) //If we were asked for papers from someone we don't know, ask them! + broadcast(m.getSrc :: Nil, PapersPlease) + } + + case Papers(x) => remotes = remotes + (m.getSrc -> Node(x)) + + case RelayedMessage(c, m) => ActorRegistry.actorsFor(c).map(_ send m) + + case unknown => log debug ("Unknown message: %s", unknown.toString) + } + } + + case rm @ RelayedMessage(_,_) => { + log debug ("Relaying message: %s", rm) + broadcast(rm) + } + + case RegisterLocalNode(s) => { + log debug ("RegisterLocalNode: %s", s) + local = Node(local.endpoints + s) + broadcast(Papers(local.endpoints)) + } + + case DeregisterLocalNode(s) => { + log debug ("DeregisterLocalNode: %s", s) + local = Node(local.endpoints - s) + broadcast(Papers(local.endpoints)) + } + + case Block => log debug "UNSUPPORTED: block" //TODO HotSwap to a buffering body + case Unblock => log debug "UNSUPPORTED: unblock" //TODO HotSwap back and flush the buffer + } + + override def shutdown = { + log debug ("Shutting down %s", this.getClass.getName) + channel.map(_.shutdown) + remotes = Map() + channel = None + } +} \ No newline at end of file diff --git a/akka-core/pom.xml b/akka-core/pom.xml index 910f792d5f..b928182607 100644 --- a/akka-core/pom.xml +++ b/akka-core/pom.xml @@ -67,6 +67,11 @@ javautils 2.7.4-0.1 + + jgroups + jgroups + 2.8.0.CR7 + diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 1145689683..691f28b0ee 100755 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -118,12 +118,14 @@ class RemoteServer extends Logging { bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS) openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) isRunning = true + Cluster.registerLocalNode(hostname,port) } } def shutdown = { openChannels.close.awaitUninterruptibly() bootstrap.releaseExternalResources + Cluster.deregisterLocalNode(hostname,port) } } diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-kernel/src/main/scala/AkkaServlet.scala index 5ca2d701ee..8c9fdba4c2 100755 --- a/akka-kernel/src/main/scala/AkkaServlet.scala +++ b/akka-kernel/src/main/scala/AkkaServlet.scala @@ -12,10 +12,13 @@ import com.sun.jersey.api.core.ResourceConfig import com.sun.jersey.spi.container.servlet.ServletContainer import com.sun.jersey.spi.container.WebApplication +import java.util.{List => JList} + import javax.servlet.{ServletConfig} import javax.servlet.http.{HttpServletRequest, HttpServletResponse} -import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport} +import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver} +import org.atmosphere.container.{GrizzlyCometSupport} import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler} import org.atmosphere.jersey.JerseyBroadcaster @@ -74,22 +77,28 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging override def loadConfiguration(sc: ServletConfig) { config = new AtmosphereConfig { supportSession = false } atmosphereHandlers.put("/*", new AtmosphereServlet.AtmosphereHandlerWrapper(servlet, new JerseyBroadcaster)) - - loadCometSupport(sc.getInitParameter("cometSupport")) map( setCometSupport(_) ) } - private def loadCometSupport(fqn : String) = { + override def createCometSupportResolver() : CometSupportResolver = { + import org.scala_tools.javautils.Imports._ - log.info("Trying to load: " + fqn) - try { - Some(Class.forName(fqn) - .getConstructor(Array(classOf[AtmosphereConfig]): _*) - .newInstance(config) - .asInstanceOf[CometSupport[_ <: AtmosphereResource[_,_]]]) - } catch { - case e : Exception => - log.error(e, "Couldn't load comet support", fqn) - None + new DefaultCometSupportResolver(config) { + type CS = CometSupport[_ <: AtmosphereResource[_,_]] + override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = { + available.asScala.filter(_ != classOf[GrizzlyCometSupport]).toList match { + case Nil => new GrizzlyCometSupport(config) + case x :: Nil => newCometSupport(x.asInstanceOf[Class[_ <: CS]]) + case _ => super.resolveMultipleNativeSupportConflict(available) + } + } + + override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CS = { + val predef = config.getInitParameter("cometSupport") + if(testClassExists(predef)) + newCometSupport(predef) + else + super.resolve(useNativeIfPossible, useBlockingAsDefault) + } } } } diff --git a/akka-rest/src/main/scala/ActorComponentProvider.scala b/akka-rest/src/main/scala/ActorComponentProvider.scala index 4985bc48de..6e924b16da 100755 --- a/akka-rest/src/main/scala/ActorComponentProvider.scala +++ b/akka-rest/src/main/scala/ActorComponentProvider.scala @@ -4,6 +4,7 @@ package se.scalablesolutions.akka.rest +import com.sun.jersey.core.spi.component.ComponentScope import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider import config.Configurator @@ -11,6 +12,8 @@ import util.Logging class ActorComponentProvider(val clazz: Class[_], val configurators: List[Configurator]) extends IoCFullyManagedComponentProvider with Logging { + + override def getScope = ComponentScope.Singleton override def getInstance: AnyRef = { val instances = for { diff --git a/akka-rest/src/main/scala/NodeWriter.scala b/akka-rest/src/main/scala/NodeWriter.scala deleted file mode 100755 index 58c127b411..0000000000 --- a/akka-rest/src/main/scala/NodeWriter.scala +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package se.scalablesolutions.akka.rest - -import java.io.OutputStream -import java.lang.annotation.Annotation -import java.lang.{String, Class} - -import javax.ws.rs.core.{MultivaluedMap, MediaType} -import javax.ws.rs.ext.{MessageBodyWriter, Provider} -import java.lang.reflect.Type - -import scala.xml.NodeSeq - -@Provider -class NodeWriter extends MessageBodyWriter[NodeSeq] { - - def isWriteable(aClass: Class[_], aType: Type, annotations: Array[Annotation], mediaType: MediaType) = { - classOf[NodeSeq].isAssignableFrom(aClass) - } - - def getSize(nodes: NodeSeq, aClass: Class[_], aType: Type, annotations: Array[Annotation], mediaType: MediaType) = -1L - - def writeTo(nodes: NodeSeq, - aClass: Class[_], - aType: Type, - annotations: Array[Annotation], - mediaType: MediaType, - stringObjectMultivaluedMap: MultivaluedMap[String, Object], - outputStream: OutputStream) : Unit = { - var answer = nodes.toString(); - outputStream.write(answer.getBytes()); - } -} \ No newline at end of file diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala index 2ccd8f13a4..11e8304551 100644 --- a/akka-samples-scala/src/main/scala/SimpleService.scala +++ b/akka-samples-scala/src/main/scala/SimpleService.scala @@ -22,7 +22,7 @@ import org.atmosphere.jersey.Broadcastable class Boot { val factory = SupervisorFactory( SupervisorConfig( - RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + RestartStrategy(OneForOne, 3, 100,List(classOf[Exception])), Supervise( new SimpleService, LifeCycle(Permanent)) :: @@ -90,7 +90,6 @@ class PubSub extends Actor { def receive = { case _ => } } - /** * Try service out by invoking (multiple times): *
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 3fa382f252..5a04981390 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -36,7 +36,12 @@
     compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
     zlib-compression-level = 6  # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
 
-    
+	
+        name = "default"
+        actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor"
+    
+    
+      
       service = on
       hostname = "localhost"
       port = 9999
@@ -53,7 +58,7 @@
     service = on
     hostname = "localhost"
     port = 9998
-    filters = "[se.scalablesolutions.akka.security.AkkaSecurityFilterFactory]"              # List with all servlet filters to use
+    filters = "[se.scalablesolutions.akka.security.AkkaSecurityFilterFactory]"              # List with all jersey filters to use
     authenticator = "se.scalablesolutions.akka.security.samples.BasicAuthenticationService" # The authentication service to use
   
   
diff --git a/pom.xml b/pom.xml
index 0664840d16..4ab3b9ac74 100755
--- a/pom.xml
+++ b/pom.xml
@@ -42,8 +42,8 @@
     ${maven.compiler.source}
     ${project.build.sourceEncoding}
     ${project.build.sourceEncoding}
-    0.4.1
-    1.1.4
+    0.5-SNAPSHOT
+    1.1.5-ea-SNAPSHOT
     1.9.18-i
   
 
@@ -89,6 +89,16 @@
         Despot
       
     
+    
+      viktorklang
+      Viktor Klang
+      +1
+      viktor.klang [REMOVE] AT gmail DOT com
+      
+      	Sourceror
+        Ninja-Zombie
+      
+