Merge pull request #21406 from akka/wip-21371-prio-patriknw

No ack delivery for prio messages, #21371
This commit is contained in:
Patrik Nordwall 2016-09-09 15:41:54 +02:00 committed by GitHub
commit d8bb0ef476
13 changed files with 232 additions and 67 deletions

View file

@ -82,7 +82,7 @@ abstract class AeronStreamConsistencySpec
}
val streamId = 1
val giveUpSendAfter = 30.seconds
val giveUpMessageAfter = 30.seconds
override def afterAll(): Unit = {
taskRunner.stop()
@ -98,7 +98,7 @@ abstract class AeronStreamConsistencySpec
runOn(second) {
// just echo back
Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
}
enterBarrier("echo-started")
}
@ -139,7 +139,7 @@ abstract class AeronStreamConsistencySpec
envelope
}
.throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping)
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
started.expectMsg(Done)
}
@ -151,7 +151,7 @@ abstract class AeronStreamConsistencySpec
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
Await.ready(done, 20.seconds)
killSwitch.shutdown()

View file

@ -116,7 +116,7 @@ abstract class AeronStreamLatencySpec
}
val streamId = 1
val giveUpSendAfter = 30.seconds
val giveUpMessageAfter = 30.seconds
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def reporter(name: String): TestRateReporter = {
@ -247,7 +247,7 @@ abstract class AeronStreamLatencySpec
envelope
}
.throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping)
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
started.expectMsg(Done)
}
@ -266,7 +266,7 @@ abstract class AeronStreamLatencySpec
val queueValue = Source.fromGraph(new SendQueue[Unit])
.via(sendFlow)
.to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
.run()
val queue = new ManyToOneConcurrentArrayQueue[Unit](1024)
@ -316,7 +316,7 @@ abstract class AeronStreamLatencySpec
runOn(second) {
// just echo back
Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
}
enterBarrier("echo-started")
}

View file

@ -116,7 +116,7 @@ abstract class AeronStreamMaxThroughputSpec
}
val streamId = 1
val giveUpSendAfter = 30.seconds
val giveUpMessageAfter = 30.seconds
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def reporter(name: String): TestRateReporter = {
@ -215,7 +215,7 @@ abstract class AeronStreamMaxThroughputSpec
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
printStats("sender")
enterBarrier(testName + "-done")

View file

@ -0,0 +1,113 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor._
import akka.actor.ActorIdentity
import akka.actor.Identify
import akka.remote.RARP
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.remote.QuarantinedEvent
object SurviveNetworkPartitionSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("""
akka.loglevel = INFO
akka.remote.artery.enabled = on
akka.remote.artery.advanced.give-up-system-message-after = 4s
""")))
testTransport(on = true)
}
class SurviveNetworkPartitionSpecMultiJvmNode1 extends SurviveNetworkPartitionSpec
class SurviveNetworkPartitionSpecMultiJvmNode2 extends SurviveNetworkPartitionSpec
abstract class SurviveNetworkPartitionSpec
extends MultiNodeSpec(SurviveNetworkPartitionSpec)
with STMultiNodeSpec with ImplicitSender {
import SurviveNetworkPartitionSpec._
override def initialParticipants = roles.size
"Network partition" must {
"not quarantine system when it heals within 'give-up-system-message-after'" taggedAs LongRunningTest in {
runOn(second) {
system.actorOf(TestActors.echoActorProps, "echo1")
}
enterBarrier("echo-started")
runOn(first) {
system.actorSelection(node(second) / "user" / "echo1") ! Identify(None)
val ref = expectMsgType[ActorIdentity].ref.get
ref ! "ping1"
expectMsg("ping1")
// network partition
testConductor.blackhole(first, second, Direction.Both).await
// send system message during network partition
watch(ref)
// keep the network partition for a while, but shorter than give-up-system-message-after
expectNoMsg(RARP(system).provider.remoteSettings.Artery.Advanced.GiveUpSystemMessageAfter - 2.second)
// heal the network partition
testConductor.passThrough(first, second, Direction.Both).await
// not quarantined
ref ! "ping2"
expectMsg("ping2")
ref ! PoisonPill
expectTerminated(ref)
}
enterBarrier("done")
}
"quarantine system when it doesn't heal within 'give-up-system-message-after'" taggedAs LongRunningTest in {
runOn(second) {
system.actorOf(TestActors.echoActorProps, "echo2")
}
enterBarrier("echo-started")
runOn(first) {
val qProbe = TestProbe()
system.eventStream.subscribe(qProbe.ref, classOf[QuarantinedEvent])
system.actorSelection(node(second) / "user" / "echo2") ! Identify(None)
val ref = expectMsgType[ActorIdentity].ref.get
ref ! "ping1"
expectMsg("ping1")
// network partition
testConductor.blackhole(first, second, Direction.Both).await
// send system message during network partition
watch(ref)
// keep the network partition for a while, longer than give-up-system-message-after
expectNoMsg(RARP(system).provider.remoteSettings.Artery.Advanced.GiveUpSystemMessageAfter - 1.second)
qProbe.expectMsgType[QuarantinedEvent](5.seconds).address should ===(node(second).address)
expectTerminated(ref)
}
enterBarrier("done")
}
}
}

View file

@ -246,7 +246,12 @@ akka {
inject-handshake-interval = 1 second
# messages that are not accepted by Aeron are dropped after retrying for this period
give-up-send-after = 60 seconds
give-up-message-after = 60 seconds
# System messages that are not acknowledged after re-sending for this period are
# dropped and will trigger quarantine. The value should be longer than the length
# of a network partition that you need to survive.
give-up-system-message-after = 6 hours
# during ActorSystem termination the remoting will wait this long for
# an acknowledgment by the destination system that flushing of outstanding

View file

@ -20,6 +20,7 @@ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.remote.artery.ArteryTransport
import akka.util.OptionVal
import akka.remote.artery.OutboundEnvelope
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
/**
* INTERNAL API
@ -98,14 +99,19 @@ private[akka] object RemoteActorRefProvider {
// the dead letter status
if (seqOpt.isEmpty) super.!(DeadLetter(m, senderOption.getOrElse(_provider.deadLetters), recipient))
case env: OutboundEnvelope
super.!(DeadLetter(env.message, env.sender.getOrElse(_provider.deadLetters),
super.!(DeadLetter(unwrapSystemMessageEnvelope(env.message), env.sender.getOrElse(_provider.deadLetters),
env.recipient.getOrElse(_provider.deadLetters)))
case DeadLetter(env: OutboundEnvelope, _, _)
super.!(DeadLetter(env.message, env.sender.getOrElse(_provider.deadLetters),
super.!(DeadLetter(unwrapSystemMessageEnvelope(env.message), env.sender.getOrElse(_provider.deadLetters),
env.recipient.getOrElse(_provider.deadLetters)))
case _ super.!(message)(sender)
}
private def unwrapSystemMessageEnvelope(msg: AnyRef): AnyRef = msg match {
case SystemMessageEnvelope(m, _, _) m
case _ msg
}
@throws(classOf[java.io.ObjectStreamException])
override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
}

View file

@ -3,6 +3,7 @@
*/
package akka.remote.artery
import akka.util.PrettyDuration.PrettyPrintableDuration
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
@ -28,7 +29,7 @@ import org.agrona.hints.ThreadHints
object AeronSink {
final class GaveUpSendingException(msg: String) extends RuntimeException(msg) with NoStackTrace
final class GaveUpMessageException(msg: String) extends RuntimeException(msg) with NoStackTrace
final class PublicationClosedException(msg: String) extends RuntimeException(msg) with NoStackTrace
@ -75,13 +76,13 @@ object AeronSink {
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
*/
class AeronSink(
channel: String,
streamId: Int,
aeron: Aeron,
taskRunner: TaskRunner,
pool: EnvelopeBufferPool,
giveUpSendAfter: Duration,
flightRecorder: EventSink)
channel: String,
streamId: Int,
aeron: Aeron,
taskRunner: TaskRunner,
pool: EnvelopeBufferPool,
giveUpAfter: Duration,
flightRecorder: EventSink)
extends GraphStageWithMaterializedValue[SinkShape[EnvelopeBuffer], Future[Done]] {
import AeronSink._
import TaskRunner._
@ -104,7 +105,7 @@ class AeronSink(
private var backoffCount = spinning
private var lastMsgSize = 0
private val offerTask = new OfferTask(pub, null, lastMsgSize, getAsyncCallback(_ taskOnOfferSuccess()),
giveUpSendAfter, getAsyncCallback(_ onGiveUp()), getAsyncCallback(_ onPublicationClosed()))
giveUpAfter, getAsyncCallback(_ onGiveUp()), getAsyncCallback(_ onPublicationClosed()))
private val addOfferTask: Add = Add(offerTask)
private var offerTaskInProgress = false
@ -191,7 +192,7 @@ class AeronSink(
private def onGiveUp(): Unit = {
offerTaskInProgress = false
val cause = new GaveUpSendingException(s"Gave up sending message to $channel after $giveUpSendAfter.")
val cause = new GaveUpMessageException(s"Gave up sending message to $channel after ${giveUpAfter.pretty}.")
flightRecorder.alert(AeronSink_GaveUpEnvelope, cause.getMessage.getBytes("US-ASCII"))
completedValue = Failure(cause)
failStage(cause)

View file

@ -89,8 +89,10 @@ private[akka] final class ArterySettings private (config: Config) {
interval > Duration.Zero, "handshake-retry-interval must be more than zero")
val InjectHandshakeInterval = config.getMillisDuration("inject-handshake-interval").requiring(interval
interval > Duration.Zero, "inject-handshake-interval must be more than zero")
val GiveUpSendAfter = config.getMillisDuration("give-up-send-after").requiring(interval
interval > Duration.Zero, "give-up-send-after must be more than zero")
val GiveUpMessageAfter = config.getMillisDuration("give-up-message-after").requiring(interval
interval > Duration.Zero, "give-up-message-after must be more than zero")
val GiveUpSystemMessageAfter = config.getMillisDuration("give-up-system-message-after").requiring(interval
interval > Duration.Zero, "give-up-system-message-after must be more than zero")
val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout").requiring(interval
interval > Duration.Zero, "shutdown-flush-timeout must be more than zero")
val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout").requiring(interval

View file

@ -878,8 +878,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
aeronSink(outboundContext, ordinaryStreamId)
private def aeronSink(outboundContext: OutboundContext, streamId: Int): Sink[EnvelopeBuffer, Future[Done]] = {
val giveUpAfter =
if (streamId == controlStreamId) settings.Advanced.GiveUpSystemMessageAfter
else settings.Advanced.GiveUpMessageAfter
Sink.fromGraph(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner,
envelopeBufferPool, settings.Advanced.GiveUpSendAfter, createFlightRecorderEventSink()))
envelopeBufferPool, giveUpAfter, createFlightRecorderEventSink()))
}
def outboundLane(outboundContext: OutboundContext): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] =

View file

@ -24,7 +24,7 @@ import akka.event.Logging
import akka.remote._
import akka.remote.DaemonMsgCreate
import akka.remote.QuarantinedEvent
import akka.remote.artery.AeronSink.GaveUpSendingException
import akka.remote.artery.AeronSink.GaveUpMessageException
import akka.remote.artery.Encoder.ChangeOutboundCompression
import akka.remote.artery.Encoder.ChangeOutboundCompressionFailed
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
@ -287,12 +287,18 @@ private[remote] class Association(
// allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) {
message match {
case _: SystemMessage | ClearSystemMessageDelivery | _: ControlMessage
case _: SystemMessage
val outboundEnvelope = createOutboundEnvelope()
if (!controlQueue.offer(createOutboundEnvelope())) {
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
}
case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | ClearSystemMessageDelivery
// ActorSelectionMessage with PriorityMessage is used by cluster and remote failure detector heartbeating
val outboundEnvelope = createOutboundEnvelope()
if (!controlQueue.offer(createOutboundEnvelope())) {
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
}
case _: DaemonMsgCreate
// DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because
// remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages.
@ -583,7 +589,7 @@ private[remote] class Association(
// don't restart after shutdown, but log some details so we notice
log.error(cause, s"{} to {} failed after shutdown. {}", streamName, remoteAddress, cause.getMessage)
case _: AbruptTerminationException // ActorSystem shutdown
case cause: GaveUpSendingException
case cause: GaveUpMessageException
log.debug("{} to {} failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage)
// restart unconditionally, without counting restarts
lazyRestart()

View file

@ -3,6 +3,7 @@
*/
package akka.remote.artery
import akka.util.PrettyDuration.PrettyPrintableDuration
import java.util.ArrayDeque
import scala.annotation.tailrec
import scala.concurrent.duration._
@ -23,6 +24,10 @@ import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic
import akka.remote.artery.OutboundHandshake.HandshakeReq
import akka.actor.ActorRef
import akka.remote.PriorityMessage
import akka.actor.ActorSelectionMessage
import akka.dispatch.sysmsg.SystemMessage
import scala.util.control.NoStackTrace
/**
* INTERNAL API
@ -35,7 +40,14 @@ private[akka] object SystemMessageDelivery {
final case object ClearSystemMessageDelivery
final class GaveUpSystemMessageException(msg: String) extends RuntimeException(msg) with NoStackTrace
private case object ResendTick
// If other message types than SystemMesage need acked delivery they can extend this trait.
// Used in tests since real SystemMessage are somewhat cumbersome to create.
trait AckedDeliveryMessage
}
/**
@ -64,6 +76,9 @@ private[akka] class SystemMessageDelivery(
private var resendingFromSeqNo = -1L
private var stopping = false
private val giveUpAfterNanos = outboundContext.settings.Advanced.GiveUpSystemMessageAfter.toNanos
private var ackTimestamp = System.nanoTime()
private def localAddress = outboundContext.localAddress
private def remoteAddress = outboundContext.remoteAddress
@ -102,13 +117,13 @@ private[akka] class SystemMessageDelivery(
override protected def onTimer(timerKey: Any): Unit =
timerKey match {
case ResendTick
checkGiveUp()
if (resending.isEmpty && !unacknowledged.isEmpty) {
resending = unacknowledged.clone()
tryResend()
}
if (!unacknowledged.isEmpty)
scheduleOnce(ResendTick, resendInterval)
// FIXME give up resending after a long while, i.e. config property quarantine-after-silence
}
// ControlMessageObserver, external call
@ -134,6 +149,7 @@ private[akka] class SystemMessageDelivery(
}
private def ack(n: Long): Unit = {
ackTimestamp = System.nanoTime()
if (n <= seqNo)
clearUnacknowledged(n)
}
@ -166,24 +182,13 @@ private[akka] class SystemMessageDelivery(
override def onPush(): Unit = {
val outboundEnvelope = grab(in)
outboundEnvelope.message match {
case _: HandshakeReq
// pass on HandshakeReq
if (isAvailable(out))
pushCopy(outboundEnvelope)
case ClearSystemMessageDelivery
clear()
pull(in)
case _: ControlMessage
// e.g. ActorSystemTerminating, no need for acked delivery
if (resending.isEmpty && isAvailable(out))
pushCopy(outboundEnvelope)
else {
resending.offer(outboundEnvelope)
tryResend()
}
case msg
case msg @ (_: SystemMessage | _: AckedDeliveryMessage)
if (unacknowledged.size < maxBufferSize) {
seqNo += 1
if (unacknowledged.isEmpty)
ackTimestamp = System.nanoTime()
else
checkGiveUp()
val sendEnvelope = outboundEnvelope.withMessage(SystemMessageEnvelope(msg, seqNo, localAddress))
unacknowledged.offer(sendEnvelope)
scheduleOnce(ResendTick, resendInterval)
@ -199,9 +204,31 @@ private[akka] class SystemMessageDelivery(
deadLetters ! outboundEnvelope
pull(in)
}
case _: HandshakeReq
// pass on HandshakeReq
if (isAvailable(out))
pushCopy(outboundEnvelope)
case ClearSystemMessageDelivery
clear()
pull(in)
case _
// e.g. ActorSystemTerminating or ActorSelectionMessage with PriorityMessage, no need for acked delivery
if (resending.isEmpty && isAvailable(out))
push(out, outboundEnvelope)
else {
resending.offer(outboundEnvelope)
tryResend()
}
}
}
private def checkGiveUp(): Unit = {
if (!unacknowledged.isEmpty && (System.nanoTime() - ackTimestamp > giveUpAfterNanos))
throw new GaveUpSystemMessageException(
s"Gave up sending system message to [${outboundContext.remoteAddress}] after " +
s"${outboundContext.settings.Advanced.GiveUpSystemMessageAfter.pretty}.")
}
private def clear(): Unit = {
sendUnacknowledgedToDeadLetters()
seqNo = 0L // sequence number for the first message will be 1

View file

@ -10,7 +10,7 @@ import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.ExtendedActorSystem
import akka.remote.artery.AeronSink.GaveUpSendingException
import akka.remote.artery.AeronSink.GaveUpMessageException
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Sink
@ -75,7 +75,7 @@ class AeronSinkSpec extends AkkaSpec with ImplicitSender {
.runWith(new AeronSink(channel, 1, aeron, taskRunner, pool, 500.millis, IgnoreEventSink))
// without the give up timeout the stream would not complete/fail
intercept[GaveUpSendingException] {
intercept[GaveUpMessageException] {
Await.result(done, 5.seconds)
}
}

View file

@ -45,6 +45,8 @@ object SystemMessageDeliverySpec {
akka.actor.serialize-messages = off
""")
case class TestSysMsg(s: String) extends SystemMessageDelivery.AckedDeliveryMessage
}
class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.config) with ImplicitSender {
@ -68,7 +70,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[OutboundEnvelope, NotUsed] = {
val deadLetters = TestProbe().ref
Source(1 to sendCount)
.map(n outboundEnvelopePool.acquire().init(OptionVal.None, "msg-" + n, OptionVal.None))
.map(n outboundEnvelopePool.acquire().init(OptionVal.None, TestSysMsg("msg-" + n), OptionVal.None))
.via(new SystemMessageDelivery(outboundContext, deadLetters, resendInterval, maxBufferSize = 1000))
}
@ -159,12 +161,12 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
val sink = send(sendCount = 5, resendInterval = 60.seconds, outboundContextA)
.via(drop(dropSeqNumbers = Vector(3L, 4L)))
.via(inbound(inboundContextB))
.map(_.message.asInstanceOf[String])
.map(_.message.asInstanceOf[TestSysMsg])
.runWith(TestSink.probe)
sink.request(100)
sink.expectNext("msg-1")
sink.expectNext("msg-2")
sink.expectNext(TestSysMsg("msg-1"))
sink.expectNext(TestSysMsg("msg-2"))
replyProbe.expectMsg(Ack(1L, addressB))
replyProbe.expectMsg(Ack(2L, addressB))
// 3 and 4 was dropped
@ -172,11 +174,11 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
sink.expectNoMsg(100.millis) // 3 was dropped
inboundContextB.deliverLastReply()
// resending 3, 4, 5
sink.expectNext("msg-3")
sink.expectNext(TestSysMsg("msg-3"))
replyProbe.expectMsg(Ack(3L, addressB))
sink.expectNext("msg-4")
sink.expectNext(TestSysMsg("msg-4"))
replyProbe.expectMsg(Ack(4L, addressB))
sink.expectNext("msg-5")
sink.expectNext(TestSysMsg("msg-5"))
replyProbe.expectMsg(Ack(5L, addressB))
replyProbe.expectNoMsg(100.millis)
inboundContextB.deliverLastReply()
@ -193,7 +195,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
val sink = send(sendCount = 3, resendInterval = 60.seconds, outboundContextA)
.via(drop(dropSeqNumbers = Vector(1L)))
.via(inbound(inboundContextB))
.map(_.message.asInstanceOf[String])
.map(_.message.asInstanceOf[TestSysMsg])
.runWith(TestSink.probe)
sink.request(100)
@ -202,11 +204,11 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
sink.expectNoMsg(100.millis) // 1 was dropped
inboundContextB.deliverLastReply() // it's ok to not delivery all nacks
// resending 1, 2, 3
sink.expectNext("msg-1")
sink.expectNext(TestSysMsg("msg-1"))
replyProbe.expectMsg(Ack(1L, addressB))
sink.expectNext("msg-2")
sink.expectNext(TestSysMsg("msg-2"))
replyProbe.expectMsg(Ack(2L, addressB))
sink.expectNext("msg-3")
sink.expectNext(TestSysMsg("msg-3"))
replyProbe.expectMsg(Ack(3L, addressB))
inboundContextB.deliverLastReply()
sink.expectComplete()
@ -222,19 +224,19 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
val sink = send(sendCount = 3, resendInterval = 2.seconds, outboundContextA)
.via(drop(dropSeqNumbers = Vector(3L)))
.via(inbound(inboundContextB))
.map(_.message.asInstanceOf[String])
.map(_.message.asInstanceOf[TestSysMsg])
.runWith(TestSink.probe)
sink.request(100)
sink.expectNext("msg-1")
sink.expectNext(TestSysMsg("msg-1"))
replyProbe.expectMsg(Ack(1L, addressB))
inboundContextB.deliverLastReply()
sink.expectNext("msg-2")
sink.expectNext(TestSysMsg("msg-2"))
replyProbe.expectMsg(Ack(2L, addressB))
inboundContextB.deliverLastReply()
sink.expectNoMsg(200.millis) // 3 was dropped
// resending 3 due to timeout
sink.expectNext("msg-3")
sink.expectNext(TestSysMsg("msg-3"))
replyProbe.expectMsg(4.seconds, Ack(3L, addressB))
// continue resending
replyProbe.expectMsg(4.seconds, Ack(3L, addressB))
@ -255,10 +257,10 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
send(N, 1.second, outboundContextA)
.via(randomDrop(dropRate))
.via(inbound(inboundContextB))
.map(_.message.asInstanceOf[String])
.map(_.message.asInstanceOf[TestSysMsg])
.runWith(Sink.seq)
Await.result(output, 20.seconds) should ===((1 to N).map("msg-" + _).toVector)
Await.result(output, 20.seconds) should ===((1 to N).map(n TestSysMsg("msg-" + n)).toVector)
}
"deliver all during throttling and random dropping" in {
@ -274,10 +276,10 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
.throttle(200, 1.second, 10, ThrottleMode.shaping)
.via(randomDrop(dropRate))
.via(inbound(inboundContextB))
.map(_.message.asInstanceOf[String])
.map(_.message.asInstanceOf[TestSysMsg])
.runWith(Sink.seq)
Await.result(output, 20.seconds) should ===((1 to N).map("msg-" + _).toVector)
Await.result(output, 20.seconds) should ===((1 to N).map(n TestSysMsg("msg-" + n)).toVector)
}
}