diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala index 95f5f29937..ea5b620057 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala @@ -334,7 +334,7 @@ class ActorRefSpec extends WordSpec with MustMatchers { val ref = Actor.actorOf( new Actor { def receive = { case _ ⇒ } - override def preRestart(reason: Throwable) = latch.countDown() + override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown() override def postRestart(reason: Throwable) = latch.countDown() }).start() diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala new file mode 100644 index 0000000000..d2b9a42ee5 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala @@ -0,0 +1,161 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.actor + +import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach } +import org.scalatest.matchers.MustMatchers + +import Actor.actorOf +import akka.testkit._ +import akka.util.duration._ +import akka.config.Supervision.OneForOneStrategy + +import java.util.concurrent.atomic._ + +object ActorRestartSpec { + + private var _gen = new AtomicInteger(0) + def generation = _gen.incrementAndGet + def generation_=(x: Int) { _gen.set(x) } + + sealed trait RestartType + case object Normal extends RestartType + case object Nested extends RestartType + case object Handover extends RestartType + case object Fail extends RestartType + + class Restarter(val testActor: ActorRef) extends Actor { + val gen = generation + var xx = 0 + var restart: RestartType = Normal + def receive = { + case x: Int ⇒ xx = x + case t: RestartType ⇒ restart = t + case "get" ⇒ self reply xx + } + override def preStart { testActor ! (("preStart", gen)) } + override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! (("preRestart", msg, gen)) } + override def postRestart(cause: Throwable) { testActor ! (("postRestart", gen)) } + override def freshInstance() = { + restart match { + case Normal ⇒ None + case Nested ⇒ + val ref = TestActorRef(new Actor { + def receive = { case _ ⇒ } + override def preStart { testActor ! ((this, self)) } + }).start() + testActor ! ((ref.underlyingActor, ref)) + None + case Handover ⇒ + val fresh = new Restarter(testActor) + fresh.xx = xx + Some(fresh) + case Fail ⇒ + throw new IllegalActorStateException("expected") + } + } + } + + class Supervisor extends Actor { + self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000) + def receive = { + case _ ⇒ + } + } +} + +class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach { + import ActorRestartSpec._ + + override def beforeEach { generation = 0 } + override def afterEach { + val it = toStop.iterator + while (it.hasNext) { + it.next.stop() + it.remove + } + } + + private var toStop = new java.util.concurrent.ConcurrentSkipListSet[ActorRef] + private def newActor(f: ⇒ Actor): ActorRef = { + val ref = actorOf(f) + toStop add ref + ref.start() + } + + "An Actor restart" must { + + "invoke preRestart, preStart, postRestart" in { + val actor = newActor(new Restarter(testActor)) + expectMsg(1 second, ("preStart", 1)) + val supervisor = newActor(new Supervisor) + supervisor link actor + actor ! Kill + within(1 second) { + expectMsg(("preRestart", Some(Kill), 1)) + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + } + + "support creation of nested actors in freshInstance()" in { + val actor = newActor(new Restarter(testActor)) + expectMsg(1 second, ("preStart", 1)) + val supervisor = newActor(new Supervisor) + supervisor link actor + actor ! Nested + actor ! Kill + within(1 second) { + expectMsg(("preRestart", Some(Kill), 1)) + val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])] + tRef.underlyingActor must be(tActor) + expectMsg((tActor, tRef)) + tRef.stop() + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + } + + "use freshInstance() if available" in { + val actor = newActor(new Restarter(testActor)) + expectMsg(1 second, ("preStart", 1)) + val supervisor = newActor(new Supervisor) + supervisor link actor + actor ! 42 + actor ! Handover + actor ! Kill + within(1 second) { + expectMsg(("preRestart", Some(Kill), 1)) + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + actor ! "get" + expectMsg(1 second, 42) + } + + "fall back to default factory if freshInstance() fails" in { + val actor = newActor(new Restarter(testActor)) + expectMsg(1 second, ("preStart", 1)) + val supervisor = newActor(new Supervisor) + supervisor link actor + actor ! 42 + actor ! Fail + actor ! Kill + within(1 second) { + expectMsg(("preRestart", Some(Kill), 1)) + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + actor ! "get" + expectMsg(1 second, 0) + } + + } + +} diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala index 131dd5cb96..c552345192 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala @@ -32,7 +32,7 @@ object FSMTransitionSpec { case Ev("reply") ⇒ stay replying "reply" } initialize - override def preRestart(reason: Throwable) { target ! "restarted" } + override def preRestart(reason: Throwable, msg: Option[Any]) { target ! "restarted" } } class Forwarder(target: ActorRef) extends Actor { diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala index d6e15d40c5..4c5deb8b51 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala @@ -25,7 +25,7 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers { case Die ⇒ throw new Exception(self.address + " is dying...") } - override def preRestart(reason: Throwable) { + override def preRestart(reason: Throwable, msg: Option[Any]) { log += self.address } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala index 0976063b3b..5863305ff4 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala @@ -62,7 +62,7 @@ object Ticket669Spec { case msg ⇒ throw new Exception("test") } - override def preRestart(reason: scala.Throwable) { + override def preRestart(reason: scala.Throwable, msg: Option[Any]) { self.tryReply("failure1") } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 84c7c3ea4f..f4bd4f1204 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -10,6 +10,7 @@ import akka.util.duration._ import akka.actor._ import akka.actor.Actor._ import akka.routing._ +import akka.event.EventHandler import java.util.concurrent.atomic.AtomicInteger import akka.dispatch.{ KeptPromise, Future } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index d5b83b0960..b981c954ce 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -5,6 +5,7 @@ package akka.actor import DeploymentConfig._ +import akka.experimental import akka.dispatch._ import akka.config._ import Config._ @@ -662,9 +663,23 @@ trait Actor { /** * User overridable callback. *

- * Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated. + * Is called on a crashed Actor right BEFORE it is restarted to allow clean + * up of resources before Actor is terminated. */ - def preRestart(reason: Throwable) {} + def preRestart(reason: Throwable, message: Option[Any]) {} + + /** + * User overridable callback. + *

+ * Is called on the crashed Actor to give it the option of producing the + * Actor's reincarnation. If it returns None, which is the default, the + * initially provided actor factory is used. + *

+ * Warning: Propagating state from a crashed actor carries the risk + * of proliferating the cause of the error. Consider let-it-crash first. + */ + @experimental("1.2") + def freshInstance(): Option[Actor] = None /** * User overridable callback. diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index efdf9dc269..3eeecb98f6 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -776,7 +776,8 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor, def performRestart() { val failedActor = actorInstance.get if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting") - failedActor.preRestart(reason) + val message = if (currentMessage ne null) Some(currentMessage.message) else None + failedActor.preRestart(reason, message) val freshActor = newActor setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call @@ -857,7 +858,20 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor, val stackBefore = refStack.get refStack.set(stackBefore.push(this)) try { - actorFactory() + if (_status == ActorRefInternals.BEING_RESTARTED) { + val a = actor + val fresh = try a.freshInstance catch { + case e ⇒ + EventHandler.error(e, a, "freshInstance() failed, falling back to initial actor factory") + None + } + fresh match { + case Some(ref) ⇒ ref + case None ⇒ actorFactory() + } + } else { + actorFactory() + } } finally { val stackAfter = refStack.get if (stackAfter.nonEmpty) @@ -1011,7 +1025,7 @@ private[akka] case class RemoteActorRef private[akka]( case _ ⇒ None } val chFuture = channel match { - case f: Promise[Any] ⇒ Some(f) + case f: Promise[_] ⇒ Some(f.asInstanceOf[Promise[Any]]) case _ ⇒ None } val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout, false, this, loader) diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index d23002c5bc..f84cfbf20a 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -102,7 +102,7 @@ abstract class UntypedActor extends Actor { *

* Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated. */ - override def preRestart(reason: Throwable) {} + override def preRestart(reason: Throwable, lastMessage: Option[Any]) {} /** * User overridable callback. diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 9af29eed98..64c0c5afb2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -197,14 +197,15 @@ object Dispatchers { case "GlobalDispatcher" ⇒ GlobalDispatcherConfigurator case fqn ⇒ ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match { - case r: Right[_, Class[MessageDispatcherConfigurator]] ⇒ - ReflectiveAccess.createInstance[MessageDispatcherConfigurator](r.b, Array[Class[_]](), Array[AnyRef]()) match { - case r: Right[Exception, MessageDispatcherConfigurator] ⇒ r.b - case l: Left[Exception, MessageDispatcherConfigurator] ⇒ - throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, l.a) + case Right(clazz) ⇒ + ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]()) match { + case Right(configurator) ⇒ configurator + case Left(exception)⇒ + throw new IllegalArgumentException( + "Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, exception) } - case l: Left[Exception, _] ⇒ - throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, l.a) + case Left(exception) ⇒ + throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, exception) } } map { _ configure cfg diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 8705d3da4e..5c858a0905 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -90,8 +90,8 @@ object Futures { val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature? f.value.get match { - case r: Right[Throwable, T] ⇒ - val added = results add r.b + case Right(value) ⇒ + val added = results add value if (added && results.size == allDone) { //Only one thread can get here if (done.switchOn) { try { @@ -109,9 +109,9 @@ object Futures { } } } - case l: Left[Throwable, T] ⇒ + case Left(exception) ⇒ if (done.switchOn) { - result completeWithException l.a + result completeWithException exception results.clear } } @@ -148,10 +148,8 @@ object Futures { val seedFold: Future[T] ⇒ Unit = f ⇒ { if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold f.value.get match { - case r: Right[Throwable, T] ⇒ - result.completeWith(fold(r.b, timeout)(futures.filterNot(_ eq f))(op)) - case l: Left[Throwable, T] ⇒ - result.completeWithException(l.a) + case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op)) + case Left(exception) ⇒ result.completeWithException(exception) } } } diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 134543e284..0a0e00e2cc 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -112,8 +112,8 @@ object EventHandler extends ListenerManagement { defaultListeners foreach { listenerName ⇒ try { ReflectiveAccess.getClassFor[Actor](listenerName) match { - case r: Right[_, Class[Actor]] ⇒ addListener(Actor.localActorOf(r.b).start()) - case l: Left[Exception, _] ⇒ throw l.a + case Right(actorClass) ⇒ addListener(Actor.localActorOf(actorClass).start()) + case Left(exception) ⇒ throw exception } } catch { case e: Exception ⇒ diff --git a/akka-actor/src/main/scala/akka/experimental.scala b/akka-actor/src/main/scala/akka/experimental.scala new file mode 100644 index 0000000000..cfc976551a --- /dev/null +++ b/akka-actor/src/main/scala/akka/experimental.scala @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka + +import annotation.target._ + +/** + * This annotation marks a feature which is not yet considered stable and may + * change or be removed in a future release. + * + * @author Roland Kuhn + * @since 1.2 + */ +@getter +@setter +@beanGetter +@beanSetter +final class experimental(since: String) extends annotation.StaticAnnotation diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 6398ef2241..6bfd4c18b3 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -265,7 +265,7 @@ trait MailboxPressureCapacitor { */ trait ActiveFuturesPressureCapacitor { def pressure(delegates: Seq[ActorRef]): Int = - delegates count { _.channel.isInstanceOf[Promise[Any]] } + delegates count { _.channel.isInstanceOf[Promise[_]] } } /** diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 835cf9fa7a..aa57c4b47d 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -8,17 +8,28 @@ import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, B import akka.util.ClassLoaderObjectInputStream object Serializer { - val defaultSerializerName = JavaSerializer.getClass.getName + val defaultSerializerName = classOf[JavaSerializer].getName type Identifier = Byte } +/** + * A Serializer represents a bimap between an object and an array of bytes representing that object + */ trait Serializer extends scala.Serializable { /** * Completely unique Byte value to identify this implementation of Serializer, used to optimize network traffic * Values from 0 to 16 is reserved for Akka internal usage */ def identifier: Serializer.Identifier + + /** + * Serializes the given object into an Array of Byte + */ def toBinary(o: AnyRef): Array[Byte] + + /** + * Produces an object from an array of bytes, with an optional type-hint and a classloader to load the class into + */ def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef } diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index 3281feff83..b868498cba 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -58,7 +58,7 @@ trait ProducerSupport { this: Actor ⇒ * Default implementation of Actor.preRestart for freeing resources needed * to actually send messages to endpointUri. */ - override def preRestart(reason: Throwable) { + override def preRestart(reason: Throwable, msg: Option[Any]) { try { preRestartProducer(reason) } finally { processor.stop } } diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala index a6a6971d98..52a1e91246 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala @@ -251,7 +251,7 @@ object ConsumerScalaTest { case "succeed" ⇒ self.reply("ok") } - override def preRestart(reason: scala.Throwable) { + override def preRestart(reason: scala.Throwable, msg: Option[Any]) { self.tryReply("pr") } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 9a26ad985b..39c57c0778 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -703,8 +703,6 @@ class DefaultClusterNode private[akka]( serializeMailbox: Boolean, serializer: Serializer): ClusterNode = if (isConnected.isOn) { - val serializerClassName = serializer.getClass.getName - EventHandler.debug(this, "Storing actor with address [%s] in cluster".format(actorAddress)) @@ -739,9 +737,9 @@ class DefaultClusterNode private[akka]( // create ADDRESS -> SERIALIZER CLASS NAME mapping try { - zkClient.createPersistent(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName) + zkClient.createPersistent(actorAddressRegistrySerializerPathFor(actorAddress), serializer.identifier.toString) } catch { - case e: ZkNodeExistsException ⇒ zkClient.writeData(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName) + case e: ZkNodeExistsException ⇒ zkClient.writeData(actorAddressRegistrySerializerPathFor(actorAddress), serializer.identifier.toString) } // create ADDRESS -> NODE mapping @@ -1084,21 +1082,10 @@ class DefaultClusterNode private[akka]( /** * Returns Serializer for actor with specific address. */ - def serializerForActor(actorAddress: String): Serializer = { - val serializerClassName = - try { - zkClient.readData(actorAddressRegistrySerializerPathFor(actorAddress), new Stat).asInstanceOf[String] - } catch { - case e: ZkNoNodeException ⇒ throw new IllegalStateException("No serializer found for actor with address [%s]".format(actorAddress)) - } - - ReflectiveAccess.getClassFor(serializerClassName) match { - // FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess. - case Right(clazz) ⇒ clazz.newInstance.asInstanceOf[Serializer] - case Left(error) ⇒ - EventHandler.error(error, this, "Could not load serializer class [%s] due to: %s".format(serializerClassName, error.toString)) - throw error - } + def serializerForActor(actorAddress: String): Serializer = try { + Serialization.serializerByIdentity(zkClient.readData(actorAddressRegistrySerializerPathFor(actorAddress), new Stat).asInstanceOf[String].toByte) + } catch { + case e: ZkNoNodeException ⇒ throw new IllegalStateException("No serializer found for actor with address [%s]".format(actorAddress)) } /** @@ -1790,7 +1777,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = Dispatchers.newPinnedDispatcher(self) - override def preRestart(reason: Throwable) { + override def preRestart(reason: Throwable, msg: Option[Any]) { EventHandler.debug(this, "RemoteClusterDaemon failed due to [%s] restarting...".format(reason)) } @@ -1930,7 +1917,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = computeGridDispatcher def receive = { - case f: Function0[Unit] ⇒ try { + case f: Function0[_] ⇒ try { f() } finally { self.stop() @@ -1943,7 +1930,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = computeGridDispatcher def receive = { - case f: Function0[Any] ⇒ try { + case f: Function0[_] ⇒ try { self.reply(f()) } finally { self.stop() @@ -1956,8 +1943,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = computeGridDispatcher def receive = { - case (fun: Function[Any, Unit], param: Any) ⇒ try { - fun(param) + case (fun: Function[_, _], param: Any) ⇒ try { + fun.asInstanceOf[Any => Unit].apply(param) } finally { self.stop() } @@ -1969,8 +1956,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = computeGridDispatcher def receive = { - case (fun: Function[Any, Unit], param: Any) ⇒ try { - self.reply(fun(param)) + case (fun: Function[_, _], param: Any) ⇒ try { + self.reply(fun.asInstanceOf[Any => Any](param)) } finally { self.stop() } diff --git a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index e79e426d94..a339a1b8b6 100644 --- a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -88,14 +88,14 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem lock.readLock.lock try { val c = remoteClients.get(key) match { - case s: Some[RemoteClient] ⇒ s.get + case Some(client) ⇒ client case None ⇒ lock.readLock.unlock lock.writeLock.lock //Lock upgrade, not supported natively try { try { remoteClients.get(key) match { //Recheck for addition, race between upgrades - case s: Some[RemoteClient] ⇒ s.get //If already populated by other writer + case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _) client.connect() @@ -111,15 +111,15 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard { remoteClients.remove(Address(address)) match { - case s: Some[RemoteClient] ⇒ s.get.shutdown() - case None ⇒ false + case Some(client) ⇒ client.shutdown() + case None ⇒ false } } def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard { remoteClients.get(Address(address)) match { - case s: Some[RemoteClient] ⇒ s.get.connect(reconnectIfAlreadyConnected = true) - case None ⇒ false + case Some(client) ⇒ client.connect(reconnectIfAlreadyConnected = true) + case None ⇒ false } } @@ -632,12 +632,12 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) def address = currentServer.get match { - case s: Some[NettyRemoteServer] ⇒ s.get.address - case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress + case Some(server) ⇒ server.address + case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress } def name = currentServer.get match { - case s: Some[NettyRemoteServer] ⇒ s.get.name + case Some(server) ⇒ server.name case None ⇒ val a = ReflectiveAccess.RemoteModule.configDefaultAddress "NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort @@ -920,15 +920,15 @@ class RemoteServerHandler( request.getActorInfo.getTimeout, new ActorPromise(request.getActorInfo.getTimeout). onComplete(_.value.get match { - case l: Left[Throwable, Any] ⇒ write(channel, createErrorReplyMessage(l.a, request)) - case r: Right[Throwable, Any] ⇒ + case Left(exception) ⇒ write(channel, createErrorReplyMessage(exception, request)) + case r: Right[_,_] ⇒ val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( Some(actorRef), Right(request.getUuid), actorInfo.getAddress, actorInfo.getTimeout, - r, - true, + r.asInstanceOf[Either[Throwable,Any]], + isOneWay = true, Some(actorRef)) // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method diff --git a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala index e86665295b..d8b1293bc6 100644 --- a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -95,10 +95,10 @@ object ActorSerialization { if (actorRef.mailbox eq null) throw new IllegalActorStateException("Can't serialize an actor that has not been started.") val messages = actorRef.mailbox match { - case q: java.util.Queue[MessageInvocation] ⇒ + case q: java.util.Queue[_] ⇒ val l = new scala.collection.mutable.ListBuffer[MessageInvocation] val it = q.iterator - while (it.hasNext == true) l += it.next + while (it.hasNext) l += it.next.asInstanceOf[MessageInvocation] l } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmSpec.scala index ae0eadd97f..349626445c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmSpec.scala @@ -57,16 +57,14 @@ class RegistryStoreMultiJvmNode1 extends MasterClusterTestNode { } barrier("store-1-in-node-1", NrOfNodes) { - val serializer = Serialization.serializerFor(classOf[HelloWorld1]).fold(x ⇒ fail("No serializer found"), s ⇒ s) - node.store("hello-world-1", classOf[HelloWorld1], serializer) + node.store("hello-world-1", classOf[HelloWorld1], Serialization.serializerFor(classOf[HelloWorld1])) } barrier("use-1-in-node-2", NrOfNodes) { } barrier("store-2-in-node-1", NrOfNodes) { - val serializer = Serialization.serializerFor(classOf[HelloWorld1]).fold(x ⇒ fail("No serializer found"), s ⇒ s) - node.store("hello-world-2", classOf[HelloWorld1], false, serializer) + node.store("hello-world-2", classOf[HelloWorld1], false, Serialization.serializerFor(classOf[HelloWorld1])) } barrier("use-2-in-node-2", NrOfNodes) { diff --git a/akka-docs/project/migration-guide-1.1.x-1.2.x.rst b/akka-docs/project/migration-guide-1.1.x-1.2.x.rst new file mode 100644 index 0000000000..e4988b9460 --- /dev/null +++ b/akka-docs/project/migration-guide-1.1.x-1.2.x.rst @@ -0,0 +1,6 @@ +.. _migration-1.2: + +################################ + Migration Guide 1.1.x to 1.2.x +################################ + diff --git a/akka-docs/project/migration-guide-1.2.x-2.0.x.rst b/akka-docs/project/migration-guide-1.2.x-2.0.x.rst new file mode 100644 index 0000000000..7eabcf2f10 --- /dev/null +++ b/akka-docs/project/migration-guide-1.2.x-2.0.x.rst @@ -0,0 +1,20 @@ +.. _migration-2.0: + +################################ + Migration Guide 1.2.x to 2.0.x +################################ + +Actors +====== + +The 2.0 release contains several new features which require source-level +changes in client code. This API cleanup is planned to be the last one for a +significant amount of time. + +Lifecycle Callbacks +------------------- + +The :meth:`preRestart(cause: Throwable)` method has been replaced by +:meth:`preRestart(cause: Throwable, lastMessage: Any)`, hence you must insert +the second argument in all overriding methods. The good news is that any missed +actor will not compile without error. diff --git a/akka-docs/project/migration-guides.rst b/akka-docs/project/migration-guides.rst index 7eb063811c..7af815f241 100644 --- a/akka-docs/project/migration-guides.rst +++ b/akka-docs/project/migration-guides.rst @@ -6,6 +6,8 @@ Migration Guides .. toctree:: :maxdepth: 1 + migration-guide-1.2.x-2.0.x + migration-guide-1.1.x-1.2.x migration-guide-1.0.x-1.1.x migration-guide-0.10.x-1.0.x migration-guide-0.9.x-0.10.x diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 96db03ac11..16bdc06079 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -92,6 +92,90 @@ Here we create a light-weight actor-based thread, that can be used to spawn off ... // do stuff } +Actor Internal API +------------------ + +The :class:`Actor` trait defines only one abstract method, the abovementioned +:meth:`receive`. In addition, it offers two convenience methods +:meth:`become`/:meth:`unbecome` for modifying the hotswap behavior stack as +described in :ref:`Actor.HotSwap` and the :obj:`self` reference to this actor’s +:class:`ActorRef` object. If the current actor behavior does not match a +received message, :meth:`unhandled` is called, which by default throws an +:class:`UnhandledMessageException`. + +The remaining visible methods are user-overridable life-cycle hooks which are +described in the following:: + + def preStart() {} + def preRestart(cause: Throwable, message: Option[Any]) {} + def freshInstance(): Option[Actor] = None + def postRestart(cause: Throwable) {} + def postStop() {} + +The implementations shown above are the defaults provided by the :class:`Actor` +trait. + +Start Hook +^^^^^^^^^^ + +Right after starting the actor, its :meth:`preStart` method is invoked. This is +guaranteed to happen before the first message from external sources is queued +to the actor’s mailbox. + +:: + + override def preStart { + // e.g. send initial message to self + self ! GetMeStarted + // or do any other stuff, e.g. registering with other actors + someService ! Register(self) + } + +Restart Hooks +^^^^^^^^^^^^^ + +A supervised actor, i.e. one which is linked to another actor with a fault +handling strategy, will be restarted in case an exception is thrown while +processing a message. This restart involves four of the hooks mentioned above: + +1. The old actor is informed by calling :meth:`preRestart` with the exception + which caused the restart and the message which triggered that exception; the + latter may be ``None`` if the restart was not caused by processing a + message, e.g. when a supervisor does not trap the exception and is restarted + in turn by its supervisor. This method is the best place for cleaning up, + preparing hand-over to the fresh actor instance, etc. +2. The old actor’s :meth:`freshInstance` factory method is invoked, which may + optionally produce the new actor instance which will replace this actor. If + this method returns :obj:`None` or throws an exception, the initial factory + from the ``Actor.actorOf`` call is used to produce the fresh instance. +3. The new actor’s :meth:`preStart` method is invoked, just as in the normal + start-up case. +4. The new actor’s :meth:`postRestart` method is called with the exception + which caused the restart. + +.. warning:: + + The :meth:`freshInstance` hook may be used to propagate (part of) the failed + actor’s state to the fresh instance. This carries the risk of proliferating + the cause for the crash which triggered the restart. If you are tempted to + take this route, it is strongly advised to step back and consider other + possible approaches, e.g. distributing the state in question using other + means or spawning short-lived worker actors for carrying out “risky” tasks. + +An actor restart replaces only the actual actor object; the contents of the +mailbox and the hotswap stack are unaffected by the restart, so processing of +messages will resume after the :meth:`postRestart` hook returns. Any message +sent to an actor while it is being restarted will be queued to its mailbox as +usual. + +Stop Hook +^^^^^^^^^ + +After stopping an actor, its :meth:`postStop` hook is called, which may be used +e.g. for deregistering this actor from other services. This hook is guaranteed +to run after message queuing has been disabled for this actor, i.e. sending +messages would fail with an :class:`IllegalActorStateException`. + Identifying Actors ------------------ @@ -252,43 +336,6 @@ This method should return a ``PartialFunction``, e.g. a ‘match/case’ clause } } -Actor internal API ------------------- - -The Actor trait contains almost no member fields or methods to invoke, you just use the Actor trait to implement the: - -#. ``receive`` message handler -#. life-cycle callbacks: - - #. preStart - #. postStop - #. preRestart - #. postRestart - -The ``Actor`` trait has one single member field: - -.. code-block:: scala - - val self: ActorRef - -This ``self`` field holds a reference to its ``ActorRef`` and it is this reference you want to access the Actor's API. Here, for example, you find methods to reply to messages, send yourself messages, define timeouts, fault tolerance etc., start and stop etc. - -However, for convenience you can import these functions and fields like below, which will allow you do drop the ``self`` prefix: - -.. code-block:: scala - - class MyActor extends Actor { - import self._ - id = ... - dispatcher = ... - start - ... - } - -But in this documentation we will always prefix the calls with ``self`` for clarity. - -Let's start by looking how we can reply to messages in a convenient way using this ``ActorRef`` API. - Reply to messages ----------------- @@ -441,6 +488,8 @@ You can also send an actor the ``akka.actor.PoisonPill`` message, which will sto If the sender is a ``Future`` (e.g. the message is sent with ``?``), the ``Future`` will be completed with an ``akka.actor.ActorKilledException("PoisonPill")``. +.. _Actor.HotSwap: + HotSwap ------- diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index 4f385364a2..abcfd6d9bf 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -93,7 +93,7 @@ You can also set the rejection policy that should be used, e.g. what should be d * java.util.concurrent.ThreadPoolExecutor.DiscardPolicy - discards the message (throws it away) * java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy - discards the oldest message in the mailbox (throws it away) -You cane read more about these policies `here `_. +You can read more about these policies `here `_. Here is an example: @@ -104,7 +104,7 @@ Here is an example: import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy class MyActor extends Actor { - self.dispatcher = Dispatchers.newDispatcher(name) + self.dispatcher = Dispatchers.newDispatcher(name, throughput = 15) .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) .setCorePoolSize(16) .setMaxPoolSize(128) @@ -114,8 +114,14 @@ Here is an example: ... } -This 'Dispatcher' allows you to define the 'throughput' it should have. This defines the number of messages for a specific Actor the dispatcher should process in one single sweep. -Setting this to a higher number will increase throughput but lower fairness, and vice versa. If you don't specify it explicitly then it uses the default value defined in the 'akka.conf' configuration file: +The standard :class:`Dispatcher` allows you to define the ``throughput`` it +should have, as shown above. This defines the number of messages for a specific +Actor the dispatcher should process in one single sweep; in other words, the +dispatcher will bunch up to ``throughput`` message invocations together when +having elected an actor to run. Setting this to a higher number will increase +throughput but lower fairness, and vice versa. If you don't specify it +explicitly then it uses the default value defined in the 'akka.conf' +configuration file: .. code-block:: ruby diff --git a/akka-docs/scala/testing.rst b/akka-docs/scala/testing.rst index 8b7b270bff..65066f3d7f 100644 --- a/akka-docs/scala/testing.rst +++ b/akka-docs/scala/testing.rst @@ -288,6 +288,13 @@ assertions concerning received messages. Here is the full list: does a conformance check; if you need the class to be equal, have a look at :meth:`expectMsgAllClassOf` with a single given class argument. + * :meth:`expectMsgType[T: Manifest](d: Duration)` + + An object which is an instance of the given type (after erasure) must be + received within the allotted time frame; the object will be returned. This + method is approximately equivalent to + ``expectMsgClass(manifest[T].erasure)``. + * :meth:`expectMsgAnyOf[T](d: Duration, obj: T*): T` An object must be received within the given time, and it must be equal ( @@ -411,21 +418,25 @@ is between :obj:`min` and :obj:`max`, where the former defaults to zero. The deadline calculated by adding the :obj:`max` parameter to the block's start time is implicitly available within the block to all examination methods, if you do not specify it, is is inherited from the innermost enclosing -:meth:`within` block. It should be noted that using :meth:`expectNoMsg` will -terminate upon reception of a message or at the deadline, whichever occurs -first; it follows that this examination usually is the last statement in a :meth:`within` block. +It should be noted that if the last message-receiving assertion of the block is +:meth:`expectNoMsg` or :meth:`receiveWhile`, the final check of the +:meth:`within` is skipped in order to avoid false positives due to wake-up +latencies. This means that while individual contained assertions still use the +maximum time bound, the overall block may take arbitrarily longer in this case. + .. code-block:: scala class SomeSpec extends WordSpec with MustMatchers with TestKit { "A Worker" must { "send timely replies" in { val worker = actorOf(...) - within (50 millis) { + within (500 millis) { worker ! "some work" expectMsg("some result") - expectNoMsg + expectNoMsg // will block for the rest of the 500ms + Thread.sleep(1000) // will NOT make this block fail } } } diff --git a/akka-sbt-plugin/build.sbt b/akka-sbt-plugin/build.sbt index e01a2e9809..b7fa421544 100644 --- a/akka-sbt-plugin/build.sbt +++ b/akka-sbt-plugin/build.sbt @@ -9,6 +9,3 @@ version := "2.0-SNAPSHOT" publishMavenStyle := true -publishTo := Some("Typesafe Publish Repo" at "http://repo.typesafe.com/typesafe/maven-releases/") - -credentials += Credentials(Path.userHome / ".ivy2" / "typesafe-credentials") diff --git a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala index 65f24ae9d8..2f00de19dc 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala @@ -4,25 +4,37 @@ import sbt._ import sbt.Keys._ +import sbt.Load.BuildStructure import sbt.classpath.ClasspathUtilities import sbt.Project.Initialize +import sbt.CommandSupport._ import java.io.File +import scala.collection.mutable.{ Set => MutableSet } object AkkaMicrokernelPlugin extends Plugin { + case class DistConfig( + outputDirectory: File, + configSourceDirs: Seq[File], + distJvmOptions: String, + distMainClass: String, + libFilter: File ⇒ Boolean, + additionalLibs: Seq[File]) + val Dist = config("dist") extend (Runtime) val dist = TaskKey[File]("dist", "Builds an Akka microkernel directory") - // TODO how to reuse keyword "clean" here instead (dist:clean) - val distClean = TaskKey[File]("clean-dist", "Removes Akka microkernel directory") + val distClean = TaskKey[Unit]("clean", "Removes Akka microkernel directory") + val outputDirectory = SettingKey[File]("output-directory") val configSourceDirs = TaskKey[Seq[File]]("config-source-directories", "Configuration files are copied from these directories") - val distJvmOptions = SettingKey[String]("jvm-options", "JVM parameters to use in start script") + val distJvmOptions = SettingKey[String]("kernel-jvm-options", "JVM parameters to use in start script") val distMainClass = SettingKey[String]("kernel-main-class", "Kernel main class to use in start script") val libFilter = SettingKey[File ⇒ Boolean]("lib-filter", "Filter of dependency jar files") val additionalLibs = TaskKey[Seq[File]]("additional-libs", "Additional dependency jar files") + val distConfig = TaskKey[DistConfig]("dist-config") override lazy val settings = inConfig(Dist)(Seq( @@ -36,46 +48,59 @@ object AkkaMicrokernelPlugin extends Plugin { distJvmOptions := "-Xms1024M -Xmx1024M -Xss1M -XX:MaxPermSize=256M -XX:+UseParallelGC", distMainClass := "akka.kernel.Main", libFilter := { f ⇒ true }, - additionalLibs <<= defaultAdditionalLibs)) ++ + additionalLibs <<= defaultAdditionalLibs, + distConfig <<= (outputDirectory, configSourceDirs, distJvmOptions, distMainClass, libFilter, additionalLibs) map DistConfig)) ++ Seq( dist <<= (dist in Dist).identity) private def distTask: Initialize[Task[File]] = - (outputDirectory, sourceDirectory, crossTarget, dependencyClasspath, - configSourceDirs, distJvmOptions, distMainClass, libFilter, streams) map { - (outDir, src, tgt, cp, configSrc, jvmOptions, mainClass, libFilt, s) ⇒ + (distConfig, sourceDirectory, crossTarget, dependencyClasspath, projectDependencies, allDependencies, buildStructure, state, streams) map { + (conf, src, tgt, cp, projDeps, allDeps, buildStruct, st, s) ⇒ + + if (isKernelProject(allDeps)) { val log = s.log - val distBinPath = outDir / "bin" - val distConfigPath = outDir / "config" - val distDeployPath = outDir / "deploy" - val distLibPath = outDir / "lib" - // TODO how do I grab the additionalLibs setting? Can't add it in input tuple, limitation of number of elements in map of tuple. - val addLibs = Seq.empty[File] - - log.info("Creating distribution %s ..." format outDir) - IO.createDirectory(outDir) - Scripts(jvmOptions, mainClass).writeScripts(distBinPath) - copyDirectories(configSrc, distConfigPath) + val distBinPath = conf.outputDirectory / "bin" + val distConfigPath = conf.outputDirectory / "config" + val distDeployPath = conf.outputDirectory / "deploy" + val distLibPath = conf.outputDirectory / "lib" + + val subProjectDependencies: Set[SubProjectInfo] = allSubProjectDependencies(projDeps, buildStruct, st) + + log.info("Creating distribution %s ..." format conf.outputDirectory) + IO.createDirectory(conf.outputDirectory) + Scripts(conf.distJvmOptions, conf.distMainClass).writeScripts(distBinPath) + copyDirectories(conf.configSourceDirs, distConfigPath) copyJars(tgt, distDeployPath) - copyFiles(libFiles(cp, libFilt), distLibPath) - copyFiles(addLibs, distLibPath) + + copyFiles(libFiles(cp, conf.libFilter), distLibPath) + copyFiles(conf.additionalLibs, distLibPath) + for (subTarget <- subProjectDependencies.map(_.target)) { + copyJars(subTarget, distLibPath) + } log.info("Distribution created.") - outDir } - - private def distCleanTask: Initialize[Task[File]] = - (outputDirectory, streams) map { (outDir, s) ⇒ - val log = s.log - log.info("Cleaning " + outDir) - IO.delete(outDir) - outDir + conf.outputDirectory } - def defaultConfigSourceDirs = (sourceDirectory, unmanagedResourceDirectories) map { (src, resources) ⇒ - Seq(src / "main" / "config") ++ resources + private def distCleanTask: Initialize[Task[Unit]] = + (outputDirectory, allDependencies, streams) map { (outDir, deps, s) ⇒ + + if (isKernelProject(deps)) { + val log = s.log + log.info("Cleaning " + outDir) + IO.delete(outDir) + } + } + + def isKernelProject(dependencies: Seq[ModuleID]): Boolean = { + dependencies.exists(moduleId => moduleId.organization == "se.scalablesolutions.akka" && moduleId.name == "akka-kernel") + } + + private def defaultConfigSourceDirs = (sourceDirectory, unmanagedResourceDirectories) map { (src, resources) ⇒ + Seq(src / "config", src / "main" / "config") ++ resources } - def defaultAdditionalLibs = (libraryDependencies) map { (libs) ⇒ + private def defaultAdditionalLibs = (libraryDependencies) map { (libs) ⇒ Seq.empty[File] } @@ -146,6 +171,67 @@ object AkkaMicrokernelPlugin extends Plugin { val (libs, directories) = classpath.map(_.data).partition(ClasspathUtilities.isArchive) libs.map(_.asFile).filter(libFilter) } + + private def allSubProjectDependencies(projDeps: Seq[ModuleID], buildStruct: BuildStructure, state: State): Set[SubProjectInfo] = { + val buildUnit = buildStruct.units(buildStruct.root) + val uri = buildStruct.root + val allProjects = buildUnit.defined.map { + case (id, proj) => (ProjectRef(uri, id) -> proj) + } + + val projDepsNames = projDeps.map(_.name) + def include(project: ResolvedProject): Boolean = projDepsNames.exists(_ == project.id) + val subProjects: Seq[SubProjectInfo] = allProjects.collect { + case (projRef, project) if include(project) => projectInfo(projRef, project, buildStruct, state, allProjects) + }.toList + + val allSubProjects = subProjects.map(_.recursiveSubProjects).flatten.toSet + allSubProjects +} + + private def projectInfo(projectRef: ProjectRef, project: ResolvedProject, buildStruct: BuildStructure, state: State, + allProjects: Map[ProjectRef, ResolvedProject]): SubProjectInfo = { + + def optionalSetting[A](key: ScopedSetting[A]) = key in projectRef get buildStruct.data + + def setting[A](key: ScopedSetting[A], errorMessage: => String) = { + optionalSetting(key) getOrElse { + logger(state).error(errorMessage); + throw new IllegalArgumentException() + } + } + + def evaluateTask[T](taskKey: sbt.Project.ScopedKey[sbt.Task[T]]) = { + EvaluateTask.evaluateTask(buildStruct, taskKey, state, projectRef, false, EvaluateTask.SystemProcessors) + } + + val projDeps: Seq[ModuleID] = evaluateTask(Keys.projectDependencies) match { + case Some(Value(moduleIds)) => moduleIds + case _ => Seq.empty + } + + val projDepsNames = projDeps.map(_.name) + def include(project: ResolvedProject): Boolean = projDepsNames.exists(_ == project.id) + val subProjects = allProjects.collect { + case (projRef, proj) if include(proj) => projectInfo(projRef, proj, buildStruct, state, allProjects) + }.toList + + val target = setting(Keys.crossTarget, "Missing crossTarget directory") + SubProjectInfo(project.id, target, subProjects) + } + + private case class SubProjectInfo(id: String, target: File, subProjects: Seq[SubProjectInfo]) { + + def recursiveSubProjects: Set[SubProjectInfo] = { + val flatSubProjects = for { + x <- subProjects + y <- x.recursiveSubProjects + } yield y + + flatSubProjects.toSet + this + } + + } } diff --git a/akka-spring/src/test/java/akka/spring/RemoteTypedActorOneImpl.java b/akka-spring/src/test/java/akka/spring/RemoteTypedActorOneImpl.java index c7ad0f3de6..f6ab0e56e7 100644 --- a/akka-spring/src/test/java/akka/spring/RemoteTypedActorOneImpl.java +++ b/akka-spring/src/test/java/akka/spring/RemoteTypedActorOneImpl.java @@ -2,6 +2,8 @@ package akka.spring; import akka.actor.*; +import scala.Option; + import java.util.concurrent.CountDownLatch; public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedActorOne { @@ -22,7 +24,7 @@ public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedAc } @Override - public void preRestart(Throwable e) { + public void preRestart(Throwable e, Option msg) { try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} latch.countDown(); } diff --git a/akka-spring/src/test/java/akka/spring/RemoteTypedActorTwoImpl.java b/akka-spring/src/test/java/akka/spring/RemoteTypedActorTwoImpl.java index 1479db0d8c..2ee07195e2 100644 --- a/akka-spring/src/test/java/akka/spring/RemoteTypedActorTwoImpl.java +++ b/akka-spring/src/test/java/akka/spring/RemoteTypedActorTwoImpl.java @@ -2,6 +2,8 @@ package akka.spring; import akka.actor.*; +import scala.Option; + import java.util.concurrent.CountDownLatch; public class RemoteTypedActorTwoImpl extends TypedActor implements RemoteTypedActorTwo { @@ -22,7 +24,7 @@ public class RemoteTypedActorTwoImpl extends TypedActor implements RemoteTypedAc } @Override - public void preRestart(Throwable e) { + public void preRestart(Throwable e, Option msg) { try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} latch.countDown(); } diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index 3a97263369..dca421c941 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -282,8 +282,8 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor { val txFactory = TransactionFactory(familyName = "AgentUpdater", readonly = false) def receive = { - case update: Update[T] ⇒ - self.tryReply(atomic(txFactory) { agent.ref alter update.function }) + case update: Update[_] ⇒ + self.tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T => T] }) case Get ⇒ self reply agent.get case _ ⇒ () } @@ -298,8 +298,8 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor { val txFactory = TransactionFactory(familyName = "ThreadBasedAgentUpdater", readonly = false) def receive = { - case update: Update[T] ⇒ try { - self.tryReply(atomic(txFactory) { agent.ref alter update.function }) + case update: Update[_] ⇒ try { + self.tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T => T] }) } finally { agent.resume self.stop() diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 6920faea64..116450665a 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -70,12 +70,12 @@ object TestActorRef { import ReflectiveAccess.{ createInstance, noParams, noArgs } createInstance[T](manifest[T].erasure, noParams, noArgs) match { - case r: Right[_, T] ⇒ r.b - case l: Left[Exception, _] ⇒ throw new ActorInitializationException( + case Right(value) ⇒ value + case Left(exception) ⇒ throw new ActorInitializationException( "Could not instantiate Actor" + "\nMake sure Actor is NOT defined inside a class/trait," + "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", l.a) + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", exception) } }, address) } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index f5d0a72046..1dd1498ad2 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -110,13 +110,12 @@ trait TestKitLight { val senderOption = Some(testActor) private var end: Duration = Duration.Inf - /* - * THIS IS A HACK: expectNoMsg and receiveWhile are bounded by `end`, but - * running them should not trigger an AssertionError, so mark their end - * time here and do not fail at the end of `within` if that time is not - * long gone. + + /** + * if last assertion was expectNoMsg, disable timing failure upon within() + * block end. */ - private var lastSoftTimeout: Duration = now - 5.millis + private var lastWasNoMsg = false /** * Stop test actor. Should be done at the end of the test unless relying on @@ -211,6 +210,8 @@ trait TestKitLight { val rem = end - start assert(rem >= min, "required min time " + min + " not possible, only " + format(min.unit, rem) + " left") + lastWasNoMsg = false + val max_diff = _max min rem val prev_end = end end = start + max_diff @@ -219,13 +220,8 @@ trait TestKitLight { val diff = now - start assert(min <= diff, "block took " + format(min.unit, diff) + ", should at least have been " + min) - /* - * caution: HACK AHEAD - */ - if (now - lastSoftTimeout > 5.millis) { + if (!lastWasNoMsg) { assert(diff <= max_diff, "block took " + format(_max.unit, diff) + ", exceeding " + format(_max.unit, max_diff)) - } else { - lastSoftTimeout -= 5.millis } ret @@ -302,6 +298,20 @@ trait TestKitLight { f(o) } + /** + * Same as `expectMsgType[T](remaining)`, but correctly treating the timeFactor. + */ + def expectMsgType[T](implicit m: Manifest[T]): T = expectMsgClass_internal(remaining, m.erasure.asInstanceOf[Class[T]]) + + /** + * Receive one message from the test actor and assert that it conforms to the + * given type (after erasure). Wait time is bounded by the given duration, + * with an AssertionFailure being thrown in case of timeout. + * + * @return the received object + */ + def expectMsgType[T](max: Duration)(implicit m: Manifest[T]): T = expectMsgClass_internal(max.dilated, m.erasure.asInstanceOf[Class[T]]) + /** * Same as `expectMsgClass(remaining, c)`, but correctly treating the timeFactor. */ @@ -452,7 +462,7 @@ trait TestKitLight { private def expectNoMsg_internal(max: Duration) { val o = receiveOne(max) assert(o eq null, "received unexpected message " + o) - lastSoftTimeout = now + lastWasNoMsg = true } /** @@ -501,7 +511,7 @@ trait TestKitLight { } val ret = doit(Nil) - lastSoftTimeout = now + lastWasNoMsg = true ret } @@ -541,6 +551,7 @@ trait TestKitLight { } else { queue.takeFirst } + lastWasNoMsg = false message match { case null ⇒ lastMessage = NullMessage diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 93264524c2..64c729f5dc 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -174,7 +174,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), Some(2), Some(1000)) val ref = TestActorRef(new TActor { def receiveT = { case _ ⇒ } - override def preRestart(reason: Throwable) { counter -= 1 } + override def preRestart(reason: Throwable, msg: Option[Any]) { counter -= 1 } override def postRestart(reason: Throwable) { counter -= 1 } }).start() self.dispatcher = CallingThreadDispatcher.global