From 3861bea1774b5ae2fa3ea79202afdaa20be84caa Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 11 Aug 2010 15:34:36 +0200 Subject: [PATCH 01/11] Changing akka-init-script.sh to use logback --- scripts/akka-init-script.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/akka-init-script.sh b/scripts/akka-init-script.sh index 5a64ea5039..752d675b55 100644 --- a/scripts/akka-init-script.sh +++ b/scripts/akka-init-script.sh @@ -10,9 +10,9 @@ NAME="cool" DAEMON=/usr/bin/java export AKKA_HOME=/var/.../servers/akka AKKA_JAR=$AKKA_HOME/akka.jar -LOG4J=$AKKA_HOME/config/log4j.properties +LOGBACK=$AKKA_HOME/config/logback.xml JVMFLAGS="-Xms512M -Xmx3072M -XX:+UseConcMarkSweepGC - -Dlog4j.configuration=file://"$LOG4J +Dlogback.configurationFile="$LOGBACK DAEMON_ARGS=$JVMFLAGS" -jar "$AKKA_JAR PIDFILE=/var/run/$NAME.pid SCRIPTNAME=/etc/init.d/$NAME From 3d6500f58081322e680473205db54d7152c08506 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 11 Aug 2010 15:46:17 +0200 Subject: [PATCH 02/11] Ported TransactorSpec to UntypedActor --- .../untyped-actor/UntypedTransactorSpec.scala | 239 ++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 akka-core/src/test/scala/actor/untyped-actor/UntypedTransactorSpec.scala diff --git a/akka-core/src/test/scala/actor/untyped-actor/UntypedTransactorSpec.scala b/akka-core/src/test/scala/actor/untyped-actor/UntypedTransactorSpec.scala new file mode 100644 index 0000000000..3ec707119c --- /dev/null +++ b/akka-core/src/test/scala/actor/untyped-actor/UntypedTransactorSpec.scala @@ -0,0 +1,239 @@ +package se.scalablesolutions.akka.actor + +import java.util.concurrent.{TimeUnit, CountDownLatch} +import org.scalatest.junit.JUnitSuite +import org.junit.Test + +import se.scalablesolutions.akka.stm.{Ref, TransactionalMap, TransactionalVector} +import UntypedActor._ + +object UntypedTransactorSpec { + case class GetMapState(key: String) + case object GetVectorState + case object GetVectorSize + case object GetRefState + + case class SetMapState(key: String, value: String) + case class SetVectorState(key: String) + case class SetRefState(key: String) + case class Success(key: String, value: String) + case class Failure(key: String, value: String, failer: UntypedActorRef) + + case class SetMapStateOneWay(key: String, value: String) + case class SetVectorStateOneWay(key: String) + case class SetRefStateOneWay(key: String) + case class SuccessOneWay(key: String, value: String) + case class FailureOneWay(key: String, value: String, failer: UntypedActorRef) + + case object GetNotifier +} +import UntypedTransactorSpec._ + +class StatefulUntypedTransactor(expectedInvocationCount: Int) extends UntypedTransactor { + def this() = this(0) + getContext.setTimeout(5000) + + val notifier = new CountDownLatch(expectedInvocationCount) + + private val mapState = TransactionalMap[String, String]() + private val vectorState = TransactionalVector[String]() + private val refState = Ref[String]() + + def onReceive(message: Any) = message match { + case GetNotifier => + getContext.replyUnsafe(notifier) + case GetMapState(key) => + getContext.replyUnsafe(mapState.get(key).get) + notifier.countDown + case GetVectorSize => + getContext.replyUnsafe(vectorState.length.asInstanceOf[AnyRef]) + notifier.countDown + case GetRefState => + getContext.replyUnsafe(refState.get) + notifier.countDown + case SetMapState(key, msg) => + mapState.put(key, msg) + getContext.replyUnsafe(msg) + notifier.countDown + case SetVectorState(msg) => + vectorState.add(msg) + getContext.replyUnsafe(msg) + notifier.countDown + case SetRefState(msg) => + refState.swap(msg) + getContext.replyUnsafe(msg) + notifier.countDown + case Success(key, msg) => + mapState.put(key, msg) + vectorState.add(msg) + refState.swap(msg) + getContext.replyUnsafe(msg) + notifier.countDown + case Failure(key, msg, failer) => + mapState.put(key, msg) + vectorState.add(msg) + refState.swap(msg) + failer.sendRequestReply("Failure") + getContext.replyUnsafe(msg) + notifier.countDown + case SetMapStateOneWay(key, msg) => + mapState.put(key, msg) + notifier.countDown + case SetVectorStateOneWay(msg) => + vectorState.add(msg) + notifier.countDown + case SetRefStateOneWay(msg) => + refState.swap(msg) + notifier.countDown + case SuccessOneWay(key, msg) => + mapState.put(key, msg) + vectorState.add(msg) + refState.swap(msg) + notifier.countDown + case FailureOneWay(key, msg, failer) => + mapState.put(key, msg) + vectorState.add(msg) + refState.swap(msg) + notifier.countDown + failer.sendOneWay("Failure",getContext) + } +} + +class StatefulUntypedTransactorExpectingTwoInvocations extends StatefulUntypedTransactor(2) + +@serializable +class FailerUntypedTransactor extends UntypedTransactor { + + def onReceive(message: Any) = message match { + case "Failure" => + throw new RuntimeException("Expected exception; to test fault-tolerance") + } +} + +class UntypedTransactorSpec extends JUnitSuite { + + @Test + def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start + stateful sendOneWay SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state + stateful sendOneWay SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch] + assert(notifier.await(1, TimeUnit.SECONDS)) + assert("new state" === (stateful sendRequestReply GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"))) + } + + @Test + def shouldMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf(classOf[StatefulUntypedTransactor]).start + stateful sendRequestReply SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state + stateful sendRequestReply Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + assert("new state" === (stateful sendRequestReply GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"))) + } + + @Test + def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start + val failer = actorOf(classOf[FailerUntypedTransactor]).start + stateful sendOneWay SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state + stateful sendOneWay FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch] + assert(notifier.await(5, TimeUnit.SECONDS)) + assert("init" === (stateful sendRequestReply GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure"))) // check that state is == init state + } + + @Test + def shouldMapShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf(classOf[StatefulUntypedTransactor]).start + stateful sendRequestReply SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state + val failer = actorOf(classOf[FailerUntypedTransactor]).start + try { + stateful sendRequestReply Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + fail("should have thrown an exception") + } catch {case e: RuntimeException => {}} + assert("init" === (stateful sendRequestReply GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure"))) // check that state is == init state + } + + @Test + def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start + stateful sendOneWay SetVectorStateOneWay("init") // set init state + stateful sendOneWay SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch] + assert(notifier.await(1, TimeUnit.SECONDS)) + assert(2 === (stateful sendRequestReply GetVectorSize)) + } + + @Test + def shouldVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf(classOf[StatefulUntypedTransactor]).start + stateful sendRequestReply SetVectorState("init") // set init state + stateful sendRequestReply Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + assert(2 === (stateful sendRequestReply GetVectorSize)) + } + + @Test + def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start + stateful sendOneWay SetVectorStateOneWay("init") // set init state + Thread.sleep(1000) + val failer = actorOf(classOf[FailerUntypedTransactor]).start + stateful sendOneWay FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch] + assert(notifier.await(1, TimeUnit.SECONDS)) + assert(1 === (stateful sendRequestReply GetVectorSize)) + } + + @Test + def shouldVectorShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf(classOf[StatefulUntypedTransactor]).start + stateful sendRequestReply SetVectorState("init") // set init state + val failer = actorOf(classOf[FailerUntypedTransactor]).start + try { + stateful sendRequestReply Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + fail("should have thrown an exception") + } catch {case e: RuntimeException => {}} + assert(1 === (stateful sendRequestReply GetVectorSize)) + } + + @Test + def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start + stateful sendOneWay SetRefStateOneWay("init") // set init state + stateful sendOneWay SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch] + assert(notifier.await(1, TimeUnit.SECONDS)) + assert("new state" === (stateful sendRequestReply GetRefState)) + } + + @Test + def shouldRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf(classOf[StatefulUntypedTransactor]).start + stateful sendRequestReply SetRefState("init") // set init state + stateful sendRequestReply Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + assert("new state" === (stateful sendRequestReply GetRefState)) + } + + @Test + def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf(classOf[StatefulUntypedTransactorExpectingTwoInvocations]).start + stateful sendOneWay SetRefStateOneWay("init") // set init state + Thread.sleep(1000) + val failer = actorOf(classOf[FailerUntypedTransactor]).start + stateful sendOneWay FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + val notifier = (stateful sendRequestReply GetNotifier).asInstanceOf[CountDownLatch] + assert(notifier.await(1, TimeUnit.SECONDS)) + assert("init" === (stateful sendRequestReply (GetRefState, 1000000))) // check that state is == init state + } + + @Test + def shouldRefShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf(classOf[StatefulUntypedTransactor]).start + stateful sendRequestReply SetRefState("init") // set init state + val failer = actorOf(classOf[FailerUntypedTransactor]).start + try { + stateful sendRequestReply Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + fail("should have thrown an exception") + } catch {case e: RuntimeException => {}} + assert("init" === (stateful sendRequestReply GetRefState)) // check that state is == init state + } +} \ No newline at end of file From 79df750b1da19e5cbd56ecb8e5f73cd1ed6a6295 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 11 Aug 2010 17:01:16 +0200 Subject: [PATCH 03/11] Minor perf improvement in Ref --- akka-core/src/main/scala/stm/Ref.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/akka-core/src/main/scala/stm/Ref.scala b/akka-core/src/main/scala/stm/Ref.scala index 2ca6802b15..7d99c673a6 100644 --- a/akka-core/src/main/scala/stm/Ref.scala +++ b/akka-core/src/main/scala/stm/Ref.scala @@ -43,8 +43,9 @@ class Ref[T](initialOpt: Option[T] = None) def alter(f: T => T): T = { ensureNotNull - set(f(this.get)) - this.get + val value = f(this.get) + set(value) + value } def getOption: Option[T] = Option(this.get) From 8744ee5caec1a5cee4235830aeac836deffcd2c6 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 11 Aug 2010 17:02:44 +0200 Subject: [PATCH 04/11] Extra robustness for Logback --- project/build/AkkaProject.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 78512ac699..7140c40895 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -93,6 +93,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val MULTIVERSE_VERSION = "0.6-SNAPSHOT" lazy val SCALATEST_VERSION = "1.2-for-scala-2.8.0.final-SNAPSHOT" lazy val LOGBACK_VERSION = "0.9.24" + lazy val SLF4J_VERSION = "1.6.0" lazy val SPRING_VERSION = "3.0.3.RELEASE" lazy val WerkzVersion = "2.2.1" @@ -188,6 +189,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val sjson = "sjson.json" % "sjson" % "0.7-2.8.0" % "compile" + lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile" + lazy val logback = "ch.qos.logback" % "logback-classic" % LOGBACK_VERSION % "compile" lazy val logback_core = "ch.qos.logback" % "logback-core" % LOGBACK_VERSION % "compile" @@ -266,10 +269,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { " dist/akka-jta_%s-%s.jar".format(buildScalaVersion, version) ) - //FIXME STILL NEEDED? => Exclude slf4j1.5.11 from the classpath, it's conflicting... override def runClasspath = super.runClasspath +++ - descendents(info.projectPath / "config", "*") --- - (super.runClasspath ** "slf4j*1.5.11.jar") + descendents(info.projectPath / "config", "*") + + //Exclude slf4j1.5.11 from the classpath, it's conflicting... + override def fullClasspath(config: Configuration): PathFinder = { + super.fullClasspath(config) --- (super.fullClasspath(config) ** "slf4j*1.5.11.jar") + } override def mainResources = super.mainResources +++ descendents(info.projectPath / "config", "*") @@ -349,6 +355,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val sjson = Dependencies.sjson val werkz = Dependencies.werkz val werkz_core = Dependencies.werkz_core + val slf4j = Dependencies.slf4j val logback = Dependencies.logback val logback_core = Dependencies.logback_core From b0b294aa740d36bd7ccb69adb59ee39d1455d37f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Thu, 12 Aug 2010 09:46:42 +0200 Subject: [PATCH 05/11] Refactored Future API to make it more Java friendly --- akka-core/src/main/scala/actor/ActorRef.scala | 4 ++-- akka-core/src/main/scala/actor/ActorRegistry.scala | 13 ++++--------- akka-core/src/main/scala/actor/TypedActor.scala | 6 ++---- akka-core/src/main/scala/dispatch/Future.scala | 14 +++++++------- akka-core/src/main/scala/remote/RemoteClient.scala | 2 +- akka-core/src/test/scala/dispatch/FutureSpec.scala | 2 +- 6 files changed, 17 insertions(+), 24 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 0d0256927b..6c06404601 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -306,7 +306,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef] if (isTypedActor) throw e else None } - if (future.exception.isDefined) throw future.exception.get._2 + if (future.exception.isDefined) throw future.exception.get else future.result } else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start' before using it") @@ -1179,7 +1179,7 @@ class LocalActorRef private[akka]( } } - senderFuture.foreach(_.completeWithException(this, reason)) + senderFuture.foreach(_.completeWithException(reason)) clearTransaction if (topLevelTransaction) clearTransactionSet diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 1e097f9034..84a7f487df 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -127,17 +127,13 @@ object ActorRegistry extends ListenerManagement { if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor) val set = actorsById get id - if(set ne null) - set add actor + if (set ne null) set add actor else { val newSet = new ConcurrentSkipListSet[ActorRef] newSet add actor - val oldSet = actorsById.putIfAbsent(id,newSet) - - //Parry for two simultaneous putIfAbsent(id,newSet) - if(oldSet ne null) - oldSet add actor + // Parry for two simultaneous putIfAbsent(id,newSet) + if (oldSet ne null) oldSet add actor } // UUID @@ -154,8 +150,7 @@ object ActorRegistry extends ListenerManagement { actorsByUUID remove actor.uuid val set = actorsById get actor.id - if (set ne null) - set remove actor + if (set ne null) set remove actor //FIXME: safely remove set if empty, leaks memory diff --git a/akka-core/src/main/scala/actor/TypedActor.scala b/akka-core/src/main/scala/actor/TypedActor.scala index 9c68a7dc01..c171a75211 100644 --- a/akka-core/src/main/scala/actor/TypedActor.scala +++ b/akka-core/src/main/scala/actor/TypedActor.scala @@ -629,10 +629,8 @@ private[akka] sealed class TypedActorAspect { } private def getResultOrThrowException[T](future: Future[T]): Option[T] = - if (future.exception.isDefined) { - val (_, cause) = future.exception.get - throw cause - } else future.result + if (future.exception.isDefined) throw future.exception.get + else future.result private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE diff --git a/akka-core/src/main/scala/dispatch/Future.scala b/akka-core/src/main/scala/dispatch/Future.scala index d1b4f9572b..c365df3c3c 100644 --- a/akka-core/src/main/scala/dispatch/Future.scala +++ b/akka-core/src/main/scala/dispatch/Future.scala @@ -25,7 +25,7 @@ object Futures { try { promise completeWithResult body } catch { - case e => promise completeWithException (None, e) + case e => promise completeWithException e } promise } @@ -77,12 +77,12 @@ sealed trait Future[T] { def isExpired: Boolean def timeoutInNanos: Long def result: Option[T] - def exception: Option[Tuple2[AnyRef, Throwable]] + def exception: Option[Throwable] } trait CompletableFuture[T] extends Future[T] { def completeWithResult(result: T) - def completeWithException(toBlame: AnyRef, exception: Throwable) + def completeWithException(exception: Throwable) } // Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. @@ -96,7 +96,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { private val _signal = _lock.newCondition private var _completed: Boolean = _ private var _result: Option[T] = None - private var _exception: Option[Tuple2[AnyRef, Throwable]] = None + private var _exception: Option[Throwable] = None def await = try { _lock.lock @@ -147,7 +147,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { _lock.unlock } - def exception: Option[Tuple2[AnyRef, Throwable]] = try { + def exception: Option[Throwable] = try { _lock.lock _exception } finally { @@ -165,11 +165,11 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { _lock.unlock } - def completeWithException(toBlame: AnyRef, exception: Throwable) = try { + def completeWithException(exception: Throwable) = try { _lock.lock if (!_completed) { _completed = true - _exception = Some((toBlame, exception)) + _exception = Some(exception) } } finally { _signal.signalAll diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index e17c7f57ef..c1bd574c3e 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -326,7 +326,7 @@ class RemoteClientHandler( "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) } - future.completeWithException(null, parseException(reply, client.loader)) + future.completeWithException(parseException(reply, client.loader)) } futures.remove(reply.getId) } else { diff --git a/akka-core/src/test/scala/dispatch/FutureSpec.scala b/akka-core/src/test/scala/dispatch/FutureSpec.scala index 39fc563007..f740763fdf 100644 --- a/akka-core/src/test/scala/dispatch/FutureSpec.scala +++ b/akka-core/src/test/scala/dispatch/FutureSpec.scala @@ -36,7 +36,7 @@ class FutureSpec extends JUnitSuite { val future = actor !!! "Failure" future.await assert(future.exception.isDefined) - assert("Expected exception; to test fault-tolerance" === future.exception.get._2.getMessage) + assert("Expected exception; to test fault-tolerance" === future.exception.get.getMessage) actor.stop } From a6a02de9db566d1487d7b679273c37b30e23753d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Thu, 12 Aug 2010 12:00:24 +0200 Subject: [PATCH 06/11] Add actorOf with call-by-name for Java TypedActor --- .../src/main/scala/actor/UntypedActor.scala | 30 ++++++++++++------- ...typedActorFireForgetRequestReplySpec.scala | 16 ++++++++++ 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/akka-core/src/main/scala/actor/UntypedActor.scala b/akka-core/src/main/scala/actor/UntypedActor.scala index 474c75e6de..2228db2de4 100644 --- a/akka-core/src/main/scala/actor/UntypedActor.scala +++ b/akka-core/src/main/scala/actor/UntypedActor.scala @@ -83,6 +83,15 @@ abstract class UntypedTransactor extends UntypedActor { self.makeTransactionRequired } +/** + * Factory closure for an UntypedActor, to be used with 'UntypedActor.actorOf(factory)'. + * + * @author Jonas Bonér + */ +trait UntypedActorFactory { + def create: UntypedActor +} + /** * Extend this abstract class to create a remote UntypedActor. * @@ -119,7 +128,7 @@ object UntypedActor { *
    *   ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class);
    *   actor.start();
-   *   actor.sendOneWay(message, context)
+   *   actor.sendOneWay(message, context);
    *   actor.stop();
    * 
* You can create and start the actor in one statement like this: @@ -137,22 +146,23 @@ object UntypedActor { * NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the * UntypedActor instance directly, but only through its 'UntypedActorRef' wrapper reference. *

- * Creates an ActorRef out of the Actor. Allows you to pass in the instance for the Actor. Only - * use this method when you need to pass in constructor arguments into the 'UntypedActor'. + * Creates an UntypedActorRef out of the Actor. Allows you to pass in the instance for the UntypedActor. + * Only use this method when you need to pass in constructor arguments into the 'UntypedActor'. *

+ * You use it by implementing the UntypedActorFactory interface. * Example in Java: *

-   *   ActorRef actor = UntypedActor.actorOf(new MyUntypedActor("service:name", 5));
+   *   UntypedActorRef actor = UntypedActor.actorOf(new UntypedActorFactory() {
+   *     public UntypedActor create() { 
+   *       return new MyUntypedActor("service:name", 5); 
+   *     }
+   *   });
    *   actor.start();
-   *   actor.sendOneWay(message, context)
+   *   actor.sendOneWay(message, context);
    *   actor.stop();
    * 
- * You can create and start the actor in one statement like this: - *
-   *   ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class).start();
-   * 
*/ - def actorOf(actorInstance: UntypedActor): UntypedActorRef = UntypedActorRef.wrap(new LocalActorRef(() => actorInstance)) + def actorOf(factory: UntypedActorFactory) = UntypedActorRef.wrap(new LocalActorRef(() => factory.create)) } /** diff --git a/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala b/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala index 89a05eca9c..bb883142d6 100644 --- a/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala +++ b/akka-core/src/test/scala/actor/untyped-actor/UntypedActorFireForgetRequestReplySpec.scala @@ -11,6 +11,22 @@ import Actor._ class UntypedActorFireForgetRequestReplySpec extends WordSpec with MustMatchers { "An UntypedActor" should { + "be able to be created using the UntypedActor.actorOf(UntypedActorFactory) factory method" in { + UntypedActorTestState.finished = new CyclicBarrier(2); + UntypedActorTestState.log = "NIL"; + val replyActor = UntypedActor.actorOf(new UntypedActorFactory() { + def create: UntypedActor = new ReplyUntypedActor + }).start + val senderActor = UntypedActor.actorOf(new UntypedActorFactory() { + def create: UntypedActor = new SenderUntypedActor + }).start + senderActor.sendOneWay(replyActor) + senderActor.sendOneWay("ReplyToSendOneWayUsingReply") + try { UntypedActorTestState.finished.await(1L, TimeUnit.SECONDS) } + catch { case e: TimeoutException => fail("Never got the message") } + UntypedActorTestState.log must be ("Reply") + } + "reply to message sent with 'sendOneWay' using 'reply'" in { UntypedActorTestState.finished = new CyclicBarrier(2); UntypedActorTestState.log = "NIL"; From cfa68f54b3e0a133c06ec5df7d6dea968695b7e7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 12 Aug 2010 13:38:13 +0200 Subject: [PATCH 07/11] Moving logback-test.xml to /config --- .../src/test/resources => config}/logback-test.xml | 0 project/build/AkkaProject.scala | 11 ++++++----- 2 files changed, 6 insertions(+), 5 deletions(-) rename {akka-core/src/test/resources => config}/logback-test.xml (100%) diff --git a/akka-core/src/test/resources/logback-test.xml b/config/logback-test.xml similarity index 100% rename from akka-core/src/test/resources/logback-test.xml rename to config/logback-test.xml diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 7140c40895..e0874b9e9c 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -269,16 +269,17 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { " dist/akka-jta_%s-%s.jar".format(buildScalaVersion, version) ) - override def runClasspath = super.runClasspath +++ - descendents(info.projectPath / "config", "*") - //Exclude slf4j1.5.11 from the classpath, it's conflicting... override def fullClasspath(config: Configuration): PathFinder = { - super.fullClasspath(config) --- (super.fullClasspath(config) ** "slf4j*1.5.11.jar") + super.fullClasspath(config) --- + (super.fullClasspath(config) ** "slf4j*1.5.11.jar") } override def mainResources = super.mainResources +++ - descendents(info.projectPath / "config", "*") + descendents(info.projectPath / "config", "*") --- + (info.projectPath / "config" / "logback-test.xml") + + override def testResources = super.testResources +++ (info.projectPath / "config" / "logback-test.xml") // ------------------------------------------------------------ // publishing From b5b6574912a47a1d23a5b36aa5a38322e073a692 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 12 Aug 2010 13:39:20 +0200 Subject: [PATCH 08/11] Allow core threads to time out in dispatchers --- akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala index 55755250e2..6f8da64073 100644 --- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -34,6 +34,7 @@ trait ThreadPoolBuilder { def buildThreadPool(): Unit = synchronized { ensureNotActive inProcessOfBuilding = false + threadPoolBuilder.allowCoreThreadTimeOut(true) if (boundedExecutorBound > 0) { val boundedExecutor = new BoundedExecutorDecorator(threadPoolBuilder, boundedExecutorBound) boundedExecutorBound = -1 From 79b3b6dc7d10c612fb06cfd6a0fb04526b1d4774 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Thu, 12 Aug 2010 14:28:47 +0200 Subject: [PATCH 09/11] Added tests for remotely supervised TypedActor --- .../akka/actor/RemoteTypedActorOne.java | 6 + .../akka/actor/RemoteTypedActorOneImpl.java | 29 ++++ .../akka/actor/RemoteTypedActorTwo.java | 6 + .../akka/actor/RemoteTypedActorTwoImpl.java | 29 ++++ .../scala/remote/RemoteTypedActorSpec.scala | 128 ++++++++++++++++++ 5 files changed, 198 insertions(+) create mode 100644 akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOne.java create mode 100644 akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java create mode 100644 akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwo.java create mode 100644 akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwoImpl.java create mode 100644 akka-core/src/test/scala/remote/RemoteTypedActorSpec.scala diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOne.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOne.java new file mode 100644 index 0000000000..03df632582 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOne.java @@ -0,0 +1,6 @@ +package se.scalablesolutions.akka.actor; + +public interface RemoteTypedActorOne { + public String requestReply(String s) throws Exception; + public void oneWay() throws Exception; +} \ No newline at end of file diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java new file mode 100644 index 0000000000..0744652181 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorOneImpl.java @@ -0,0 +1,29 @@ +package se.scalablesolutions.akka.actor.remote; + +import se.scalablesolutions.akka.actor.*; + +import java.util.concurrent.CountDownLatch; + +public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedActorOne { + + public static CountDownLatch latch = new CountDownLatch(1); + + public String requestReply(String s) throws Exception { + if (s.equals("ping")) { + RemoteTypedActorLog.messageLog().put("ping"); + return "pong"; + } else if (s.equals("die")) { + throw new RuntimeException("Expected exception; to test fault-tolerance"); + } else return null; + } + + public void oneWay() throws Exception { + RemoteTypedActorLog.oneWayLog().put("oneway"); + } + + @Override + public void preRestart(Throwable e) { + try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} + latch.countDown(); + } +} \ No newline at end of file diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwo.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwo.java new file mode 100644 index 0000000000..58f294c6cd --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwo.java @@ -0,0 +1,6 @@ +package se.scalablesolutions.akka.actor; + +public interface RemoteTypedActorTwo { + public String requestReply(String s) throws Exception; + public void oneWay() throws Exception; +} \ No newline at end of file diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwoImpl.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwoImpl.java new file mode 100644 index 0000000000..36bb055ef8 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/RemoteTypedActorTwoImpl.java @@ -0,0 +1,29 @@ +package se.scalablesolutions.akka.actor.remote; + +import se.scalablesolutions.akka.actor.*; + +import java.util.concurrent.CountDownLatch; + +public class RemoteTypedActorTwoImpl extends TypedActor implements RemoteTypedActorTwo { + + public static CountDownLatch latch = new CountDownLatch(1); + + public String requestReply(String s) throws Exception { + if (s.equals("ping")) { + RemoteTypedActorLog.messageLog().put("ping"); + return "pong"; + } else if (s.equals("die")) { + throw new RuntimeException("Expected exception; to test fault-tolerance"); + } else return null; + } + + public void oneWay() throws Exception { + RemoteTypedActorLog.oneWayLog().put("oneway"); + } + + @Override + public void preRestart(Throwable e) { + try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} + latch.countDown(); + } +} \ No newline at end of file diff --git a/akka-core/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-core/src/test/scala/remote/RemoteTypedActorSpec.scala new file mode 100644 index 0000000000..c49882f61d --- /dev/null +++ b/akka-core/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor.remote + +import org.scalatest.Spec +import org.scalatest.Assertions +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.config.Config +import se.scalablesolutions.akka.config._ +import se.scalablesolutions.akka.config.TypedActorConfigurator +import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.actor._ +import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} + +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} + +object RemoteTypedActorSpec { + val HOSTNAME = "localhost" + val PORT = 9988 + var server: RemoteServer = null +} + +object RemoteTypedActorLog { + val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] + val oneWayLog = new LinkedBlockingQueue[String] + + def clearMessageLogs { + messageLog.clear + oneWayLog.clear + } +} + +@RunWith(classOf[JUnitRunner]) +class RemoteTypedActorSpec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + import RemoteTypedActorLog._ + import RemoteTypedActorSpec._ + + private val conf = new TypedActorConfigurator + + override def beforeAll = { + server = new RemoteServer() + server.start("localhost", 9995) + Config.config + conf.configure( + new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), + List( + new Component( + classOf[RemoteTypedActorOne], + classOf[RemoteTypedActorOneImpl], + new LifeCycle(new Permanent), + 10000, + new RemoteAddress("localhost", 9995)), + new Component( + classOf[RemoteTypedActorTwo], + classOf[RemoteTypedActorTwoImpl], + new LifeCycle(new Permanent), + 10000, + new RemoteAddress("localhost", 9995)) + ).toArray).supervise + Thread.sleep(1000) + } + + override def afterAll = { + conf.stop + try { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + ActorRegistry.shutdownAll + } + + describe("Remote Typed Actor ") { + + it("should receive one-way message") { + clearMessageLogs + val ta = conf.getInstance(classOf[RemoteTypedActorOne]) + + expect("oneway") { + ta.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + } + + it("should respond to request-reply message") { + clearMessageLogs + val ta = conf.getInstance(classOf[RemoteTypedActorOne]) + + expect("pong") { + ta.requestReply("ping") + } + } + + it("should be restarted on failure") { + clearMessageLogs + val ta = conf.getInstance(classOf[RemoteTypedActorOne]) + + intercept[RuntimeException] { + ta.requestReply("die") + } + messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") + } + + it("should restart linked friends on failure") { + clearMessageLogs + val ta1 = conf.getInstance(classOf[RemoteTypedActorOne]) + val ta2 = conf.getInstance(classOf[RemoteTypedActorTwo]) + + intercept[RuntimeException] { + ta1.requestReply("die") + } + messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") + messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") + } + } +} From 1ec811b21c9967f119bad54fc11baf5d28337b53 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 12 Aug 2010 15:05:45 +0200 Subject: [PATCH 10/11] Add default DEBUG to test output --- config/logback-test.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/logback-test.xml b/config/logback-test.xml index 1e168e65db..66130159fd 100755 --- a/config/logback-test.xml +++ b/config/logback-test.xml @@ -15,8 +15,8 @@ [%4p] [%d{ISO8601}] [%t] %c{1}: %m%n - - + + From 82ec8b31201c6def4fdbd6140333e2e3cc9953c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Thu, 12 Aug 2010 15:36:05 +0200 Subject: [PATCH 11/11] Fixed #305. Invoking 'stop' on client-managed remote actors does not shut down remote instance (but only local) --- akka-core/src/main/scala/actor/ActorRef.scala | 13 +++++- .../main/scala/config/SupervisionConfig.scala | 9 +++- .../main/scala/remote/MessageSerializer.scala | 1 - .../src/main/scala/remote/RemoteClient.scala | 25 +++++++---- .../src/main/scala/remote/RemoteServer.scala | 42 ++++++++++--------- 5 files changed, 59 insertions(+), 31 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index d4700b9ba0..ce954e41de 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -13,7 +13,7 @@ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.{TransactionManagement, TransactionSetAbortedException} import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, MessageSerializer, RemoteRequestProtocolIdFactory} -import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.serialization.{Serializer, BinaryString} import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard} import RemoteActorSerialization._ @@ -1253,6 +1253,15 @@ class LocalActorRef private[akka]( } else message } +/** + * System messages for RemoteActorRef. + * + * @author Jonas Bonér + */ +object RemoteActorSystemMessage { + val Stop = BinaryString("RemoteActorRef:stop") +} + /** * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. * This reference is network-aware (remembers its origin) and immutable. @@ -1263,6 +1272,7 @@ private[akka] case class RemoteActorRef private[akka] ( uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long, loader: Option[ClassLoader]) // uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef { extends ActorRef { + _uuid = uuuid timeout = _timeout @@ -1291,6 +1301,7 @@ private[akka] case class RemoteActorRef private[akka] ( def stop: Unit = { _isRunning = false _isShutDown = true + postMessageToMailbox(RemoteActorSystemMessage.Stop, None) } /** diff --git a/akka-core/src/main/scala/config/SupervisionConfig.scala b/akka-core/src/main/scala/config/SupervisionConfig.scala index cb0829704d..071fdf0c12 100644 --- a/akka-core/src/main/scala/config/SupervisionConfig.scala +++ b/akka-core/src/main/scala/config/SupervisionConfig.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.config -import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.actor.{ActorRef, UntypedActorRef} import se.scalablesolutions.akka.dispatch.MessageDispatcher sealed abstract class FaultHandlingStrategy @@ -25,11 +25,15 @@ object ScalaConfig { case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { + def this(actorRef: UntypedActorRef, lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) = + this(actorRef.actorRef, lifeCycle, _remoteAddress) val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress) } object Supervise { def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress) def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null) + def apply(actorRef: UntypedActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress) + def apply(actorRef: UntypedActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null) def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress)) } @@ -215,6 +219,9 @@ object JavaConfig { def newSupervised(actorRef: ActorRef) = se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform) + + def newSupervised(actorRef: UntypedActorRef) = + se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform) } } diff --git a/akka-core/src/main/scala/remote/MessageSerializer.scala b/akka-core/src/main/scala/remote/MessageSerializer.scala index 24269c7f8e..8ef6f5d590 100644 --- a/akka-core/src/main/scala/remote/MessageSerializer.scala +++ b/akka-core/src/main/scala/remote/MessageSerializer.scala @@ -25,7 +25,6 @@ object MessageSerializer extends Logging { } def deserialize(messageProtocol: MessageProtocol): Any = { - log.debug("scheme = " + messageProtocol.getSerializationScheme) messageProtocol.getSerializationScheme match { case SerializationSchemeType.JAVA => unbox(SERIALIZER_JAVA.fromBinary(messageProtocol.getMessage.toByteArray, None)) diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index c1bd574c3e..fefb1521ed 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -44,9 +44,16 @@ object RemoteRequestProtocolIdFactory { * Life-cycle events for RemoteClient. */ sealed trait RemoteClientLifeCycleEvent -case class RemoteClientError(@BeanProperty val cause: Throwable, @BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent -case class RemoteClientDisconnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent -case class RemoteClientConnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientError( + @BeanProperty val cause: Throwable, + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientDisconnected( + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientConnected( + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent class RemoteClientException private[akka](message: String) extends RuntimeException(message) @@ -259,23 +266,23 @@ class RemoteClientPipelineFactory( remoteAddress: SocketAddress, timer: HashedWheelTimer, client: RemoteClient) extends ChannelPipelineFactory { - def getPipeline: ChannelPipeline = { - def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*) + def getPipeline: ChannelPipeline = { + def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*) val engine = RemoteServerSslContext.client.createSSLEngine() engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible? engine.setUseClientMode(true) - val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() + val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join() val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val(enc,dec) = RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)),join(new ZlibDecoder)) - case _ => (join(),join()) + val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) + case _ => (join(), join()) } val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 89c0a6e437..ec33bd8600 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -309,10 +309,10 @@ object RemoteServerSslContext { //val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509") //val store = KeyStore.getInstance("JKS") val s = SSLContext.getInstance(protocol) - s.init(null,null,null) + s.init(null, null, null) val c = SSLContext.getInstance(protocol) - c.init(null,null,null) - (c,s) + c.init(null, null, null) + (c, s) } } @@ -429,25 +429,29 @@ class RemoteServerHandler( if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)) else None - if (request.getIsOneWay) actorRef.!(message)(sender) - else { - try { - val resultOrNone = (actorRef.!!(message)(sender)).as[AnyRef] - val result = if (resultOrNone.isDefined) resultOrNone.get else null + message match { // first match on system messages + case RemoteActorSystemMessage.Stop => actorRef.stop + case _ => // then match on user defined messages + if (request.getIsOneWay) actorRef.!(message)(sender) + else { + try { + val resultOrNone = (actorRef.!!(message)(sender)).as[AnyRef] + val result = if (resultOrNone.isDefined) resultOrNone.get else null - log.debug("Returning result from actor invocation [%s]", result) - val replyBuilder = RemoteReplyProtocol.newBuilder - .setId(request.getId) - .setMessage(MessageSerializer.serialize(result)) - .setIsSuccessful(true) - .setIsActor(true) + log.debug("Returning result from actor invocation [%s]", result) + val replyBuilder = RemoteReplyProtocol.newBuilder + .setId(request.getId) + .setMessage(MessageSerializer.serialize(result)) + .setIsSuccessful(true) + .setIsActor(true) - if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(replyBuilder.build) + if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) + channel.write(replyBuilder.build) - } catch { - case e: Throwable => channel.write(createErrorReplyMessage(e, request, true)) - } + } catch { + case e: Throwable => channel.write(createErrorReplyMessage(e, request, true)) + } + } } }