Merge pull request #1511 from akka/wip-3414-backpressureSpec-∂π
improve BackpressureSpec, see #3414
This commit is contained in:
commit
92eb05bdbf
1 changed files with 88 additions and 40 deletions
|
|
@ -15,6 +15,7 @@ import akka.pattern.ask
|
|||
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
||||
import akka.util.{ ByteString, Timeout }
|
||||
import akka.actor.Deploy
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
object BackpressureSpec {
|
||||
|
||||
|
|
@ -24,6 +25,8 @@ object BackpressureSpec {
|
|||
case class Done(hash: ByteString)
|
||||
case object Failed
|
||||
case object Close
|
||||
case object GetStatus
|
||||
case class SenderStatus(restarted: Throwable, sent: Int, buffering: Boolean)
|
||||
|
||||
class Sender(receiver: InetSocketAddress) extends Actor with Stash with ActorLogging {
|
||||
val digest = MessageDigest.getInstance("SHA-1")
|
||||
|
|
@ -32,6 +35,15 @@ object BackpressureSpec {
|
|||
import context.system
|
||||
IO(Tcp) ! Tcp.Connect(receiver)
|
||||
|
||||
var restarted: Throwable = _
|
||||
var sent = 0
|
||||
var buffering = false
|
||||
|
||||
override def postRestart(thr: Throwable): Unit = {
|
||||
restarted = thr
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case _: Tcp.Connected ⇒
|
||||
val init = TcpPipelineHandler.withLogger(log,
|
||||
|
|
@ -57,28 +69,33 @@ object BackpressureSpec {
|
|||
digest.update(data)
|
||||
connection ! init.Command(ByteString(data))
|
||||
self forward StartSending(n - 1)
|
||||
sent += 1
|
||||
case BackpressureBuffer.HighWatermarkReached ⇒
|
||||
context.setReceiveTimeout(5.seconds)
|
||||
buffering = true
|
||||
context.become({
|
||||
case BackpressureBuffer.LowWatermarkReached ⇒
|
||||
unstashAll()
|
||||
context.setReceiveTimeout(Duration.Undefined)
|
||||
buffering = false
|
||||
context.unbecome()
|
||||
case ReceiveTimeout ⇒
|
||||
log.error("receive timeout while throttled")
|
||||
context.stop(self)
|
||||
case _ ⇒ stash()
|
||||
case _: StartSending ⇒ stash()
|
||||
}, discardOld = false)
|
||||
case ReceiveTimeout ⇒ // that old cancellation race
|
||||
case Close ⇒ connection ! Management(Tcp.Close)
|
||||
case Tcp.Closed ⇒ context.stop(self)
|
||||
}
|
||||
|
||||
override def unhandled(msg: Any): Unit = msg match {
|
||||
case GetStatus ⇒ sender ! SenderStatus(restarted, sent, buffering)
|
||||
}
|
||||
|
||||
val failed: Receive = {
|
||||
case _ ⇒ sender ! Failed
|
||||
}
|
||||
|
||||
override def postRestart(thr: Throwable): Unit = context.stop(self)
|
||||
}
|
||||
|
||||
case object GetPort
|
||||
|
|
@ -87,6 +104,7 @@ object BackpressureSpec {
|
|||
case class Progress(n: Int)
|
||||
case object GetHash
|
||||
case class Hash(hash: ByteString)
|
||||
case class ReceiverStatus(received: Long, hiccupStarted: Int, hiccupEnded: Int)
|
||||
|
||||
class Receiver(hiccups: Boolean) extends Actor with Stash with ActorLogging {
|
||||
val digest = MessageDigest.getInstance("SHA-1")
|
||||
|
|
@ -96,6 +114,13 @@ object BackpressureSpec {
|
|||
IO(Tcp) ! Tcp.Bind(self, new InetSocketAddress("localhost", 0))
|
||||
|
||||
var listener: ActorRef = _
|
||||
var received = 0L
|
||||
var hiccupStarted = 0
|
||||
var hiccupEnded = 0
|
||||
|
||||
override def postRestart(thr: Throwable): Unit = {
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case Tcp.Bound(local) ⇒
|
||||
|
|
@ -122,34 +147,37 @@ object BackpressureSpec {
|
|||
}
|
||||
|
||||
def connected(init: Init[WithinActorContext, ByteString, ByteString], connection: ActorRef): Receive = {
|
||||
var received = 0L
|
||||
case init.Event(data) ⇒
|
||||
digest.update(data.toArray)
|
||||
received += data.length
|
||||
if (hiccups && hiccupStarted == hiccupEnded && ThreadLocalRandom.current.nextInt(1000) == 0) {
|
||||
connection ! Management(Tcp.SuspendReading)
|
||||
import context.dispatcher
|
||||
system.scheduler.scheduleOnce(100.millis, self, Management(Tcp.ResumeReading))
|
||||
hiccupStarted += 1
|
||||
}
|
||||
case m: Management ⇒
|
||||
hiccupEnded += 1
|
||||
connection ! m
|
||||
case GetProgress ⇒
|
||||
sender ! Progress((received / ChunkSize).toInt)
|
||||
case GetHash ⇒
|
||||
sender ! Hash(ByteString(digest.digest()))
|
||||
case Tcp.PeerClosed ⇒
|
||||
listener ! Tcp.Unbind
|
||||
context.become {
|
||||
case Tcp.Unbound ⇒ context.stop(self)
|
||||
case _: Management ⇒
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
case init.Event(data) ⇒
|
||||
digest.update(data.toArray)
|
||||
received += data.length
|
||||
if (hiccups && ThreadLocalRandom.current.nextInt(1000) == 0) {
|
||||
connection ! Management(Tcp.SuspendReading)
|
||||
import context.dispatcher
|
||||
system.scheduler.scheduleOnce(100.millis, connection, Management(Tcp.ResumeReading))
|
||||
}
|
||||
case GetProgress ⇒
|
||||
sender ! Progress((received / ChunkSize).toInt)
|
||||
case GetHash ⇒
|
||||
sender ! Hash(ByteString(digest.digest()))
|
||||
case Tcp.PeerClosed ⇒
|
||||
listener ! Tcp.Unbind
|
||||
context.become {
|
||||
case Tcp.Unbound ⇒ context.stop(self)
|
||||
}
|
||||
}
|
||||
override def unhandled(msg: Any): Unit = msg match {
|
||||
case GetStatus ⇒ sender ! ReceiverStatus(received, hiccupStarted, hiccupEnded)
|
||||
}
|
||||
|
||||
val failed: Receive = {
|
||||
case _ ⇒ sender ! Failed
|
||||
}
|
||||
|
||||
override def postRestart(thr: Throwable): Unit = context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -165,13 +193,23 @@ class BackpressureSpec extends AkkaSpec("akka.actor.serialize-creators=on") with
|
|||
recv ! GetPort
|
||||
val port = expectMsgType[Port].p
|
||||
val send = watch(system.actorOf(Props(classOf[Sender], new InetSocketAddress("localhost", port)), "sender1"))
|
||||
within(20.seconds) {
|
||||
send ! StartSending(N)
|
||||
val hash = expectMsgType[Done].hash
|
||||
implicit val t = Timeout(100.millis)
|
||||
awaitAssert(Await.result(recv ? GetProgress, t.duration) must be === Progress(N))
|
||||
recv ! GetHash
|
||||
expectMsgType[Hash].hash must be === hash
|
||||
try {
|
||||
within(20.seconds) {
|
||||
send ! StartSending(N)
|
||||
val hash = expectMsgType[Done].hash
|
||||
implicit val t = Timeout(100.millis)
|
||||
awaitAssert(Await.result(recv ? GetProgress, t.duration) must be === Progress(N))
|
||||
recv ! GetHash
|
||||
expectMsgType[Hash].hash must be === hash
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
system.log.error(e, "timeout")
|
||||
send ! GetStatus
|
||||
println(expectMsgType[SenderStatus])
|
||||
recv ! GetStatus
|
||||
println(expectMsgType[ReceiverStatus])
|
||||
throw e
|
||||
}
|
||||
send ! Close
|
||||
val terminated = receiveWhile(1.second, messages = 2) {
|
||||
|
|
@ -186,13 +224,23 @@ class BackpressureSpec extends AkkaSpec("akka.actor.serialize-creators=on") with
|
|||
recv ! GetPort
|
||||
val port = expectMsgType[Port].p
|
||||
val send = watch(system.actorOf(Props(classOf[Sender], new InetSocketAddress("localhost", port)), "sender2"))
|
||||
within(20.seconds) {
|
||||
send ! StartSending(N)
|
||||
val hash = expectMsgType[Done].hash
|
||||
implicit val t = Timeout(100.millis)
|
||||
awaitAssert(Await.result(recv ? GetProgress, t.duration) must be === Progress(N))
|
||||
recv ! GetHash
|
||||
expectMsgType[Hash].hash must be === hash
|
||||
try {
|
||||
within(20.seconds) {
|
||||
send ! StartSending(N)
|
||||
val hash = expectMsgType[Done].hash
|
||||
implicit val t = Timeout(100.millis)
|
||||
awaitAssert(Await.result(recv ? GetProgress, t.duration) must be === Progress(N))
|
||||
recv ! GetHash
|
||||
expectMsgType[Hash].hash must be === hash
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
system.log.error(e, "timeout")
|
||||
send ! GetStatus
|
||||
println(expectMsgType[SenderStatus])
|
||||
recv ! GetStatus
|
||||
println(expectMsgType[ReceiverStatus])
|
||||
throw e
|
||||
}
|
||||
send ! Close
|
||||
val terminated = receiveWhile(1.second, messages = 2) {
|
||||
|
|
@ -203,4 +251,4 @@ class BackpressureSpec extends AkkaSpec("akka.actor.serialize-creators=on") with
|
|||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue