=per #3836 Set lower idleTimeout for persistent channels
- idleTimeout lower than expectation timeout - removed remaining shared testActor usage
This commit is contained in:
parent
48f3804270
commit
1d65a4a4f8
2 changed files with 10 additions and 15 deletions
|
|
@ -13,12 +13,6 @@ import akka.actor._
|
|||
import akka.testkit._
|
||||
|
||||
object ChannelSpec {
|
||||
class TestDestination extends Actor {
|
||||
def receive = {
|
||||
case m: ConfirmablePersistent ⇒ sender ! m
|
||||
}
|
||||
}
|
||||
|
||||
class TestDestinationProcessor(name: String) extends NamedProcessor(name) {
|
||||
def receive = {
|
||||
case cp @ ConfirmablePersistent("a", _, _) ⇒ cp.confirm()
|
||||
|
|
@ -84,15 +78,16 @@ abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with Persist
|
|||
"A channel" must {
|
||||
"must resolve destination references and preserve message order" in {
|
||||
val empty = actorRefFor("testDestination") // will be an EmptyLocalActorRef
|
||||
val destination = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination")
|
||||
val probe = TestProbe()
|
||||
val destination = system.actorOf(Props(classOf[TestReceiver], probe.ref), "testDestination")
|
||||
|
||||
defaultTestChannel ! Deliver(PersistentRepr("a"), empty.path)
|
||||
defaultTestChannel ! Deliver(Persistent("b"), destination.path)
|
||||
defaultTestChannel ! Deliver(Persistent("c"), destination.path)
|
||||
|
||||
expectMsg("a")
|
||||
expectMsg("b")
|
||||
expectMsg("c")
|
||||
probe.expectMsg("a")
|
||||
probe.expectMsg("b")
|
||||
probe.expectMsg("c")
|
||||
}
|
||||
"support processors as destination" in {
|
||||
val destination = system.actorOf(Props(classOf[TestDestinationProcessor], name))
|
||||
|
|
@ -118,14 +113,14 @@ abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with Persist
|
|||
awaitConfirmation(confirmProbe)
|
||||
}
|
||||
"accept confirmable persistent messages for delivery" in {
|
||||
val destination = system.actorOf(Props[TestDestination])
|
||||
val confirmProbe = TestProbe()
|
||||
val destinationProbe = TestProbe()
|
||||
|
||||
subscribeToConfirmation(confirmProbe)
|
||||
|
||||
defaultTestChannel ! Deliver(PersistentRepr("a", confirmable = true), destination.path)
|
||||
defaultTestChannel ! Deliver(PersistentRepr("a", confirmable = true), destinationProbe.ref.path)
|
||||
|
||||
expectMsgPF() { case m @ ConfirmablePersistent("a", _, _) ⇒ m.confirm() }
|
||||
destinationProbe.expectMsgPF() { case m @ ConfirmablePersistent("a", _, _) ⇒ m.confirm() }
|
||||
awaitConfirmation(confirmProbe)
|
||||
}
|
||||
"redeliver on missing confirmation" in {
|
||||
|
|
|
|||
|
|
@ -37,10 +37,10 @@ abstract class PersistentChannelSpec(config: Config) extends ChannelSpec(config)
|
|||
import PersistentChannelSpec._
|
||||
|
||||
private def redeliverChannelSettings(listener: Option[ActorRef]): PersistentChannelSettings =
|
||||
PersistentChannelSettings(redeliverMax = 2, redeliverInterval = 100 milliseconds, redeliverFailureListener = listener)
|
||||
PersistentChannelSettings(redeliverMax = 2, redeliverInterval = 100 milliseconds, redeliverFailureListener = listener, idleTimeout = 5.seconds)
|
||||
|
||||
private def createDefaultTestChannel(name: String): ActorRef =
|
||||
system.actorOf(PersistentChannel.props(s"${name}-default", PersistentChannelSettings()))
|
||||
system.actorOf(PersistentChannel.props(s"${name}-default", PersistentChannelSettings(idleTimeout = 5.seconds)))
|
||||
|
||||
override def createDefaultTestChannel(): ActorRef =
|
||||
createDefaultTestChannel(name)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue