parent
a9d1a7f8b9
commit
aa2493c475
2 changed files with 54 additions and 1 deletions
|
|
@ -6,17 +6,55 @@ package akka.actor
|
|||
|
||||
import language.postfixOps
|
||||
import akka.testkit._
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import scala.concurrent.Await
|
||||
import java.util.concurrent.TimeoutException
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
object ReceiveTimeoutSpec {
|
||||
case object Tick
|
||||
case object TransparentTick extends NotInfluenceReceiveTimeout
|
||||
|
||||
class RestartingParent(probe: ActorRef) extends Actor {
|
||||
val restarting = new AtomicBoolean(false)
|
||||
val child = context.actorOf(Props(new RestartingChild(probe, restarting)))
|
||||
def receive = {
|
||||
case msg =>
|
||||
child.forward(msg)
|
||||
}
|
||||
}
|
||||
class RestartingChild(probe: ActorRef, restarting: AtomicBoolean) extends Actor {
|
||||
|
||||
override def preStart(): Unit = {
|
||||
if (restarting.get) {
|
||||
probe ! "restarting"
|
||||
context.setReceiveTimeout(500.millis)
|
||||
} else {
|
||||
probe ! "starting"
|
||||
}
|
||||
}
|
||||
|
||||
class ReceiveTimeoutSpec extends AkkaSpec {
|
||||
override def postStop(): Unit = {
|
||||
probe ! "stopping"
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case "crash" =>
|
||||
restarting.set(true)
|
||||
probe ! "crashing"
|
||||
throw TestException("boom bang")
|
||||
case ReceiveTimeout =>
|
||||
probe ! ReceiveTimeout
|
||||
case other =>
|
||||
probe ! other
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ReceiveTimeoutSpec extends AkkaSpec() {
|
||||
import ReceiveTimeoutSpec._
|
||||
|
||||
"An actor with receive timeout" must {
|
||||
|
|
@ -237,5 +275,19 @@ class ReceiveTimeoutSpec extends AkkaSpec {
|
|||
probe.expectMsgType[ReceiveTimeout]
|
||||
system.stop(timeoutActor)
|
||||
}
|
||||
|
||||
// #28266 reproducer
|
||||
"get the timeout when scheduled immedately on restart" in {
|
||||
val probe = TestProbe()
|
||||
val ref = system.actorOf(Props(new RestartingParent(probe.ref)))
|
||||
probe.expectMsg("starting")
|
||||
EventFilter.error("boom bang", occurrences = 1).intercept {
|
||||
ref ! "crash"
|
||||
}
|
||||
probe.expectMsg("crashing")
|
||||
probe.expectMsg("stopping")
|
||||
probe.expectMsg("restarting")
|
||||
probe.expectMsg(ReceiveTimeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -244,6 +244,7 @@ private[akka] trait FaultHandling { this: ActorCell =>
|
|||
if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
|
||||
|
||||
freshActor.aroundPostRestart(cause)
|
||||
checkReceiveTimeout() // user may have set a receive timeout in preStart which is called from postRestart
|
||||
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
|
||||
|
||||
// only after parent is up and running again do restart the children which were not stopped
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue