From f7444d698c43be32a722dfec0171f16fd2560ffc Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Sat, 16 Jan 2016 19:37:17 +0100 Subject: [PATCH] #19014 make Akka Typed spawnAdapter more efficient --- .../scala/akka/actor/dungeon/Children.scala | 16 ++++++++---- .../src/main/scala/akka/typed/Impl.scala | 26 ++++++++++++++----- .../scala/akka/typed/ActorContextSpec.scala | 26 +++++++++++++++++++ 3 files changed, 56 insertions(+), 12 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index 547f7fb34b..8698c88d78 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -71,14 +71,20 @@ private[akka] trait Children { this: ActorCell ⇒ ref } - private[akka] def removeFunctionRef(ref: FunctionRef): Unit = { + private[akka] def removeFunctionRef(ref: FunctionRef): Boolean = { require(ref.path.parent eq self.path, "trying to remove FunctionRef from wrong ActorCell") - ref.stop() val name = ref.path.name - @tailrec def rec(): Unit = { + @tailrec def rec(): Boolean = { val old = functionRefs - val removed = old - name - if (!Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.functionRefsOffset, old, removed)) rec() + if (!old.contains(name)) false + else { + val removed = old - name + if (!Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.functionRefsOffset, old, removed)) rec() + else { + ref.stop() + true + } + } } rec() } diff --git a/akka-typed/src/main/scala/akka/typed/Impl.scala b/akka-typed/src/main/scala/akka/typed/Impl.scala index f35dadfa38..173fe51a95 100644 --- a/akka-typed/src/main/scala/akka/typed/Impl.scala +++ b/akka-typed/src/main/scala/akka/typed/Impl.scala @@ -82,12 +82,20 @@ private[typed] class ActorContextAdapter[T](ctx: akka.actor.ActorContext) extend def spawn[U](props: Props[U], name: String) = ctx.spawn(props, name) def actorOf(props: a.Props) = ctx.actorOf(props) def actorOf(props: a.Props, name: String) = ctx.actorOf(props, name) - def stop(child: ActorRef[Nothing]) = ctx.child(child.path.name) match { - case Some(ref) if ref == child.untypedRef ⇒ - ctx.stop(child.untypedRef) - true - case _ ⇒ false // none of our business - } + def stop(child: ActorRef[Nothing]) = + child.untypedRef match { + case f: akka.actor.FunctionRef ⇒ + val cell = ctx.asInstanceOf[akka.actor.ActorCell] + cell.removeFunctionRef(f) + case _ ⇒ + ctx.child(child.path.name) match { + case Some(ref) if ref == child.untypedRef ⇒ + ctx.stop(child.untypedRef) + true + case _ ⇒ + false // none of our business + } + } def watch[U](other: ActorRef[U]) = { ctx.watch(other.untypedRef); other } def watch(other: a.ActorRef) = { ctx.watch(other); other } def unwatch[U](other: ActorRef[U]) = { ctx.unwatch(other.untypedRef); other } @@ -98,7 +106,11 @@ private[typed] class ActorContextAdapter[T](ctx: akka.actor.ActorContext) extend import ctx.dispatcher ctx.system.scheduler.scheduleOnce(delay, target.untypedRef, msg) } - def spawnAdapter[U](f: U ⇒ T) = ActorRef[U](ctx.actorOf(akka.actor.Props(classOf[MessageWrapper], f))) + def spawnAdapter[U](f: U ⇒ T) = { + val cell = ctx.asInstanceOf[akka.actor.ActorCell] + val ref = cell.addFunctionRef((_, msg) ⇒ ctx.self ! f(msg.asInstanceOf[U])) + ActorRef[U](ref) + } } /** diff --git a/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala b/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala index 756a6dc2b1..708cfdc175 100644 --- a/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala @@ -66,6 +66,9 @@ object ActorContextSpec { final case class BecomeCareless(replyTo: ActorRef[BecameCareless.type]) extends Command case object BecameCareless extends Event + final case class GetAdapter(replyTo: ActorRef[Adapter]) extends Command + final case class Adapter(a: ActorRef[Command]) extends Event + def subject(monitor: ActorRef[GotSignal]): Behavior[Command] = FullTotal { case Sig(ctx, signal) ⇒ @@ -142,6 +145,9 @@ object ActorContextSpec { monitor ! GotSignal(sig) Same } + case GetAdapter(replyTo) ⇒ + replyTo ! Adapter(ctx.spawnAdapter(identity)) + Same } } } @@ -503,6 +509,26 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( msgs should ===(Scheduled :: Pong2 :: Nil) } }) + + def `40 must create a working adapter`(): Unit = sync(setup("ctx40") { (ctx, startWith) ⇒ + startWith.keep { subj ⇒ + subj ! GetAdapter(ctx.self) + }.expectMessage(500.millis) { (msg, subj) ⇒ + val Adapter(adapter) = msg + ctx.watch(adapter) + adapter ! Ping(ctx.self) + (subj, adapter) + }.expectMessage(500.millis) { + case (msg, (subj, adapter)) ⇒ + msg should ===(Pong1) + ctx.stop(subj) + adapter + }.expectMessageKeep(500.millis) { (msg, _) ⇒ + msg should ===(GotSignal(PostStop)) + }.expectTermination(500.millis) { (t, adapter) ⇒ + t.ref should ===(adapter) + } + }) } object `An ActorContext` extends Tests {