add more tests + docs
This commit is contained in:
parent
5fd48386e3
commit
05e7463b0d
4 changed files with 165 additions and 46 deletions
|
|
@ -3,9 +3,11 @@
|
|||
*/
|
||||
package akka.actor.typed
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.testkit.EventFilter
|
||||
import akka.testkit.typed.scaladsl.TestProbe
|
||||
|
||||
import scala.concurrent._
|
||||
import akka.testkit.typed.TestKit
|
||||
|
|
@ -36,83 +38,188 @@ class WatchSpec extends TestKit("WordSpec", WatchSpec.config)
|
|||
import WatchSpec._
|
||||
|
||||
"Actor monitoring" must {
|
||||
"get notified of actor termination" in {
|
||||
class WatchSetup {
|
||||
val terminator = systemActor(terminatorBehavior)
|
||||
val receivedTerminationSignal: Promise[ActorRef[Nothing]] = Promise()
|
||||
val watchProbe = TestProbe[Done]()
|
||||
|
||||
val watcher = systemActor(Behaviors.immutable[StartWatching] {
|
||||
case (ctx, StartWatching(watchee)) ⇒
|
||||
ctx.watch(watchee)
|
||||
Behaviors.same
|
||||
}.onSignal {
|
||||
case (_, Terminated(stopped)) ⇒
|
||||
receivedTerminationSignal.success(stopped)
|
||||
Behaviors.stopped
|
||||
})
|
||||
|
||||
val watcher = systemActor(
|
||||
Behaviors.supervise(
|
||||
Behaviors.immutable[StartWatching] {
|
||||
case (ctx, StartWatching(watchee)) ⇒
|
||||
ctx.watch(watchee)
|
||||
watchProbe.ref ! Done
|
||||
Behaviors.same
|
||||
}.onSignal {
|
||||
case (_, Terminated(stopped)) ⇒
|
||||
receivedTerminationSignal.success(stopped)
|
||||
Behaviors.stopped
|
||||
}
|
||||
).onFailure[Throwable](SupervisorStrategy.stop))
|
||||
}
|
||||
"get notified of actor termination" in new WatchSetup {
|
||||
watcher ! StartWatching(terminator)
|
||||
watchProbe.expectMsg(Done)
|
||||
terminator ! Stop
|
||||
|
||||
receivedTerminationSignal.future.futureValue shouldEqual terminator
|
||||
}
|
||||
"allow idempotent invocations of watch" in new WatchSetup {
|
||||
watcher ! StartWatching(terminator)
|
||||
watchProbe.expectMsg(Done)
|
||||
// shouldn't fail when watched twice
|
||||
watcher ! StartWatching(terminator)
|
||||
watchProbe.expectMsg(Done)
|
||||
terminator ! Stop
|
||||
|
||||
receivedTerminationSignal.future.futureValue shouldEqual terminator
|
||||
}
|
||||
|
||||
"get notified of actor termination with a custom message" in {
|
||||
class WatchWithSetup {
|
||||
val terminator = systemActor(terminatorBehavior)
|
||||
val receivedTerminationSignal: Promise[Message] = Promise()
|
||||
val watchProbe = TestProbe[Done]()
|
||||
|
||||
val watcher = systemActor(Behaviors.immutable[Message] {
|
||||
case (ctx, StartWatchingWith(watchee, msg)) ⇒
|
||||
ctx.watchWith(watchee, msg)
|
||||
Behaviors.same
|
||||
case (_, msg) ⇒
|
||||
receivedTerminationSignal.success(msg)
|
||||
Behaviors.stopped
|
||||
})
|
||||
|
||||
val watcher = systemActor(
|
||||
Behaviors.supervise(
|
||||
Behaviors.immutable[Message] {
|
||||
case (ctx, StartWatchingWith(watchee, msg)) ⇒
|
||||
ctx.watchWith(watchee, msg)
|
||||
watchProbe.ref ! Done
|
||||
Behaviors.same
|
||||
case (_, msg) ⇒
|
||||
receivedTerminationSignal.success(msg)
|
||||
Behaviors.stopped
|
||||
}).onFailure[Throwable](SupervisorStrategy.stop)
|
||||
)
|
||||
}
|
||||
"get notified of actor termination with a custom message" in new WatchWithSetup {
|
||||
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
|
||||
watchProbe.expectMsg(Done)
|
||||
terminator ! Stop
|
||||
|
||||
receivedTerminationSignal.future.futureValue shouldEqual CustomTerminationMessage
|
||||
}
|
||||
"allow idempotent invocations of watchWith with matching msgs" in new WatchWithSetup {
|
||||
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
|
||||
watchProbe.expectMsg(Done)
|
||||
// shouldn't fail when watchWith'd twice
|
||||
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
|
||||
watchProbe.expectMsg(Done)
|
||||
terminator ! Stop
|
||||
|
||||
receivedTerminationSignal.future.futureValue shouldEqual CustomTerminationMessage
|
||||
}
|
||||
|
||||
"allow watch message definition after watch using unwatch" in {
|
||||
val terminator = systemActor(terminatorBehavior)
|
||||
val receivedTerminationSignal: Promise[Message] = Promise()
|
||||
val watchProbe = TestProbe[Done]()
|
||||
|
||||
val watcher = systemActor(
|
||||
Behaviors.supervise(
|
||||
Behaviors.immutable[Message] {
|
||||
case (ctx, StartWatching(watchee)) ⇒
|
||||
ctx.watch(watchee)
|
||||
Behaviors.same
|
||||
case (ctx, StartWatchingWith(watchee, msg)) ⇒
|
||||
ctx.unwatch(watchee)
|
||||
ctx.watchWith(watchee, msg)
|
||||
watchProbe.ref ! Done
|
||||
Behaviors.same
|
||||
case (_, msg) ⇒
|
||||
receivedTerminationSignal.success(msg)
|
||||
Behaviors.stopped
|
||||
}).onFailure[Throwable](SupervisorStrategy.stop)
|
||||
)
|
||||
|
||||
watcher ! StartWatching(terminator)
|
||||
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
|
||||
watchProbe.expectMsg(Done)
|
||||
terminator ! Stop
|
||||
|
||||
receivedTerminationSignal.future.futureValue shouldEqual CustomTerminationMessage
|
||||
}
|
||||
|
||||
"allow watch message redefinition using unwatch" in {
|
||||
val terminator = systemActor(terminatorBehavior)
|
||||
val receivedTerminationSignal: Promise[Message] = Promise()
|
||||
val watchProbe = TestProbe[Done]()
|
||||
|
||||
val watcher = systemActor(
|
||||
Behaviors.supervise(
|
||||
Behaviors.immutable[Message] {
|
||||
case (ctx, StartWatchingWith(watchee, msg)) ⇒
|
||||
ctx.unwatch(watchee)
|
||||
ctx.watchWith(watchee, msg)
|
||||
watchProbe.ref ! Done
|
||||
Behaviors.same
|
||||
case (_, msg) ⇒
|
||||
receivedTerminationSignal.success(msg)
|
||||
Behaviors.stopped
|
||||
}).onFailure[Throwable](SupervisorStrategy.stop)
|
||||
)
|
||||
|
||||
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
|
||||
watcher ! StartWatchingWith(terminator, CustomTerminationMessage2)
|
||||
watchProbe.expectMsg(Done)
|
||||
terminator ! Stop
|
||||
|
||||
receivedTerminationSignal.future.futureValue shouldEqual CustomTerminationMessage2
|
||||
}
|
||||
|
||||
class ErrorTestSetup {
|
||||
val terminator = systemActor(terminatorBehavior)
|
||||
private val stopProbe = TestProbe[Done]()
|
||||
def expectStopped(): Unit = stopProbe.expectMsg(Done)
|
||||
|
||||
val watcher = systemActor(Behaviors.immutable[Message] {
|
||||
case (ctx, StartWatchingWith(watchee, msg)) ⇒
|
||||
ctx.watchWith(watchee, msg)
|
||||
Behaviors.same
|
||||
case (ctx, StartWatching(watchee)) ⇒
|
||||
ctx.watch(watchee)
|
||||
Behaviors.same
|
||||
case (_, msg) ⇒
|
||||
Behaviors.stopped
|
||||
})
|
||||
val watcher = systemActor(
|
||||
Behaviors.supervise(
|
||||
Behaviors.immutable[Message] {
|
||||
case (ctx, StartWatchingWith(watchee, msg)) ⇒
|
||||
ctx.watchWith(watchee, msg)
|
||||
Behaviors.same
|
||||
case (ctx, StartWatching(watchee)) ⇒
|
||||
ctx.watch(watchee)
|
||||
Behaviors.same
|
||||
case (_, msg) ⇒
|
||||
Behaviors.stopped
|
||||
}.onSignal {
|
||||
case (_, PostStop) ⇒
|
||||
stopProbe.ref ! Done
|
||||
Behaviors.stopped
|
||||
}
|
||||
).onFailure[Throwable](SupervisorStrategy.stop)
|
||||
)
|
||||
}
|
||||
"warn when watch is used after watchWith on same subject" in new ErrorTestSetup {
|
||||
|
||||
"fail when watch is used after watchWith on same subject" in new ErrorTestSetup {
|
||||
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
|
||||
|
||||
EventFilter[IllegalStateException](pattern = ".*termination message was not overwritten.*", occurrences = 1) intercept {
|
||||
watcher ! StartWatching(terminator)
|
||||
}
|
||||
terminator ! Stop
|
||||
// supervisor should have stopped the actor
|
||||
expectStopped()
|
||||
}
|
||||
|
||||
"warn when watchWitch is used after watchWith with different termination message" in new ErrorTestSetup {
|
||||
"fail when watchWitch is used after watchWith with different termination message" in new ErrorTestSetup {
|
||||
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
|
||||
|
||||
EventFilter[IllegalStateException](pattern = ".*termination message was not overwritten.*", occurrences = 1) intercept {
|
||||
watcher ! StartWatchingWith(terminator, CustomTerminationMessage2)
|
||||
}
|
||||
terminator ! Stop
|
||||
// supervisor should have stopped the actor
|
||||
expectStopped()
|
||||
}
|
||||
"warn when watchWith is used after watch on same subject" in new ErrorTestSetup {
|
||||
"fail when watchWith is used after watch on same subject" in new ErrorTestSetup {
|
||||
watcher ! StartWatching(terminator)
|
||||
|
||||
EventFilter[IllegalStateException](pattern = ".*termination message was not overwritten.*", occurrences = 1) intercept {
|
||||
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
|
||||
}
|
||||
terminator ! Stop
|
||||
// supervisor should have stopped the actor
|
||||
expectStopped()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -108,8 +108,10 @@ trait ActorContext[T] {
|
|||
* is on a node that has been removed from the cluster when using akka-cluster
|
||||
* or has been marked unreachable when using akka-remote directly.
|
||||
*
|
||||
* Will fail with an [[IllegalStateException]] if the same subject was watched before using `watchWith`.
|
||||
* To change the termination message, unwatch first.
|
||||
* `watch` is idempotent if it is not mixed with `watchWith`.
|
||||
*
|
||||
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watchWith`.
|
||||
* To clear the termination message, unwatch first.
|
||||
*/
|
||||
def watch[U](other: ActorRef[U]): Unit
|
||||
|
||||
|
|
@ -119,7 +121,9 @@ trait ActorContext[T] {
|
|||
* is on a node that has been removed from the cluster when using akka-cluster
|
||||
* or has been marked unreachable when using akka-remote directly.
|
||||
*
|
||||
* Will fail with an [[IllegalStateException]] if the same subject was watched before using `watch` or `watchWith` with
|
||||
* `watchWith` is idempotent if it is called with the same `msg` and not mixed with `watch`.
|
||||
*
|
||||
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watch` or `watchWith` with
|
||||
* another termination message. To change the termination message, unwatch first.
|
||||
*/
|
||||
def watchWith[U](other: ActorRef[U], msg: T): Unit
|
||||
|
|
|
|||
|
|
@ -94,8 +94,10 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒
|
|||
* is on a node that has been removed from the cluster when using akka-cluster
|
||||
* or has been marked unreachable when using akka-remote directly
|
||||
*
|
||||
* Will fail with an [[IllegalStateException]] if the same subject was watched before using `watchWith`.
|
||||
* To change the termination message, unwatch first.
|
||||
* `watch` is idempotent if it is not mixed with `watchWith`.
|
||||
*
|
||||
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watchWith`.
|
||||
* To clear the termination message, unwatch first.
|
||||
*/
|
||||
def watch[U](other: ActorRef[U]): Unit
|
||||
|
||||
|
|
@ -105,7 +107,9 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒
|
|||
* is on a node that has been removed from the cluster when using akka-cluster
|
||||
* or has been marked unreachable when using akka-remote directly.
|
||||
*
|
||||
* Will fail with an [[IllegalStateException]] if the same subject was watched before using `watch` or `watchWith` with
|
||||
* `watchWith` is idempotent if it is called with the same `msg` and not mixed with `watch`.
|
||||
*
|
||||
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watch` or `watchWith` with
|
||||
* another termination message. To change the termination message, unwatch first.
|
||||
*/
|
||||
def watchWith[U](other: ActorRef[U], msg: T): Unit
|
||||
|
|
|
|||
|
|
@ -149,8 +149,10 @@ trait ActorContext extends ActorRefFactory {
|
|||
* This actor will receive a Terminated(subject) message when watched
|
||||
* actor is terminated.
|
||||
*
|
||||
* Will fail with an [[IllegalStateException]] if the same subject was watched before using `watchWith`.
|
||||
* To change the termination message, unwatch first.
|
||||
* `watch` is idempotent if it is not mixed with `watchWith`.
|
||||
*
|
||||
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watchWith`.
|
||||
* To clear the termination message, unwatch first.
|
||||
*
|
||||
* @return the provided ActorRef
|
||||
*/
|
||||
|
|
@ -161,7 +163,9 @@ trait ActorContext extends ActorRefFactory {
|
|||
* This actor will receive the specified message when watched
|
||||
* actor is terminated.
|
||||
*
|
||||
* Will fail with an [[IllegalStateException]] if the same subject was watched before using `watch` or `watchWith` with
|
||||
* `watchWith` is idempotent if it is called with the same `msg` and not mixed with `watch`.
|
||||
*
|
||||
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watch` or `watchWith` with
|
||||
* another termination message. To change the termination message, unwatch first.
|
||||
*
|
||||
* @return the provided ActorRef
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue