diff --git a/.history b/.history new file mode 100644 index 0000000000..209db6b195 --- /dev/null +++ b/.history @@ -0,0 +1,2 @@ +update +reload diff --git a/akka-actor-tests/src/main/scala/akka/testing/Serializers.scala b/akka-actor-tests/src/main/scala/akka/testing/Serializers.scala index 6412619963..de7d0924ea 100644 --- a/akka-actor-tests/src/main/scala/akka/testing/Serializers.scala +++ b/akka-actor-tests/src/main/scala/akka/testing/Serializers.scala @@ -14,7 +14,7 @@ import sjson.json._ class ProtobufSerializer extends Serializer { val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) - def identifier = 2:Byte + def identifier = 2: Byte def toBinary(obj: AnyRef): Array[Byte] = { if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException( @@ -33,7 +33,7 @@ object ProtobufSerializer extends ProtobufSerializer class JavaJSONSerializer extends Serializer { private val mapper = new ObjectMapper - def identifier = 3:Byte + def identifier = 3: Byte def toBinary(obj: AnyRef): Array[Byte] = { val bos = new ByteArrayOutputStream @@ -58,7 +58,7 @@ object JavaJSONSerializer extends JavaJSONSerializer class SJSONSerializer extends Serializer { - def identifier = 4:Byte + def identifier = 4: Byte def toBinary(obj: AnyRef): Array[Byte] = sjson.json.Serializer.SJSON.out(obj) diff --git a/akka-actor-tests/src/test/scala/akka/AkkaExceptionSpec.scala b/akka-actor-tests/src/test/scala/akka/AkkaExceptionSpec.scala index ac0b2b75cf..5e51e2366a 100644 --- a/akka-actor-tests/src/test/scala/akka/AkkaExceptionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/AkkaExceptionSpec.scala @@ -22,7 +22,7 @@ class AkkaExceptionSpec extends WordSpec with MustMatchers { } } - def verify(clazz:java.lang.Class[_]):Unit = { + def verify(clazz: java.lang.Class[_]): Unit = { clazz.getConstructor(Array(classOf[String]): _*) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/Bench.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/Bench.scala index 315798cc19..9ac58d284a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/Bench.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/Bench.scala @@ -91,7 +91,7 @@ object Chameneos { self.stop() } - case msg@Meet(a, c) ⇒ + case msg @ Meet(a, c) ⇒ if (n > 0) { waitingChameneo match { case Some(chameneo) ⇒ diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala index 423098a49a..b2e41a02c7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala @@ -18,6 +18,7 @@ class DeployerSpec extends WordSpec with MustMatchers { deployment must equal(Some( Deploy( "service-ping", + None, LeastCPU, Clustered( Vector(Node("node1")), diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala index c39d51f6f3..3596c96b40 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala @@ -14,7 +14,7 @@ import org.scalatest.matchers.MustMatchers class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { import Ticket669Spec._ - override def beforeAll = Thread.interrupted() //remove interrupted status. + override def beforeAll = Thread.interrupted() //remove interrupted status. override def afterAll = { Actor.registry.local.shutdownAll diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala index 327d6dda10..3fa4d16f2e 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -9,12 +9,12 @@ import akka.testkit.Testing import akka.dispatch._ import akka.actor.Actor._ import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} +import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit } import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor import akka.util.Switch -import akka.actor.{ActorKilledException, PoisonPill, ActorRef, Actor} +import akka.actor.{ ActorKilledException, PoisonPill, ActorRef, Actor } import java.rmi.RemoteException -import org.junit.{After, Test} +import org.junit.{ After, Test } object ActorModelSpec { @@ -46,7 +46,6 @@ object ActorModelSpec { case class ThrowException(e: Throwable) extends ActorModelMessage - val Ping = "Ping" val Pong = "Pong" @@ -68,19 +67,19 @@ object ActorModelSpec { } def receive = { - case Await(latch) ⇒ ack; latch.await(); busy.switchOff() - case Meet(sign, wait) ⇒ ack; sign.countDown(); wait.await(); busy.switchOff() - case Wait(time) ⇒ ack; Thread.sleep(time); busy.switchOff() - case WaitAck(time, l) ⇒ ack; Thread.sleep(time); l.countDown(); busy.switchOff() - case Reply(msg) ⇒ ack; self.reply(msg); busy.switchOff() - case TryReply(msg) ⇒ ack; self.tryReply(msg); busy.switchOff() - case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff() - case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff() - case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff() - case CountDownNStop(l) ⇒ ack; l.countDown(); self.stop(); busy.switchOff() - case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested") - case Interrupt => ack; busy.switchOff(); throw new InterruptedException("Ping!") - case ThrowException(e: Throwable) => ack; busy.switchOff(); throw e + case Await(latch) ⇒ ack; latch.await(); busy.switchOff() + case Meet(sign, wait) ⇒ ack; sign.countDown(); wait.await(); busy.switchOff() + case Wait(time) ⇒ ack; Thread.sleep(time); busy.switchOff() + case WaitAck(time, l) ⇒ ack; Thread.sleep(time); l.countDown(); busy.switchOff() + case Reply(msg) ⇒ ack; self.reply(msg); busy.switchOff() + case TryReply(msg) ⇒ ack; self.tryReply(msg); busy.switchOff() + case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff() + case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff() + case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff() + case CountDownNStop(l) ⇒ ack; l.countDown(); self.stop(); busy.switchOff() + case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested") + case Interrupt ⇒ ack; busy.switchOff(); throw new InterruptedException("Ping!") + case ThrowException(e: Throwable) ⇒ ack; busy.switchOff(); throw e } } @@ -368,9 +367,8 @@ abstract class ActorModelSpec extends JUnitSuite { def flood(num: Int) { val cachedMessage = CountDownNStop(new CountDownLatch(num)) - (1 to num) foreach { - _ ⇒ - newTestActor.start() ! cachedMessage + (1 to num) foreach { _ ⇒ + newTestActor.start() ! cachedMessage } assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns") } @@ -454,4 +452,3 @@ class BalancingDispatcherModelTest extends ActorModelSpec { new BalancingDispatcher("foo") with MessageDispatcherInterceptor } - diff --git a/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala index 7c6a4326a3..bd988aa190 100644 --- a/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala @@ -9,12 +9,12 @@ import Actor._ import akka.config.Supervision._ import org.multiverse.api.latches.StandardLatch import org.junit.{ Test, Before, After } -import java.util.concurrent.{ScheduledFuture, ConcurrentLinkedQueue, CountDownLatch, TimeUnit} +import java.util.concurrent.{ ScheduledFuture, ConcurrentLinkedQueue, CountDownLatch, TimeUnit } class SchedulerSpec extends JUnitSuite { private val futures = new ConcurrentLinkedQueue[ScheduledFuture[AnyRef]]() - def collectFuture(f: => ScheduledFuture[AnyRef]): ScheduledFuture[AnyRef] = { + def collectFuture(f: ⇒ ScheduledFuture[AnyRef]): ScheduledFuture[AnyRef] = { val future = f futures.add(future) future diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala index 869c186524..ae8e879421 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala @@ -34,7 +34,7 @@ class AkkaOrderReceiver(disp: Option[MessageDispatcher]) } def receive = { - case routing@MatchingEngineRouting(mapping) ⇒ + case routing @ MatchingEngineRouting(mapping) ⇒ refreshMatchingEnginePartitions(routing.asInstanceOf[MatchingEngineRouting[ActorRef]]) case order: Order ⇒ placeOrder(order) case unknown ⇒ EventHandler.warning(this, "Received unknown message: " + unknown) diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala index 0c8e5f0cb2..048606d322 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala @@ -12,7 +12,7 @@ import java.io.PrintWriter import java.text.SimpleDateFormat import java.util.Date -import scala.collection.mutable.{Map => MutableMap} +import scala.collection.mutable.{ Map ⇒ MutableMap } import akka.event.EventHandler diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 6654943886..8fd5d31bf0 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -109,7 +109,7 @@ class RoutingSpec extends WordSpec with MustMatchers { t1.dispatcher.suspend(t1) - for (i <- 1 to 2501) t1 ! i + for (i ← 1 to 2501) t1 ! i val t2 = actorOf(new Actor { def receive = { @@ -121,7 +121,7 @@ class RoutingSpec extends WordSpec with MustMatchers { val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) //Will pick the last with the smallest mailbox, so make sure t1 is last - for (i ← 1 to 2499 ) d ! i + for (i ← 1 to 2499) d ! i latch2.await(20 seconds) @@ -529,12 +529,12 @@ class RoutingSpec extends WordSpec with MustMatchers { def pressureThreshold = 1 def factory = actorOf(new Actor { if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")} + if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } def receive = { - case akka.Die ⇒ + case akka.Die ⇒ if (keepDying) deathCount.incrementAndGet throw new RuntimeException - case _ => pingCount.incrementAndGet + case _ ⇒ pingCount.incrementAndGet } }).start() }).start() @@ -554,12 +554,12 @@ class RoutingSpec extends WordSpec with MustMatchers { def factory = actorOf(new Actor { self.lifeCycle = Permanent if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")} + if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } def receive = { - case akka.Die ⇒ + case akka.Die ⇒ if (keepDying) deathCount.incrementAndGet throw new RuntimeException - case _ => pingCount.incrementAndGet + case _ ⇒ pingCount.incrementAndGet } }).start() }).start() @@ -579,12 +579,12 @@ class RoutingSpec extends WordSpec with MustMatchers { def factory = actorOf(new Actor { self.lifeCycle = Temporary if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")} + if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } def receive = { - case akka.Die ⇒ + case akka.Die ⇒ if (keepDying) deathCount.incrementAndGet throw new RuntimeException - case _ => pingCount.incrementAndGet + case _ ⇒ pingCount.incrementAndGet } }).start() }).start() @@ -598,12 +598,12 @@ class RoutingSpec extends WordSpec with MustMatchers { pool1 ! akka.Die sleepFor(2 seconds) (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (1) + pingCount.get must be(1) // default lifecycle // actor dies completely pingCount.set(0) - keepDying = true + keepDying = true pool1 ! "ping" (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) pool1 ! akka.Die @@ -611,35 +611,35 @@ class RoutingSpec extends WordSpec with MustMatchers { (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) pool1 ! "ping" (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (2) + pingCount.get must be(2) // permanent lifecycle // actor comes back right away pingCount.set(0) - keepDying = false + keepDying = false pool2 ! "ping" (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) pool2 ! akka.Die sleepFor(2 seconds) (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (1) + pingCount.get must be(1) // permanent lifecycle // actor dies completely pingCount.set(0) - keepDying = true + keepDying = true pool2 ! "ping" (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) pool2 ! akka.Die sleepFor(2 seconds) (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) pool2 ! "ping" - (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (2) + (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be(2) // temporary lifecycle pingCount.set(0) - keepDying = false + keepDying = false pool3 ! "ping" (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) pool3 ! akka.Die @@ -649,7 +649,7 @@ class RoutingSpec extends WordSpec with MustMatchers { pool3 ! "ping" pool3 ! "ping" (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (4) + pingCount.get must be(4) } "support customizable supervision config of pooled actors" in { @@ -678,19 +678,18 @@ class RoutingSpec extends WordSpec with MustMatchers { def pressureThreshold = 1 def factory = actorOf(new Actor { if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")} + if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } def receive = { - case BadState ⇒ + case BadState ⇒ if (keepDying) deathCount.incrementAndGet throw new IllegalStateException - case akka.Die => + case akka.Die ⇒ throw new RuntimeException - case _ => pingCount.incrementAndGet + case _ ⇒ pingCount.incrementAndGet } }).start() }).start() - // actor comes back right away pingCount.set(0) keepDying = false @@ -699,11 +698,11 @@ class RoutingSpec extends WordSpec with MustMatchers { pool1 ! BadState sleepFor(2 seconds) (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (1) + pingCount.get must be(1) // actor dies completely pingCount.set(0) - keepDying = true + keepDying = true pool1 ! "ping" (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) pool1 ! BadState @@ -711,7 +710,7 @@ class RoutingSpec extends WordSpec with MustMatchers { (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) pool1 ! "ping" (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be (2) + pingCount.get must be(2) // kill it intercept[RuntimeException](pool1.?(akka.Die).get) diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index 9fbc5fd7ac..78d74cfdc5 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -5,7 +5,7 @@ package akka.testkit import akka.actor.dispatch.ActorModelSpec import java.util.concurrent.CountDownLatch -import org.junit.{After, Test} +import org.junit.{ After, Test } class CallingThreadDispatcherModelSpec extends ActorModelSpec { import ActorModelSpec._ @@ -43,7 +43,6 @@ class CallingThreadDispatcherModelSpec extends ActorModelSpec { //Can't handle this... } - @After def after { //remove the interrupted status since we are messing with interrupted exceptions. diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala index 24831ff589..f623c3734f 100644 --- a/akka-actor/src/main/scala/akka/AkkaException.scala +++ b/akka-actor/src/main/scala/akka/AkkaException.scala @@ -6,7 +6,7 @@ package akka import akka.actor.newUuid import java.net.{ InetAddress, UnknownHostException } - + /** * Akka base Exception. Each Exception gets: *
reply(..) version.
*/
- def tryReply(message: Any): Boolean = channel.safe_!(message)(this)
+ def tryReply(message: Any): Boolean = channel.safe_!(message)(this)
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
@@ -444,7 +444,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
*
* @author Jonas Bonér
*/
-class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor, val address: String)
+class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, val address: String)
extends ActorRef with ScalaActorRef {
protected[akka] val guard = new ReentrantGuard
@@ -480,12 +480,12 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
"]")
private val serializer: Serializer =
- try { Serialization.serializerFor(this.getClass) } catch { case e: Exception => serializerErrorDueTo(e.toString)}
+ try { Serialization.serializerFor(this.getClass) } catch { case e: Exception ⇒ serializerErrorDueTo(e.toString) }
private lazy val replicationStorage: Option[TransactionLog] = {
import DeploymentConfig._
val replicationScheme = replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(Transient)
- if(isReplicated(replicationScheme)) {
+ if (isReplicated(replicationScheme)) {
if (isReplicatedWithTransactionLog(replicationScheme)) {
EventHandler.debug(this, "Creating a transaction log for Actor [%s] with replication strategy [%s]".format(address, replicationScheme))
@@ -503,16 +503,16 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
// used only for deserialization
private[akka] def this(
- __uuid: Uuid,
- __address: String,
- __timeout: Long,
- __receiveTimeout: Option[Long],
- __lifeCycle: LifeCycle,
- __supervisor: Option[ActorRef],
- __hotswap: Stack[PartialFunction[Any, Unit]],
- __factory: () ⇒ Actor) = {
+ __uuid: Uuid,
+ __address: String,
+ __timeout: Long,
+ __receiveTimeout: Option[Long],
+ __lifeCycle: LifeCycle,
+ __supervisor: Option[ActorRef],
+ __hotswap: Stack[PartialFunction[Any, Unit]],
+ __factory: () ⇒ Actor) = {
- this (__factory, __address)
+ this(__factory, __address)
_uuid = __uuid
timeout = __timeout
@@ -704,8 +704,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
throw e
case e ⇒
handleExceptionInDispatch(e, messageHandle.message)
- }
- finally {
+ } finally {
checkReceiveTimeout // Reschedule receive timeout
}
} catch {
@@ -810,8 +809,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
case e ⇒
EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString))
false // an error or exception here should trigger a retry
- }
- finally {
+ } finally {
currentMessage = null
}
@@ -852,7 +850,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
// ========= PRIVATE FUNCTIONS =========
private[this] def newActor: Actor = {
- import Actor.{actorRefInCreation ⇒ refStack}
+ import Actor.{ actorRefInCreation ⇒ refStack }
val stackBefore = refStack.get
refStack.set(stackBefore.push(this))
try {
@@ -876,7 +874,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
refStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) //pop null marker plus self
}
} match {
- case null ⇒ throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
+ case null ⇒ throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
case valid ⇒ valid
}
@@ -909,21 +907,20 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
private def notifySupervisorWithMessage(notification: LifeCycleMessage) {
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
- _supervisor.foreach {
- sup ⇒
- if (sup.isShutdown) {
- // if supervisor is shut down, game over for all linked actors
- //Scoped stop all linked actors, to avoid leaking the 'i' val
- {
- val i = _linkedActors.values.iterator
- while (i.hasNext) {
- i.next.stop()
- i.remove
- }
+ _supervisor.foreach { sup ⇒
+ if (sup.isShutdown) {
+ // if supervisor is shut down, game over for all linked actors
+ //Scoped stop all linked actors, to avoid leaking the 'i' val
+ {
+ val i = _linkedActors.values.iterator
+ while (i.hasNext) {
+ i.next.stop()
+ i.remove
}
- //Stop the actor itself
- stop
- } else sup ! notification // else notify supervisor
+ }
+ //Stop the actor itself
+ stop
+ } else sup ! notification // else notify supervisor
}
}
@@ -993,11 +990,11 @@ object RemoteActorSystemMessage {
*
* @author Jonas Bonér
*/
-private[akka] case class RemoteActorRef private[akka](
- val remoteAddress: InetSocketAddress,
- val address: String,
- _timeout: Long,
- loader: Option[ClassLoader])
+private[akka] case class RemoteActorRef private[akka] (
+ val remoteAddress: InetSocketAddress,
+ val address: String,
+ _timeout: Long,
+ loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef {
ClusterModule.ensureEnabled()
@@ -1009,7 +1006,7 @@ private[akka] case class RemoteActorRef private[akka](
def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
val chSender = channel match {
case ref: ActorRef ⇒ Some(ref)
- case _ ⇒ None
+ case _ ⇒ None
}
Actor.remote.send[Any](message, chSender, None, remoteAddress, timeout, true, this, loader)
}
@@ -1024,7 +1021,7 @@ private[akka] case class RemoteActorRef private[akka](
}
val chFuture = channel match {
case f: Promise[_] ⇒ Some(f.asInstanceOf[Promise[Any]])
- case _ ⇒ None
+ case _ ⇒ None
}
val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout.duration.toMillis, false, this, loader)
if (future.isDefined) ActorPromise(future.get)
@@ -1174,7 +1171,7 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel {
if (msg eq null) None
else msg.channel match {
case ref: ActorRef ⇒ Some(ref)
- case _ ⇒ None
+ case _ ⇒ None
}
}
@@ -1188,7 +1185,7 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel {
if (msg eq null) None
else msg.channel match {
case f: ActorPromise ⇒ Some(f)
- case _ ⇒ None
+ case _ ⇒ None
}
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
index e4fe387fcf..29da2fac04 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
@@ -32,8 +32,6 @@ case class ActorUnregistered(address: String, actor: ActorRef, typedActor: Optio
* @author Jonas Bonér
*/
private[actor] final class ActorRegistry private[actor] () extends ListenerManagement {
-
- //private val isClusterEnabled = ReflectiveAccess.isClusterEnabled
private val actorsByAddress = new ConcurrentHashMap[String, ActorRef]
private val actorsByUuid = new ConcurrentHashMap[Uuid, ActorRef]
private val typedActorsByUuid = new ConcurrentHashMap[Uuid, AnyRef]
diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala
index 3e98716722..55f1ab23a1 100644
--- a/akka-actor/src/main/scala/akka/actor/Deployer.scala
+++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala
@@ -10,67 +10,53 @@ import java.util.concurrent.ConcurrentHashMap
import akka.event.EventHandler
import akka.actor.DeploymentConfig._
-import akka.config.{ ConfigurationException, Config }
import akka.util.ReflectiveAccess._
import akka.AkkaException
+import akka.serialization.{ Serializer, Serialization }
+import akka.util.ReflectiveAccess
+import akka.config.{ Configuration, ConfigurationException, Config }
+
+trait ActorDeployer {
+ private[akka] def init(deployments: Seq[Deploy]): Unit
+ private[akka] def shutdown(): Unit //TODO Why should we have "shutdown", should be crash only?
+ private[akka] def deploy(deployment: Deploy): Unit
+ private[akka] def lookupDeploymentFor(address: String): Option[Deploy]
+ private[akka] def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_))
+}
/**
* Deployer maps actor deployments to actor addresses.
*
* @author Jonas Bonér
*/
-object Deployer {
+object Deployer extends ActorDeployer {
val defaultAddress = Host(Config.hostname)
- lazy val instance: ClusterModule.ClusterDeployer = {
- val deployer =
- if (ClusterModule.isEnabled) ClusterModule.clusterDeployer
- else LocalDeployer
+ lazy val instance: ActorDeployer = {
+ val deployer = if (ClusterModule.isEnabled) ClusterModule.clusterDeployer else LocalDeployer
deployer.init(deploymentsInConfig)
deployer
}
- def start() {
- instance.toString
- }
+ def start(): Unit = instance.toString //Force evaluation
- def shutdown() {
- instance.shutdown()
- }
+ private[akka] def init(deployments: Seq[Deploy]) = instance.init(deployments)
- def deploy(deployment: Deploy) {
- if (deployment eq null) throw new IllegalArgumentException("Deploy can not be null")
- val address = deployment.address
- Address.validate(address)
- instance.deploy(deployment)
- }
+ def shutdown(): Unit = instance.shutdown() //TODO Why should we have "shutdown", should be crash only?
- def deploy(deployment: Seq[Deploy]) {
- deployment foreach (deploy(_))
- }
-
- /**
- * Undeploy is idemponent. E.g. safe to invoke multiple times.
- */
- def undeploy(deployment: Deploy) {
- instance.undeploy(deployment)
- }
-
- def undeployAll() {
- instance.undeployAll()
- }
+ def deploy(deployment: Deploy): Unit = instance.deploy(deployment)
def isLocal(deployment: Deploy): Boolean = deployment match {
- case Deploy(_, _, Local) ⇒ true
- case _ ⇒ false
+ case Deploy(_, _, _, Local) | Deploy(_, _, _, _: Local) ⇒ true
+ case _ ⇒ false
}
- def isClustered(deployment: Deploy): Boolean = isLocal(deployment)
+ def isClustered(deployment: Deploy): Boolean = !isLocal(deployment)
- def isLocal(address: String): Boolean = isLocal(deploymentFor(address))
+ def isLocal(address: String): Boolean = isLocal(deploymentFor(address)) //TODO Should this throw exception if address not found?
- def isClustered(address: String): Boolean = !isLocal(address)
+ def isClustered(address: String): Boolean = !isLocal(address) //TODO Should this throw exception if address not found?
/**
* Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound.
@@ -87,15 +73,13 @@ object Deployer {
if (deployment_?.isDefined && (deployment_?.get ne null)) deployment_?
else {
-
- val newDeployment =
- try {
- lookupInConfig(address)
- } catch {
- case e: ConfigurationException ⇒
- EventHandler.error(e, this, e.getMessage)
- throw e
- }
+ val newDeployment = try {
+ lookupInConfig(address)
+ } catch {
+ case e: ConfigurationException ⇒
+ EventHandler.error(e, this, e.getMessage)
+ throw e
+ }
newDeployment foreach { d ⇒
if (d eq null) {
@@ -131,20 +115,21 @@ object Deployer {
/**
* Lookup deployment in 'akka.conf' configuration file.
*/
- private[akka] def lookupInConfig(address: String): Option[Deploy] = {
+ private[akka] def lookupInConfig(address: String, configuration: Configuration = Config.config): Option[Deploy] = {
+ import akka.util.ReflectiveAccess.{ createInstance, emptyArguments, emptyParams, getClassFor }
// --------------------------------
// akka.actor.deployment.
// --------------------------------
val addressPath = "akka.actor.deployment." + address
- Config.config.getSection(addressPath) match {
- case None ⇒ Some(Deploy(address, Direct, Local))
+ configuration.getSection(addressPath) match {
+ case None ⇒ Some(Deploy(address, None, Direct, Local))
case Some(addressConfig) ⇒
// --------------------------------
// akka.actor.deployment..router
// --------------------------------
- val router = addressConfig.getString("router", "direct") match {
+ val router: Routing = addressConfig.getString("router", "direct") match {
case "direct" ⇒ Direct
case "round-robin" ⇒ RoundRobin
case "random" ⇒ Random
@@ -152,14 +137,21 @@ object Deployer {
case "least-ram" ⇒ LeastRAM
case "least-messages" ⇒ LeastMessages
case customRouterClassName ⇒
- val customRouter = try {
- Class.forName(customRouterClassName).newInstance.asInstanceOf[AnyRef]
- } catch {
- case e ⇒ throw new ConfigurationException(
+ createInstance[AnyRef](customRouterClassName, emptyParams, emptyArguments).fold(
+ e ⇒ throw new ConfigurationException(
"Config option [" + addressPath + ".router] needs to be one of " +
- "[\"direct\", \"round-robin\", \"random\", \"least-cpu\", \"least-ram\", \"least-messages\" or FQN of router class]")
- }
- CustomRouter(customRouter)
+ "[\"direct\", \"round-robin\", \"random\", \"least-cpu\", \"least-ram\", \"least-messages\" or FQN of router class]", e),
+ CustomRouter(_))
+ }
+
+ val recipe: Option[ActorRecipe] = addressConfig.getSection("create-as") map { section ⇒
+ val implementationClass = section.getString("implementation-class") match {
+ case Some(impl) ⇒
+ getClassFor[Actor](impl).fold(e ⇒ throw new ConfigurationException("Config option [" + addressPath + ".create-as.implementation-class] load failed", e), identity)
+ case None ⇒ throw new ConfigurationException("Config option [" + addressPath + ".create-as.implementation-class] is missing")
+ }
+
+ ActorRecipe(implementationClass)
}
// --------------------------------
@@ -167,7 +159,7 @@ object Deployer {
// --------------------------------
addressConfig.getSection("clustered") match {
case None ⇒
- Some(Deploy(address, router, Local)) // deploy locally
+ Some(Deploy(address, recipe, router, Local)) // deploy locally
case Some(clusteredConfig) ⇒
@@ -227,7 +219,7 @@ object Deployer {
// --------------------------------
clusteredConfig.getSection("replication") match {
case None ⇒
- Some(Deploy(address, router, Clustered(preferredNodes, replicationFactor, Transient)))
+ Some(Deploy(address, recipe, router, Clustered(preferredNodes, replicationFactor, Transient)))
case Some(replicationConfig) ⇒
val storage = replicationConfig.getString("storage", "transaction-log") match {
@@ -246,17 +238,14 @@ object Deployer {
".clustered.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" +
unknown + "]")
}
- Some(Deploy(address, router, Clustered(preferredNodes, replicationFactor, Replication(storage, strategy))))
+ Some(Deploy(address, recipe, router, Clustered(preferredNodes, replicationFactor, Replication(storage, strategy))))
}
}
}
}
private[akka] def throwDeploymentBoundException(deployment: Deploy): Nothing = {
- val e = new DeploymentAlreadyBoundException(
- "Address [" + deployment.address +
- "] already bound to [" + deployment +
- "]. You have to invoke 'undeploy(deployment) first.")
+ val e = new DeploymentAlreadyBoundException("Address [" + deployment.address + "] already bound to [" + deployment + "]")
EventHandler.error(e, this, e.getMessage)
throw e
}
@@ -273,29 +262,30 @@ object Deployer {
*
* @author Jonas Bonér
*/
-object LocalDeployer {
+object LocalDeployer extends ActorDeployer {
private val deployments = new ConcurrentHashMap[String, Deploy]
- private[akka] def init(deployments: List[Deploy]) {
+ private[akka] def init(deployments: Seq[Deploy]) {
EventHandler.info(this, "Deploying actors locally [\n\t%s\n]" format deployments.mkString("\n\t"))
deployments foreach (deploy(_)) // deploy
}
private[akka] def shutdown() {
- undeployAll()
- deployments.clear()
+ deployments.clear() //TODO do something else/more?
}
private[akka] def deploy(deployment: Deploy) {
- if (deployments.putIfAbsent(deployment.address, deployment) != deployment) {
- //Deployer.throwDeploymentBoundException(deployment) // FIXME uncomment this and fix the issue with multiple deployments
- }
+ deployments.putIfAbsent(deployment.address, deployment) /* match {
+ case null ⇒
+ deployment match {
+ case Deploy(address, Some(recipe), routing, _) ⇒ Actor.actorOf(recipe.implementationClass, address).start() //FIXME use routing?
+ case _ ⇒
+ }
+ case `deployment` ⇒ //Already deployed TODO should it be like this?
+ case preexists ⇒ Deployer.throwDeploymentBoundException(deployment)
+ }*/
}
- private[akka] def undeploy(deployment: Deploy): Unit = deployments.remove(deployment.address)
-
- private[akka] def undeployAll(): Unit = deployments.clear()
-
private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = Option(deployments.get(address))
}
@@ -308,10 +298,8 @@ object Address {
private val validAddressPattern = java.util.regex.Pattern.compile("[0-9a-zA-Z\\-\\_\\$\\.]+")
def validate(address: String) {
- if (validAddressPattern.matcher(address).matches) true
- else {
- val e = new IllegalArgumentException(
- "Address [" + address + "] is not valid, need to follow pattern [0-9a-zA-Z\\-\\_\\$]+")
+ if (!validAddressPattern.matcher(address).matches) {
+ val e = new IllegalArgumentException("Address [" + address + "] is not valid, need to follow pattern: " + validAddressPattern.pattern)
EventHandler.error(e, this, e.getMessage)
throw e
}
diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala
index f5206b7668..c7dd94bbdb 100644
--- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala
+++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala
@@ -21,8 +21,16 @@ object DeploymentConfig {
// --------------------------------
case class Deploy(
address: String,
+ recipe: Option[ActorRecipe],
routing: Routing = Direct,
- scope: Scope = Local)
+ scope: Scope = Local) {
+ Address.validate(address)
+ }
+
+ // --------------------------------
+ // --- Actor Recipe
+ // --------------------------------
+ case class ActorRecipe(implementationClass: Class[_ <: Actor]) //TODO Add ActorConfiguration here
// --------------------------------
// --- Routing
@@ -133,8 +141,10 @@ object DeploymentConfig {
case Host("localhost") ⇒ Config.nodename
case IP("0.0.0.0") ⇒ Config.nodename
case IP("127.0.0.1") ⇒ Config.nodename
- case Host(hostname) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'hostname' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
- case IP(address) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
+ case Host(hostname) ⇒ throw new UnsupportedOperationException(
+ "Specifying preferred node name by 'hostname' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
+ case IP(address) ⇒ throw new UnsupportedOperationException(
+ "Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
}
def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home ⇒ nodeNameFor(home) == Config.nodename)
@@ -156,7 +166,7 @@ object DeploymentConfig {
}
def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match {
- case Deploy(_, _, Clustered(_, _, replicationScheme)) ⇒ Some(replicationScheme)
+ case Deploy(_, _, _, Clustered(_, _, replicationScheme)) ⇒ Some(replicationScheme)
case _ ⇒ None
}
diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala
index b911af4c1a..b3e9c96e9e 100644
--- a/akka-actor/src/main/scala/akka/actor/FSM.scala
+++ b/akka-actor/src/main/scala/akka/actor/FSM.scala
@@ -454,7 +454,7 @@ trait FSM[S, D] extends ListenerManagement {
if (generation == gen) {
processMsg(StateTimeout, "state timeout")
}
- case t@Timer(name, msg, repeat, generation) ⇒
+ case t @ Timer(name, msg, repeat, generation) ⇒
if ((timers contains name) && (timers(name).generation == generation)) {
processMsg(msg, t)
if (!repeat) {
diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala
index 95532e8fb7..1463c1b6cb 100644
--- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala
@@ -13,7 +13,7 @@ import java.util.concurrent.{ CopyOnWriteArrayList, ConcurrentHashMap }
import akka.config.Supervision._
class SupervisorException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) {
- def this(msg:String) = this(msg, null);
+ def this(msg: String) = this(msg, null);
}
/**
@@ -153,7 +153,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (Act
supervisor.link(actorRef)
if (ClusterModule.isEnabled && registerAsRemoteService)
Actor.remote.register(actorRef)
- case supervisorConfig@SupervisorConfig(_, _, _) ⇒ // recursive supervisor configuration
+ case supervisorConfig @ SupervisorConfig(_, _, _) ⇒ // recursive supervisor configuration
val childSupervisor = Supervisor(supervisorConfig)
supervisor.link(childSupervisor.supervisor)
_childSupervisors.add(childSupervisor)
@@ -179,7 +179,7 @@ final class SupervisorActor private[akka] (handler: FaultHandlingStrategy, maxRe
}
def receive = {
- case max@MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) ⇒ maxRestartsHandler(self, max)
+ case max @ MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) ⇒ maxRestartsHandler(self, max)
case unknown ⇒ throw new SupervisorException(
"SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]")
}
diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala
index cf15c7d520..41eba5cd41 100644
--- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala
+++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala
@@ -11,7 +11,7 @@ import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler,
import akka.util.{ Duration }
import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar }
import com.sun.xml.internal.ws.developer.MemberSubmissionAddressing.Validation
-import akka.serialization.{Serializer, Serialization}
+import akka.serialization.{ Serializer, Serialization }
//TODO Document this class, not only in Scaladoc, but also in a dedicated typed-actor.rst, for both java and scala
/**
@@ -90,12 +90,12 @@ object TypedActor {
} catch { case i: InvocationTargetException ⇒ throw i.getTargetException }
private def writeReplace(): AnyRef = parameters match {
- case null => SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null, null)
- case ps if ps.length == 0 => SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array[Serializer.Identifier](), Array[Array[Byte]]())
- case ps =>
+ case null ⇒ SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null, null)
+ case ps if ps.length == 0 ⇒ SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array[Serializer.Identifier](), Array[Array[Byte]]())
+ case ps ⇒
val serializers: Array[Serializer] = ps map Serialization.findSerializerFor
val serializedParameters: Array[Array[Byte]] = Array.ofDim[Array[Byte]](serializers.length)
- for(i <- 0 until serializers.length)
+ for (i ← 0 until serializers.length)
serializedParameters(i) = serializers(i) toBinary parameters(i) //Mutable for the sake of sanity
SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, serializers.map(_.identifier), serializedParameters)
@@ -110,11 +110,11 @@ object TypedActor {
//TODO Possible optimization is to special encode the parameter-types to conserve space
private def readResolve(): AnyRef = {
MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
- case null => null
- case a if a.length == 0 => Array[AnyRef]()
- case a =>
+ case null ⇒ null
+ case a if a.length == 0 ⇒ Array[AnyRef]()
+ case a ⇒
val deserializedParameters: Array[AnyRef] = Array.ofDim[AnyRef](a.length) //Mutable for the sake of sanity
- for(i <- 0 until a.length)
+ for (i ← 0 until a.length)
deserializedParameters(i) = Serialization.serializerByIdentity(serializerIdentifiers(i)).fromBinary(serializedParameters(i))
deserializedParameters
diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
index f59d2ff4ab..e36492879c 100644
--- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
+++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
@@ -16,8 +16,7 @@ import akka.AkkaException
import com.eaio.uuid.UUID
import java.net.InetSocketAddress
-import java.util.concurrent.{ ConcurrentSkipListSet}
-
+import java.util.concurrent.{ ConcurrentSkipListSet }
class ClusterException(message: String) extends AkkaException(message)
diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala
index 40f84a30d2..5a38f3cf2b 100644
--- a/akka-actor/src/main/scala/akka/config/Config.scala
+++ b/akka-actor/src/main/scala/akka/config/Config.scala
@@ -11,11 +11,11 @@ import java.net.InetAddress
import com.eaio.uuid.UUID
class ConfigurationException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
- def this(msg:String) = this(msg, null);
+ def this(msg: String) = this(msg, null);
}
-class ModuleNotAvailableException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
- def this(msg:String) = this(msg, null);
+class ModuleNotAvailableException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
+ def this(msg: String) = this(msg, null);
}
/**
diff --git a/akka-actor/src/main/scala/akka/config/ConfigParser.scala b/akka-actor/src/main/scala/akka/config/ConfigParser.scala
index 91f40df096..39b961a24d 100644
--- a/akka-actor/src/main/scala/akka/config/ConfigParser.scala
+++ b/akka-actor/src/main/scala/akka/config/ConfigParser.scala
@@ -66,9 +66,9 @@ class ConfigParser(var prefix: String = "", map: mutable.Map[String, Any] = muta
def parse(in: String): Map[String, Any] = {
parseAll(root, in) match {
- case Success(result, _) ⇒ map.toMap
- case x@Failure(msg, _) ⇒ throw new ConfigurationException(x.toString)
- case x@Error(msg, _) ⇒ throw new ConfigurationException(x.toString)
+ case Success(result, _) ⇒ map.toMap
+ case x @ Failure(msg, _) ⇒ throw new ConfigurationException(x.toString)
+ case x @ Error(msg, _) ⇒ throw new ConfigurationException(x.toString)
}
}
}
diff --git a/akka-actor/src/main/scala/akka/config/Configuration.scala b/akka-actor/src/main/scala/akka/config/Configuration.scala
index b32c4eeb72..32d7d79418 100644
--- a/akka-actor/src/main/scala/akka/config/Configuration.scala
+++ b/akka-actor/src/main/scala/akka/config/Configuration.scala
@@ -148,11 +148,10 @@ class Configuration(val map: Map[String, Any]) {
getDouble(key).getOrElse(outputIfDesiredAndReturnInput(key, defaultValue))
def getBoolean(key: String): Option[Boolean] = {
- getString(key) flatMap {
- s ⇒
- val isTrue = trueValues.contains(s)
- if (!isTrue && !falseValues.contains(s)) None
- else Some(isTrue)
+ getString(key) flatMap { s ⇒
+ val isTrue = trueValues.contains(s)
+ if (!isTrue && !falseValues.contains(s)) None
+ else Some(isTrue)
}
}
@@ -165,7 +164,7 @@ class Configuration(val map: Map[String, Any]) {
getBoolean(key, defaultValue)
def apply(key: String): String = getString(key) match {
- case None ⇒ throw new ConfigurationException("undefined config: " + key)
+ case None ⇒ throw new ConfigurationException("undefined config: " + key)
case Some(v) ⇒ v
}
@@ -179,7 +178,7 @@ class Configuration(val map: Map[String, Any]) {
def getSection(name: String): Option[Configuration] = {
val l = name.length + 1
- val pattern = name+"."
+ val pattern = name + "."
val m = map.collect {
case (k, v) if k.startsWith(pattern) ⇒ (k.substring(l), v)
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
index 5097f69aa0..fbd32c580b 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
@@ -194,7 +194,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
final def run = {
try { processMailbox() } catch {
- case ie: InterruptedException => Thread.currentThread().interrupt() //Restore interrupt
+ case ie: InterruptedException ⇒ Thread.currentThread().interrupt() //Restore interrupt
} finally {
dispatcherLock.unlock()
if (!self.isEmpty)
diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala
index 64c0c5afb2..8384fdf949 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala
@@ -200,7 +200,7 @@ object Dispatchers {
case Right(clazz) ⇒
ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]()) match {
case Right(configurator) ⇒ configurator
- case Left(exception)⇒
+ case Left(exception) ⇒
throw new IllegalArgumentException(
"Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, exception)
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index 8cc6d2fb89..547034fd0b 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -116,8 +116,7 @@ object Futures {
case e: Exception ⇒
EventHandler.error(e, this, e.getMessage)
result completeWithException e
- }
- finally {
+ } finally {
results.clear
}
}
@@ -296,7 +295,7 @@ object Future {
*/
def flow[A](body: ⇒ A @cps[Future[Any]])(implicit timeout: Timeout): Future[A] = {
val future = Promise[A](timeout)
- (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onException { case e => future completeWithException e }
+ (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onException { case e ⇒ future completeWithException e }
future
}
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
index 1477e986d8..aafd0988ee 100644
--- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
@@ -34,8 +34,7 @@ final case class FutureInvocation[T](future: Promise[T], function: () ⇒ T, cle
case e ⇒
EventHandler.error(e, this, e.getMessage)
Left(e)
- }
- finally {
+ } finally {
cleanup()
})
}
diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala
index 4b1fc2e1aa..c6ea69dcc0 100644
--- a/akka-actor/src/main/scala/akka/event/EventHandler.scala
+++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala
@@ -8,7 +8,7 @@ import akka.actor._
import akka.dispatch.Dispatchers
import akka.config.Config._
import akka.config.ConfigurationException
-import akka.util.{ListenerManagement, ReflectiveAccess}
+import akka.util.{ ListenerManagement, ReflectiveAccess }
import akka.serialization._
import akka.AkkaException
@@ -95,10 +95,10 @@ object EventHandler extends ListenerManagement {
@volatile
var level: Int = config.getString("akka.event-handler-level", "INFO") match {
- case "ERROR" ⇒ ErrorLevel
+ case "ERROR" ⇒ ErrorLevel
case "WARNING" ⇒ WarningLevel
- case "INFO" ⇒ InfoLevel
- case "DEBUG" ⇒ DebugLevel
+ case "INFO" ⇒ InfoLevel
+ case "DEBUG" ⇒ DebugLevel
case unknown ⇒ throw new ConfigurationException(
"Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]")
}
@@ -106,22 +106,21 @@ object EventHandler extends ListenerManagement {
def start() {
try {
val defaultListeners = config.getList("akka.event-handlers") match {
- case Nil ⇒ "akka.event.EventHandler$DefaultListener" :: Nil
+ case Nil ⇒ "akka.event.EventHandler$DefaultListener" :: Nil
case listeners ⇒ listeners
}
- defaultListeners foreach {
- listenerName ⇒
- try {
- ReflectiveAccess.getClassFor[Actor](listenerName) match {
- case Right(actorClass) ⇒ addListener(Actor.localActorOf(actorClass).start())
- case Left(exception) ⇒ throw exception
- }
- } catch {
- case e: Exception ⇒
- throw new ConfigurationException(
- "Event Handler specified in config can't be loaded [" + listenerName +
- "] due to [" + e.toString + "]", e)
+ defaultListeners foreach { listenerName ⇒
+ try {
+ ReflectiveAccess.getClassFor[Actor](listenerName) match {
+ case Right(actorClass) ⇒ addListener(Actor.localActorOf(actorClass).start())
+ case Left(exception) ⇒ throw exception
}
+ } catch {
+ case e: Exception ⇒
+ throw new ConfigurationException(
+ "Event Handler specified in config can't be loaded [" + listenerName +
+ "] due to [" + e.toString + "]", e)
+ }
}
info(this, "Starting up EventHandler")
} catch {
@@ -146,7 +145,7 @@ object EventHandler extends ListenerManagement {
notifyListeners(event)
}
- def notify[T <: Event : ClassManifest](event: ⇒ T) {
+ def notify[T <: Event: ClassManifest](event: ⇒ T) {
if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) notifyListeners(event)
}
@@ -182,7 +181,6 @@ object EventHandler extends ListenerManagement {
if (level >= InfoLevel) notifyListeners(Info(instance, message))
}
-
def debug(instance: AnyRef, message: ⇒ String) {
if (level >= DebugLevel) notifyListeners(Debug(instance, message))
}
@@ -196,7 +194,7 @@ object EventHandler extends ListenerManagement {
def isDebugEnabled = level >= DebugLevel
def stackTraceFor(e: Throwable) = {
- import java.io.{StringWriter, PrintWriter}
+ import java.io.{ StringWriter, PrintWriter }
val sw = new StringWriter
val pw = new PrintWriter(sw)
e.printStackTrace(pw)
@@ -223,7 +221,7 @@ object EventHandler extends ListenerManagement {
def timestamp = dateFormat.format(new Date)
def receive = {
- case event@Error(cause, instance, message) ⇒
+ case event @ Error(cause, instance, message) ⇒
println(error.format(
timestamp,
event.thread.getName,
@@ -231,21 +229,21 @@ object EventHandler extends ListenerManagement {
message,
stackTraceFor(cause)))
- case event@Warning(instance, message) ⇒
+ case event @ Warning(instance, message) ⇒
println(warning.format(
timestamp,
event.thread.getName,
instance.getClass.getSimpleName,
message))
- case event@Info(instance, message) ⇒
+ case event @ Info(instance, message) ⇒
println(info.format(
timestamp,
event.thread.getName,
instance.getClass.getSimpleName,
message))
- case event@Debug(instance, message) ⇒
+ case event @ Debug(instance, message) ⇒
println(debug.format(
timestamp,
event.thread.getName,
diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala
index 6bfd4c18b3..2e930e912e 100644
--- a/akka-actor/src/main/scala/akka/routing/Pool.scala
+++ b/akka-actor/src/main/scala/akka/routing/Pool.scala
@@ -38,7 +38,7 @@ object ActorPool {
trait ActorPool {
/**
* Adds a new actor to the pool. The DefaultActorPool implementation will start and link (supervise) this actor.
- * This method is invoked whenever the pool determines it must boost capacity.
+ * This method is invoked whenever the pool determines it must boost capacity.
* @return A new actor for the pool
*/
def instance(): ActorRef
@@ -48,7 +48,7 @@ trait ActorPool {
* @param _delegates The current sequence of pooled actors
* @return the number of delegates by which the pool should be adjusted (positive, negative or zero)
*/
- def capacity(delegates: Seq[ActorRef]): Int
+ def capacity(delegates: Seq[ActorRef]): Int
/**
* Provides the results of the selector, one or more actors, to which an incoming message is forwarded.
* This method returns an iterator since a selector might return more than one actor to handle the message.
@@ -71,7 +71,7 @@ trait ActorPoolSupervisionConfig {
/**
* Provides a default implementation of the supervision configuration by
- * defining a One-for-One fault handling strategy, trapping exceptions,
+ * defining a One-for-One fault handling strategy, trapping exceptions,
* limited to 5 retries within 1 second.
*
* This is just a basic strategy and implementors are encouraged to define
@@ -89,7 +89,7 @@ trait DefaultActorPoolSupervisionConfig extends ActorPoolSupervisionConfig {
* are added or existing ones are removed. Removed actors are sent the PoisonPill message.
* New actors are automatically started and linked. The pool supervises the actors and will
* use the fault handling strategy specified by the mixed-in ActorPoolSupervisionConfig.
- * Pooled actors may be any lifecycle. If you're testing pool sizes during runtime, take a
+ * Pooled actors may be any lifecycle. If you're testing pool sizes during runtime, take a
* look at the unit tests... Any delegate with a Permanent lifecycle will be
* restarted and the pool size will be level with what it was prior to the fault. In just
* about every other case, e.g. the delegates are Temporary or the delegate cannot be
@@ -99,9 +99,9 @@ trait DefaultActorPoolSupervisionConfig extends ActorPoolSupervisionConfig {
*
* Second, invokes the pool's selector that returns a list of delegates that are to receive
* the incoming message. Selectors may return more than one actor. If partialFill
- * is true then it might also the case that fewer than number of desired actors will be
+ * is true then it might also the case that fewer than number of desired actors will be
* returned.
- *
+ *
* Lastly, routes by forwarding, the incoming message to each delegate in the selected set.
*/
trait DefaultActorPool extends ActorPool { this: Actor with ActorPoolSupervisionConfig ⇒
@@ -126,12 +126,12 @@ trait DefaultActorPool extends ActorPool { this: Actor with ActorPoolSupervision
self tryReply Stats(_delegates length)
case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _) ⇒
_delegates = _delegates filterNot { _.uuid == victim.uuid }
- case Death(victim, _) =>
+ case Death(victim, _) ⇒
_delegates = _delegates filterNot { _.uuid == victim.uuid }
case msg ⇒
resizeIfAppropriate()
- select(_delegates) foreach { _ forward msg }
+ select(_delegates) foreach { _ forward msg }
}
private def resizeIfAppropriate() {
@@ -160,7 +160,7 @@ trait DefaultActorPool extends ActorPool { this: Actor with ActorPoolSupervision
/**
* Selectors
- *
+ *
* These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool.
* Note that it's acceptable to return more than one actor to handle a given message.
*/
@@ -211,7 +211,7 @@ trait RoundRobinSelector {
/**
* Capacitors
- *
+ *
* These traits define how to alter the size of the pool according to some desired behavior.
* Capacitors are required (minimally) by the pool to establish bounds on the number of delegates
* that may exist in the pool.
@@ -269,7 +269,7 @@ trait ActiveFuturesPressureCapacitor {
}
/**
- *
+ *
*/
trait CapacityStrategy {
import ActorPool._
@@ -283,7 +283,7 @@ trait CapacityStrategy {
def pressure(delegates: Seq[ActorRef]): Int
/**
* This method can be used to smooth the response of the capacitor by considering
- * the current pressure and current capacity.
+ * the current pressure and current capacity.
*/
def filter(pressure: Int, capacity: Int): Int
@@ -299,7 +299,7 @@ trait FixedCapacityStrategy extends FixedSizeCapacitor
* Use this trait to setup a pool that may have a variable number of
* delegates but always within an established upper and lower limit.
*
- * If mix this into your pool implementation, you must also provide a
+ * If mix this into your pool implementation, you must also provide a
* PressureCapacitor and a Filter.
*/
trait BoundedCapacityStrategy extends CapacityStrategy with BoundedCapacitor
diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala
index e065bbc24c..7d3375aff9 100644
--- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala
+++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala
@@ -11,7 +11,6 @@ import akka.actor.{ ActorRef, Actor }
import akka.AkkaException
import akka.util.ReflectiveAccess
-
case class NoSerializerFoundException(m: String) extends AkkaException(m)
/**
@@ -22,18 +21,18 @@ object Serialization {
//TODO document me
def serialize(o: AnyRef): Either[Exception, Array[Byte]] =
- try { Right(findSerializerFor(o).toBinary(o)) } catch { case e: Exception => Left(e) }
+ try { Right(findSerializerFor(o).toBinary(o)) } catch { case e: Exception ⇒ Left(e) }
//TODO document me
def deserialize(
bytes: Array[Byte],
clazz: Class[_],
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
- try { Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) } catch { case e: Exception => Left(e) }
+ try { Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) } catch { case e: Exception ⇒ Left(e) }
def findSerializerFor(o: AnyRef): Serializer = o match {
- case null => NullSerializer
- case other => serializerFor(other.getClass)
+ case null ⇒ NullSerializer
+ case other ⇒ serializerFor(other.getClass)
}
//TODO document me
@@ -50,12 +49,12 @@ object Serialization {
if (bindings.isEmpty)
Left(NoSerializerFoundException("No mapping serializer found for " + cl))
else {
- bindings find {
- case (clazzName, _) ⇒
- getClassFor(clazzName) match {
- case Right(clazz) ⇒ clazz.isAssignableFrom(cl)
- case _ ⇒ false
- }
+ bindings find {
+ case (clazzName, _) ⇒
+ getClassFor(clazzName) match {
+ case Right(clazz) ⇒ clazz.isAssignableFrom(cl)
+ case _ ⇒ false
+ }
} map {
case (_, ser) ⇒ serializerOf(ser)
} getOrElse Left(NoSerializerFoundException("No mapping serializer found for " + cl))
@@ -69,20 +68,20 @@ object Serialization {
*/
val serializers: Map[String, Serializer] =
config.getSection("akka.actor.serializers")
- .map(_.map)
- .getOrElse(Map())
- .foldLeft(Map[String, Serializer]("default" -> akka.serialization.JavaSerializer)) {
- case (result, (k: String, v: String)) => result + (k -> serializerOf(v).fold(throw _, identity))
- case (result, _) => result
- }
+ .map(_.map)
+ .getOrElse(Map())
+ .foldLeft(Map[String, Serializer]("default" -> akka.serialization.JavaSerializer)) {
+ case (result, (k: String, v: String)) ⇒ result + (k -> serializerOf(v).fold(throw _, identity))
+ case (result, _) ⇒ result
+ }
/**
* bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used
*/
val bindings: Map[String, String] = config.getSection("akka.actor.serialization-bindings") map {
- _.map.foldLeft(Map[String,String]()) {
- case (result, (k: String, vs: List[_])) => result ++ (vs collect { case v: String => (v, k) }) //All keys which are lists, take the Strings from them and Map them
- case (result, _) => result //For any other values, just skip them, TODO: print out warnings?
+ _.map.foldLeft(Map[String, String]()) {
+ case (result, (k: String, vs: List[_])) ⇒ result ++ (vs collect { case v: String ⇒ (v, k) }) //All keys which are lists, take the Strings from them and Map them
+ case (result, _) ⇒ result //For any other values, just skip them, TODO: print out warnings?
}
} getOrElse Map()
@@ -95,5 +94,5 @@ object Serialization {
* Maps from a Serializer.Identifier (Byte) to a Serializer instance (optimization)
*/
val serializerByIdentity: Map[Serializer.Identifier, Serializer] =
- Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) => (v.identifier,v) }
+ Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) ⇒ (v.identifier, v) }
}
diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala
index aa57c4b47d..893e974859 100644
--- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala
+++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala
@@ -38,7 +38,7 @@ object NullSerializer extends NullSerializer
class JavaSerializer extends Serializer {
- def identifier = 1:Byte
+ def identifier = 1: Byte
def toBinary(o: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
@@ -63,7 +63,7 @@ class NullSerializer extends Serializer {
val nullAsBytes = Array[Byte]()
- def identifier = 0:Byte
+ def identifier = 0: Byte
def toBinary(o: AnyRef) = nullAsBytes
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef = null
}
diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index 3f0f33f01c..9f1311a349 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -50,14 +50,7 @@ object ReflectiveAccess {
None
}
- lazy val clusterDeployerInstance: Option[ClusterDeployer] = getObjectFor("akka.cluster.ClusterDeployer$") match {
- case Right(value) ⇒ Some(value)
- case Left(exception) ⇒
- EventHandler.debug(this, exception.toString)
- None
- }
-
- lazy val serializerClass: Option[Class[_]] = getClassFor("akka.serialization.Serializer") match {
+ lazy val clusterDeployerInstance: Option[ActorDeployer] = getObjectFor("akka.cluster.ClusterDeployer$") match {
case Right(value) ⇒ Some(value)
case Left(exception) ⇒
EventHandler.debug(this, exception.toString)
@@ -76,7 +69,7 @@ object ReflectiveAccess {
clusterInstance.get.node
}
- lazy val clusterDeployer: ClusterDeployer = {
+ lazy val clusterDeployer: ActorDeployer = {
ensureEnabled()
clusterDeployerInstance.get
}
@@ -86,15 +79,6 @@ object ReflectiveAccess {
transactionLogInstance.get
}
- type ClusterDeployer = {
- def init(deployments: List[Deploy])
- def shutdown()
- def deploy(deployment: Deploy)
- def undeploy(deployment: Deploy)
- def undeployAll()
- def lookupDeploymentFor(address: String): Option[Deploy]
- }
-
type Cluster = {
def node: ClusterNode
}
@@ -104,12 +88,6 @@ object ReflectiveAccess {
def dequeue: MessageInvocation
}
- // FIXME: remove?
- type Serializer = {
- def toBinary(obj: AnyRef): Array[Byte]
- def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
- }
-
type TransactionLogObject = {
def newLogFor(
id: String,
diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala
index fc730b0e2d..c63ca8a289 100644
--- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala
+++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala
@@ -16,7 +16,7 @@ import akka.actor._
import akka.camel.{ Ack, Failure, Message }
import akka.camel.CamelMessageConversion.toExchangeAdapter
import scala.reflect.BeanProperty
-import akka.dispatch.{FutureTimeoutException, Promise, MessageInvocation, MessageDispatcher}
+import akka.dispatch.{ FutureTimeoutException, Promise, MessageInvocation, MessageDispatcher }
/**
* @author Martin Krasser
@@ -170,7 +170,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with Asyn
}
}
- private def sendSync(exchange: Exchange) = {
+ private def sendSync(exchange: Exchange) = {
val actor = target(exchange)
val result: Any = try { (actor ? requestFor(exchange)).as[Any] } catch { case e ⇒ Some(Failure(e)) }
@@ -181,7 +181,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with Asyn
case None ⇒ throw new TimeoutException("timeout (%d ms) while waiting response from %s"
format (actor.timeout, ep.getEndpointUri))
}
- }
+ }
private def sendAsync(exchange: Exchange, sender: Option[ActorRef] = None) =
target(exchange).!(requestFor(exchange))(sender)
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index 39c57c0778..0e7157fa5e 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -6,14 +6,14 @@ package akka.cluster
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event._
import org.apache.zookeeper.data.Stat
-import org.apache.zookeeper.recipes.lock.{WriteLock, LockListener}
+import org.apache.zookeeper.recipes.lock.{ WriteLock, LockListener }
import org.I0Itec.zkclient._
import org.I0Itec.zkclient.serialize._
import org.I0Itec.zkclient.exception._
-import java.util.{List ⇒ JList}
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
+import java.util.{ List ⇒ JList }
+import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import java.net.InetSocketAddress
import javax.management.StandardMBean
@@ -29,15 +29,15 @@ import Status._
import DeploymentConfig._
import akka.event.EventHandler
-import akka.dispatch.{Dispatchers, Future}
+import akka.dispatch.{ Dispatchers, Future }
import akka.remoteinterface._
import akka.routing.RouterType
-import akka.config.{Config, Supervision}
+import akka.config.{ Config, Supervision }
import Supervision._
import Config._
-import akka.serialization.{Serialization, Serializer, ActorSerialization}
+import akka.serialization.{ Serialization, Serializer, ActorSerialization }
import ActorSerialization._
import akka.serialization.Compression.LZF
@@ -49,7 +49,7 @@ import RemoteDaemonMessageType._
import com.eaio.uuid.UUID
import com.google.protobuf.ByteString
-import java.util.concurrent.{CopyOnWriteArrayList, Callable, ConcurrentHashMap}
+import java.util.concurrent.{ CopyOnWriteArrayList, Callable, ConcurrentHashMap }
// FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down
@@ -181,17 +181,17 @@ object Cluster {
private def nodename: String = properties.get("akka.cluster.nodename") match {
case Some(uberride) ⇒ uberride
- case None ⇒ Config.nodename
+ case None ⇒ Config.nodename
}
private def hostname: String = properties.get("akka.cluster.hostname") match {
case Some(uberride) ⇒ uberride
- case None ⇒ Config.hostname
+ case None ⇒ Config.hostname
}
private def port: Int = properties.get("akka.cluster.port") match {
case Some(uberride) ⇒ uberride.toInt
- case None ⇒ Config.remoteServerPort
+ case None ⇒ Config.remoteServerPort
}
val defaultZooKeeperSerializer = new SerializableSerializer
@@ -329,12 +329,12 @@ object Cluster {
*
* @author Jonas Bonér
*/
-class DefaultClusterNode private[akka](
- val nodeAddress: NodeAddress,
- val hostname: String = Config.hostname,
- val port: Int = Config.remoteServerPort,
- val zkServerAddresses: String,
- val serializer: ZkSerializer) extends ErrorHandler with ClusterNode {
+class DefaultClusterNode private[akka] (
+ val nodeAddress: NodeAddress,
+ val hostname: String = Config.hostname,
+ val port: Int = Config.remoteServerPort,
+ val zkServerAddresses: String,
+ val serializer: ZkSerializer) extends ErrorHandler with ClusterNode {
self ⇒
if ((hostname eq null) || hostname == "") throw new NullPointerException("Host name must not be null or empty string")
@@ -349,7 +349,7 @@ class DefaultClusterNode private[akka](
def receive = {
case RemoteClientError(cause, client, address) ⇒ client.shutdownClientModule()
case RemoteClientDisconnected(client, address) ⇒ client.shutdownClientModule()
- case _ ⇒ //ignore other
+ case _ ⇒ //ignore other
}
}, "akka.cluster.RemoteClientLifeCycleListener").start()
@@ -475,7 +475,6 @@ class DefaultClusterNode private[akka](
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
}
-
def shutdown() {
def shutdownNode() {
ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath))
@@ -696,12 +695,12 @@ class DefaultClusterNode private[akka](
* available durable store.
*/
def store(
- actorAddress: String,
- actorFactory: () ⇒ ActorRef,
- replicationFactor: Int,
- replicationScheme: ReplicationScheme,
- serializeMailbox: Boolean,
- serializer: Serializer): ClusterNode = if (isConnected.isOn) {
+ actorAddress: String,
+ actorFactory: () ⇒ ActorRef,
+ replicationFactor: Int,
+ replicationScheme: ReplicationScheme,
+ serializeMailbox: Boolean,
+ serializer: Serializer): ClusterNode = if (isConnected.isOn) {
EventHandler.debug(this,
"Storing actor with address [%s] in cluster".format(actorAddress))
@@ -730,7 +729,7 @@ class DefaultClusterNode private[akka](
}
}
}) match {
- case Left(path) ⇒ path
+ case Left(path) ⇒ path
case Right(exception) ⇒ actorAddressRegistryPath
}
}
@@ -817,7 +816,7 @@ class DefaultClusterNode private[akka](
val actorFactory =
Serialization.deserialize(actorFactoryBytes, classOf[() ⇒ LocalActorRef], None) match {
- case Left(error) ⇒ throw error
+ case Left(error) ⇒ throw error
case Right(instance) ⇒ instance.asInstanceOf[() ⇒ LocalActorRef]
}
@@ -897,12 +896,11 @@ class DefaultClusterNode private[akka](
val command = builder.build
- nodes foreach {
- node ⇒
- nodeConnections.get(node) foreach {
- case (_, connection) ⇒
- sendCommandToNode(connection, command, async = false)
- }
+ nodes foreach { node ⇒
+ nodeConnections.get(node) foreach {
+ case (_, connection) ⇒
+ sendCommandToNode(connection, command, async = false)
+ }
}
}
}
@@ -933,18 +931,18 @@ class DefaultClusterNode private[akka](
*/
def release(actorAddress: String) {
- // FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no longer available. Then what to do? Should we even remove this method?
+ // FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no
+ // longer available. Then what to do? Should we even remove this method?
if (isConnected.isOn) {
ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName)))
- uuidsForActorAddress(actorAddress) foreach {
- uuid ⇒
- EventHandler.debug(this,
- "Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid))
+ uuidsForActorAddress(actorAddress) foreach { uuid ⇒
+ EventHandler.debug(this,
+ "Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid))
- ignore[ZkNoNodeException](zkClient.deleteRecursive(nodeToUuidsPathFor(nodeAddress.nodeName, uuid)))
- ignore[ZkNoNodeException](zkClient.delete(actorUuidRegistryRemoteAddressPathFor(uuid)))
+ ignore[ZkNoNodeException](zkClient.deleteRecursive(nodeToUuidsPathFor(nodeAddress.nodeName, uuid)))
+ ignore[ZkNoNodeException](zkClient.delete(actorUuidRegistryRemoteAddressPathFor(uuid)))
}
}
}
@@ -962,11 +960,10 @@ class DefaultClusterNode private[akka](
.setActorAddress(actorAddress)
.build
- nodesForActorsInUseWithAddress(actorAddress) foreach {
- node ⇒
- nodeConnections.get(node) foreach {
- case (_, connection) ⇒ sendCommandToNode(connection, command, async = true)
- }
+ nodesForActorsInUseWithAddress(actorAddress) foreach { node ⇒
+ nodeConnections.get(node) foreach {
+ case (_, connection) ⇒ sendCommandToNode(connection, command, async = true)
+ }
}
}
}
@@ -1204,7 +1201,7 @@ class DefaultClusterNode private[akka](
}
}
}) match {
- case Left(_) ⇒ /* do nothing */
+ case Left(_) ⇒ /* do nothing */
case Right(exception) ⇒ throw exception
}
}
@@ -1292,7 +1289,6 @@ class DefaultClusterNode private[akka](
private[cluster] def actorAddressToUuidsPathFor(actorAddress: String, uuid: UUID): String = "%s/%s".format(actorAddressToUuidsPathFor(actorAddress), uuid)
-
/**
* Returns a random set with node names of size 'replicationFactor'.
* Default replicationFactor is 0, which returns the empty Set.
@@ -1310,7 +1306,7 @@ class DefaultClusterNode private[akka](
if (actorAddress.isDefined) {
// use 'preferred-nodes' in deployment config for the actor
Deployer.deploymentFor(actorAddress.get) match {
- case Deploy(_, _, Clustered(nodes, _, _)) ⇒
+ case Deploy(_, _, _, Clustered(nodes, _, _)) ⇒
nodes map (node ⇒ DeploymentConfig.nodeNameFor(node)) take replicationFactor
case _ ⇒
throw new ClusterException("Actor [" + actorAddress.get + "] is not configured as clustered")
@@ -1363,16 +1359,15 @@ class DefaultClusterNode private[akka](
* @returns a Map with the remote socket addresses to of disconnected node connections
*/
private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster(
- newlyConnectedMembershipNodes: Traversable[String],
- newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = {
+ newlyConnectedMembershipNodes: Traversable[String],
+ newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = {
// cache the disconnected connections in a map, needed for fail-over of these connections later
var disconnectedConnections = Map.empty[String, InetSocketAddress]
- newlyDisconnectedMembershipNodes foreach {
- node ⇒
- disconnectedConnections += (node -> (nodeConnections(node) match {
- case (address, _) ⇒ address
- }))
+ newlyDisconnectedMembershipNodes foreach { node ⇒
+ disconnectedConnections += (node -> (nodeConnections(node) match {
+ case (address, _) ⇒ address
+ }))
}
if (connectToAllNewlyArrivedMembershipNodesInClusterLock.compareAndSet(false, true)) {
@@ -1381,20 +1376,18 @@ class DefaultClusterNode private[akka](
newlyDisconnectedMembershipNodes foreach (nodeConnections.remove(_))
// add connections newly arrived nodes
- newlyConnectedMembershipNodes foreach {
- node ⇒
- if (!nodeConnections.contains(node)) {
- // only connect to each replica once
+ newlyConnectedMembershipNodes foreach { node ⇒
+ if (!nodeConnections.contains(node)) {
+ // only connect to each replica once
- remoteSocketAddressForNode(node) foreach {
- address ⇒
- EventHandler.debug(this,
- "Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
+ remoteSocketAddressForNode(node) foreach { address ⇒
+ EventHandler.debug(this,
+ "Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
- val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.Address, address.getHostName, address.getPort).start()
- nodeConnections.put(node, (address, clusterDaemon))
- }
+ val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.Address, address.getHostName, address.getPort).start()
+ nodeConnections.put(node, (address, clusterDaemon))
}
+ }
}
} finally {
connectToAllNewlyArrivedMembershipNodesInClusterLock.set(false)
@@ -1441,87 +1434,85 @@ class DefaultClusterNode private[akka](
}
private[cluster] def migrateActorsOnFailedNodes(
- failedNodes: List[String],
- currentClusterNodes: List[String],
- oldClusterNodes: List[String],
- disconnectedConnections: Map[String, InetSocketAddress]) {
+ failedNodes: List[String],
+ currentClusterNodes: List[String],
+ oldClusterNodes: List[String],
+ disconnectedConnections: Map[String, InetSocketAddress]) {
- failedNodes.foreach {
- failedNodeName ⇒
+ failedNodes.foreach { failedNodeName ⇒
- val failedNodeAddress = NodeAddress(nodeAddress.clusterName, failedNodeName)
+ val failedNodeAddress = NodeAddress(nodeAddress.clusterName, failedNodeName)
- val myIndex = oldClusterNodes.indexWhere(_.endsWith(nodeAddress.nodeName))
- val failedNodeIndex = oldClusterNodes.indexWhere(_ == failedNodeName)
+ val myIndex = oldClusterNodes.indexWhere(_.endsWith(nodeAddress.nodeName))
+ val failedNodeIndex = oldClusterNodes.indexWhere(_ == failedNodeName)
- // Migrate to the successor of the failed node (using a sorted circular list of the node names)
- if ((failedNodeIndex == 0 && myIndex == oldClusterNodes.size - 1) || // No leftmost successor exists, check the tail
- (failedNodeIndex == myIndex + 1)) {
- // Am I the leftmost successor?
+ // Migrate to the successor of the failed node (using a sorted circular list of the node names)
+ if ((failedNodeIndex == 0 && myIndex == oldClusterNodes.size - 1) || // No leftmost successor exists, check the tail
+ (failedNodeIndex == myIndex + 1)) {
+ // Am I the leftmost successor?
- // Takes the lead of migrating the actors. Not all to this node.
- // All to this node except if the actor already resides here, then pick another node it is not already on.
+ // Takes the lead of migrating the actors. Not all to this node.
+ // All to this node except if the actor already resides here, then pick another node it is not already on.
- // Yes I am the node to migrate the actor to (can only be one in the cluster)
- val actorUuidsForFailedNode = zkClient.getChildren(nodeToUuidsPathFor(failedNodeName)).toList
+ // Yes I am the node to migrate the actor to (can only be one in the cluster)
+ val actorUuidsForFailedNode = zkClient.getChildren(nodeToUuidsPathFor(failedNodeName)).toList
- actorUuidsForFailedNode.foreach {
- uuidAsString ⇒
- EventHandler.debug(this,
- "Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]"
- .format(failedNodeName, uuidAsString, nodeAddress.nodeName))
+ actorUuidsForFailedNode.foreach { uuidAsString ⇒
+ EventHandler.debug(this,
+ "Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]"
+ .format(failedNodeName, uuidAsString, nodeAddress.nodeName))
- val uuid = uuidFrom(uuidAsString)
- val actorAddress = actorAddressForUuid(uuid).getOrElse(
- throw new IllegalStateException("No actor address found for UUID [" + uuidAsString + "]"))
+ val uuid = uuidFrom(uuidAsString)
+ val actorAddress = actorAddressForUuid(uuid).getOrElse(
+ throw new IllegalStateException("No actor address found for UUID [" + uuidAsString + "]"))
- val migrateToNodeAddress =
- if (isInUseOnNode(actorAddress)) {
- // already in use on this node, pick another node to instantiate the actor on
- val replicaNodesForActor = nodesForActorsInUseWithAddress(actorAddress)
- val nodesAvailableForMigration = (currentClusterNodes.toSet diff failedNodes.toSet) diff replicaNodesForActor.toSet
+ val migrateToNodeAddress =
+ if (isInUseOnNode(actorAddress)) {
+ // already in use on this node, pick another node to instantiate the actor on
+ val replicaNodesForActor = nodesForActorsInUseWithAddress(actorAddress)
+ val nodesAvailableForMigration = (currentClusterNodes.toSet diff failedNodes.toSet) diff replicaNodesForActor.toSet
- if (nodesAvailableForMigration.isEmpty) throw new ClusterException(
- "Can not migrate actor to new node since there are not any available nodes left. " +
- "(However, the actor already has >1 replica in cluster, so we are ok)")
+ if (nodesAvailableForMigration.isEmpty) throw new ClusterException(
+ "Can not migrate actor to new node since there are not any available nodes left. " +
+ "(However, the actor already has >1 replica in cluster, so we are ok)")
- NodeAddress(nodeAddress.clusterName, nodesAvailableForMigration.head)
- } else {
- // actor is not in use on this node, migrate it here
- nodeAddress
- }
+ NodeAddress(nodeAddress.clusterName, nodesAvailableForMigration.head)
+ } else {
+ // actor is not in use on this node, migrate it here
+ nodeAddress
+ }
- // if actor is replicated => pass along the UUID for the actor to replicate from (replay transaction log etc.)
- val replicateFromUuid =
- if (isReplicated(actorAddress)) Some(uuid)
- else None
+ // if actor is replicated => pass along the UUID for the actor to replicate from (replay transaction log etc.)
+ val replicateFromUuid =
+ if (isReplicated(actorAddress)) Some(uuid)
+ else None
- migrateWithoutCheckingThatActorResidesOnItsHomeNode(
- failedNodeAddress,
- migrateToNodeAddress,
- actorAddress,
- replicateFromUuid)
- }
-
- // notify all available nodes that they should fail-over all connections from 'from' to 'to'
- val from = disconnectedConnections(failedNodeName)
- val to = remoteServerAddress
-
- Serialization.serialize((from, to)) match {
- case Left(error) ⇒ throw error
- case Right(bytes) ⇒
-
- val command = RemoteDaemonMessageProtocol.newBuilder
- .setMessageType(FAIL_OVER_CONNECTIONS)
- .setPayload(ByteString.copyFrom(bytes))
- .build
-
- // FIXME now we are broadcasting to ALL nodes in the cluster even though a fraction might have a reference to the actors - should that be fixed?
- nodeConnections.values foreach {
- case (_, connection) ⇒ sendCommandToNode(connection, command, async = true)
- }
- }
+ migrateWithoutCheckingThatActorResidesOnItsHomeNode(
+ failedNodeAddress,
+ migrateToNodeAddress,
+ actorAddress,
+ replicateFromUuid)
}
+
+ // notify all available nodes that they should fail-over all connections from 'from' to 'to'
+ val from = disconnectedConnections(failedNodeName)
+ val to = remoteServerAddress
+
+ Serialization.serialize((from, to)) match {
+ case Left(error) ⇒ throw error
+ case Right(bytes) ⇒
+
+ val command = RemoteDaemonMessageProtocol.newBuilder
+ .setMessageType(FAIL_OVER_CONNECTIONS)
+ .setPayload(ByteString.copyFrom(bytes))
+ .build
+
+ // FIXME now we are broadcasting to ALL nodes in the cluster even though a fraction might have a reference to the actors - should that be fixed?
+ nodeConnections.values foreach {
+ case (_, connection) ⇒ sendCommandToNode(connection, command, async = true)
+ }
+ }
+ }
}
}
@@ -1529,7 +1520,7 @@ class DefaultClusterNode private[akka](
* Used when the ephemeral "home" node is already gone, so we can't check if it is available.
*/
private def migrateWithoutCheckingThatActorResidesOnItsHomeNode(
- from: NodeAddress, to: NodeAddress, actorAddress: String, replicateFromUuid: Option[UUID]) {
+ from: NodeAddress, to: NodeAddress, actorAddress: String, replicateFromUuid: Option[UUID]) {
EventHandler.debug(this, "Migrating actor [%s] from node [%s] to node [%s]".format(actorAddress, from, to))
if (!isInUseOnNode(actorAddress, to)) {
@@ -1555,17 +1546,16 @@ class DefaultClusterNode private[akka](
EventHandler.info(this, "Created node [%s]".format(CLUSTER_PATH))
}
- basePaths.foreach {
- path ⇒
- try {
- ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
- EventHandler.debug(this, "Created node [%s]".format(path))
- } catch {
- case e ⇒
- val error = new ClusterException(e.toString)
- EventHandler.error(error, this)
- throw error
- }
+ basePaths.foreach { path ⇒
+ try {
+ ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
+ EventHandler.debug(this, "Created node [%s]".format(path))
+ } catch {
+ case e ⇒
+ val error = new ClusterException(e.toString)
+ EventHandler.error(error, this)
+ throw error
+ }
}
}
@@ -1793,85 +1783,81 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
try {
if (message.hasActorAddress) {
val actorAddress = message.getActorAddress
- cluster.serializerForActor(actorAddress) foreach {
- serializer ⇒
- cluster.use(actorAddress, serializer) foreach {
- newActorRef ⇒
- cluster.remoteService.register(actorAddress, newActorRef)
+ cluster.serializerForActor(actorAddress) foreach { serializer ⇒
+ cluster.use(actorAddress, serializer) foreach { newActorRef ⇒
+ cluster.remoteService.register(actorAddress, newActorRef)
- if (message.hasReplicateActorFromUuid) {
- // replication is used - fetch the messages and replay them
- import akka.remote.protocol.RemoteProtocol._
- import akka.remote.MessageSerializer
+ if (message.hasReplicateActorFromUuid) {
+ // replication is used - fetch the messages and replay them
+ import akka.remote.protocol.RemoteProtocol._
+ import akka.remote.MessageSerializer
- val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid)
- val deployment = Deployer.deploymentFor(actorAddress)
- val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse(
- throw new IllegalStateException(
- "Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme"))
- val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme)
+ val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid)
+ val deployment = Deployer.deploymentFor(actorAddress)
+ val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse(
+ throw new IllegalStateException(
+ "Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme"))
+ val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme)
- try {
- // get the transaction log for the actor UUID
- val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme)
+ try {
+ // get the transaction log for the actor UUID
+ val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme)
- // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte])
- val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries
+ // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte])
+ val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries
- // deserialize and restore actor snapshot
- val actorRefToUseForReplay =
- snapshotAsBytes match {
+ // deserialize and restore actor snapshot
+ val actorRefToUseForReplay =
+ snapshotAsBytes match {
- // we have a new actor ref - the snapshot
- case Some(bytes) ⇒
- // stop the new actor ref and use the snapshot instead
- cluster.remoteService.unregister(actorAddress)
+ // we have a new actor ref - the snapshot
+ case Some(bytes) ⇒
+ // stop the new actor ref and use the snapshot instead
+ cluster.remoteService.unregister(actorAddress)
- // deserialize the snapshot actor ref and register it as remote actor
- val uncompressedBytes =
- if (Cluster.shouldCompressData) LZF.uncompress(bytes)
- else bytes
+ // deserialize the snapshot actor ref and register it as remote actor
+ val uncompressedBytes =
+ if (Cluster.shouldCompressData) LZF.uncompress(bytes)
+ else bytes
- val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start()
- cluster.remoteService.register(actorAddress, snapshotActorRef)
+ val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start()
+ cluster.remoteService.register(actorAddress, snapshotActorRef)
- // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should)
- //newActorRef.stop()
+ // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should)
+ //newActorRef.stop()
- snapshotActorRef
+ snapshotActorRef
- // we have no snapshot - use the new actor ref
- case None ⇒
- newActorRef
- }
-
- // deserialize the messages
- val messages: Vector[AnyRef] = entriesAsBytes map {
- bytes ⇒
- val messageBytes =
- if (Cluster.shouldCompressData) LZF.uncompress(bytes)
- else bytes
- MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None)
- }
-
- EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress))
-
- // replay all messages
- messages foreach {
- message ⇒
- EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress))
-
- // FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other?
- actorRefToUseForReplay ! message
- }
-
- } catch {
- case e: Throwable ⇒
- EventHandler.error(e, this, e.toString)
- throw e
+ // we have no snapshot - use the new actor ref
+ case None ⇒
+ newActorRef
}
+
+ // deserialize the messages
+ val messages: Vector[AnyRef] = entriesAsBytes map { bytes ⇒
+ val messageBytes =
+ if (Cluster.shouldCompressData) LZF.uncompress(bytes)
+ else bytes
+ MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None)
}
+
+ EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress))
+
+ // replay all messages
+ messages foreach { message ⇒
+ EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress))
+
+ // FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other?
+ actorRefToUseForReplay ! message
+ }
+
+ } catch {
+ case e: Throwable ⇒
+ EventHandler.error(e, this, e.toString)
+ throw e
+ }
}
+ }
}
} else {
EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message))
@@ -1886,9 +1872,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
case RELEASE ⇒
if (message.hasActorUuid) {
- cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach {
- address ⇒
- cluster.release(address)
+ cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address ⇒
+ cluster.release(address)
}
} else if (message.hasActorAddress) {
cluster release message.getActorAddress
@@ -1898,15 +1883,15 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
.format(message))
}
- case START ⇒ cluster.start()
+ case START ⇒ cluster.start()
- case STOP ⇒ cluster.shutdown()
+ case STOP ⇒ cluster.shutdown()
case DISCONNECT ⇒ cluster.disconnect()
- case RECONNECT ⇒ cluster.reconnect()
+ case RECONNECT ⇒ cluster.reconnect()
- case RESIGN ⇒ cluster.resign()
+ case RESIGN ⇒ cluster.resign()
case FAIL_OVER_CONNECTIONS ⇒
val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)])
@@ -1944,7 +1929,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
def receive = {
case (fun: Function[_, _], param: Any) ⇒ try {
- fun.asInstanceOf[Any => Unit].apply(param)
+ fun.asInstanceOf[Any ⇒ Unit].apply(param)
} finally {
self.stop()
}
@@ -1957,7 +1942,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
def receive = {
case (fun: Function[_, _], param: Any) ⇒ try {
- self.reply(fun.asInstanceOf[Any => Any](param))
+ self.reply(fun.asInstanceOf[Any ⇒ Any](param))
} finally {
self.stop()
}
@@ -1970,7 +1955,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = {
Serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
- case Left(error) ⇒ throw error
+ case Left(error) ⇒ throw error
case Right(instance) ⇒ instance.asInstanceOf[T]
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
index b71b6a909a..a5727e5381 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
@@ -3,15 +3,10 @@
*/
package akka.cluster
-import Cluster._
-
import akka.actor._
-import Actor._
import akka.dispatch._
import akka.util._
import ReflectiveAccess._
-import ClusterModule._
-import akka.event.EventHandler
import akka.dispatch.Future
import java.net.InetSocketAddress
@@ -19,6 +14,8 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.{ Map ⇒ JMap }
import com.eaio.uuid.UUID
+import collection.immutable.Map
+import annotation.tailrec
/**
* ActorRef representing a one or many instances of a clustered, load-balanced and sometimes replicated actor
@@ -26,11 +23,11 @@ import com.eaio.uuid.UUID
*
* @author Jonas Bonér
*/
-class ClusterActorRef private[akka] (
- inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
- val address: String,
- _timeout: Long)
- extends ActorRef with ScalaActorRef { this: Router.Router ⇒
+class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
+ val address: String,
+ _timeout: Long)
+ extends ActorRef with ScalaActorRef {
+ this: Router.Router ⇒
timeout = _timeout
private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]](
@@ -50,10 +47,9 @@ class ClusterActorRef private[akka] (
route(message)(sender)
}
- override def postMessageToMailboxAndCreateFutureResultWithTimeout(
- message: Any,
- timeout: Timeout,
- channel: UntypedChannel): Future[Any] = {
+ override def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any,
+ timeout: Timeout,
+ channel: UntypedChannel): Future[Any] = {
val sender = channel match {
case ref: ActorRef ⇒ Some(ref)
case _ ⇒ None
@@ -61,13 +57,56 @@ class ClusterActorRef private[akka] (
route[Any](message, timeout.duration.toMillis)(sender)
}
- private[akka] def failOver(fromInetSocketAddress: InetSocketAddress, toInetSocketAddress: InetSocketAddress) {
- inetSocketAddressToActorRefMap set (inetSocketAddressToActorRefMap.get map {
- case (`fromInetSocketAddress`, actorRef) ⇒
- actorRef.stop()
- (toInetSocketAddress, createRemoteActorRef(actorRef.address, toInetSocketAddress))
- case other ⇒ other
- })
+ private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress): Unit = {
+ @tailrec
+ def doFailover(from: InetSocketAddress, to: InetSocketAddress): Unit = {
+ val oldValue = inetSocketAddressToActorRefMap.get
+
+ val newValue = oldValue map {
+ case (`from`, actorRef) ⇒
+ actorRef.stop()
+ (to, createRemoteActorRef(actorRef.address, to))
+ case other ⇒ other
+ }
+
+ if (!inetSocketAddressToActorRefMap.compareAndSet(oldValue, newValue))
+ doFailover(from, to)
+ }
+
+ doFailover(from, to)
+ }
+
+ /**
+ * Removes the given address (and the corresponding actorref) from this ClusteredActorRef.
+ *
+ * Call can safely be made when the address is missing.
+ *
+ * Call is threadsafe.
+ */
+ @tailrec
+ private def remove(address: InetSocketAddress): Unit = {
+ val oldValue = inetSocketAddressToActorRefMap.get()
+
+ var newValue = oldValue - address
+
+ if (!inetSocketAddressToActorRefMap.compareAndSet(oldValue, newValue))
+ remove(address)
+ }
+
+ def signalDeadActor(ref: ActorRef): Unit = {
+ //since the number remote actor refs for a clustered actor ref is quite low, we can deal with the O(N) complexity
+ //of the following removal.
+ val it = connections.keySet.iterator
+
+ while (it.hasNext) {
+ val address = it.next()
+ val foundRef: ActorRef = connections.get(address).get
+
+ if (foundRef == ref) {
+ remove(address)
+ return
+ }
+ }
}
private def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = {
@@ -75,18 +114,21 @@ class ClusterActorRef private[akka] (
}
def start(): this.type = synchronized[this.type] {
- _status = ActorRefInternals.RUNNING
+ if (_status == ActorRefInternals.UNSTARTED) {
+ _status = ActorRefInternals.RUNNING
+ //TODO add this? Actor.registry.register(this)
+ }
this
}
def stop() {
synchronized {
if (_status == ActorRefInternals.RUNNING) {
+ //TODO add this? Actor.registry.unregister(this)
_status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
// FIXME here we need to fire off Actor.cluster.remove(address) (which needs to be properly implemented first, see ticket)
-
inetSocketAddressToActorRefMap.get.values foreach (_.stop()) // shut down all remote connections
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
index a0a5fa40f2..63229b4770 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
@@ -4,8 +4,8 @@
package akka.cluster
-import akka.actor.{ DeploymentConfig, Deployer, LocalDeployer, DeploymentException }
-import DeploymentConfig._
+import akka.actor.DeploymentConfig._
+import akka.actor._
import akka.event.EventHandler
import akka.config.Config
import akka.util.Switch
@@ -17,12 +17,10 @@ import org.apache.zookeeper.recipes.lock.{ WriteLock, LockListener }
import org.I0Itec.zkclient.exception.{ ZkNoNodeException, ZkNodeExistsException }
+import scala.collection.immutable.Seq
import scala.collection.JavaConversions.collectionAsScalaIterable
-import com.eaio.uuid.UUID
-
import java.util.concurrent.{ CountDownLatch, TimeUnit }
-import java.util.concurrent.atomic.AtomicReference
/**
* A ClusterDeployer is responsible for deploying a Deploy.
@@ -31,7 +29,7 @@ import java.util.concurrent.atomic.AtomicReference
*
* @author Jonas Bonér
*/
-object ClusterDeployer {
+object ClusterDeployer extends ActorDeployer {
val clusterName = Cluster.name
val nodeName = Config.nodename
val clusterPath = "/%s" format clusterName
@@ -127,7 +125,7 @@ object ClusterDeployer {
deployments
}
- private[akka] def init(deployments: List[Deploy]) {
+ private[akka] def init(deployments: Seq[Deploy]) {
isConnected switchOn {
EventHandler.info(this, "Initializing cluster deployer")
@@ -143,7 +141,7 @@ object ClusterDeployer {
}
}
- val allDeployments = deployments ::: systemDeployments
+ val allDeployments = deployments ++ systemDeployments
if (!isDeploymentCompletedInCluster) {
if (deploymentInProgressLock.lock()) {
@@ -167,21 +165,20 @@ object ClusterDeployer {
ensureRunning {
LocalDeployer.deploy(deployment)
deployment match {
- case Deploy(_, _, Local) ⇒ {} // local deployment, do nothing here
- case _ ⇒ // cluster deployment
- val path = deploymentAddressPath.format(deployment.address)
+ case Deploy(_, _, _, Local) | Deploy(_, _, _, _: Local) ⇒ //TODO LocalDeployer.deploy(deployment)??
+ case Deploy(address, recipe, routing, _) ⇒ // cluster deployment
+ /*TODO recipe foreach { r ⇒
+ Deployer.newClusterActorRef(() ⇒ Actor.actorOf(r.implementationClass), address, deployment).start()
+ }*/
+ val path = deploymentAddressPath.format(address)
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
zkClient.writeData(path, deployment)
} catch {
case e: NullPointerException ⇒
- handleError(new DeploymentException(
- "Could not store deployment data [" + deployment +
- "] in ZooKeeper since client session is closed"))
+ handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper since client session is closed"))
case e: Exception ⇒
- handleError(new DeploymentException(
- "Could not store deployment data [" +
- deployment + "] in ZooKeeper due to: " + e))
+ handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper due to: " + e))
}
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala
index c165e699fe..43942b3e8d 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Routing.scala
@@ -3,10 +3,7 @@
*/
package akka.cluster
-import Cluster._
-
import akka.actor._
-import Actor._
import akka.dispatch.Future
import akka.event.EventHandler
import akka.routing.{ RouterType, RoutingException }
@@ -42,22 +39,49 @@ object Router {
* @author Jonas Bonér
*/
trait Router {
+
def connections: Map[InetSocketAddress, ActorRef]
+ def signalDeadActor(ref: ActorRef): Unit
+
def route(message: Any)(implicit sender: Option[ActorRef]): Unit
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T]
}
+ /**
+ * An Abstract Router implementation that already provides the basic infrastructure so that a concrete
+ * Router only needs to implement the next method.
+ *
+ * This also is the location where a failover is done in the future if an ActorRef fails and a different
+ * one needs to be selected.
+ */
trait BasicRouter extends Router {
+
def route(message: Any)(implicit sender: Option[ActorRef]): Unit = next match {
- case Some(actor) ⇒ actor.!(message)(sender)
- case _ ⇒ throwNoConnectionsError()
+ case Some(actor) ⇒ {
+ try {
+ actor.!(message)(sender)
+ } catch {
+ case e: Exception ⇒
+ signalDeadActor(actor)
+ throw e
+ }
+ }
+ case _ ⇒ throwNoConnectionsError()
}
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = next match {
- case Some(actor) ⇒ actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
- case _ ⇒ throwNoConnectionsError()
+ case Some(actor) ⇒ {
+ try {
+ actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
+ } catch {
+ case e: Throwable ⇒
+ signalDeadActor(actor)
+ throw e
+ }
+ }
+ case _ ⇒ throwNoConnectionsError()
}
protected def next: Option[ActorRef]
@@ -73,6 +97,7 @@ object Router {
* @author Jonas Bonér
*/
trait Direct extends BasicRouter {
+
lazy val next: Option[ActorRef] = {
val connection = connections.values.headOption
if (connection.isEmpty) EventHandler.warning(this, "Router has no replica connection")
@@ -90,7 +115,9 @@ object Router {
if (connections.isEmpty) {
EventHandler.warning(this, "Router has no replica connections")
None
- } else Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next())
+ } else {
+ Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next())
+ }
}
/**
@@ -124,4 +151,5 @@ object Router {
findNext
}
}
+
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
index 2261829934..d12820c130 100644
--- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
@@ -14,7 +14,7 @@ import akka.config._
import Config._
import akka.util._
import akka.actor._
-import DeploymentConfig.{ ReplicationScheme}
+import DeploymentConfig.{ ReplicationScheme }
import akka.event.EventHandler
import akka.dispatch.{ DefaultPromise, Promise, MessageInvocation }
import akka.remote.MessageSerializer
diff --git a/akka-cluster/src/main/scala/akka/cluster/storage/Storage.scala b/akka-cluster/src/main/scala/akka/cluster/storage/Storage.scala
index 0129c39eb0..bb1fb39fc8 100755
--- a/akka-cluster/src/main/scala/akka/cluster/storage/Storage.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/storage/Storage.scala
@@ -118,8 +118,8 @@ class VersionedData(val data: Array[Byte], val version: Long) {}
/**
* An AkkaException thrown by the Storage module.
*/
-class StorageException(msg: String = null, cause: java.lang.Throwable = null) extends AkkaException(msg, cause){
- def this(msg:String) = this(msg, null);
+class StorageException(msg: String = null, cause: java.lang.Throwable = null) extends AkkaException(msg, cause) {
+ def this(msg: String) = this(msg, null);
}
/**
@@ -127,21 +127,21 @@ class StorageException(msg: String = null, cause: java.lang.Throwable = null) ex
* A StorageException thrown when an operation is done on a non existing node.
*/
class MissingDataException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) {
- def this(msg:String) = this(msg, null);
+ def this(msg: String) = this(msg, null);
}
/**
* A StorageException thrown when an operation is done on an existing node, but no node was expected.
*/
-class DataExistsException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause){
- def this(msg:String) = this(msg, null);
+class DataExistsException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) {
+ def this(msg: String) = this(msg, null);
}
/**
* A StorageException thrown when an operation causes an optimistic locking failure.
*/
class BadVersionException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) {
- def this(msg:String) = this(msg, null);
+ def this(msg: String) = this(msg, null);
}
/**
diff --git a/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala b/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala
index c405240bfd..fd27d894bf 100644
--- a/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala
@@ -27,8 +27,7 @@ class AkkaZkClient(zkServers: String,
_connection.connect(this)
} catch {
case e: InterruptedException ⇒ throw new ZkInterruptedException(e)
- }
- finally {
+ } finally {
zkLock.unlock()
}
}
diff --git a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
index b676e9f254..015ccde474 100644
--- a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
+++ b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
@@ -4,22 +4,22 @@
package akka.remote.netty
-import akka.dispatch.{ActorPromise, DefaultPromise, Promise}
-import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
+import akka.dispatch.{ ActorPromise, DefaultPromise, Promise }
+import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings }
import akka.remote.protocol.RemoteProtocol._
import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
import akka.remoteinterface._
import akka.actor.{
-PoisonPill,
-Actor,
-RemoteActorRef,
-ActorRef,
-IllegalActorStateException,
-RemoteActorSystemMessage,
-uuidFrom,
-Uuid,
-LifeCycleMessage
+ PoisonPill,
+ Actor,
+ RemoteActorRef,
+ ActorRef,
+ IllegalActorStateException,
+ RemoteActorSystemMessage,
+ uuidFrom,
+ Uuid,
+ LifeCycleMessage
}
import akka.actor.Actor._
import akka.config.Config
@@ -28,27 +28,27 @@ import akka.util._
import akka.event.EventHandler
import org.jboss.netty.channel._
-import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup, ChannelGroupFuture}
+import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture }
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
-import org.jboss.netty.bootstrap.{ServerBootstrap, ClientBootstrap}
-import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
-import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
-import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
-import org.jboss.netty.handler.timeout.{ReadTimeoutHandler, ReadTimeoutException}
-import org.jboss.netty.handler.execution.{OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler}
-import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer}
+import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap }
+import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
+import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
+import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
+import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
+import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
+import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import java.net.InetSocketAddress
-import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
+import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import java.util.concurrent._
import akka.AkkaException
-class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause){
- def this(msg:String) = this(msg, null);
+class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
+ def this(msg: String) = this(msg, null);
}
object RemoteEncoder {
@@ -82,7 +82,7 @@ trait NettyRemoteClientModule extends RemoteClientModule {
withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef))
private[akka] def withClientFor[T](
- address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient ⇒ T): T = {
+ address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient ⇒ T): T = {
// loader.foreach(MessageSerializer.setClassLoader(_))
val key = Address(address)
lock.readLock.lock
@@ -119,14 +119,14 @@ trait NettyRemoteClientModule extends RemoteClientModule {
def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard {
remoteClients.remove(Address(address)) match {
case Some(client) ⇒ client.shutdown()
- case None ⇒ false
+ case None ⇒ false
}
}
def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard {
remoteClients.get(Address(address)) match {
case Some(client) ⇒ client.connect(reconnectIfAlreadyConnected = true)
- case None ⇒ false
+ case None ⇒ false
}
}
@@ -152,9 +152,9 @@ trait NettyRemoteClientModule extends RemoteClientModule {
* ActiveRemoteClient, but others could be feasible, like a PassiveRemoteClient that
* reuses an already established connection.
*/
-abstract class RemoteClient private[akka](
- val module: NettyRemoteClientModule,
- val remoteAddress: InetSocketAddress) {
+abstract class RemoteClient private[akka] (
+ val module: NettyRemoteClientModule,
+ val remoteAddress: InetSocketAddress) {
val useTransactionLog = config.getBool("akka.cluster.client.buffering.retry-message-send-on-failure", true)
val transactionLogCapacity = config.getInt("akka.cluster.client.buffering.capacity", -1)
@@ -198,13 +198,13 @@ abstract class RemoteClient private[akka](
* Converts the message to the wireprotocol and sends the message across the wire
*/
def send[T](
- message: Any,
- senderOption: Option[ActorRef],
- senderFuture: Option[Promise[T]],
- remoteAddress: InetSocketAddress,
- timeout: Long,
- isOneWay: Boolean,
- actorRef: ActorRef): Option[Promise[T]] =
+ message: Any,
+ senderOption: Option[ActorRef],
+ senderFuture: Option[Promise[T]],
+ remoteAddress: InetSocketAddress,
+ timeout: Long,
+ isOneWay: Boolean,
+ actorRef: ActorRef): Option[Promise[T]] =
send(createRemoteMessageProtocolBuilder(
Some(actorRef), Left(actorRef.uuid), actorRef.address, timeout, Right(message), isOneWay, senderOption).build,
senderFuture)
@@ -213,8 +213,8 @@ abstract class RemoteClient private[akka](
* Sends the message across the wire
*/
def send[T](
- request: RemoteMessageProtocol,
- senderFuture: Option[Promise[T]]): Option[Promise[T]] = {
+ request: RemoteMessageProtocol,
+ senderFuture: Option[Promise[T]]): Option[Promise[T]] = {
if (isRunning) {
EventHandler.debug(this, "Sending to connection [%s] message [\n%s]".format(remoteAddress, request))
@@ -312,9 +312,9 @@ abstract class RemoteClient private[akka](
*
* @author Jonas Bonér
*/
-class ActiveRemoteClient private[akka](
- module: NettyRemoteClientModule, remoteAddress: InetSocketAddress,
- val loader: Option[ClassLoader] = None, notifyListenersFun: (⇒ Any) ⇒ Unit) extends RemoteClient(module, remoteAddress) {
+class ActiveRemoteClient private[akka] (
+ module: NettyRemoteClientModule, remoteAddress: InetSocketAddress,
+ val loader: Option[ClassLoader] = None, notifyListenersFun: (⇒ Any) ⇒ Unit) extends RemoteClient(module, remoteAddress) {
import RemoteClientSettings._
@@ -346,7 +346,6 @@ class ActiveRemoteClient private[akka](
EventHandler.debug(this, "Starting remote client connection to [%s]".format(remoteAddress))
-
// Wait until the connection attempt succeeds or fails.
connection = bootstrap.connect(remoteAddress)
openChannels.add(connection.awaitUninterruptibly.getChannel)
@@ -390,7 +389,7 @@ class ActiveRemoteClient private[akka](
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
- EventHandler.error(connection.getCause, "Reconnection to [%s] has failed".format(remoteAddress),this)
+ EventHandler.error(connection.getCause, "Reconnection to [%s] has failed".format(remoteAddress), this)
false
} else {
@@ -443,12 +442,12 @@ class ActiveRemoteClient private[akka](
* @author Jonas Bonér
*/
class ActiveRemoteClientPipelineFactory(
- name: String,
- futures: ConcurrentMap[Uuid, Promise[_]],
- bootstrap: ClientBootstrap,
- remoteAddress: InetSocketAddress,
- timer: HashedWheelTimer,
- client: ActiveRemoteClient) extends ChannelPipelineFactory {
+ name: String,
+ futures: ConcurrentMap[Uuid, Promise[_]],
+ bootstrap: ClientBootstrap,
+ remoteAddress: InetSocketAddress,
+ timer: HashedWheelTimer,
+ client: ActiveRemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.length, RemoteClientSettings.READ_TIMEOUT.unit)
@@ -458,7 +457,7 @@ class ActiveRemoteClientPipelineFactory(
val protobufEnc = new ProtobufEncoder
val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match {
case "zlib" ⇒ (new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
- case _ ⇒ (Nil, Nil)
+ case _ ⇒ (Nil, Nil)
}
val remoteClient = new ActiveRemoteClientHandler(name, futures, bootstrap, remoteAddress, timer, client)
@@ -472,12 +471,12 @@ class ActiveRemoteClientPipelineFactory(
*/
@ChannelHandler.Sharable
class ActiveRemoteClientHandler(
- val name: String,
- val futures: ConcurrentMap[Uuid, Promise[_]],
- val bootstrap: ClientBootstrap,
- val remoteAddress: InetSocketAddress,
- val timer: HashedWheelTimer,
- val client: ActiveRemoteClient)
+ val name: String,
+ val futures: ConcurrentMap[Uuid, Promise[_]],
+ val bootstrap: ClientBootstrap,
+ val remoteAddress: InetSocketAddress,
+ val timer: HashedWheelTimer,
+ val client: ActiveRemoteClient)
extends SimpleChannelUpstreamHandler {
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
@@ -493,20 +492,20 @@ class ActiveRemoteClientHandler(
case arp: AkkaRemoteProtocol if arp.hasMessage ⇒
val reply = arp.getMessage
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
- EventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]".format(reply))
+ EventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]".format(reply))
EventHandler.debug(this, "Trying to map back to future: %s".format(replyUuid))
futures.remove(replyUuid).asInstanceOf[Promise[Any]] match {
- case null =>
+ case null ⇒
client.notifyListeners(RemoteClientError(new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist"), client.module, client.remoteAddress))
- case future =>
- if (reply.hasMessage) {
- val message = MessageSerializer.deserialize(reply.getMessage)
- future.completeWithResult(message)
- } else {
- future.completeWithException(parseException(reply, client.loader))
- }
+ case future ⇒
+ if (reply.hasMessage) {
+ val message = MessageSerializer.deserialize(reply.getMessage)
+ future.completeWithResult(message)
+ } else {
+ future.completeWithException(parseException(reply, client.loader))
+ }
}
case other ⇒
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress)
@@ -598,11 +597,11 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
def optimizeLocalScoped_?() = optimizeLocal.get
protected[akka] def actorFor(
- actorAddress: String,
- timeout: Long,
- host: String,
- port: Int,
- loader: Option[ClassLoader]): ActorRef = {
+ actorAddress: String,
+ timeout: Long,
+ host: String,
+ port: Int,
+ loader: Option[ClassLoader]): ActorRef = {
val homeInetSocketAddress = this.address
if (optimizeLocalScoped_?) {
@@ -684,7 +683,7 @@ trait NettyRemoteServerModule extends RemoteServerModule {
def address = currentServer.get match {
case Some(server) ⇒ server.address
- case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress
+ case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress
}
def name = currentServer.get match {
@@ -715,11 +714,10 @@ trait NettyRemoteServerModule extends RemoteServerModule {
def shutdownServerModule() = guard withGuard {
_isRunning switchOff {
- currentServer.getAndSet(None) foreach {
- instance ⇒
- EventHandler.debug(this, "Shutting down remote server on %s:%s".format(instance.host, instance.port))
+ currentServer.getAndSet(None) foreach { instance ⇒
+ EventHandler.debug(this, "Shutting down remote server on %s:%s".format(instance.host, instance.port))
- instance.shutdown()
+ instance.shutdown()
}
}
}
@@ -810,11 +808,11 @@ trait NettyRemoteServerModule extends RemoteServerModule {
* @author Jonas Bonér
*/
class RemoteServerPipelineFactory(
- val name: String,
- val openChannels: ChannelGroup,
- val executor: ExecutionHandler,
- val loader: Option[ClassLoader],
- val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
+ val name: String,
+ val openChannels: ChannelGroup,
+ val executor: ExecutionHandler,
+ val loader: Option[ClassLoader],
+ val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
import RemoteServerSettings._
@@ -825,7 +823,7 @@ class RemoteServerPipelineFactory(
val protobufEnc = new ProtobufEncoder
val (enc, dec) = COMPRESSION_SCHEME match {
case "zlib" ⇒ (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
- case _ ⇒ (Nil, Nil)
+ case _ ⇒ (Nil, Nil)
}
val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
@@ -865,10 +863,10 @@ class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends Si
*/
@ChannelHandler.Sharable
class RemoteServerHandler(
- val name: String,
- val openChannels: ChannelGroup,
- val applicationLoader: Option[ClassLoader],
- val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
+ val name: String,
+ val openChannels: ChannelGroup,
+ val applicationLoader: Option[ClassLoader],
+ val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
import RemoteServerSettings._
@@ -886,7 +884,7 @@ class RemoteServerHandler(
} else if (!future.isSuccess) {
val socketAddress = future.getChannel.getRemoteAddress match {
case i: InetSocketAddress ⇒ Some(i)
- case _ ⇒ None
+ case _ ⇒ None
}
server.notifyListeners(RemoteServerWriteFailed(payload, future.getCause, server, socketAddress))
}
@@ -902,7 +900,7 @@ class RemoteServerHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
- EventHandler.debug(this,"Remote client [%s] connected to [%s]".format(clientAddress, server.name))
+ EventHandler.debug(this, "Remote client [%s] connected to [%s]".format(clientAddress, server.name))
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
@@ -921,7 +919,7 @@ class RemoteServerHandler(
try {
actor ! PoisonPill
} catch {
- case e: Exception ⇒ EventHandler.error(e, "Couldn't stop %s".format(actor),this)
+ case e: Exception ⇒ EventHandler.error(e, "Couldn't stop %s".format(actor), this)
}
}
@@ -930,7 +928,7 @@ class RemoteServerHandler(
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
- EventHandler.debug("Remote client [%s] channel closed from [%s]".format(clientAddress, server.name),this)
+ EventHandler.debug("Remote client [%s] channel closed from [%s]".format(clientAddress, server.name), this)
server.notifyListeners(RemoteServerClientClosed(server, clientAddress))
}
@@ -956,7 +954,7 @@ class RemoteServerHandler(
private def getClientAddress(ctx: ChannelHandlerContext): Option[InetSocketAddress] =
ctx.getChannel.getRemoteAddress match {
case inet: InetSocketAddress ⇒ Some(inet)
- case _ ⇒ None
+ case _ ⇒ None
}
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = try {
@@ -1005,22 +1003,22 @@ class RemoteServerHandler(
request.getActorInfo.getTimeout,
new ActorPromise(request.getActorInfo.getTimeout).
onComplete(_.value.get match {
- case Left(exception) ⇒ write(channel, createErrorReplyMessage(exception, request))
- case r: Right[_, _] ⇒
- val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
- Some(actorRef),
- Right(request.getUuid),
- actorInfo.getAddress,
- actorInfo.getTimeout,
- r.asInstanceOf[Either[Throwable, Any]],
- isOneWay = true,
- Some(actorRef))
+ case Left(exception) ⇒ write(channel, createErrorReplyMessage(exception, request))
+ case r: Right[_, _] ⇒
+ val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
+ Some(actorRef),
+ Right(request.getUuid),
+ actorInfo.getAddress,
+ actorInfo.getTimeout,
+ r.asInstanceOf[Either[Throwable, Any]],
+ isOneWay = true,
+ Some(actorRef))
- // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
- if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
+ // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
+ if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
- write(channel, RemoteEncoder.encode(messageBuilder.build))
- }))
+ write(channel, RemoteEncoder.encode(messageBuilder.build))
+ }))
}
}
@@ -1070,7 +1068,7 @@ class RemoteServerHandler(
private def findSessionActor(id: String, channel: Channel): ActorRef =
sessionActors.get(channel) match {
case null ⇒ null
- case map ⇒ map get id
+ case map ⇒ map get id
}
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = {
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala
index aebfd8b651..bdc430ee6d 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala
@@ -13,6 +13,7 @@ import akka.util.Duration
import System.{ currentTimeMillis ⇒ now }
import java.io.File
+import akka.actor.Deployer
trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll {
def testNodes: Int
@@ -36,6 +37,7 @@ trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAft
}
trait ClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll {
+
override def beforeAll() = {
ClusterTestNode.waitForReady(getClass.getName)
}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.conf
new file mode 100644
index 0000000000..518aed1cd0
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.conf
@@ -0,0 +1,6 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "ERROR"
+akka.actor.deployment.service-test.router = "round-robin"
+akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
+akka.actor.deployment.service-test.clustered.replication-factor = 2
+akka.cluster.client.buffering.retry-message-send-on-failure = false
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.opts
new file mode 100644
index 0000000000..a88c260d8c
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.conf
new file mode 100644
index 0000000000..518aed1cd0
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.conf
@@ -0,0 +1,6 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "ERROR"
+akka.actor.deployment.service-test.router = "round-robin"
+akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
+akka.actor.deployment.service-test.clustered.replication-factor = 2
+akka.cluster.client.buffering.retry-message-send-on-failure = false
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.opts
new file mode 100644
index 0000000000..f1e01f253d
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.conf
new file mode 100644
index 0000000000..882d9cb7db
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.conf
@@ -0,0 +1,6 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "ERROR"
+akka.actor.deployment.service-test.router = "round-robin"
+akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
+akka.actor.deployment.service-test.clustered.replication-factor = 2
+akka.cluster.client.buffering.retry-message-send-on-failure = false
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.opts
new file mode 100644
index 0000000000..202496ad31
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala
new file mode 100644
index 0000000000..7c59a9fb93
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala
@@ -0,0 +1,150 @@
+/*
+ * Copyright (C) 2009-2011 Typesafe Inc.