#19014 make Akka Typed spawnAdapter more efficient
This commit is contained in:
parent
ea34103f98
commit
f7444d698c
3 changed files with 56 additions and 12 deletions
|
|
@ -71,14 +71,20 @@ private[akka] trait Children { this: ActorCell ⇒
|
|||
ref
|
||||
}
|
||||
|
||||
private[akka] def removeFunctionRef(ref: FunctionRef): Unit = {
|
||||
private[akka] def removeFunctionRef(ref: FunctionRef): Boolean = {
|
||||
require(ref.path.parent eq self.path, "trying to remove FunctionRef from wrong ActorCell")
|
||||
ref.stop()
|
||||
val name = ref.path.name
|
||||
@tailrec def rec(): Unit = {
|
||||
@tailrec def rec(): Boolean = {
|
||||
val old = functionRefs
|
||||
val removed = old - name
|
||||
if (!Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.functionRefsOffset, old, removed)) rec()
|
||||
if (!old.contains(name)) false
|
||||
else {
|
||||
val removed = old - name
|
||||
if (!Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.functionRefsOffset, old, removed)) rec()
|
||||
else {
|
||||
ref.stop()
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
rec()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -82,12 +82,20 @@ private[typed] class ActorContextAdapter[T](ctx: akka.actor.ActorContext) extend
|
|||
def spawn[U](props: Props[U], name: String) = ctx.spawn(props, name)
|
||||
def actorOf(props: a.Props) = ctx.actorOf(props)
|
||||
def actorOf(props: a.Props, name: String) = ctx.actorOf(props, name)
|
||||
def stop(child: ActorRef[Nothing]) = ctx.child(child.path.name) match {
|
||||
case Some(ref) if ref == child.untypedRef ⇒
|
||||
ctx.stop(child.untypedRef)
|
||||
true
|
||||
case _ ⇒ false // none of our business
|
||||
}
|
||||
def stop(child: ActorRef[Nothing]) =
|
||||
child.untypedRef match {
|
||||
case f: akka.actor.FunctionRef ⇒
|
||||
val cell = ctx.asInstanceOf[akka.actor.ActorCell]
|
||||
cell.removeFunctionRef(f)
|
||||
case _ ⇒
|
||||
ctx.child(child.path.name) match {
|
||||
case Some(ref) if ref == child.untypedRef ⇒
|
||||
ctx.stop(child.untypedRef)
|
||||
true
|
||||
case _ ⇒
|
||||
false // none of our business
|
||||
}
|
||||
}
|
||||
def watch[U](other: ActorRef[U]) = { ctx.watch(other.untypedRef); other }
|
||||
def watch(other: a.ActorRef) = { ctx.watch(other); other }
|
||||
def unwatch[U](other: ActorRef[U]) = { ctx.unwatch(other.untypedRef); other }
|
||||
|
|
@ -98,7 +106,11 @@ private[typed] class ActorContextAdapter[T](ctx: akka.actor.ActorContext) extend
|
|||
import ctx.dispatcher
|
||||
ctx.system.scheduler.scheduleOnce(delay, target.untypedRef, msg)
|
||||
}
|
||||
def spawnAdapter[U](f: U ⇒ T) = ActorRef[U](ctx.actorOf(akka.actor.Props(classOf[MessageWrapper], f)))
|
||||
def spawnAdapter[U](f: U ⇒ T) = {
|
||||
val cell = ctx.asInstanceOf[akka.actor.ActorCell]
|
||||
val ref = cell.addFunctionRef((_, msg) ⇒ ctx.self ! f(msg.asInstanceOf[U]))
|
||||
ActorRef[U](ref)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -66,6 +66,9 @@ object ActorContextSpec {
|
|||
final case class BecomeCareless(replyTo: ActorRef[BecameCareless.type]) extends Command
|
||||
case object BecameCareless extends Event
|
||||
|
||||
final case class GetAdapter(replyTo: ActorRef[Adapter]) extends Command
|
||||
final case class Adapter(a: ActorRef[Command]) extends Event
|
||||
|
||||
def subject(monitor: ActorRef[GotSignal]): Behavior[Command] =
|
||||
FullTotal {
|
||||
case Sig(ctx, signal) ⇒
|
||||
|
|
@ -142,6 +145,9 @@ object ActorContextSpec {
|
|||
monitor ! GotSignal(sig)
|
||||
Same
|
||||
}
|
||||
case GetAdapter(replyTo) ⇒
|
||||
replyTo ! Adapter(ctx.spawnAdapter(identity))
|
||||
Same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -503,6 +509,26 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
|
|||
msgs should ===(Scheduled :: Pong2 :: Nil)
|
||||
}
|
||||
})
|
||||
|
||||
def `40 must create a working adapter`(): Unit = sync(setup("ctx40") { (ctx, startWith) ⇒
|
||||
startWith.keep { subj ⇒
|
||||
subj ! GetAdapter(ctx.self)
|
||||
}.expectMessage(500.millis) { (msg, subj) ⇒
|
||||
val Adapter(adapter) = msg
|
||||
ctx.watch(adapter)
|
||||
adapter ! Ping(ctx.self)
|
||||
(subj, adapter)
|
||||
}.expectMessage(500.millis) {
|
||||
case (msg, (subj, adapter)) ⇒
|
||||
msg should ===(Pong1)
|
||||
ctx.stop(subj)
|
||||
adapter
|
||||
}.expectMessageKeep(500.millis) { (msg, _) ⇒
|
||||
msg should ===(GotSignal(PostStop))
|
||||
}.expectTermination(500.millis) { (t, adapter) ⇒
|
||||
t.ref should ===(adapter)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
object `An ActorContext` extends Tests {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue