Make 'watch' and 'unwatch' return void (#22664)
This indeed makes both the call sites and the implementations a little more elegant. The only non-trivial change was in EventStreamSpec, which while a bit more verbose, seems more readable like this, too.
This commit is contained in:
parent
ae467b1528
commit
2b09868f01
9 changed files with 23 additions and 23 deletions
|
|
@ -104,13 +104,13 @@ public interface ActorContext<T> {
|
|||
* {@link akka.typed.ActorSystem} to which the referenced Actor belongs is declared as failed
|
||||
* (e.g. in reaction to being unreachable).
|
||||
*/
|
||||
public <U> ActorRef<U> watch(ActorRef<U> other);
|
||||
public void watch(ActorRef<?> other);
|
||||
|
||||
/**
|
||||
* Revoke the registration established by {@link #watch}. A {@link akka.typed.Terminated}
|
||||
* notification will not subsequently be received for the referenced Actor.
|
||||
*/
|
||||
public <U> ActorRef<U> unwatch(ActorRef<U> other);
|
||||
public void unwatch(ActorRef<?> other);
|
||||
|
||||
/**
|
||||
* Schedule the sending of a notification in case no other message is received
|
||||
|
|
|
|||
|
|
@ -105,8 +105,8 @@ class StubbedActorContext[T](
|
|||
case Some(inbox) ⇒ inbox.ref == child
|
||||
}
|
||||
}
|
||||
override def watch[U](other: ActorRef[U]): ActorRef[U] = other
|
||||
override def unwatch[U](other: ActorRef[U]): ActorRef[U] = other
|
||||
override def watch(other: ActorRef[_]): Unit = ()
|
||||
override def unwatch(other: ActorRef[_]): Unit = ()
|
||||
override def setReceiveTimeout(d: FiniteDuration, msg: T): Unit = ()
|
||||
override def cancelReceiveTimeout(): Unit = ()
|
||||
|
||||
|
|
|
|||
|
|
@ -80,11 +80,11 @@ class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _ma
|
|||
effectQueue.offer(Stopped(child.path.name))
|
||||
super.stop(child)
|
||||
}
|
||||
override def watch[U](other: ActorRef[U]): ActorRef[U] = {
|
||||
override def watch(other: ActorRef[_]): Unit = {
|
||||
effectQueue.offer(Watched(other))
|
||||
super.watch(other)
|
||||
}
|
||||
override def unwatch[U](other: ActorRef[U]): ActorRef[U] = {
|
||||
override def unwatch(other: ActorRef[_]): Unit = {
|
||||
effectQueue.offer(Unwatched(other))
|
||||
super.unwatch(other)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,8 +36,8 @@ private[typed] class ActorContextAdapter[T](ctx: a.ActorContext) extends ActorCo
|
|||
false // none of our business
|
||||
}
|
||||
}
|
||||
override def watch[U](other: ActorRef[U]) = { ctx.watch(toUntyped(other)); other }
|
||||
override def unwatch[U](other: ActorRef[U]) = { ctx.unwatch(toUntyped(other)); other }
|
||||
override def watch(other: ActorRef[_]) = ctx.watch(toUntyped(other))
|
||||
override def unwatch(other: ActorRef[_]) = ctx.unwatch(toUntyped(other))
|
||||
var receiveTimeoutMsg: T = null.asInstanceOf[T]
|
||||
override def setReceiveTimeout(d: FiniteDuration, msg: T) = {
|
||||
receiveTimeoutMsg = msg
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ private[typed] trait DeathWatch[T] {
|
|||
private var watching = Set.empty[ARImpl]
|
||||
private var watchedBy = Set.empty[ARImpl]
|
||||
|
||||
final def watch[U](_a: ActorRef[U]): ActorRef[U] = {
|
||||
final def watch(_a: ActorRef[_]): Unit = {
|
||||
val a = _a.sorry
|
||||
if (a != self && !watching.contains(a)) {
|
||||
maintainAddressTerminatedSubscription(a) {
|
||||
|
|
@ -52,10 +52,9 @@ private[typed] trait DeathWatch[T] {
|
|||
watching += a
|
||||
}
|
||||
}
|
||||
a
|
||||
}
|
||||
|
||||
final def unwatch[U](_a: ActorRef[U]): ActorRef[U] = {
|
||||
final def unwatch(_a: ActorRef[_]): Unit = {
|
||||
val a = _a.sorry
|
||||
if (a != self && watching.contains(a)) {
|
||||
a.sendSystem(Unwatch(a, self))
|
||||
|
|
@ -63,7 +62,6 @@ private[typed] trait DeathWatch[T] {
|
|||
watching -= a
|
||||
}
|
||||
}
|
||||
a
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -137,7 +135,7 @@ private[typed] trait DeathWatch[T] {
|
|||
if (system.settings.untyped.DebugLifecycle) publish(Debug(self.path.toString, clazz(behavior), s"now watched by $watcher"))
|
||||
}
|
||||
} else if (!watcheeSelf && watcherSelf) {
|
||||
watch[Nothing](watchee)
|
||||
watch(watchee)
|
||||
} else {
|
||||
publish(Warning(self.path.toString, clazz(behavior), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self)))
|
||||
}
|
||||
|
|
@ -153,7 +151,7 @@ private[typed] trait DeathWatch[T] {
|
|||
if (system.settings.untyped.DebugLifecycle) publish(Debug(self.path.toString, clazz(behavior), s"no longer watched by $watcher"))
|
||||
}
|
||||
} else if (!watcheeSelf && watcherSelf) {
|
||||
unwatch[Nothing](watchee)
|
||||
unwatch(watchee)
|
||||
} else {
|
||||
publish(Warning(self.path.toString, clazz(behavior), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self)))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ private[typed] class EventStreamImpl(private val debug: Boolean)(implicit privat
|
|||
Actor.Stateful[Command]({
|
||||
case (ctx, Register(actor)) ⇒
|
||||
if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates"))
|
||||
ctx.watch[Nothing](actor)
|
||||
ctx.watch(actor)
|
||||
Actor.Same
|
||||
|
||||
case (ctx, UnregisterIfNoMoreSubscribedChannels(actor)) if hasSubscriptions(actor) ⇒ Actor.Same
|
||||
|
|
@ -55,7 +55,7 @@ private[typed] class EventStreamImpl(private val debug: Boolean)(implicit privat
|
|||
|
||||
case (ctx, UnregisterIfNoMoreSubscribedChannels(actor)) ⇒
|
||||
if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions"))
|
||||
ctx.unwatch[Nothing](actor)
|
||||
ctx.unwatch(actor)
|
||||
Actor.Same
|
||||
}, {
|
||||
case (_, Terminated(actor)) ⇒
|
||||
|
|
|
|||
|
|
@ -89,13 +89,13 @@ trait ActorContext[T] { this: akka.typed.javadsl.ActorContext[T] ⇒
|
|||
* [[ActorSystem]] to which the referenced Actor belongs is declared as
|
||||
* failed (e.g. in reaction to being unreachable).
|
||||
*/
|
||||
def watch[U](other: ActorRef[U]): ActorRef[U]
|
||||
def watch(other: ActorRef[_]): Unit
|
||||
|
||||
/**
|
||||
* Revoke the registration established by `watch`. A [[Terminated]]
|
||||
* notification will not subsequently be received for the referenced Actor.
|
||||
*/
|
||||
def unwatch[U](other: ActorRef[U]): ActorRef[U]
|
||||
def unwatch(other: ActorRef[_]): Unit
|
||||
|
||||
/**
|
||||
* Schedule the sending of a notification in case no other
|
||||
|
|
|
|||
|
|
@ -113,11 +113,11 @@ object ActorContextSpec {
|
|||
else replyTo ! NotKilled
|
||||
Actor.Same
|
||||
case Watch(ref, replyTo) ⇒
|
||||
ctx.watch[Nothing](ref)
|
||||
ctx.watch(ref)
|
||||
replyTo ! Watched
|
||||
Actor.Same
|
||||
case Unwatch(ref, replyTo) ⇒
|
||||
ctx.unwatch[Nothing](ref)
|
||||
ctx.unwatch(ref)
|
||||
replyTo ! Unwatched
|
||||
Actor.Same
|
||||
case GetInfo(replyTo) ⇒
|
||||
|
|
@ -196,11 +196,11 @@ object ActorContextSpec {
|
|||
else replyTo ! NotKilled
|
||||
Actor.Same
|
||||
case Watch(ref, replyTo) ⇒
|
||||
ctx.watch[Nothing](ref)
|
||||
ctx.watch(ref)
|
||||
replyTo ! Watched
|
||||
Actor.Same
|
||||
case Unwatch(ref, replyTo) ⇒
|
||||
ctx.unwatch[Nothing](ref)
|
||||
ctx.unwatch(ref)
|
||||
replyTo ! Unwatched
|
||||
Actor.Same
|
||||
case GetInfo(replyTo) ⇒
|
||||
|
|
|
|||
|
|
@ -21,7 +21,9 @@ object EventStreamSpec {
|
|||
def initialBehavior: Behavior[Logger.Command] =
|
||||
Stateless {
|
||||
case (ctx, Logger.Initialize(es, replyTo)) ⇒
|
||||
replyTo ! ctx.watch(ctx.spawn(Stateless[LogEvent] { (_, ev: LogEvent) ⇒ logged :+= ev }, "logger"))
|
||||
val logger = ctx.spawn(Stateless[LogEvent] { (_, ev: LogEvent) ⇒ logged :+= ev }, "logger")
|
||||
ctx.watch(logger)
|
||||
replyTo ! logger
|
||||
Empty
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue