give up sending after a while, #20317

This commit is contained in:
Patrik Nordwall 2016-05-19 08:24:27 +02:00
parent b17c4c1d9b
commit c90121485f
11 changed files with 203 additions and 51 deletions

View file

@ -81,6 +81,7 @@ abstract class AeronStreamConsistencySpec
}
val streamId = 1
val giveUpSendAfter = 30.seconds
override def afterAll(): Unit = {
taskRunner.stop()
@ -96,7 +97,7 @@ abstract class AeronStreamConsistencySpec
runOn(second) {
// just echo back
Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter))
}
enterBarrier("echo-started")
}
@ -137,7 +138,7 @@ abstract class AeronStreamConsistencySpec
envelope
}
.throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping)
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter))
started.expectMsg(Done)
}
@ -149,7 +150,7 @@ abstract class AeronStreamConsistencySpec
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter))
Await.ready(done, 20.seconds)
killSwitch.shutdown()

View file

@ -108,6 +108,7 @@ abstract class AeronStreamLatencySpec
}
val streamId = 1
val giveUpSendAfter = 30.seconds
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def reporter(name: String): TestRateReporter = {
@ -227,7 +228,7 @@ abstract class AeronStreamLatencySpec
envelope
}
.throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping)
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter))
started.expectMsg(Done)
}
@ -245,7 +246,7 @@ abstract class AeronStreamLatencySpec
sendTimes.set(n - 1, System.nanoTime())
envelope
}
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter))
barrier.await((totalMessages / messageRate) + 10, SECONDS)
}
@ -264,7 +265,7 @@ abstract class AeronStreamLatencySpec
runOn(second) {
// just echo back
Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter))
}
enterBarrier("echo-started")
}

View file

@ -114,6 +114,7 @@ abstract class AeronStreamMaxThroughputSpec
}
val streamId = 1
val giveUpSendAfter = 30.seconds
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def reporter(name: String): TestRateReporter = {
@ -211,7 +212,7 @@ abstract class AeronStreamMaxThroughputSpec
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter))
printStats("sender")
enterBarrier(testName + "-done")

View file

@ -13,6 +13,7 @@ import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NoStackTrace
import akka.Done
import akka.stream.Attributes
@ -28,26 +29,38 @@ import org.agrona.concurrent.UnsafeBuffer
object AeronSink {
class OfferTask(pub: Publication, var buffer: UnsafeBuffer, msgSize: AtomicInteger, onOfferSuccess: AsyncCallback[Unit])
extends (() Boolean) {
final class GaveUpSendingException(msg: String) extends RuntimeException(msg) with NoStackTrace
private val TimerCheckPeriod = 1 << 13 // 8192
private val TimerCheckMask = TimerCheckPeriod - 1
private final class OfferTask(pub: Publication, var buffer: UnsafeBuffer, var msgSize: Int, onOfferSuccess: AsyncCallback[Unit],
giveUpAfter: Duration, onGiveUp: AsyncCallback[Unit])
extends (() Boolean) {
val giveUpAfterNanos = giveUpAfter match {
case f: FiniteDuration f.toNanos
case _ -1L
}
var n = 0L
var localMsgSize = -1
var startTime = 0L
override def apply(): Boolean = {
if (n == 0L) {
// first invocation for this message
startTime = if (giveUpAfterNanos >= 0) System.nanoTime() else 0L
}
n += 1
if (localMsgSize == -1)
localMsgSize = msgSize.get
val result = pub.offer(buffer, 0, localMsgSize)
val result = pub.offer(buffer, 0, msgSize)
if (result >= 0) {
n = 0
localMsgSize = -1
n = 0L
onOfferSuccess.invoke(())
true
} else if (giveUpAfterNanos >= 0 && (n & TimerCheckMask) == 0 && (System.nanoTime() - startTime) > giveUpAfterNanos) {
// the task is invoked by the spinning thread, only check nanoTime each 8192th invocation
n = 0L
onGiveUp.invoke(())
true
} else {
// FIXME drop after too many attempts?
if (n > 1000000 && n % 100000 == 0)
println(s"# offer not accepted after $n") // FIXME
false
}
}
@ -57,7 +70,7 @@ object AeronSink {
/**
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
*/
class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRunner, pool: EnvelopeBufferPool)
class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRunner, pool: EnvelopeBufferPool, giveUpSendAfter: Duration)
extends GraphStageWithMaterializedValue[SinkShape[EnvelopeBuffer], Future[Done]] {
import AeronSink._
import TaskRunner._
@ -77,8 +90,8 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu
private val spinning = 1000
private var backoffCount = spinning
private var lastMsgSize = 0
private val lastMsgSizeRef = new AtomicInteger // used in the external backoff task
private val offerTask = new OfferTask(pub, null, lastMsgSizeRef, getAsyncCallback(_ onOfferSuccess()))
private val offerTask = new OfferTask(pub, null, lastMsgSize, getAsyncCallback(_ onOfferSuccess()),
giveUpSendAfter, getAsyncCallback(_ onGiveUp()))
private val addOfferTask: Add = Add(offerTask)
private var offerTaskInProgress = false
@ -112,9 +125,10 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu
publish() // recursive
} else {
// delegate backoff to shared TaskRunner
lastMsgSizeRef.set(lastMsgSize)
offerTaskInProgress = true
// visibility of these assignments are ensured by adding the task to the command queue
offerTask.buffer = envelopeInFlight.aeronBuffer
offerTask.msgSize = lastMsgSize
taskRunner.command(addOfferTask)
}
} else {
@ -134,6 +148,13 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu
pull(in)
}
private def onGiveUp(): Unit = {
offerTaskInProgress = false
val cause = new GaveUpSendingException(s"Gave up sending message to $channel after $giveUpSendAfter.")
completedValue = Failure(cause)
failStage(cause)
}
override def onUpstreamFinish(): Unit = {
// flush outstanding offer before completing stage
if (!offerTaskInProgress)

View file

@ -236,6 +236,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private val handshakeTimeout: FiniteDuration =
system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring(_ > Duration.Zero,
"handshake-timeout must be > 0")
private val giveUpSendAfter: FiniteDuration = 60.seconds
private val largeMessageDestinations =
system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardTree[NotUsed]()) { (tree, entry)
@ -416,15 +417,15 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () Unit): Unit = {
implicit val ec = materializer.executionContext
streamCompleted.onFailure {
case _ if isShutdown // don't restart after shutdown
case _: AbruptTerminationException // ActorSystem shutdown
case cause
if (!isShutdown)
if (restartCounter.restart()) {
log.error(cause, "{} failed. Restarting it.", streamName)
log.error(cause, "{} failed. Restarting it. {}", streamName, cause.getMessage)
restart()
} else {
log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system.",
streamName, maxRestarts, restartTimeout.toSeconds)
log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system. {}",
streamName, maxRestarts, restartTimeout.toSeconds, cause.getMessage)
system.terminate()
}
}
@ -485,7 +486,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval))
.via(encoder)
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner, envelopePool))(Keep.right)
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner,
envelopePool, giveUpSendAfter))(Keep.right)
}
def outboundLarge(outboundContext: OutboundContext): Sink[Send, Future[Done]] = {
@ -494,7 +496,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval))
.via(createEncoder(pool))
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner, envelopePool))(Keep.right)
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner,
envelopePool, giveUpSendAfter))(Keep.right)
case None throw new IllegalArgumentException("Trying to create outbound stream but outbound stream not configured")
}
}
@ -505,7 +508,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
.via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval, remoteSettings.SysMsgBufferSize))
.viaMat(new OutboundControlJunction(outboundContext))(Keep.right)
.via(encoder)
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, envelopePool))(Keep.both)
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner,
envelopePool, Duration.Inf))(Keep.both)
// FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages
}

View file

@ -21,6 +21,7 @@ import akka.dispatch.sysmsg.SystemMessage
import akka.event.Logging
import akka.remote.EndpointManager.Send
import akka.remote.{ LargeDestination, RegularDestination, RemoteActorRef, UniqueAddress }
import akka.remote.artery.AeronSink.GaveUpSendingException
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException
@ -261,15 +262,19 @@ private[akka] class Association(
private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: Throwable Unit): Unit = {
implicit val ec = materializer.executionContext
streamCompleted.onFailure {
case _ if transport.isShutdown // don't restart after shutdown
case _: AbruptTerminationException // ActorSystem shutdown
case cause: GaveUpSendingException
log.error(cause, "{} failed. Restarting it. {}", streamName, cause.getMessage)
// restart unconditionally, without counting restarts
restart(cause)
case cause
if (!transport.isShutdown)
if (restartCounter.restart()) {
log.error(cause, "{} failed. Restarting it.", streamName)
log.error(cause, "{} failed. Restarting it. {}", streamName, cause.getMessage)
restart(cause)
} else {
log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system.",
streamName, maxRestarts, restartTimeout.toSeconds)
log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system. {}",
streamName, maxRestarts, restartTimeout.toSeconds, cause.getMessage)
transport.system.terminate()
}
}

View file

@ -141,7 +141,7 @@ private[akka] class InboundControlJunction
* INTERNAL API
*/
private[akka] object OutboundControlJunction {
trait OutboundControlIngress {
private[akka] trait OutboundControlIngress {
def sendControlMessage(message: ControlMessage): Unit
}
}
@ -158,7 +158,7 @@ private[akka] class OutboundControlJunction(outboundContext: OutboundContext)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
// FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way
val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler {
val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler with StageLogging {
import OutboundControlJunction._
private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage)
@ -192,8 +192,7 @@ private[akka] class OutboundControlJunction(outboundContext: OutboundContext)
buffer.offer(wrap(message))
else {
// it's alright to drop control messages
// FIXME we need that stage logging support
println(s"dropping control message ${message.getClass.getName} due to full buffer")
log.debug("Dropping control message [{}] due to full buffer.", message.getClass.getName)
}
}

View file

@ -0,0 +1,34 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.stream.stage.GraphStageLogic
import akka.event.LoggingAdapter
import akka.stream.ActorMaterializer
import akka.event.NoLogging
// TODO this can be removed when https://github.com/akka/akka/issues/18793 has been implemented
/**
* INTERNAL API
*/
private[akka] trait StageLogging { self: GraphStageLogic
private var _log: LoggingAdapter = _
protected def logSource: Class[_] = this.getClass
def log: LoggingAdapter = {
// only used in StageLogic, i.e. thread safe
if (_log eq null) {
materializer match {
case a: ActorMaterializer
_log = akka.event.Logging(a.system, logSource)
case _
_log = NoLogging
}
}
_log
}
}

View file

@ -107,6 +107,7 @@ private[akka] class SystemMessageDelivery(
}
if (!unacknowledged.isEmpty)
scheduleOnce(ResendTick, resendInterval)
// FIXME give up resending after a long while, i.e. config property quarantine-after-silence
}
// ControlMessageObserver, external call

View file

@ -0,0 +1,84 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.io.File
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.ExtendedActorSystem
import akka.remote.artery.AeronSink.GaveUpSendingException
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.SocketUtil
import io.aeron.Aeron
import io.aeron.driver.MediaDriver
import org.agrona.IoUtil
class AeronSinkSpec extends AkkaSpec with ImplicitSender {
val driver = MediaDriver.launchEmbedded()
val aeron = {
val ctx = new Aeron.Context
ctx.aeronDirectoryName(driver.aeronDirectoryName)
Aeron.connect(ctx)
}
val taskRunner = {
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem])
r.start()
r
}
val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers)
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings)(system)
override def afterTermination(): Unit = {
taskRunner.stop()
aeron.close()
driver.close()
IoUtil.delete(new File(driver.aeronDirectoryName), true)
super.afterTermination()
}
"AeronSink" must {
"give up sending after given duration" in {
val port = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort
val channel = s"aeron:udp?endpoint=localhost:$port"
Source.fromGraph(new AeronSource(channel, 1, aeron, taskRunner, pool))
// fail receiver stream on first message
.map(_ throw new RuntimeException("stop") with NoStackTrace)
.runWith(Sink.ignore)
// use large enough messages to fill up buffers
val payload = Array.ofDim[Byte](100000)
val done = Source(1 to 1000).map(_ payload)
.map { n
val envelope = pool.acquire()
envelope.byteBuffer.put(payload)
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel, 1, aeron, taskRunner, pool, 500.millis))
// without the give up timeout the stream would not complete/fail
intercept[GaveUpSendingException] {
Await.result(done, 5.seconds)
}
}
}
}

View file

@ -38,6 +38,7 @@ object AeronStreamsApp {
val latencyRate = 10000 // per second
val latencyN = 10 * latencyRate
val payload = ("0" * 100).getBytes("utf-8")
val giveUpSendAfter = 60.seconds
lazy val sendTimes = new AtomicLongArray(latencyN)
lazy val driver = {
@ -201,7 +202,7 @@ object AeronStreamsApp {
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool, giveUpSendAfter))
}
def runEchoReceiver(): Unit = {
@ -213,7 +214,7 @@ object AeronStreamsApp {
r.onMessage(1, envelope.byteBuffer.limit)
envelope
}
.runWith(new AeronSink(channel2, streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel2, streamId, aeron, taskRunner, pool, giveUpSendAfter))
}
def runEchoSender(): Unit = {
@ -264,7 +265,7 @@ object AeronStreamsApp {
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool, giveUpSendAfter))
barrier.await()
}
@ -303,7 +304,7 @@ object AeronStreamsApp {
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool))
.runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool, giveUpSendAfter))
}
def runStats(): Unit = {