diff --git a/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala b/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala index 1217acbc27..cfc8817016 100644 --- a/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala @@ -15,6 +15,7 @@ import akka.pattern.ask import akka.testkit.{ AkkaSpec, ImplicitSender } import akka.util.{ ByteString, Timeout } import akka.actor.Deploy +import scala.util.control.NonFatal object BackpressureSpec { @@ -24,6 +25,8 @@ object BackpressureSpec { case class Done(hash: ByteString) case object Failed case object Close + case object GetStatus + case class SenderStatus(restarted: Throwable, sent: Int, buffering: Boolean) class Sender(receiver: InetSocketAddress) extends Actor with Stash with ActorLogging { val digest = MessageDigest.getInstance("SHA-1") @@ -32,6 +35,15 @@ object BackpressureSpec { import context.system IO(Tcp) ! Tcp.Connect(receiver) + var restarted: Throwable = _ + var sent = 0 + var buffering = false + + override def postRestart(thr: Throwable): Unit = { + restarted = thr + context.stop(self) + } + def receive = { case _: Tcp.Connected ⇒ val init = TcpPipelineHandler.withLogger(log, @@ -57,28 +69,33 @@ object BackpressureSpec { digest.update(data) connection ! init.Command(ByteString(data)) self forward StartSending(n - 1) + sent += 1 case BackpressureBuffer.HighWatermarkReached ⇒ context.setReceiveTimeout(5.seconds) + buffering = true context.become({ case BackpressureBuffer.LowWatermarkReached ⇒ unstashAll() context.setReceiveTimeout(Duration.Undefined) + buffering = false context.unbecome() case ReceiveTimeout ⇒ log.error("receive timeout while throttled") context.stop(self) - case _ ⇒ stash() + case _: StartSending ⇒ stash() }, discardOld = false) case ReceiveTimeout ⇒ // that old cancellation race case Close ⇒ connection ! Management(Tcp.Close) case Tcp.Closed ⇒ context.stop(self) } + override def unhandled(msg: Any): Unit = msg match { + case GetStatus ⇒ sender ! SenderStatus(restarted, sent, buffering) + } + val failed: Receive = { case _ ⇒ sender ! Failed } - - override def postRestart(thr: Throwable): Unit = context.stop(self) } case object GetPort @@ -87,6 +104,7 @@ object BackpressureSpec { case class Progress(n: Int) case object GetHash case class Hash(hash: ByteString) + case class ReceiverStatus(received: Long, hiccupStarted: Int, hiccupEnded: Int) class Receiver(hiccups: Boolean) extends Actor with Stash with ActorLogging { val digest = MessageDigest.getInstance("SHA-1") @@ -96,6 +114,13 @@ object BackpressureSpec { IO(Tcp) ! Tcp.Bind(self, new InetSocketAddress("localhost", 0)) var listener: ActorRef = _ + var received = 0L + var hiccupStarted = 0 + var hiccupEnded = 0 + + override def postRestart(thr: Throwable): Unit = { + context.stop(self) + } def receive = { case Tcp.Bound(local) ⇒ @@ -122,34 +147,37 @@ object BackpressureSpec { } def connected(init: Init[WithinActorContext, ByteString, ByteString], connection: ActorRef): Receive = { - var received = 0L + case init.Event(data) ⇒ + digest.update(data.toArray) + received += data.length + if (hiccups && hiccupStarted == hiccupEnded && ThreadLocalRandom.current.nextInt(1000) == 0) { + connection ! Management(Tcp.SuspendReading) + import context.dispatcher + system.scheduler.scheduleOnce(100.millis, self, Management(Tcp.ResumeReading)) + hiccupStarted += 1 + } + case m: Management ⇒ + hiccupEnded += 1 + connection ! m + case GetProgress ⇒ + sender ! Progress((received / ChunkSize).toInt) + case GetHash ⇒ + sender ! Hash(ByteString(digest.digest())) + case Tcp.PeerClosed ⇒ + listener ! Tcp.Unbind + context.become { + case Tcp.Unbound ⇒ context.stop(self) + case _: Management ⇒ + } + } - { - case init.Event(data) ⇒ - digest.update(data.toArray) - received += data.length - if (hiccups && ThreadLocalRandom.current.nextInt(1000) == 0) { - connection ! Management(Tcp.SuspendReading) - import context.dispatcher - system.scheduler.scheduleOnce(100.millis, connection, Management(Tcp.ResumeReading)) - } - case GetProgress ⇒ - sender ! Progress((received / ChunkSize).toInt) - case GetHash ⇒ - sender ! Hash(ByteString(digest.digest())) - case Tcp.PeerClosed ⇒ - listener ! Tcp.Unbind - context.become { - case Tcp.Unbound ⇒ context.stop(self) - } - } + override def unhandled(msg: Any): Unit = msg match { + case GetStatus ⇒ sender ! ReceiverStatus(received, hiccupStarted, hiccupEnded) } val failed: Receive = { case _ ⇒ sender ! Failed } - - override def postRestart(thr: Throwable): Unit = context.stop(self) } } @@ -165,13 +193,23 @@ class BackpressureSpec extends AkkaSpec("akka.actor.serialize-creators=on") with recv ! GetPort val port = expectMsgType[Port].p val send = watch(system.actorOf(Props(classOf[Sender], new InetSocketAddress("localhost", port)), "sender1")) - within(20.seconds) { - send ! StartSending(N) - val hash = expectMsgType[Done].hash - implicit val t = Timeout(100.millis) - awaitAssert(Await.result(recv ? GetProgress, t.duration) must be === Progress(N)) - recv ! GetHash - expectMsgType[Hash].hash must be === hash + try { + within(20.seconds) { + send ! StartSending(N) + val hash = expectMsgType[Done].hash + implicit val t = Timeout(100.millis) + awaitAssert(Await.result(recv ? GetProgress, t.duration) must be === Progress(N)) + recv ! GetHash + expectMsgType[Hash].hash must be === hash + } + } catch { + case NonFatal(e) ⇒ + system.log.error(e, "timeout") + send ! GetStatus + println(expectMsgType[SenderStatus]) + recv ! GetStatus + println(expectMsgType[ReceiverStatus]) + throw e } send ! Close val terminated = receiveWhile(1.second, messages = 2) { @@ -186,13 +224,23 @@ class BackpressureSpec extends AkkaSpec("akka.actor.serialize-creators=on") with recv ! GetPort val port = expectMsgType[Port].p val send = watch(system.actorOf(Props(classOf[Sender], new InetSocketAddress("localhost", port)), "sender2")) - within(20.seconds) { - send ! StartSending(N) - val hash = expectMsgType[Done].hash - implicit val t = Timeout(100.millis) - awaitAssert(Await.result(recv ? GetProgress, t.duration) must be === Progress(N)) - recv ! GetHash - expectMsgType[Hash].hash must be === hash + try { + within(20.seconds) { + send ! StartSending(N) + val hash = expectMsgType[Done].hash + implicit val t = Timeout(100.millis) + awaitAssert(Await.result(recv ? GetProgress, t.duration) must be === Progress(N)) + recv ! GetHash + expectMsgType[Hash].hash must be === hash + } + } catch { + case NonFatal(e) ⇒ + system.log.error(e, "timeout") + send ! GetStatus + println(expectMsgType[SenderStatus]) + recv ! GetStatus + println(expectMsgType[ReceiverStatus]) + throw e } send ! Close val terminated = receiveWhile(1.second, messages = 2) { @@ -203,4 +251,4 @@ class BackpressureSpec extends AkkaSpec("akka.actor.serialize-creators=on") with } -} \ No newline at end of file +}