From 1a63ff7dbd57f47aa145dce958be7ac8f747b375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Tue, 14 May 2013 22:07:56 +0200 Subject: [PATCH] Remove some unnecessary object creation and switch to HashSet for better performance. See #3308 --- .../scala/akka/actor/ActorSystemSpec.scala | 2 +- .../akka/actor/dispatch/ActorModelSpec.scala | 2 +- .../src/main/scala/akka/actor/ActorCell.scala | 3 +-- .../src/main/scala/akka/actor/ActorRef.scala | 2 +- .../src/main/scala/akka/actor/Props.scala | 13 ++++++++---- .../scala/akka/actor/dungeon/Children.scala | 2 +- .../scala/akka/actor/dungeon/DeathWatch.scala | 21 +++++++------------ .../src/main/scala/akka/util/Reflect.scala | 9 +++++++- 8 files changed, 29 insertions(+), 25 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 12314471ec..f83e599063 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -215,7 +215,7 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend "reliably create waves of actors" in { import system.dispatcher - implicit val timeout = Timeout(30 seconds) + implicit val timeout = Timeout((20 seconds).dilated) val waves = for (i ← 1 to 3) yield system.actorOf(Props[ActorSystemSpec.Waves]) ? 50000 Await.result(Future.sequence(waves), timeout.duration + 5.seconds) must be === Seq("done", "done", "done") } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index ee52045257..471c702e3c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -365,7 +365,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def flood(num: Int) { val cachedMessage = CountDownNStop(new CountDownLatch(num)) val stopLatch = new CountDownLatch(num) - val waitTime = (30 seconds).dilated.toMillis + val waitTime = (20 seconds).dilated.toMillis val boss = system.actorOf(Props(new Actor { def receive = { case "run" ⇒ for (_ ← 1 to num) (context.watch(context.actorOf(props))) ! cachedMessage diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 660c91c106..b36a7389ec 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -301,8 +301,7 @@ private[akka] object ActorCell { final val emptyBehaviorStack: List[Actor.Receive] = Nil - final val emptyActorRefSet: Set[ActorRef] = immutable.TreeSet.empty - final val emptyActorRefMap: Map[ActorPath, ActorRef] = immutable.TreeMap.empty + final val emptyActorRefSet: Set[ActorRef] = immutable.HashSet.empty final val terminatedProps: Props = Props(() ⇒ throw new IllegalActorStateException("This Actor has been terminated")) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 9bc923251c..b93b35aa76 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -96,7 +96,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable */ final def compareTo(other: ActorRef) = { val x = this.path compareTo other.path - if (x == 0) this.path.uid compareTo other.path.uid + if (x == 0) if (this.path.uid < other.path.uid) -1 else if (this.path.uid == other.path.uid) 0 else 1 else x } diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index cda2e5eed0..3542e10818 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -64,7 +64,7 @@ object Props { * * Scala API. */ - def apply[T <: Actor: ClassTag](): Props = apply(implicitly[ClassTag[T]].runtimeClass) + def apply[T <: Actor: ClassTag](): Props = apply(defaultDeploy, implicitly[ClassTag[T]].runtimeClass, Vector.empty) /** * Returns a Props that has default values except for "creator" which will be a function that creates an instance @@ -136,8 +136,11 @@ object Props { @SerialVersionUID(2L) case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]) { + // the constructor can't be serialized, but needs to be a lazy val to be initialized after deserialization + @transient + private lazy val constructor = Reflect.findConstructor(clazz, args) // validate constructor signature; throws IllegalArgumentException if invalid - Reflect.findConstructor(clazz, args) + constructor /** * No-args constructor that sets all the default values. @@ -250,16 +253,18 @@ case class Props(deploy: Deploy, clazz: Class[_], args: immutable.Seq[Any]) { * used within the implementation of [[IndirectActorProducer#produce]]. */ def newActor(): Actor = { - Reflect.instantiate(clazz, args) match { + Reflect.instantiate(constructor, args) match { case a: Actor ⇒ a case i: IndirectActorProducer ⇒ i.produce() case _ ⇒ throw new IllegalArgumentException(s"unknown actor creator [$clazz]") } } + // don't serialize the class but reinitialize it after deserialization + @transient private lazy val cachedActorClass: Class[_ <: Actor] = { if (classOf[IndirectActorProducer].isAssignableFrom(clazz)) { - Reflect.instantiate(clazz, args).asInstanceOf[IndirectActorProducer].actorClass + Reflect.instantiate(constructor, args).asInstanceOf[IndirectActorProducer].actorClass } else if (classOf[Actor].isAssignableFrom(clazz)) { clazz.asInstanceOf[Class[_ <: Actor]] } else { diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index 2281243b92..b8a69f98cf 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -184,7 +184,7 @@ private[akka] trait Children { this: ActorCell ⇒ // this name will either be unreserved or overwritten with a real child below val actor = try { - val childPath = (cell.self.path / name).withUid(ActorCell.newUid()) + val childPath = new ChildActorPath(cell.self.path, name, ActorCell.newUid()) cell.provider.actorOf(cell.systemImpl, props, cell.self, childPath, systemService = systemService, deploy = None, lookupDeploy = true, async = async) } catch { diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index 9c96d890dd..992a70129e 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -39,12 +39,11 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ a } - protected def receivedTerminated(t: Terminated): Unit = { + protected def receivedTerminated(t: Terminated): Unit = if (terminatedQueued(t.actor)) { terminatedQueued -= t.actor // here we know that it is the SAME ref which was put in receiveMessage(t) } - } /** * When this actor is watching the subject of [[akka.actor.Terminated]] message @@ -65,21 +64,17 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ // TODO this should be removed and be replaced with `watching.contains(subject)` // when all actor references have uid, i.e. actorFor is removed - private def watchingContains(subject: ActorRef): Boolean = { + private def watchingContains(subject: ActorRef): Boolean = watching.contains(subject) || (subject.path.uid != ActorCell.undefinedUid && watching.contains(new UndefinedUidActorRef(subject))) - } // TODO this should be removed and be replaced with `set - subject` // when all actor references have uid, i.e. actorFor is removed - private def removeFromSet(subject: ActorRef, set: Set[ActorRef]): Set[ActorRef] = { - val removed = if (set(subject)) set - subject else set - if (subject.path.uid != ActorCell.undefinedUid) - removed - new UndefinedUidActorRef(subject) - else removed filterNot (_.path == subject.path) - } + private def removeFromSet(subject: ActorRef, set: Set[ActorRef]): Set[ActorRef] = + if (subject.path.uid != ActorCell.undefinedUid) (set - subject) - new UndefinedUidActorRef(subject) + else set filterNot (_.path == subject.path) - protected def tellWatchersWeDied(actor: Actor): Unit = { + protected def tellWatchersWeDied(actor: Actor): Unit = if (!watchedBy.isEmpty) { try { def sendTerminated(ifLocal: Boolean)(watcher: ActorRef): Unit = @@ -103,9 +98,8 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ watchedBy foreach sendTerminated(ifLocal = true) } finally watchedBy = ActorCell.emptyActorRefSet } - } - protected def unwatchWatchedActors(actor: Actor): Unit = { + protected def unwatchWatchedActors(actor: Actor): Unit = if (!watching.isEmpty) { try { watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ @@ -117,7 +111,6 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ unsubscribeAddressTerminated() } } - } protected def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { val watcheeSelf = watchee == self diff --git a/akka-actor/src/main/scala/akka/util/Reflect.scala b/akka-actor/src/main/scala/akka/util/Reflect.scala index b3f67381ab..31d4b57bfd 100644 --- a/akka-actor/src/main/scala/akka/util/Reflect.scala +++ b/akka-actor/src/main/scala/akka/util/Reflect.scala @@ -50,7 +50,14 @@ private[akka] object Reflect { * Calls findConstructor and invokes it with the given arguments. */ private[akka] def instantiate[T](clazz: Class[T], args: immutable.Seq[Any]): T = { - val constructor = findConstructor(clazz, args) + instantiate(findConstructor(clazz, args), args) + } + + /** + * INTERNAL API + * Invokes the constructor with the the given arguments. + */ + private[akka] def instantiate[T](constructor: Constructor[T], args: immutable.Seq[Any]): T = { constructor.setAccessible(true) try constructor.newInstance(args.asInstanceOf[Seq[AnyRef]]: _*) catch {