From fd277f7ee129ac98a0743103bf10000ccfbdfe59 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 25 Nov 2009 01:58:35 +0100 Subject: [PATCH 01/22] Atmosphere5.0 --- akka-kernel/pom.xml | 11 ++++-- akka-kernel/src/main/scala/AkkaServlet.scala | 39 ++++++++++++------- akka-kernel/src/main/scala/Kernel.scala | 2 +- .../src/main/scala/SimpleService.scala | 2 +- pom.xml | 10 ++--- 5 files changed, 39 insertions(+), 25 deletions(-) diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml index b964967624..9ed31850e0 100755 --- a/akka-kernel/pom.xml +++ b/akka-kernel/pom.xml @@ -232,12 +232,17 @@ org.atmosphere atmosphere-runtime - 0.4.1 + 0.5-SNAPSHOT org.atmosphere - atmosphere-core - 0.4-SNAPSHOT + atmosphere-jersey + 0.5-SNAPSHOT + + + org.atmosphere + atmosphere-annotations + 0.5-SNAPSHOT diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-kernel/src/main/scala/AkkaServlet.scala index f633835deb..3f18fc9dad 100755 --- a/akka-kernel/src/main/scala/AkkaServlet.scala +++ b/akka-kernel/src/main/scala/AkkaServlet.scala @@ -12,12 +12,15 @@ import com.sun.jersey.api.core.ResourceConfig import com.sun.jersey.spi.container.servlet.ServletContainer import com.sun.jersey.spi.container.WebApplication +import java.util.{List => JList} + import javax.servlet.{ServletConfig} import javax.servlet.http.{HttpServletRequest, HttpServletResponse} -import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport} +import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver} +import org.atmosphere.container.{GrizzlyCometSupport,GlassFishv3CometSupport} import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler} -import org.atmosphere.core.JerseyBroadcaster +import org.atmosphere.jersey.JerseyBroadcaster /** * Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container, @@ -73,22 +76,28 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging override def loadConfiguration(sc: ServletConfig) { atmosphereHandlers.put("/*", new AtmosphereServlet.AtmosphereHandlerWrapper(servlet, new JerseyBroadcaster)) - - loadCometSupport(sc.getInitParameter("cometSupport")) map( setCometSupport(_) ) } - private def loadCometSupport(fqn : String) = { + override def createCometSupportResolver() : CometSupportResolver = { + import org.scala_tools.javautils.Imports._ - log.info("Trying to load: " + fqn) - try { - Some(Class.forName(fqn) - .getConstructor(Array(classOf[AtmosphereConfig]): _*) - .newInstance(config) - .asInstanceOf[CometSupport[_ <: AtmosphereResource[_,_]]]) - } catch { - case e : Exception => - log.error(e, "Couldn't load comet support", fqn) - None + new CometSupportResolver(config) { + type CS = CometSupport[_ <: AtmosphereResource[_,_]] + override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = { + available.asScala.filter(c => c != classOf[GrizzlyCometSupport] && c != classOf[GlassFishv3CometSupport]).toList match { + case Nil => new GrizzlyCometSupport(config) + case x :: Nil => newCometSupport(x) + case _ => super.resolveMultipleNativeSupportConflict(available) + } + } + + override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CS = { + val predef = config.getInitParameter("cometSupport") + if(testClassExists(predef)) + newCometSupport(predef) + else + super.resolve(useNativeIfPossible, useBlockingAsDefault) + } } } } diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index e4f66a9050..69813fdd18 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -70,7 +70,7 @@ object Kernel extends Logging { adapter.setServletInstance(new AkkaCometServlet) adapter.setContextPath(uri.getPath) //Using autodetection for now - adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") + //adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root") log.info("REST service root path: [" + adapter.getRootFolder + "] and context path [" + adapter.getContextPath + "] ") diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala index 6cb6ed0b8f..bbed4f9d75 100644 --- a/akka-samples-scala/src/main/scala/SimpleService.scala +++ b/akka-samples-scala/src/main/scala/SimpleService.scala @@ -13,7 +13,7 @@ import java.lang.Integer import javax.ws.rs.core.MultivaluedMap import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes} -import org.atmosphere.core.annotation.{Broadcast, Suspend} +import org.atmosphere.annotation.{Broadcast, Suspend} import org.atmosphere.util.XSSHtmlFilter import org.atmosphere.cpr.BroadcastFilter diff --git a/pom.xml b/pom.xml index 7361426e9a..c6f161a0e4 100755 --- a/pom.xml +++ b/pom.xml @@ -67,16 +67,16 @@ - - repo1.maven - Maven Main Repository - http://repo1.maven.org/maven2 - project.embedded.module Project Embedded Repository file://${basedir}/../embedded-repo + + repo1.maven + Maven Main Repository + http://repo1.maven.org/maven2 + scala-tools-snapshots Scala-Tools Maven2 Snapshot Repository From a78cea603f64f6a89a3625cf6829fcc001768c9b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 2 Dec 2009 21:08:29 +0100 Subject: [PATCH 02/22] Added version --- akka-kernel/src/main/scala/AkkaServlet.scala | 4 +-- .../src/main/scala/SimpleService.scala | 32 ++++++++++++++++--- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-kernel/src/main/scala/AkkaServlet.scala index 3f18fc9dad..215cab72cd 100755 --- a/akka-kernel/src/main/scala/AkkaServlet.scala +++ b/akka-kernel/src/main/scala/AkkaServlet.scala @@ -17,7 +17,7 @@ import java.util.{List => JList} import javax.servlet.{ServletConfig} import javax.servlet.http.{HttpServletRequest, HttpServletResponse} -import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver} +import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver} import org.atmosphere.container.{GrizzlyCometSupport,GlassFishv3CometSupport} import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler} import org.atmosphere.jersey.JerseyBroadcaster @@ -81,7 +81,7 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging override def createCometSupportResolver() : CometSupportResolver = { import org.scala_tools.javautils.Imports._ - new CometSupportResolver(config) { + new DefaultCometSupportResolver(config) { type CS = CometSupport[_ <: AtmosphereResource[_,_]] override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = { available.asScala.filter(c => c != classOf[GrizzlyCometSupport] && c != classOf[GlassFishv3CometSupport]).toList match { diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala index 5fa1fc6666..e16d6691b9 100644 --- a/akka-samples-scala/src/main/scala/SimpleService.scala +++ b/akka-samples-scala/src/main/scala/SimpleService.scala @@ -11,16 +11,18 @@ import se.scalablesolutions.akka.util.Logging import java.lang.Integer import javax.ws.rs.core.MultivaluedMap -import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes} +import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam} import org.atmosphere.annotation.{Broadcast, Suspend} import org.atmosphere.util.XSSHtmlFilter import org.atmosphere.cpr.BroadcastFilter +import org.atmosphere.cpr.Broadcaster +import org.atmosphere.jersey.Broadcastable class Boot { val factory = SupervisorFactory( SupervisorConfig( - RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + RestartStrategy(OneForOne, 3, 100,List(classOf[Exception])), Supervise( new SimpleService, LifeCycle(Permanent)) :: @@ -29,11 +31,33 @@ class Boot { LifeCycle(Permanent)) :: Supervise( new PersistentSimpleService, - LifeCycle(Permanent)) - :: Nil)) + LifeCycle(Permanent)) :: + Supervise( + new PubSub, + LifeCycle(Permanent)) :: + Nil) ) factory.newInstance.start } +@Path("/pubsub/") +class PubSub extends Actor { + case class Msg(topic: String, message: String) + + @GET + @Suspend + @Produces(Array("text/plain;charset=ISO-8859-1")) + @Path("/topic/{topic}/") + def subscribe(@PathParam("topic") topic: Broadcaster): Broadcastable = new Broadcastable("", topic) + + @GET + @Broadcast + @Path("/topic/{topic}/{message}/") + @Produces(Array("text/plain;charset=ISO-8859-1")) + def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic) + + override def receive = { case _ => } +} + /** * Try service out by invoking (multiple times): *

From 6090de69068dfba3a180d18fa9f539fc3d293fda Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Wed, 2 Dec 2009 21:29:34 +0100
Subject: [PATCH 03/22] Fixed JErsey broadcaster issue

---
 akka-kernel/src/main/scala/AkkaServlet.scala | 1 +
 1 file changed, 1 insertion(+)

diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-kernel/src/main/scala/AkkaServlet.scala
index 215cab72cd..32a9b18295 100755
--- a/akka-kernel/src/main/scala/AkkaServlet.scala
+++ b/akka-kernel/src/main/scala/AkkaServlet.scala
@@ -75,6 +75,7 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging
   }
 
   override def loadConfiguration(sc: ServletConfig) {
+    config = new AtmosphereConfig { supportSession = false }
     atmosphereHandlers.put("/*", new AtmosphereServlet.AtmosphereHandlerWrapper(servlet, new JerseyBroadcaster))
   }
 

From 037c3d7bb4f38f9da8bd6aef8f3465f09ff9b21b Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Wed, 2 Dec 2009 23:02:37 +0100
Subject: [PATCH 04/22] Tweaked Jersey version

---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 2e1cc89d73..dae9d62afb 100755
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
     ${project.build.sourceEncoding}
     ${project.build.sourceEncoding}
     0.5-SNAPSHOT
-    1.1.5-ea-SNAPSHOT
+    1.1.4
   
 
   

From 33e7bc2f01ccf96082defa67d5755024e7543450 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Wed, 9 Dec 2009 20:52:30 +0100
Subject: [PATCH 05/22] Create and link new cluster module

---
 akka-cluster/pom.xml | 72 ++++++++++++++++++++++++++++++++++++++++++++
 pom.xml              | 12 ++++++++
 2 files changed, 84 insertions(+)
 create mode 100644 akka-cluster/pom.xml

diff --git a/akka-cluster/pom.xml b/akka-cluster/pom.xml
new file mode 100644
index 0000000000..24be269969
--- /dev/null
+++ b/akka-cluster/pom.xml
@@ -0,0 +1,72 @@
+
+  4.0.0
+
+  akka-cluster
+  Akka Cluster Module
+
+  jar
+
+  
+    akka
+    se.scalablesolutions.akka
+    0.6
+    ../pom.xml
+  
+
+  
+    
+      org.scala-lang
+      scala-library
+      ${scala.version}
+    
+    
+    
+      akka-actors
+      ${project.groupId}
+      ${project.version}
+    
+    
+      akka-util
+      ${project.groupId}
+      ${project.version}
+    
+    
+      njgroups
+      jgroups
+      2.8.0.CR7
+    
+    
+      net.liftweb
+      lift-util
+      1.1-M6
+    
+
+    
+    
+      org.scalatest
+      scalatest
+      1.0
+      test
+    
+    
+      junit
+      junit
+      4.5
+      test
+    
+    
+      org.mockito
+      mockito-all
+      1.8.0
+      test
+    
+
+
+  
+
+
diff --git a/pom.xml b/pom.xml
index d89166c09d..03a88bb56e 100755
--- a/pom.xml
+++ b/pom.xml
@@ -40,6 +40,7 @@
     akka-persistence
     akka-rest
     akka-camel
+    akka-cluster
     akka-amqp
     akka-security
     akka-kernel
@@ -74,6 +75,17 @@
         Despot
       
     
+    
+      viktorklang
+      Viktor Klang
+      +1
+      viktor.klang [REMOVE] AT gmail DOT com
+      
+        Crazy hermit
+        Tinkerer
+        Visionary
+      
+    
   
 
   

From d73409f4e4f77e821259e42411f8876448d63dc3 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Wed, 9 Dec 2009 20:57:04 +0100
Subject: [PATCH 06/22] Updated conf docs

---
 config/akka-reference.conf | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 3fa382f252..97d7a86a0f 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -53,7 +53,7 @@
     service = on
     hostname = "localhost"
     port = 9998
-    filters = "[se.scalablesolutions.akka.security.AkkaSecurityFilterFactory]"              # List with all servlet filters to use
+    filters = "[se.scalablesolutions.akka.security.AkkaSecurityFilterFactory]"              # List with all jersey filters to use
     authenticator = "se.scalablesolutions.akka.security.samples.BasicAuthenticationService" # The authentication service to use
   
   

From a21dcc8006e6e77421e12c21542979364039478d Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Wed, 9 Dec 2009 23:30:59 +0100
Subject: [PATCH 07/22] Adding the cluster module skeleton

---
 pom.xml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pom.xml b/pom.xml
index 2ff2ad0ac5..57f6be651c 100755
--- a/pom.xml
+++ b/pom.xml
@@ -38,6 +38,7 @@
     akka-util
     akka-actors
     akka-persistence
+    akka-cluster
     akka-rest
     akka-camel
     akka-amqp

From d89b30c0b0a0350efa2dfd6b6015f0b7fd360ba6 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sat, 12 Dec 2009 00:03:36 +0100
Subject: [PATCH 08/22] Atleast compiles

---
 akka-cluster/pom.xml                      |  12 +-
 akka-cluster/src/main/scala/Cluster.scala | 130 ++++++++++++++++++++++
 config/akka-reference.conf                |   4 +
 3 files changed, 140 insertions(+), 6 deletions(-)
 create mode 100644 akka-cluster/src/main/scala/Cluster.scala

diff --git a/akka-cluster/pom.xml b/akka-cluster/pom.xml
index 24be269969..abf62fe317 100644
--- a/akka-cluster/pom.xml
+++ b/akka-cluster/pom.xml
@@ -20,11 +20,11 @@
       scala-library
       ${scala.version}
     
-    
+    
+      org.scala-tools
+      javautils
+      2.7.4-0.1
+    
     
       akka-actors
       ${project.groupId}
@@ -36,7 +36,7 @@
       ${project.version}
     
     
-      njgroups
+      jgroups
       jgroups
       2.8.0.CR7
     
diff --git a/akka-cluster/src/main/scala/Cluster.scala b/akka-cluster/src/main/scala/Cluster.scala
new file mode 100644
index 0000000000..88987a79a6
--- /dev/null
+++ b/akka-cluster/src/main/scala/Cluster.scala
@@ -0,0 +1,130 @@
+package se.scalablesolutions.akka.cluster
+
+import se.scalablesolutions.akka.Config.config
+import se.scalablesolutions.akka.actor.Actor
+import se.scalablesolutions.akka.util.Logging
+import org.jgroups.{JChannel,View,Address,Message,ExtendedMembershipListener,Receiver,SetStateEvent}
+import se.scalablesolutions.akka.serialization.Serializer.Protobuf
+import se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress
+import scala.collection.immutable.{Map,HashMap,HashSet}
+import org.jgroups.util.Util
+import se.scalablesolutions.akka.nio.RemoteServer
+import se.scalablesolutions.akka.cluster.Cluster.{DeregisterLocalNode, RegisterLocalNode, Node}
+
+abstract class ClusterActor(name : String) extends Actor {
+  // def clusterSpawn[T <: Actor](clazz : Class[T]) : T
+   def members : List[Node]
+}
+
+object Cluster {
+
+      case class RegisterLocalNode(server : RemoteAddress)
+      case class DeregisterLocalNode(server : RemoteAddress)
+      case class Node(endpoints : List[RemoteAddress])
+
+      lazy val impl : Option[ClusterActor] = {
+        config.getString("akka.remote.cluster.actor") map ( name => {
+             Class.forName(name)
+                       .getDeclaredConstructor(Array(classOf[String]): _*)
+                       .newInstance(config.getString("akka.remote.cluster.name") getOrElse "default")
+                       .asInstanceOf[ClusterActor]
+        })
+      }
+
+      //def registerLocalNode(server : RemoteAddress)   : Unit = impl.foreach(_ ! RegisterLocalNode(server))
+      //def deregisterLocalNode(server : RemoteAddress) : Unit = impl.foreach(_ ! DeregisterLocalNode(server))
+}
+
+object JGroupsClusterActor {
+    //Message types
+    case object PapersPlease
+    case class Papers(addresses : List[RemoteAddress])
+    case object Block
+    case object Unblock
+    case class Zombie(address : Address)
+}
+
+class JGroupsClusterActor(name : String) extends ClusterActor(name)
+{
+    import JGroupsClusterActor._
+    import org.scala_tools.javautils.Implicits._
+    private var local   : Node              = Node(Nil)
+    private var channel : JChannel          = null
+    private var remotes : Map[Address,Node] = Map()
+  
+    override def init(config : AnyRef) = {
+      remotes = new HashMap[Address,Node]
+      val me  = this
+      channel = new JChannel {
+        setReceiver(new Receiver with ExtendedMembershipListener {
+          def getState : Array[Byte]               = null
+          def setState(state : Array[Byte]) : Unit = ()
+          def receive(msg : Message)        : Unit = me ! msg
+          def viewAccepted(view : View)     : Unit = me ! view
+          def suspect(a : Address)          : Unit = me ! Zombie(a)
+          def block                         : Unit = me ! Block
+          def unblock                       : Unit = me ! Unblock
+        })
+      }
+      channel connect name
+    }
+
+    protected def serializer = Protobuf
+
+    private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) : Unit = {
+        for(r <- recipients)
+            channel.send(new Message(r,null,serializer out msg))
+    }
+
+    private def broadcast[T <: AnyRef](msg : T) : Unit = channel.send(new Message(null,null,serializer out msg))
+
+   override def receive = {
+      case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead
+                          broadcast(x :: Nil,PapersPlease)
+                          remotes = remotes - x
+                        }
+
+      case v : View  => {
+           log debug v.printDetails
+           //Not present in the cluster anymore = presumably zombies
+           //Nodes we have no prior knowledge existed = unknowns
+           val members = Set[Address]() ++ v.getMembers.asScala
+           val zombies = Set[Address]() ++ remotes.keySet.filter( members contains _ )
+           val unknown : Set[Address] = members -- remotes.keySet
+
+           //Tell the zombies and unknowns to provide papers and prematurely treat them as dead
+           broadcast(zombies ++ unknown, PapersPlease)
+           remotes = remotes -- zombies
+          }
+
+      case m : Message if m.getSrc != channel.getAddress => {
+            ( serializer in(m.getRawBuffer,None) ) match {
+                case PapersPlease => broadcast(m.getSrc :: Nil,Papers(local.endpoints))
+                case Papers(x)    => remotes = remotes + (m.getSrc -> Node(x))
+                case unknown      => log debug unknown.toString
+            }
+          }
+
+      case DeregisterLocalNode(s) => {
+           log debug "DeregisterLocalNode"+s
+           local = Node(local.endpoints - s)
+           broadcast(Papers(local.endpoints))
+          }
+
+      case RegisterLocalNode(s)   => {
+           log debug "RegisterLocalNode"+s
+           local = Node(local.endpoints + s)
+           broadcast(Nil,Papers(local.endpoints))
+          }
+      
+      case Block                    => log debug "Asked to block" //TODO HotSwap to a buffering body
+      case Unblock                  => log debug "Asked to unblock" //TODO HotSwap back and flush the buffer
+    }
+
+    def members = remotes.values.toList
+
+    override def shutdown = {
+      remotes = Map()
+      channel.close
+    }
+}
\ No newline at end of file
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 97d7a86a0f..04c8f37128 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -37,6 +37,10 @@
     zlib-compression-level = 6  # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
 
     
+      
+        name = "default"
+        actor = "se.scalablesolutions.akka.cluster.JGroupsClusterActor"
+      
       service = on
       hostname = "localhost"
       port = 9999

From 4cf7aaa002f8d61cbf66cab17468a8d472d3e730 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sat, 12 Dec 2009 00:36:38 +0100
Subject: [PATCH 09/22] Tidying some code

---
 akka-cluster/src/main/scala/Cluster.scala | 34 ++++++++++++++---------
 1 file changed, 21 insertions(+), 13 deletions(-)

diff --git a/akka-cluster/src/main/scala/Cluster.scala b/akka-cluster/src/main/scala/Cluster.scala
index 88987a79a6..91b5ac78f6 100644
--- a/akka-cluster/src/main/scala/Cluster.scala
+++ b/akka-cluster/src/main/scala/Cluster.scala
@@ -9,17 +9,18 @@ import se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress
 import scala.collection.immutable.{Map,HashMap,HashSet}
 import org.jgroups.util.Util
 import se.scalablesolutions.akka.nio.RemoteServer
-import se.scalablesolutions.akka.cluster.Cluster.{DeregisterLocalNode, RegisterLocalNode, Node}
+import se.scalablesolutions.akka.cluster.Cluster.Node
 
-abstract class ClusterActor(name : String) extends Actor {
-  // def clusterSpawn[T <: Actor](clazz : Class[T]) : T
-   def members : List[Node]
+trait Cluster {
+    def members : List[Node]
+    def name : String
+    def registerLocalNode(server : RemoteAddress)   : Unit
+    def deregisterLocalNode(server : RemoteAddress) : Unit
 }
 
-object Cluster {
+abstract class ClusterActor(val name : String) extends Actor with Cluster
 
-      case class RegisterLocalNode(server : RemoteAddress)
-      case class DeregisterLocalNode(server : RemoteAddress)
+object Cluster extends Cluster {
       case class Node(endpoints : List[RemoteAddress])
 
       lazy val impl : Option[ClusterActor] = {
@@ -30,9 +31,11 @@ object Cluster {
                        .asInstanceOf[ClusterActor]
         })
       }
-
-      //def registerLocalNode(server : RemoteAddress)   : Unit = impl.foreach(_ ! RegisterLocalNode(server))
-      //def deregisterLocalNode(server : RemoteAddress) : Unit = impl.foreach(_ ! DeregisterLocalNode(server))
+      
+      def name = impl.map(_.name).getOrElse("No cluster")
+      def members = impl.map(_.members).getOrElse(Nil)
+      def registerLocalNode(server : RemoteAddress)   : Unit = impl.map(_.registerLocalNode(server))
+      def deregisterLocalNode(server : RemoteAddress) : Unit = impl.map(_.deregisterLocalNode(server))
 }
 
 object JGroupsClusterActor {
@@ -42,12 +45,15 @@ object JGroupsClusterActor {
     case object Block
     case object Unblock
     case class Zombie(address : Address)
+    case class RegisterLocalNode(server : RemoteAddress)
+    case class DeregisterLocalNode(server : RemoteAddress)
 }
 
 class JGroupsClusterActor(name : String) extends ClusterActor(name)
 {
     import JGroupsClusterActor._
     import org.scala_tools.javautils.Implicits._
+    
     private var local   : Node              = Node(Nil)
     private var channel : JChannel          = null
     private var remotes : Map[Address,Node] = Map()
@@ -89,10 +95,10 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
            //Not present in the cluster anymore = presumably zombies
            //Nodes we have no prior knowledge existed = unknowns
            val members = Set[Address]() ++ v.getMembers.asScala
-           val zombies = Set[Address]() ++ remotes.keySet.filter( members contains _ )
+           val zombies = Set[Address]() ++ remotes.keySet -- members
            val unknown : Set[Address] = members -- remotes.keySet
 
-           //Tell the zombies and unknowns to provide papers and prematurely treat them as dead
+           //Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead
            broadcast(zombies ++ unknown, PapersPlease)
            remotes = remotes -- zombies
           }
@@ -114,7 +120,7 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
       case RegisterLocalNode(s)   => {
            log debug "RegisterLocalNode"+s
            local = Node(local.endpoints + s)
-           broadcast(Nil,Papers(local.endpoints))
+           broadcast(Papers(local.endpoints))
           }
       
       case Block                    => log debug "Asked to block" //TODO HotSwap to a buffering body
@@ -122,6 +128,8 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
     }
 
     def members = remotes.values.toList
+    def registerLocalNode(server : RemoteAddress)   : Unit = this ! RegisterLocalNode(server)
+    def deregisterLocalNode(server : RemoteAddress) : Unit = this ! DeregisterLocalNode(server)
 
     override def shutdown = {
       remotes = Map()

From 18b2945ac7b719a91ac50da23969ec9e10ee02df Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sat, 12 Dec 2009 00:39:55 +0100
Subject: [PATCH 10/22] Moved cluster into akka-actor

---
 .../src/main/scala/nio}/Cluster.scala         |  3 +-
 akka-cluster/pom.xml                          | 72 -------------------
 pom.xml                                       |  1 -
 3 files changed, 1 insertion(+), 75 deletions(-)
 rename {akka-cluster/src/main/scala => akka-actors/src/main/scala/nio}/Cluster.scala (98%)
 delete mode 100644 akka-cluster/pom.xml

diff --git a/akka-cluster/src/main/scala/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala
similarity index 98%
rename from akka-cluster/src/main/scala/Cluster.scala
rename to akka-actors/src/main/scala/nio/Cluster.scala
index 91b5ac78f6..7afd7b9d4d 100644
--- a/akka-cluster/src/main/scala/Cluster.scala
+++ b/akka-actors/src/main/scala/nio/Cluster.scala
@@ -1,7 +1,6 @@
-package se.scalablesolutions.akka.cluster
+package se.scalablesolutions.akka.actor
 
 import se.scalablesolutions.akka.Config.config
-import se.scalablesolutions.akka.actor.Actor
 import se.scalablesolutions.akka.util.Logging
 import org.jgroups.{JChannel,View,Address,Message,ExtendedMembershipListener,Receiver,SetStateEvent}
 import se.scalablesolutions.akka.serialization.Serializer.Protobuf
diff --git a/akka-cluster/pom.xml b/akka-cluster/pom.xml
deleted file mode 100644
index abf62fe317..0000000000
--- a/akka-cluster/pom.xml
+++ /dev/null
@@ -1,72 +0,0 @@
-
-  4.0.0
-
-  akka-cluster
-  Akka Cluster Module
-
-  jar
-
-  
-    akka
-    se.scalablesolutions.akka
-    0.6
-    ../pom.xml
-  
-
-  
-    
-      org.scala-lang
-      scala-library
-      ${scala.version}
-    
-    
-      org.scala-tools
-      javautils
-      2.7.4-0.1
-    
-    
-      akka-actors
-      ${project.groupId}
-      ${project.version}
-    
-    
-      akka-util
-      ${project.groupId}
-      ${project.version}
-    
-    
-      jgroups
-      jgroups
-      2.8.0.CR7
-    
-    
-      net.liftweb
-      lift-util
-      1.1-M6
-    
-
-    
-    
-      org.scalatest
-      scalatest
-      1.0
-      test
-    
-    
-      junit
-      junit
-      4.5
-      test
-    
-    
-      org.mockito
-      mockito-all
-      1.8.0
-      test
-    
-
-
-  
-
-
diff --git a/pom.xml b/pom.xml
index 03a88bb56e..f7cca33ba2 100755
--- a/pom.xml
+++ b/pom.xml
@@ -40,7 +40,6 @@
     akka-persistence
     akka-rest
     akka-camel
-    akka-cluster
     akka-amqp
     akka-security
     akka-kernel

From 1cd01e72c5a5c20c9f5872ba56d420265e913ad2 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sat, 12 Dec 2009 01:11:35 +0100
Subject: [PATCH 11/22] Moved Cluster to akka-actors

---
 akka-actors/pom.xml                          | 5 +++++
 akka-actors/src/main/scala/nio/Cluster.scala | 2 +-
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/akka-actors/pom.xml b/akka-actors/pom.xml
index b7372a1e6e..9b4767e7fa 100644
--- a/akka-actors/pom.xml
+++ b/akka-actors/pom.xml
@@ -67,6 +67,11 @@
       javautils
       2.7.4-0.1
     
+    
+      jgroups
+      jgroups
+      2.8.0.CR7
+    
 
     
     
diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala
index 7afd7b9d4d..185e380adf 100644
--- a/akka-actors/src/main/scala/nio/Cluster.scala
+++ b/akka-actors/src/main/scala/nio/Cluster.scala
@@ -8,7 +8,7 @@ import se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress
 import scala.collection.immutable.{Map,HashMap,HashSet}
 import org.jgroups.util.Util
 import se.scalablesolutions.akka.nio.RemoteServer
-import se.scalablesolutions.akka.cluster.Cluster.Node
+import se.scalablesolutions.akka.actor.Cluster.Node
 
 trait Cluster {
     def members : List[Node]

From 7ab1b309dc1bfb8ef4a80fb895f481ed5f4367d0 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sat, 12 Dec 2009 15:53:57 +0100
Subject: [PATCH 12/22] Tweaked logging

---
 akka-actors/src/main/scala/nio/Cluster.scala | 50 ++++++++++----------
 1 file changed, 25 insertions(+), 25 deletions(-)

diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala
index 185e380adf..be59219400 100644
--- a/akka-actors/src/main/scala/nio/Cluster.scala
+++ b/akka-actors/src/main/scala/nio/Cluster.scala
@@ -62,40 +62,40 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
       val me  = this
       channel = new JChannel {
         setReceiver(new Receiver with ExtendedMembershipListener {
-          def getState : Array[Byte]               = null
-          def setState(state : Array[Byte]) : Unit = ()
-          def receive(msg : Message)        : Unit = me ! msg
-          def viewAccepted(view : View)     : Unit = me ! view
-          def suspect(a : Address)          : Unit = me ! Zombie(a)
-          def block                         : Unit = me ! Block
-          def unblock                       : Unit = me ! Unblock
-        })
+                      def getState : Array[Byte]               = null
+                      def setState(state : Array[Byte]) : Unit = ()
+                      def receive(msg : Message)        : Unit = me ! msg
+                      def viewAccepted(view : View)     : Unit = me ! view
+                      def suspect(a : Address)          : Unit = me ! Zombie(a)
+                      def block                         : Unit = me ! Block
+                      def unblock                       : Unit = me ! Unblock
+                   })
       }
       channel connect name
     }
 
-    protected def serializer = Protobuf
+    protected def serializer = Protobuf //FIXME make this configurable
 
-    private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) : Unit = {
-        for(r <- recipients)
-            channel.send(new Message(r,null,serializer out msg))
+    private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) = {
+        recipients.foreach( to => channel.send(new Message(to,null,serializer out msg)))
     }
 
     private def broadcast[T <: AnyRef](msg : T) : Unit = channel.send(new Message(null,null,serializer out msg))
 
    override def receive = {
       case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead
+                          log info "Zombie: "+x
                           broadcast(x :: Nil,PapersPlease)
                           remotes = remotes - x
                         }
 
       case v : View  => {
-           log debug v.printDetails
+           log info v.printDetails
            //Not present in the cluster anymore = presumably zombies
            //Nodes we have no prior knowledge existed = unknowns
            val members = Set[Address]() ++ v.getMembers.asScala
            val zombies = Set[Address]() ++ remotes.keySet -- members
-           val unknown : Set[Address] = members -- remotes.keySet
+           val unknown = members -- remotes.keySet
 
            //Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead
            broadcast(zombies ++ unknown, PapersPlease)
@@ -106,27 +106,27 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
             ( serializer in(m.getRawBuffer,None) ) match {
                 case PapersPlease => broadcast(m.getSrc :: Nil,Papers(local.endpoints))
                 case Papers(x)    => remotes = remotes + (m.getSrc -> Node(x))
-                case unknown      => log debug unknown.toString
+                case unknown      => log info unknown.toString
             }
           }
 
-      case DeregisterLocalNode(s) => {
-           log debug "DeregisterLocalNode"+s
-           local = Node(local.endpoints - s)
-           broadcast(Papers(local.endpoints))
-          }
-
       case RegisterLocalNode(s)   => {
-           log debug "RegisterLocalNode"+s
+           log info "RegisterLocalNode: "+s
            local = Node(local.endpoints + s)
            broadcast(Papers(local.endpoints))
           }
+
+      case DeregisterLocalNode(s) => {
+           log info "DeregisterLocalNode: "+s
+           local = Node(local.endpoints - s)
+           broadcast(Papers(local.endpoints))
+          }
       
-      case Block                    => log debug "Asked to block" //TODO HotSwap to a buffering body
-      case Unblock                  => log debug "Asked to unblock" //TODO HotSwap back and flush the buffer
+      case Block                    => log info "Asked to block" //TODO HotSwap to a buffering body
+      case Unblock                  => log info "Asked to unblock" //TODO HotSwap back and flush the buffer
     }
 
-    def members = remotes.values.toList
+    def members = remotes.values.toList //FIXME We probably want to make this a !! InstalledRemotes
     def registerLocalNode(server : RemoteAddress)   : Unit = this ! RegisterLocalNode(server)
     def deregisterLocalNode(server : RemoteAddress) : Unit = this ! DeregisterLocalNode(server)
 

From 76ed3d6e55e3066249bb30fb1560665785608ca6 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sat, 12 Dec 2009 15:57:06 +0100
Subject: [PATCH 13/22] Hooked the clustering into RemoteServer

---
 akka-actors/src/main/scala/nio/Cluster.scala      | 4 ++--
 akka-actors/src/main/scala/nio/RemoteServer.scala | 2 ++
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala
index be59219400..aa8a90cf59 100644
--- a/akka-actors/src/main/scala/nio/Cluster.scala
+++ b/akka-actors/src/main/scala/nio/Cluster.scala
@@ -33,8 +33,8 @@ object Cluster extends Cluster {
       
       def name = impl.map(_.name).getOrElse("No cluster")
       def members = impl.map(_.members).getOrElse(Nil)
-      def registerLocalNode(server : RemoteAddress)   : Unit = impl.map(_.registerLocalNode(server))
-      def deregisterLocalNode(server : RemoteAddress) : Unit = impl.map(_.deregisterLocalNode(server))
+      def registerLocalNode(hostname : String, port : Int)   : Unit = impl.map(_.registerLocalNode(RemoteAddress(hostname,port)))
+      def deregisterLocalNode(hostname : String, port : Int) : Unit = impl.map(_.deregisterLocalNode(RemoteAddress(hostname,port)))
 }
 
 object JGroupsClusterActor {
diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala
index 5a542268c8..2f7cfced3f 100755
--- a/akka-actors/src/main/scala/nio/RemoteServer.scala
+++ b/akka-actors/src/main/scala/nio/RemoteServer.scala
@@ -109,11 +109,13 @@ class RemoteServer extends Logging {
       bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS)
       bootstrap.bind(new InetSocketAddress(hostname, port))
       isRunning = true
+      Cluster.registerLocalNode(hostname,port)
     }
   }
 
   def shutdown = {
     bootstrap.releaseExternalResources
+    Cluster.deregisterLocalNode(hostname,port)
   }
 }
 

From 5ec2ab1ecfa4f0be308c5dcd4a9ccc419af76e62 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sat, 12 Dec 2009 17:52:23 +0100
Subject: [PATCH 14/22] Working on one node anyways...

---
 akka-actors/src/main/scala/nio/Cluster.scala | 80 ++++++++++++--------
 config/akka-reference.conf                   |  8 +-
 2 files changed, 51 insertions(+), 37 deletions(-)

diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala
index aa8a90cf59..663b7785ad 100644
--- a/akka-actors/src/main/scala/nio/Cluster.scala
+++ b/akka-actors/src/main/scala/nio/Cluster.scala
@@ -1,20 +1,20 @@
-package se.scalablesolutions.akka.actor
+package se.scalablesolutions.akka.nio
 
 import se.scalablesolutions.akka.Config.config
 import se.scalablesolutions.akka.util.Logging
 import org.jgroups.{JChannel,View,Address,Message,ExtendedMembershipListener,Receiver,SetStateEvent}
-import se.scalablesolutions.akka.serialization.Serializer.Protobuf
-import se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress
+import se.scalablesolutions.akka.serialization.Serializer
+import se.scalablesolutions.akka.config.ScalaConfig._
 import scala.collection.immutable.{Map,HashMap,HashSet}
 import org.jgroups.util.Util
-import se.scalablesolutions.akka.nio.RemoteServer
-import se.scalablesolutions.akka.actor.Cluster.Node
+import se.scalablesolutions.akka.actor.{Init,SupervisorFactory,Actor}
+import se.scalablesolutions.akka.nio.Cluster.Node
 
 trait Cluster {
     def members : List[Node]
     def name : String
-    def registerLocalNode(server : RemoteAddress)   : Unit
-    def deregisterLocalNode(server : RemoteAddress) : Unit
+    def registerLocalNode(hostname : String, port : Int)   : Unit
+    def deregisterLocalNode(hostname : String, port : Int) : Unit
 }
 
 abstract class ClusterActor(val name : String) extends Actor with Cluster
@@ -24,17 +24,26 @@ object Cluster extends Cluster {
 
       lazy val impl : Option[ClusterActor] = {
         config.getString("akka.remote.cluster.actor") map ( name => {
-             Class.forName(name)
-                       .getDeclaredConstructor(Array(classOf[String]): _*)
-                       .newInstance(config.getString("akka.remote.cluster.name") getOrElse "default")
-                       .asInstanceOf[ClusterActor]
+             val actor = Class.forName(name)
+                              .getDeclaredConstructor(Array(classOf[String]): _*)
+                              .newInstance(config.getString("akka.remote.cluster.name") getOrElse "default")
+                              .asInstanceOf[ClusterActor]
+
+            SupervisorFactory(
+                    SupervisorConfig(
+                           RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
+                                Supervise(actor, LifeCycle(Permanent)):: Nil
+                          )
+                     ).newInstance.start
+            actor !! Init(None)
+            actor
         })
       }
       
       def name = impl.map(_.name).getOrElse("No cluster")
       def members = impl.map(_.members).getOrElse(Nil)
-      def registerLocalNode(hostname : String, port : Int)   : Unit = impl.map(_.registerLocalNode(RemoteAddress(hostname,port)))
-      def deregisterLocalNode(hostname : String, port : Int) : Unit = impl.map(_.deregisterLocalNode(RemoteAddress(hostname,port)))
+      def registerLocalNode(hostname : String, port : Int)   : Unit = impl.map(_.registerLocalNode(hostname,port))
+      def deregisterLocalNode(hostname : String, port : Int) : Unit = impl.map(_.deregisterLocalNode(hostname,port))
 }
 
 object JGroupsClusterActor {
@@ -54,13 +63,14 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
     import org.scala_tools.javautils.Implicits._
     
     private var local   : Node              = Node(Nil)
-    private var channel : JChannel          = null
+    private var channel : Option[JChannel]  = None
     private var remotes : Map[Address,Node] = Map()
   
     override def init(config : AnyRef) = {
+      log info "Initiating cluster actor"
       remotes = new HashMap[Address,Node]
       val me  = this
-      channel = new JChannel {
+      channel = Some(new JChannel {
         setReceiver(new Receiver with ExtendedMembershipListener {
                       def getState : Array[Byte]               = null
                       def setState(state : Array[Byte]) : Unit = ()
@@ -70,17 +80,20 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
                       def block                         : Unit = me ! Block
                       def unblock                       : Unit = me ! Unblock
                    })
-      }
-      channel connect name
+      })
+      channel.map(_.connect(name))
     }
 
-    protected def serializer = Protobuf //FIXME make this configurable
+    protected def serializer = Serializer.Java //FIXME make this configurable
+    def members = remotes.values.toList //FIXME We probably want to make this a !! InstalledRemotes
+    def registerLocalNode(hostname : String, port : Int)   : Unit = this ! RegisterLocalNode(RemoteAddress(hostname,port))
+    def deregisterLocalNode(hostname : String, port : Int) : Unit = this ! DeregisterLocalNode(RemoteAddress(hostname,port))
 
-    private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) = {
-        recipients.foreach( to => channel.send(new Message(to,null,serializer out msg)))
-    }
+    private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) : Unit =
+        for(c <- channel; r <- recipients) c.send(new Message(r,null,serializer out msg))
 
-    private def broadcast[T <: AnyRef](msg : T) : Unit = channel.send(new Message(null,null,serializer out msg))
+    private def broadcast[T <: AnyRef](msg : T) : Unit =
+        channel.map( _.send(new Message(null,null,serializer out msg)))
 
    override def receive = {
       case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead
@@ -102,12 +115,16 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
            remotes = remotes -- zombies
           }
 
-      case m : Message if m.getSrc != channel.getAddress => {
-            ( serializer in(m.getRawBuffer,None) ) match {
-                case PapersPlease => broadcast(m.getSrc :: Nil,Papers(local.endpoints))
-                case Papers(x)    => remotes = remotes + (m.getSrc -> Node(x))
-                case unknown      => log info unknown.toString
-            }
+      case m : Message => {
+
+            if(m.getSrc != channel.map(_.getAddress).getOrElse(null))
+                ( serializer in(m.getRawBuffer,None) ) match {
+                    case PapersPlease => broadcast(m.getSrc :: Nil,Papers(local.endpoints))
+                    case Papers(x)    => remotes = remotes + (m.getSrc -> Node(x))
+                    case unknown      => log info unknown.toString
+                }
+            else
+                log info "Self-originating message: " + m
           }
 
       case RegisterLocalNode(s)   => {
@@ -126,12 +143,9 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
       case Unblock                  => log info "Asked to unblock" //TODO HotSwap back and flush the buffer
     }
 
-    def members = remotes.values.toList //FIXME We probably want to make this a !! InstalledRemotes
-    def registerLocalNode(server : RemoteAddress)   : Unit = this ! RegisterLocalNode(server)
-    def deregisterLocalNode(server : RemoteAddress) : Unit = this ! DeregisterLocalNode(server)
-
     override def shutdown = {
+      channel.map(_.close)
       remotes = Map()
-      channel.close
+      channel = None
     }
 }
\ No newline at end of file
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 04c8f37128..8c6601d25b 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -36,11 +36,11 @@
     compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
     zlib-compression-level = 6  # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
 
-    
-      
+	
         name = "default"
-        actor = "se.scalablesolutions.akka.cluster.JGroupsClusterActor"
-      
+        actor = "se.scalablesolutions.akka.nio.JGroupsClusterActor"
+    
+      
       service = on
       hostname = "localhost"
       port = 9999

From c6455cbaddc8a9127a6c03c694fac5575f1f0141 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sun, 13 Dec 2009 15:40:01 +0100
Subject: [PATCH 15/22] Sprinkling extra output for debugging

---
 akka-actors/src/main/scala/nio/Cluster.scala | 50 +++++++++++++++-----
 config/akka-reference.conf                   |  5 +-
 2 files changed, 41 insertions(+), 14 deletions(-)

diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala
index 663b7785ad..15a941d1b0 100644
--- a/akka-actors/src/main/scala/nio/Cluster.scala
+++ b/akka-actors/src/main/scala/nio/Cluster.scala
@@ -7,20 +7,22 @@ import se.scalablesolutions.akka.serialization.Serializer
 import se.scalablesolutions.akka.config.ScalaConfig._
 import scala.collection.immutable.{Map,HashMap,HashSet}
 import org.jgroups.util.Util
-import se.scalablesolutions.akka.actor.{Init,SupervisorFactory,Actor}
-import se.scalablesolutions.akka.nio.Cluster.Node
+import se.scalablesolutions.akka.actor.{Init,SupervisorFactory,Actor,ActorRegistry}
+import se.scalablesolutions.akka.nio.Cluster.{Node,RelayedMessage}
 
 trait Cluster {
     def members : List[Node]
     def name : String
     def registerLocalNode(hostname : String, port : Int)   : Unit
     def deregisterLocalNode(hostname : String, port : Int) : Unit
+    def relayMessage(to : Class[_ <: Actor],msg : AnyRef) : Unit
 }
 
 abstract class ClusterActor(val name : String) extends Actor with Cluster
 
 object Cluster extends Cluster {
       case class Node(endpoints : List[RemoteAddress])
+      case class RelayedMessage(actorClass : Class[_ <: Actor],msg : AnyRef)
 
       lazy val impl : Option[ClusterActor] = {
         config.getString("akka.remote.cluster.actor") map ( name => {
@@ -35,7 +37,7 @@ object Cluster extends Cluster {
                                 Supervise(actor, LifeCycle(Permanent)):: Nil
                           )
                      ).newInstance.start
-            actor !! Init(None)
+            actor !! Init(None) // FIXME for some reason the actor isn't init:ed here
             actor
         })
       }
@@ -44,6 +46,7 @@ object Cluster extends Cluster {
       def members = impl.map(_.members).getOrElse(Nil)
       def registerLocalNode(hostname : String, port : Int)   : Unit = impl.map(_.registerLocalNode(hostname,port))
       def deregisterLocalNode(hostname : String, port : Int) : Unit = impl.map(_.deregisterLocalNode(hostname,port))
+      def relayMessage(to : Class[_ <: Actor],msg : AnyRef)  : Unit = impl.map(_.send(to,msg))
 }
 
 object JGroupsClusterActor {
@@ -86,9 +89,11 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
 
     protected def serializer = Serializer.Java //FIXME make this configurable
     def members = remotes.values.toList //FIXME We probably want to make this a !! InstalledRemotes
+
     def registerLocalNode(hostname : String, port : Int)   : Unit = this ! RegisterLocalNode(RemoteAddress(hostname,port))
     def deregisterLocalNode(hostname : String, port : Int) : Unit = this ! DeregisterLocalNode(RemoteAddress(hostname,port))
-
+    def relayMessage(to : Class[_ <: Actor],msg : AnyRef)  : Unit = this ! RelayedMessage(to,msg)
+    
     private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) : Unit =
         for(c <- channel; r <- recipients) c.send(new Message(r,null,serializer out msg))
 
@@ -110,21 +115,42 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
            val zombies = Set[Address]() ++ remotes.keySet -- members
            val unknown = members -- remotes.keySet
 
+           log info "Updating view, zombies: " + zombies
+           log info "             , unknown: " + unknown
+           log info "             , members: " + members
+           log info "             ,   known: " + remotes.keySet
+
            //Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead
            broadcast(zombies ++ unknown, PapersPlease)
            remotes = remotes -- zombies
           }
 
       case m : Message => {
-
-            if(m.getSrc != channel.map(_.getAddress).getOrElse(null))
-                ( serializer in(m.getRawBuffer,None) ) match {
-                    case PapersPlease => broadcast(m.getSrc :: Nil,Papers(local.endpoints))
-                    case Papers(x)    => remotes = remotes + (m.getSrc -> Node(x))
-                    case unknown      => log info unknown.toString
+            val payload = serializer in(m.getRawBuffer,None)
+            if(m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) //handle non-own messages only, and only if we're connected
+                payload match {
+                    case PapersPlease        => {
+                            log info "Asked for papers by " + m.getSrc
+                            broadcast(m.getSrc :: Nil,Papers(local.endpoints))
+                        }
+                    case Papers(x)           => {
+                            log info "Got papers from " + m.getSrc
+                            remotes = remotes + (m.getSrc -> Node(x))
+                            log info "Installed nodes: " + remotes.keySet
+                        }
+                    case RelayedMessage(c,m) => {
+                            log info "Relaying [" + m + "] to ["+c.getName+"]"
+                            ActorRegistry.actorsFor(c).firstOption.map(_ ! m)
+                        }
+                    case unknown             => log info "Unknown message: "+unknown.toString
                 }
-            else
-                log info "Self-originating message: " + m
+            //else
+            //    log info "Self-originating message: " + m + " msg: " + payload
+          }
+
+      case rm@RelayedMessage => {
+            log info "Relaying message: " + rm
+            broadcast(rm)
           }
 
       case RegisterLocalNode(s)   => {
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 8c6601d25b..381fc55496 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -40,10 +40,11 @@
         name = "default"
         actor = "se.scalablesolutions.akka.nio.JGroupsClusterActor"
     
+    
       
       service = on
       hostname = "localhost"
-      port = 9999
+      port = 9991
       connection-timeout = 1000 # in millis (1 sec default)
     
 
@@ -56,7 +57,7 @@
   
     service = on
     hostname = "localhost"
-    port = 9998
+    port = 9992
     filters = "[se.scalablesolutions.akka.security.AkkaSecurityFilterFactory]"              # List with all jersey filters to use
     authenticator = "se.scalablesolutions.akka.security.samples.BasicAuthenticationService" # The authentication service to use
   

From d3e7e5bc19889bdb32dd11e316598a175bfd22d9 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sun, 13 Dec 2009 15:40:32 +0100
Subject: [PATCH 16/22] Ack, fixing the conf

---
 config/akka-reference.conf | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 381fc55496..85ee85740d 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -44,7 +44,7 @@
       
       service = on
       hostname = "localhost"
-      port = 9991
+      port = 9999
       connection-timeout = 1000 # in millis (1 sec default)
     
 
@@ -57,7 +57,7 @@
   
     service = on
     hostname = "localhost"
-    port = 9992
+    port = 9998
     filters = "[se.scalablesolutions.akka.security.AkkaSecurityFilterFactory]"              # List with all jersey filters to use
     authenticator = "se.scalablesolutions.akka.security.samples.BasicAuthenticationService" # The authentication service to use
   

From 6bb993ddda84b6f520ed19d86c332374d40bbddc Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sun, 13 Dec 2009 15:44:15 +0100
Subject: [PATCH 17/22]  Added additional logging and did some slight tweaks.

---
 akka-actors/src/main/scala/nio/Cluster.scala | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala
index 15a941d1b0..9df357f8b1 100644
--- a/akka-actors/src/main/scala/nio/Cluster.scala
+++ b/akka-actors/src/main/scala/nio/Cluster.scala
@@ -126,9 +126,8 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
           }
 
       case m : Message => {
-            val payload = serializer in(m.getRawBuffer,None)
             if(m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) //handle non-own messages only, and only if we're connected
-                payload match {
+                (serializer in(m.getRawBuffer,None)) match {
                     case PapersPlease        => {
                             log info "Asked for papers by " + m.getSrc
                             broadcast(m.getSrc :: Nil,Papers(local.endpoints))
@@ -144,8 +143,6 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
                         }
                     case unknown             => log info "Unknown message: "+unknown.toString
                 }
-            //else
-            //    log info "Self-originating message: " + m + " msg: " + payload
           }
 
       case rm@RelayedMessage => {
@@ -165,11 +162,12 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
            broadcast(Papers(local.endpoints))
           }
       
-      case Block                    => log info "Asked to block" //TODO HotSwap to a buffering body
-      case Unblock                  => log info "Asked to unblock" //TODO HotSwap back and flush the buffer
+      case Block                    => log info "UNSUPPORTED: block" //TODO HotSwap to a buffering body
+      case Unblock                  => log info "UNSUPPORTED: unblock" //TODO HotSwap back and flush the buffer
     }
 
     override def shutdown = {
+      log info "Shutting down "+this.getClass.getName
       channel.map(_.close)
       remotes = Map()
       channel = None

From 3fcbd8f43c734d93f13728b7ea97e93ba3bce15a Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sun, 13 Dec 2009 16:04:04 +0100
Subject: [PATCH 18/22] Excluding self node from member list

---
 akka-actors/src/main/scala/nio/Cluster.scala | 2 +-
 pom.xml                                      | 6 ++----
 2 files changed, 3 insertions(+), 5 deletions(-)

diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala
index 9df357f8b1..1f91329c0a 100644
--- a/akka-actors/src/main/scala/nio/Cluster.scala
+++ b/akka-actors/src/main/scala/nio/Cluster.scala
@@ -111,7 +111,7 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
            log info v.printDetails
            //Not present in the cluster anymore = presumably zombies
            //Nodes we have no prior knowledge existed = unknowns
-           val members = Set[Address]() ++ v.getMembers.asScala
+           val members = Set[Address]() ++ v.getMembers.asScala - channel.get.getAddress //Exclude ourselves
            val zombies = Set[Address]() ++ remotes.keySet -- members
            val unknown = members -- remotes.keySet
 
diff --git a/pom.xml b/pom.xml
index 0d90ca56c0..7c8a069125 100755
--- a/pom.xml
+++ b/pom.xml
@@ -38,7 +38,6 @@
     akka-util
     akka-actors
     akka-persistence
-    akka-cluster
     akka-rest
     akka-camel
     akka-amqp
@@ -81,9 +80,8 @@
       +1
       viktor.klang [REMOVE] AT gmail DOT com
       
-        Crazy hermit
-        Tinkerer
-        Visionary
+      	Sourceror
+        Ninja-Zombie
       
     
   

From 3d72244009b52b44bd11704dac4672efddd4258c Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sun, 13 Dec 2009 16:39:10 +0100
Subject: [PATCH 19/22] Adding more comments

---
 akka-actors/src/main/scala/nio/Cluster.scala | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala
index 1f91329c0a..50e313bb33 100644
--- a/akka-actors/src/main/scala/nio/Cluster.scala
+++ b/akka-actors/src/main/scala/nio/Cluster.scala
@@ -10,6 +10,9 @@ import org.jgroups.util.Util
 import se.scalablesolutions.akka.actor.{Init,SupervisorFactory,Actor,ActorRegistry}
 import se.scalablesolutions.akka.nio.Cluster.{Node,RelayedMessage}
 
+/**
+ Interface for interacting with the cluster
+**/
 trait Cluster {
     def members : List[Node]
     def name : String
@@ -18,8 +21,16 @@ trait Cluster {
     def relayMessage(to : Class[_ <: Actor],msg : AnyRef) : Unit
 }
 
+/**
+ Extend this class (you have to provide the same signature constructor i.e. XX(name : String) så we can construct it
+ Perhaps we'll change this to use ActorRegistry for lookup an instance instead.
+**/
 abstract class ClusterActor(val name : String) extends Actor with Cluster
 
+/**
+ A singleton representing the Cluster
+ Loads a specified ClusterActor and delegates to that instance
+**/
 object Cluster extends Cluster {
       case class Node(endpoints : List[RemoteAddress])
       case class RelayedMessage(actorClass : Class[_ <: Actor],msg : AnyRef)
@@ -49,6 +60,9 @@ object Cluster extends Cluster {
       def relayMessage(to : Class[_ <: Actor],msg : AnyRef)  : Unit = impl.map(_.send(to,msg))
 }
 
+/**
+ Just a placeholder for the JGroupsClusterActor message types
+**/
 object JGroupsClusterActor {
     //Message types
     case object PapersPlease
@@ -60,6 +74,9 @@ object JGroupsClusterActor {
     case class DeregisterLocalNode(server : RemoteAddress)
 }
 
+/**
+ Clustering support via JGroups
+**/
 class JGroupsClusterActor(name : String) extends ClusterActor(name)
 {
     import JGroupsClusterActor._

From e8ace9e1ac00359d14e089e71692f90633191695 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sun, 13 Dec 2009 19:25:47 +0100
Subject: [PATCH 20/22] Minor tweaks

---
 akka-actors/src/main/scala/nio/Cluster.scala | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala
index 50e313bb33..54dc85dd8e 100644
--- a/akka-actors/src/main/scala/nio/Cluster.scala
+++ b/akka-actors/src/main/scala/nio/Cluster.scala
@@ -22,10 +22,12 @@ trait Cluster {
 }
 
 /**
- Extend this class (you have to provide the same signature constructor i.e. XX(name : String) så we can construct it
- Perhaps we'll change this to use ActorRegistry for lookup an instance instead.
+ Baseclass for cluster implementations
 **/
-abstract class ClusterActor(val name : String) extends Actor with Cluster
+abstract class ClusterActor extends Actor with Cluster
+{
+    val name = config.getString("akka.remote.cluster.name") getOrElse "default"
+}
 
 /**
  A singleton representing the Cluster
@@ -38,8 +40,7 @@ object Cluster extends Cluster {
       lazy val impl : Option[ClusterActor] = {
         config.getString("akka.remote.cluster.actor") map ( name => {
              val actor = Class.forName(name)
-                              .getDeclaredConstructor(Array(classOf[String]): _*)
-                              .newInstance(config.getString("akka.remote.cluster.name") getOrElse "default")
+                              .newInstance
                               .asInstanceOf[ClusterActor]
 
             SupervisorFactory(
@@ -48,7 +49,7 @@ object Cluster extends Cluster {
                                 Supervise(actor, LifeCycle(Permanent)):: Nil
                           )
                      ).newInstance.start
-            actor !! Init(None) // FIXME for some reason the actor isn't init:ed here
+            actor !! Init(None) // FIXME for some reason the actor isn't init:ed unless we explicitly send it this Init message
             actor
         })
       }
@@ -77,7 +78,7 @@ object JGroupsClusterActor {
 /**
  Clustering support via JGroups
 **/
-class JGroupsClusterActor(name : String) extends ClusterActor(name)
+class JGroupsClusterActor extends ClusterActor
 {
     import JGroupsClusterActor._
     import org.scala_tools.javautils.Implicits._
@@ -90,6 +91,7 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
       log info "Initiating cluster actor"
       remotes = new HashMap[Address,Node]
       val me  = this
+      //Set up the JGroups local endpoint
       channel = Some(new JChannel {
         setReceiver(new Receiver with ExtendedMembershipListener {
                       def getState : Array[Byte]               = null
@@ -150,7 +152,7 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
                             broadcast(m.getSrc :: Nil,Papers(local.endpoints))
                         }
                     case Papers(x)           => {
-                            log info "Got papers from " + m.getSrc
+                            log info "Got papers from " + m.getSrc + " = " + x
                             remotes = remotes + (m.getSrc -> Node(x))
                             log info "Installed nodes: " + remotes.keySet
                         }

From 5d9a8c52255ed8ab8b75391fde37505d10d923be Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sun, 13 Dec 2009 20:01:33 +0100
Subject: [PATCH 21/22] Updated to latest Atmosphere API

---
 akka-actors/src/main/scala/nio/Cluster.scala | 2 +-
 akka-kernel/src/main/scala/AkkaServlet.scala | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala
index 54dc85dd8e..55d4970f48 100644
--- a/akka-actors/src/main/scala/nio/Cluster.scala
+++ b/akka-actors/src/main/scala/nio/Cluster.scala
@@ -49,7 +49,7 @@ object Cluster extends Cluster {
                                 Supervise(actor, LifeCycle(Permanent)):: Nil
                           )
                      ).newInstance.start
-            actor !! Init(None) // FIXME for some reason the actor isn't init:ed unless we explicitly send it this Init message
+            //actor !! Init(None) // FIXME for some reason the actor isn't init:ed unless we explicitly send it this Init message
             actor
         })
       }
diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-kernel/src/main/scala/AkkaServlet.scala
index 32a9b18295..ae69fc21a7 100755
--- a/akka-kernel/src/main/scala/AkkaServlet.scala
+++ b/akka-kernel/src/main/scala/AkkaServlet.scala
@@ -18,7 +18,7 @@ import javax.servlet.{ServletConfig}
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
 import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver}
-import org.atmosphere.container.{GrizzlyCometSupport,GlassFishv3CometSupport}
+import org.atmosphere.container.{GrizzlyCometSupport}
 import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler}
 import org.atmosphere.jersey.JerseyBroadcaster
 
@@ -85,9 +85,9 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging
       new DefaultCometSupportResolver(config) {
          type CS = CometSupport[_ <: AtmosphereResource[_,_]]
          override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = {
-             available.asScala.filter(c => c != classOf[GrizzlyCometSupport] && c != classOf[GlassFishv3CometSupport]).toList match {
+             available.asScala.filter(_ != classOf[GrizzlyCometSupport]).toList match {
                  case Nil      => new GrizzlyCometSupport(config)
-                 case x :: Nil => newCometSupport(x)
+                 case (x:AnyRef) :: Nil => newCometSupport(x)
                  case _        => super.resolveMultipleNativeSupportConflict(available)
              }
         }

From 1da198db0c6e23a8f7e17a897839b7cfffa2c060 Mon Sep 17 00:00:00 2001
From: Viktor Klang 
Date: Sun, 13 Dec 2009 20:27:22 +0100
Subject: [PATCH 22/22] A better solution for comet conflict resolve

---
 akka-kernel/src/main/scala/AkkaServlet.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-kernel/src/main/scala/AkkaServlet.scala
index ae69fc21a7..8c9fdba4c2 100755
--- a/akka-kernel/src/main/scala/AkkaServlet.scala
+++ b/akka-kernel/src/main/scala/AkkaServlet.scala
@@ -87,7 +87,7 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging
          override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = {
              available.asScala.filter(_ != classOf[GrizzlyCometSupport]).toList match {
                  case Nil      => new GrizzlyCometSupport(config)
-                 case (x:AnyRef) :: Nil => newCometSupport(x)
+                 case x :: Nil => newCometSupport(x.asInstanceOf[Class[_ <: CS]])
                  case _        => super.resolveMultipleNativeSupportConflict(available)
              }
         }