diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index bbffc93320..213bd29b61 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -47,11 +47,11 @@ case class Link(child: ActorRef) extends LifeCycleMessage case class Unlink(child: ActorRef) extends LifeCycleMessage case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage case object Kill extends LifeCycleMessage - -case object ReceiveTimeout +case object ReceiveTimeout extends LifeCycleMessage // Exceptions for Actors class ActorStartException private[akka](message: String) extends RuntimeException(message) +class IllegalActorStateException private[akka](message: String) extends RuntimeException(message) class ActorKilledException private[akka](message: String) extends RuntimeException(message) class ActorInitializationException private[akka](message: String) extends RuntimeException(message) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 0f758c01f9..f09d3065ab 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -103,7 +103,7 @@ trait ActorRef extends TransactionManagement { * Used if the receive (or HotSwap) contains a case handling ReceiveTimeout. */ @volatile var receiveTimeout: Long = Actor.RECEIVE_TIMEOUT - + /** * User overridable callback/setting. * @@ -585,7 +585,7 @@ sealed class LocalActorRef private[akka]( __format: Format[_ <: Actor]) = { this(() => { val actorClass = __loader.loadClass(__actorClassName) - if (__format.isInstanceOf[SerializerBasedActorFormat[_]]) + if (__format.isInstanceOf[SerializerBasedActorFormat[_]]) __format.asInstanceOf[SerializerBasedActorFormat[_]] .serializer .fromBinary(__actorBytes, Some(actorClass)).asInstanceOf[Actor] @@ -1082,7 +1082,7 @@ sealed class LocalActorRef private[akka]( } } } - + protected[akka] def restartLinkedActors(reason: Throwable) = guard.withGuard { linkedActorsAsList.foreach { actorRef => if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent)) diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index efe1f49b7b..766f69af7f 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -37,10 +37,7 @@ object ActorRegistry extends ListenerManagement { * Returns all actors in the system. */ def actors: List[ActorRef] = { - val all = new ListBuffer[ActorRef] - val elements = actorsByUUID.elements - while (elements.hasMoreElements) all += elements.nextElement - all.toList + filter(_=> true) } /** @@ -52,20 +49,32 @@ object ActorRegistry extends ListenerManagement { } /** - * Finds all actors that are subtypes of the class passed in as the Manifest argument. + * Finds all actors that are subtypes of the class passed in as the Manifest argument and supproting passed message. */ - def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorRef] = { - val all = new ListBuffer[ActorRef] + def actorsFor[T <: Actor](message: Any)(implicit manifest: Manifest[T] ): List[ActorRef] = + filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass) && a.isDefinedAt(message)) + + /** + * Finds all actors that satisfy a predicate. + */ + def filter(p: ActorRef => Boolean): List[ActorRef] = { + val all = new ListBuffer[ActorRef] val elements = actorsByUUID.elements while (elements.hasMoreElements) { val actorId = elements.nextElement - if (manifest.erasure.isAssignableFrom(actorId.actor.getClass)) { + if (p(actorId)) { all += actorId } } all.toList } + /** + * Finds all actors that are subtypes of the class passed in as the Manifest argument. + */ + def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorRef] = + filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass)) + /** * Finds any actor that matches T. */ diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala index b6d8b8a99c..6a7187afdc 100644 --- a/akka-core/src/main/scala/actor/Scheduler.scala +++ b/akka-core/src/main/scala/actor/Scheduler.scala @@ -95,4 +95,4 @@ private object SchedulerThreadFactory extends ThreadFactory { thread.setDaemon(true) thread } -} \ No newline at end of file +} diff --git a/akka-core/src/main/scala/actor/SerializationProtocol.scala b/akka-core/src/main/scala/actor/SerializationProtocol.scala index 7737988d23..4478f7df1c 100644 --- a/akka-core/src/main/scala/actor/SerializationProtocol.scala +++ b/akka-core/src/main/scala/actor/SerializationProtocol.scala @@ -70,10 +70,10 @@ trait SerializerBasedActorFormat[T <: Actor] extends Format[T] { */ object ActorSerialization { - def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef = + def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef = fromBinaryToLocalActorRef(bytes, format) - def toBinary[T <: Actor](a: ActorRef)(implicit format: Format[T]): Array[Byte] = { + def toBinary[T <: Actor](a: ActorRef)(implicit format: Format[T]): Array[Byte] = { toSerializedActorRefProtocol(a, format).toByteArray } @@ -85,7 +85,7 @@ object ActorSerialization { } val builder = LifeCycleProtocol.newBuilder a.lifeCycle match { - case Some(LifeCycle(scope, None)) => + case Some(LifeCycle(scope, None)) => setScope(builder, scope) Some(builder.build) case Some(LifeCycle(scope, Some(callbacks))) => @@ -118,14 +118,14 @@ object ActorSerialization { builder.build } - private def fromBinaryToLocalActorRef[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef = + private def fromBinaryToLocalActorRef[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef = fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None) private def fromProtobufToLocalActorRef[T <: Actor](protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = { Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol) - - val serializer = - if (format.isInstanceOf[SerializerBasedActorFormat[_]]) + + val serializer = + if (format.isInstanceOf[SerializerBasedActorFormat[_]]) Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer) else None @@ -133,7 +133,7 @@ object ActorSerialization { if (protocol.hasLifeCycle) { val lifeCycleProtocol = protocol.getLifeCycle val restartCallbacks = - if (lifeCycleProtocol.hasPreRestart || lifeCycleProtocol.hasPostRestart) + if (lifeCycleProtocol.hasPreRestart || lifeCycleProtocol.hasPostRestart) Some(RestartCallbacks(lifeCycleProtocol.getPreRestart, lifeCycleProtocol.getPostRestart)) else None Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent, restartCallbacks) @@ -145,7 +145,7 @@ object ActorSerialization { if (protocol.hasSupervisor) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) else None - + val hotswap = if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]])) @@ -214,7 +214,7 @@ object RemoteActorSerialization { RemoteServer.registerActor(homeAddress, uuid, ar) registeredInRemoteNodeDuringSerialization = true } - + RemoteActorRefProtocol.newBuilder .setUuid(uuid) .setActorClassname(actorClass.getName) diff --git a/akka-core/src/test/scala/ActorRegistrySpec.scala b/akka-core/src/test/scala/ActorRegistrySpec.scala index 0a9d23c6c3..6914472e2c 100644 --- a/akka-core/src/test/scala/ActorRegistrySpec.scala +++ b/akka-core/src/test/scala/ActorRegistrySpec.scala @@ -14,6 +14,19 @@ object ActorRegistrySpec { self.reply("got ping") } } + + class TestActor2 extends Actor { + self.id = "MyID2" + def receive = { + case "ping" => + record = "pong" + record + self.reply("got ping") + case "ping2" => + record = "pong" + record + self.reply("got ping") + } + } + } class ActorRegistrySpec extends JUnitSuite { @@ -111,6 +124,34 @@ class ActorRegistrySpec extends JUnitSuite { actor2.stop } + @Test def shouldGetActorsByMessageFromActorRegistry { + + ActorRegistry.shutdownAll + val actor1 = actorOf[TestActor] + actor1.start + val actor2 = actorOf[TestActor2] + actor2.start + + val actorsForAcotrTestActor = ActorRegistry.actorsFor[TestActor] + assert(actorsForAcotrTestActor.size === 1) + + val actorsForAcotrTestActor2 = ActorRegistry.actorsFor[TestActor2] + assert(actorsForAcotrTestActor2.size === 1) + + val actorsForAcotr = ActorRegistry.actorsFor[Actor] + assert(actorsForAcotr.size === 2) + + + val actorsForMessagePing2 = ActorRegistry.actorsFor[Actor]("ping2") + assert(actorsForMessagePing2.size === 1) + + val actorsForMessagePing = ActorRegistry.actorsFor[Actor]("ping") + assert(actorsForMessagePing.size === 2) + + actor1.stop + actor2.stop + } + @Test def shouldGetAllActorsFromActorRegistry { ActorRegistry.shutdownAll val actor1 = actorOf[TestActor] diff --git a/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala index 7d0bfd902e..0a47d1c9d6 100644 --- a/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala +++ b/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala @@ -25,9 +25,9 @@ class NestedTransactionalActiveObjectSpec extends ShouldMatchers with BeforeAndAfterAll { ======= - Spec with - ShouldMatchers with - BeforeAndAfterAll { + Spec with + ShouldMatchers with + BeforeAndAfterAll { >>>>>>> 38e8bea3fe6a7e9fcc9c5f353124144739bdc234:akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala private val conf = new ActiveObjectConfigurator diff --git a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala index 6aeb8e09de..aa800c8f3d 100644 --- a/akka-core/src/test/scala/ReceiveTimeoutSpec.scala +++ b/akka-core/src/test/scala/ReceiveTimeoutSpec.scala @@ -8,11 +8,11 @@ import java.util.concurrent.TimeUnit import org.multiverse.api.latches.StandardLatch class ReceiveTimeoutSpec extends JUnitSuite { - + @Test def receiveShouldGetTimeout= { val timeoutLatch = new StandardLatch - + val timeoutActor = actorOf(new Actor { self.receiveTimeout = 500 @@ -65,4 +65,4 @@ class ReceiveTimeoutSpec extends JUnitSuite { // timeout already after 500 ms, so 1 second wait should be enough assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS) == false) } -} \ No newline at end of file +} diff --git a/akka-core/src/test/scala/RemoteTransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/RemoteTransactionalActiveObjectSpec.scala index ee9971e83e..897318ce7d 100644 --- a/akka-core/src/test/scala/RemoteTransactionalActiveObjectSpec.scala +++ b/akka-core/src/test/scala/RemoteTransactionalActiveObjectSpec.scala @@ -24,9 +24,9 @@ object RemoteTransactionalActiveObjectSpec { @RunWith(classOf[JUnitRunner]) class RemoteTransactionalActiveObjectSpec extends - Spec with - ShouldMatchers with - BeforeAndAfterAll { + Spec with + ShouldMatchers with + BeforeAndAfterAll { import RemoteTransactionalActiveObjectSpec._ Config.config diff --git a/akka-core/src/test/scala/SchedulerSpec.scala b/akka-core/src/test/scala/SchedulerSpec.scala index 9ec325c42c..0fe7c45ea5 100644 --- a/akka-core/src/test/scala/SchedulerSpec.scala +++ b/akka-core/src/test/scala/SchedulerSpec.scala @@ -6,7 +6,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import org.junit.{After, Test} class SchedulerSpec extends JUnitSuite { - + @Test def schedulerShouldScheduleMoreThanOnce = { case object Tick diff --git a/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala b/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala index e44ae76322..99a2b99dc9 100644 --- a/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala +++ b/akka-core/src/test/scala/SerializableTypeClassActorSpec.scala @@ -27,7 +27,7 @@ class SerializableTypeClassActorSpec extends act.count = p.getCount act } - def toBinary(ac: MyActor) = + def toBinary(ac: MyActor) = ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray } } @@ -40,7 +40,7 @@ class SerializableTypeClassActorSpec extends act.count2 = p.getCount2 act } - def toBinary(ac: MyActorWithDualCounter) = + def toBinary(ac: MyActorWithDualCounter) = ProtobufProtocol.DualCounter.newBuilder.setCount1(ac.count1).setCount2(ac.count2).build.toByteArray } } diff --git a/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala index 93c9c1dabf..b42c137e33 100644 --- a/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala +++ b/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala @@ -24,9 +24,9 @@ class TransactionalActiveObjectSpec extends ShouldMatchers with BeforeAndAfterAll { ======= - Spec with - ShouldMatchers with - BeforeAndAfterAll { + Spec with + ShouldMatchers with + BeforeAndAfterAll { >>>>>>> 38e8bea3fe6a7e9fcc9c5f353124144739bdc234:akka-core/src/test/scala/TransactionalActiveObjectSpec.scala private val conf = new ActiveObjectConfigurator diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index fb9dac8b73..4d99b7e491 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -325,9 +325,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { val junit = "junit" % "junit" % "4.5" % "test" } - class AkkaSampleRestJavaProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) with CodeFellowPlugin + class AkkaSampleRestJavaProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) with CodeFellowPlugin - class AkkaSampleRemoteProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) with CodeFellowPlugin + class AkkaSampleRemoteProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) with CodeFellowPlugin class AkkaSampleRestScalaProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) with CodeFellowPlugin { val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1.1" % "compile"