Issue 24490: Refactoring of ActorContextSpec (#24644)
* Issue 24490: Removed unused method * Issue 24490: Introducing NormalActorContextSpec * Issue 24490: Implemented 'restart and stop a child actor' case * Issue 24490: Simplified watch in 'restart and stop a child actor' case * Issue 24490: Implemented 'stop a child actor' case * Issue 24490: Implemented 'reset behavior upon restart' case * Issue 24490: Implemented 'reset behavior upon resume' case * Issue 24490: Implemented 'stop upon Stop' case * Issue 24490: Implemented 'not stop non-child actor' case * Issue 24490: Implemented 'watch a child actor before its termination' case * Issue 24490: Implemented 'watch a child actor after its termination' case * Issue 24490: Implemented 'unwatch a child actor before its termination' case * Issue 24490: Implemented 'terminate upon not handling Terminated' case * Issue 24490: Implemented 'return the right context info' case * Issue 24490: Implemented 'return right info about children' case * Issue 24490: Implemented 'set small receive timeout' case * Issue 24490: Implemented 'set large receive timeout' case * Issue 24490: Implemented 'schedule a message' case * Issue 24490: Implemented 'not allow null messages' case * Issue 24490: Implemented 'create a named adapter' case * Issue 24490: Implemented 'not have problems stopping already stopped child' case * Issue 24490: Introducing decoration. StepWise removed. * Issue 24490: Isolated messages of this spec to avoid accidental usage in other specs * Issue 24594: Fixed all review comments * Issue 24594: Renamed message to create for readability * Issue 24594: Fixed remaining review comments, removed unused import
This commit is contained in:
parent
a113ca7379
commit
2285454956
2 changed files with 483 additions and 964 deletions
File diff suppressed because it is too large
Load diff
|
|
@ -1,216 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2014-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import java.util.concurrent.TimeoutException
|
||||
|
||||
import akka.actor.typed.scaladsl.Behaviors._
|
||||
|
||||
import scala.concurrent.duration.Deadline
|
||||
|
||||
/**
|
||||
* This object contains tools for building step-wise behaviors for formulating
|
||||
* a linearly progressing protocol.
|
||||
*
|
||||
* Example:
|
||||
* {{{
|
||||
* import scala.concurrent.duration._
|
||||
*
|
||||
* StepWise[Command] { (ctx, startWith) =>
|
||||
* startWith {
|
||||
* val child = ctx.spawn(...)
|
||||
* child ! msg
|
||||
* child
|
||||
* }.expectMessage(100.millis) { (reply, child) =>
|
||||
* target ! GotReply(reply)
|
||||
* }
|
||||
* }
|
||||
* }}}
|
||||
*
|
||||
* State can be passed from one step to the next by returning it as is
|
||||
* demonstrated with the `child` ActorRef in the example.
|
||||
*
|
||||
* This way of writing Actors can be very useful when writing Actor-based
|
||||
* test procedures for actor systems, hence also the possibility to expect
|
||||
* failures (see [[StepWise.Steps#expectFailure]]).
|
||||
*/
|
||||
@deprecated("to be replaced by process DSL", "2.4-M2")
|
||||
object StepWise {
|
||||
|
||||
sealed trait AST
|
||||
private final case class Thunk(f: () ⇒ Any) extends AST
|
||||
private final case class ThunkV(f: Any ⇒ Any) extends AST
|
||||
private final case class Message(timeout: FiniteDuration, f: (Any, Any) ⇒ Any, trace: Trace) extends AST
|
||||
private final case class MultiMessage(timeout: FiniteDuration, count: Int, f: (Seq[Any], Any) ⇒ Any, trace: Trace) extends AST
|
||||
private final case class Termination(timeout: FiniteDuration, f: (Terminated, Any) ⇒ Any, trace: Trace) extends AST
|
||||
private final case class Multi(timeout: FiniteDuration, count: Int, f: (Seq[Either[Signal, Any]], Any) ⇒ Any, trace: Trace) extends AST
|
||||
|
||||
private case object ReceiveTimeout
|
||||
|
||||
private sealed trait Trace {
|
||||
def getStackTrace: Array[StackTraceElement]
|
||||
protected def getFrames: Array[StackTraceElement] =
|
||||
Thread.currentThread.getStackTrace.dropWhile { elem ⇒
|
||||
val name = elem.getClassName
|
||||
name.startsWith("java.lang.Thread") || name.startsWith("akka.actor.typed.StepWise")
|
||||
}
|
||||
}
|
||||
private class WithTrace extends Trace {
|
||||
private val trace = getFrames
|
||||
def getStackTrace = trace
|
||||
}
|
||||
private object WithoutTrace extends Trace {
|
||||
def getStackTrace = getFrames
|
||||
}
|
||||
|
||||
final case class Steps[T, U](ops: List[AST], keepTraces: Boolean) {
|
||||
private def getTrace(): Trace =
|
||||
if (keepTraces) new WithTrace
|
||||
else WithoutTrace
|
||||
|
||||
def apply[V](thunk: U ⇒ V): Steps[T, V] =
|
||||
copy(ops = ThunkV(thunk.asInstanceOf[Any ⇒ Any]) :: ops)
|
||||
|
||||
def keep(thunk: U ⇒ Unit): Steps[T, U] =
|
||||
copy(ops = ThunkV(value ⇒ { thunk.asInstanceOf[Any ⇒ Any](value); value }) :: ops)
|
||||
|
||||
def expectMessage[V](timeout: FiniteDuration)(f: (T, U) ⇒ V): Steps[T, V] =
|
||||
copy(ops = Message(timeout, f.asInstanceOf[(Any, Any) ⇒ Any], getTrace()) :: ops)
|
||||
|
||||
def expectMultipleMessages[V](timeout: FiniteDuration, count: Int)(f: (Seq[T], U) ⇒ V): Steps[T, V] =
|
||||
copy(ops = MultiMessage(timeout, count, f.asInstanceOf[(Seq[Any], Any) ⇒ Any], getTrace()) :: ops)
|
||||
|
||||
def expectTermination[V](timeout: FiniteDuration)(f: (Terminated, U) ⇒ V): Steps[T, V] =
|
||||
copy(ops = Termination(timeout, f.asInstanceOf[(Terminated, Any) ⇒ Any], getTrace()) :: ops)
|
||||
|
||||
def expectMulti[V](timeout: FiniteDuration, count: Int)(f: (Seq[Either[Signal, T]], U) ⇒ V): Steps[T, V] =
|
||||
copy(ops = Multi(timeout, count, f.asInstanceOf[(Seq[Either[Signal, Any]], Any) ⇒ Any], getTrace()) :: ops)
|
||||
|
||||
def expectMessageKeep(timeout: FiniteDuration)(f: (T, U) ⇒ Unit): Steps[T, U] =
|
||||
copy(ops = Message(timeout, (msg, value) ⇒ { f.asInstanceOf[(Any, Any) ⇒ Any](msg, value); value }, getTrace()) :: ops)
|
||||
|
||||
def expectMultipleMessagesKeep(timeout: FiniteDuration, count: Int)(f: (Seq[T], U) ⇒ Unit): Steps[T, U] =
|
||||
copy(ops = MultiMessage(timeout, count, (msgs, value) ⇒ { f.asInstanceOf[(Seq[Any], Any) ⇒ Any](msgs, value); value }, getTrace()) :: ops)
|
||||
|
||||
def expectTerminationKeep(timeout: FiniteDuration)(f: (Terminated, U) ⇒ Unit): Steps[T, U] =
|
||||
copy(ops = Termination(timeout, (t, value) ⇒ { f.asInstanceOf[(Terminated, Any) ⇒ Any](t, value); value }, getTrace()) :: ops)
|
||||
|
||||
def expectMultiKeep(timeout: FiniteDuration, count: Int)(f: (Seq[Either[Signal, T]], U) ⇒ Unit): Steps[T, U] =
|
||||
copy(ops = Multi(timeout, count, (msgs, value) ⇒ { f.asInstanceOf[(Seq[Either[Signal, Any]], Any) ⇒ Any](msgs, value); value }, getTrace()) :: ops)
|
||||
|
||||
def withKeepTraces(b: Boolean): Steps[T, U] = copy(keepTraces = b)
|
||||
}
|
||||
|
||||
class StartWith[T](keepTraces: Boolean) {
|
||||
def apply[U](thunk: ⇒ U): Steps[T, U] = Steps(Thunk(() ⇒ thunk) :: Nil, keepTraces)
|
||||
def withKeepTraces(b: Boolean): StartWith[T] = new StartWith(b)
|
||||
}
|
||||
|
||||
def apply[T](f: (scaladsl.ActorContext[T], StartWith[T]) ⇒ Steps[T, _]): Behavior[T] =
|
||||
setup[Any] { ctx ⇒
|
||||
run(ctx, f(ctx.asInstanceOf[scaladsl.ActorContext[T]], new StartWith(keepTraces = false)).ops.reverse, ())
|
||||
}.narrow
|
||||
|
||||
private def throwTimeout(trace: Trace, message: String): Nothing =
|
||||
throw new TimeoutException(message) {
|
||||
override def fillInStackTrace(): Throwable = {
|
||||
setStackTrace(trace.getStackTrace)
|
||||
this
|
||||
}
|
||||
}
|
||||
|
||||
private def throwIllegalState(trace: Trace, message: String): Nothing =
|
||||
throw new IllegalStateException(message) {
|
||||
override def fillInStackTrace(): Throwable = {
|
||||
setStackTrace(trace.getStackTrace)
|
||||
this
|
||||
}
|
||||
}
|
||||
|
||||
private def run[T](ctx: scaladsl.ActorContext[Any], ops: List[AST], value: Any): Behavior[Any] =
|
||||
ops match {
|
||||
case Thunk(f) :: tail ⇒ run(ctx, tail, f())
|
||||
case ThunkV(f) :: tail ⇒ run(ctx, tail, f(value))
|
||||
case Message(t, f, trace) :: tail ⇒
|
||||
ctx.setReceiveTimeout(t, ReceiveTimeout)
|
||||
immutable[Any] {
|
||||
case (_, ReceiveTimeout) ⇒ throwTimeout(trace, s"timeout of $t expired while waiting for a message")
|
||||
case (_, msg) ⇒
|
||||
ctx.cancelReceiveTimeout()
|
||||
run(ctx, tail, f(msg, value))
|
||||
} onSignal {
|
||||
case (_, PostStop) ⇒
|
||||
// ignore PostStop here
|
||||
run(ctx, ops, value)
|
||||
case (_, other) ⇒ throwIllegalState(trace, s"unexpected $other while waiting for a message")
|
||||
}
|
||||
case MultiMessage(t, c, f, trace) :: tail ⇒
|
||||
val deadline = Deadline.now + t
|
||||
def behavior(count: Int, acc: List[Any]): Behavior[Any] = {
|
||||
ctx.setReceiveTimeout(deadline.timeLeft, ReceiveTimeout)
|
||||
immutable[Any] {
|
||||
case (_, ReceiveTimeout) ⇒
|
||||
throwTimeout(trace, s"timeout of $t expired while waiting for $c messages (got only $count)")
|
||||
case (_, msg) ⇒
|
||||
val nextCount = count + 1
|
||||
if (nextCount == c) {
|
||||
ctx.cancelReceiveTimeout()
|
||||
run(ctx, tail, f((msg :: acc).reverse, value))
|
||||
} else behavior(nextCount, msg :: acc)
|
||||
} onSignal {
|
||||
case (_, PostStop) ⇒
|
||||
// ignore PostStop here
|
||||
run(ctx, ops, value)
|
||||
case (_, other) ⇒ throwIllegalState(trace, s"unexpected $other while waiting for $c messages (got $count valid ones)")
|
||||
}
|
||||
}
|
||||
behavior(0, Nil)
|
||||
case Multi(t, c, f, trace) :: tail ⇒
|
||||
val deadline = Deadline.now + t
|
||||
def behavior(count: Int, acc: List[Either[Signal, Any]]): Behavior[Any] = {
|
||||
ctx.setReceiveTimeout(deadline.timeLeft, ReceiveTimeout)
|
||||
immutable[Any] {
|
||||
case (_, ReceiveTimeout) ⇒
|
||||
throwTimeout(trace, s"timeout of $t expired while waiting for $c messages (got only $count)")
|
||||
case (_, msg) ⇒
|
||||
val nextCount = count + 1
|
||||
if (nextCount == c) {
|
||||
ctx.cancelReceiveTimeout()
|
||||
run(ctx, tail, f((Right(msg) :: acc).reverse, value))
|
||||
} else behavior(nextCount, Right(msg) :: acc)
|
||||
} onSignal {
|
||||
case (_, PostStop) ⇒
|
||||
// ignore PostStop here
|
||||
run(ctx, ops, value)
|
||||
case (_, other) ⇒
|
||||
val nextCount = count + 1
|
||||
if (nextCount == c) {
|
||||
ctx.cancelReceiveTimeout()
|
||||
run(ctx, tail, f((Left(other) :: acc).reverse, value))
|
||||
} else behavior(nextCount, Left(other) :: acc)
|
||||
}
|
||||
}
|
||||
behavior(0, Nil)
|
||||
case Termination(t, f, trace) :: tail ⇒
|
||||
ctx.setReceiveTimeout(t, ReceiveTimeout)
|
||||
immutable[Any] {
|
||||
case (_, ReceiveTimeout) ⇒ throwTimeout(trace, s"timeout of $t expired while waiting for termination")
|
||||
case other ⇒ throwIllegalState(trace, s"unexpected $other while waiting for termination")
|
||||
} onSignal {
|
||||
case (_, PostStop) ⇒
|
||||
// ignore PostStop here
|
||||
run(ctx, ops, value)
|
||||
case (_, t: Terminated) ⇒
|
||||
ctx.cancelReceiveTimeout()
|
||||
run(ctx, tail, f(t, value))
|
||||
case other ⇒ throwIllegalState(trace, s"unexpected $other while waiting for termination")
|
||||
}
|
||||
case Nil ⇒
|
||||
stopped
|
||||
}
|
||||
}
|
||||
|
||||
abstract class StepWise
|
||||
Loading…
Add table
Add a link
Reference in a new issue