Merge pull request #23562 from akka/wip-23561-harden-hub-restart-patriknw

Harden restart of Artery stream with inbound-lanes > 1, #23561
This commit is contained in:
Patrik Nordwall 2017-11-11 10:21:02 +01:00 committed by GitHub
commit be8e4b0276
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 57 additions and 24 deletions

View file

@ -32,7 +32,8 @@ object SurviveInboundStreamRestartWithCompressionInFlightSpec extends MultiNodeC
akka.loglevel = INFO akka.loglevel = INFO
akka.remote.artery { akka.remote.artery {
enabled = on enabled = on
advanced { advanced {
inbound-lanes = 4
give-up-system-message-after = 4s give-up-system-message-after = 4s
compression.actor-refs.advertisement-interval = 300ms compression.actor-refs.advertisement-interval = 300ms
compression.manifests.advertisement-interval = 1 minute compression.manifests.advertisement-interval = 1 minute
@ -118,15 +119,20 @@ abstract class SurviveInboundStreamRestartWithCompressionInFlightSpec extends Re
} }
enterBarrier("inbound-failure-restart-first") enterBarrier("inbound-failure-restart-first")
// we poke the remote system, awaiting its inbound stream recovery, when it should reply
awaitAssert(
{
sendToB ! "alive-again"
expectMsg(300.millis, s"${sendToB.path.name}-alive-again")
},
max = 5.seconds, interval = 500.millis)
runOn(second) { runOn(second) {
sendToB.tell("trigger", ActorRef.noSender)
// when using inbound-lanes > 1 we can't be sure when it's done, another message (e.g. HandshakeReq)
// might have triggered the restart
Thread.sleep(2000)
// we poke the remote system, awaiting its inbound stream recovery, then it should reply
awaitAssert(
{
sendToB ! "alive-again"
expectMsg(300.millis, s"${sendToB.path.name}-alive-again")
},
max = 5.seconds, interval = 500.millis)
// we continue sending messages using the "old table". // we continue sending messages using the "old table".
// if a new table was being built, it would cause the b to be compressed as 1 causing a wrong reply to come back // if a new table was being built, it would cause the b to be compressed as 1 causing a wrong reply to come back
1 to 100 foreach { i pingPong(sendToB, s"b$i") } 1 to 100 foreach { i pingPong(sendToB, s"b$i") }

View file

@ -824,13 +824,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}(collection.breakOut) }(collection.breakOut)
import system.dispatcher import system.dispatcher
val completed = Future.sequence(completedValues).map(_ Done)
// tear down the upstream hub part if downstream lane fails // tear down the upstream hub part if downstream lane fails
// lanes are not completed with success by themselves so we don't have to care about onSuccess // lanes are not completed with success by themselves so we don't have to care about onSuccess
completed.failed.foreach { reason hubKillSwitch.abort(reason) } Future.firstCompletedOf(completedValues).failed.foreach { reason hubKillSwitch.abort(reason) }
(resourceLife, compressionAccess, completed) val allCompleted = Future.sequence(completedValues).map(_ Done)
(resourceLife, compressionAccess, allCompleted)
} }
_inboundCompressionAccess = OptionVal(inboundCompressionAccesses) _inboundCompressionAccess = OptionVal(inboundCompressionAccesses)

View file

@ -613,14 +613,15 @@ private[remote] class Association(
val (queueValues, compressionAccessValues, laneCompletedValues) = values.unzip3 val (queueValues, compressionAccessValues, laneCompletedValues) = values.unzip3
import transport.system.dispatcher import transport.system.dispatcher
val completed = Future.sequence(laneCompletedValues).flatMap(_ aeronSinkCompleted)
// tear down all parts if one part fails or completes // tear down all parts if one part fails or completes
completed.failed.foreach { Future.firstCompletedOf(laneCompletedValues).failed.foreach { reason
reason streamKillSwitch.abort(reason) streamKillSwitch.abort(reason)
} }
(laneCompletedValues :+ aeronSinkCompleted).foreach(_.foreach { _ streamKillSwitch.shutdown() }) (laneCompletedValues :+ aeronSinkCompleted).foreach(_.foreach { _ streamKillSwitch.shutdown() })
val allCompleted = Future.sequence(laneCompletedValues).flatMap(_ aeronSinkCompleted)
queueValues.zip(wrappers).zipWithIndex.foreach { queueValues.zip(wrappers).zipWithIndex.foreach {
case ((q, w), i) case ((q, w), i)
q.inject(w.queue) q.inject(w.queue)
@ -631,7 +632,7 @@ private[remote] class Association(
outboundCompressionAccess = compressionAccessValues outboundCompressionAccess = compressionAccessValues
attachStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize, attachStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize,
completed, () runOutboundOrdinaryMessagesStream()) allCompleted, () runOutboundOrdinaryMessagesStream())
} }
} }

View file

@ -15,7 +15,11 @@ import org.agrona.concurrent.ManyToManyConcurrentArrayQueue
@InternalApi private[akka] class FixedSizePartitionHub[T]( @InternalApi private[akka] class FixedSizePartitionHub[T](
partitioner: T Int, partitioner: T Int,
lanes: Int, lanes: Int,
bufferSize: Int) extends PartitionHub[T](() (info, elem) info.consumerIdByIdx(partitioner(elem)), lanes, bufferSize - 1) { bufferSize: Int) extends PartitionHub[T](
// during tear down or restart it's possible that some streams have been removed
// and then we must drop elements (return -1)
() (info, elem) if (info.size < lanes) -1 else info.consumerIdByIdx(partitioner(elem)),
lanes, bufferSize - 1) {
// -1 because of the Completed token // -1 because of the Completed token
override def createQueue(): PartitionHub.Internal.PartitionQueue = override def createQueue(): PartitionHub.Internal.PartitionQueue =

View file

@ -161,7 +161,9 @@ private[remote] class InboundTestStage(inboundContext: InboundContext, state: Sh
// InHandler // InHandler
override def onPush(): Unit = { override def onPush(): Unit = {
state.getInboundFailureOnce match { state.getInboundFailureOnce match {
case Some(shouldFailEx) failStage(shouldFailEx) case Some(shouldFailEx)
log.info("Fail inbound stream from [{}]: {}", classOf[InboundTestStage].getName, shouldFailEx.getMessage)
failStage(shouldFailEx)
case _ case _
val env = grab(in) val env = grab(in)
env.association match { env.association match {

View file

@ -602,6 +602,15 @@ class HubSpec extends StreamSpec {
} }
} }
"drop elements with negative index" in assertAllStagesStopped {
val source = Source(0 until 10).runWith(PartitionHub.sink(
(size, elem) if (elem == 3 || elem == 4) -1 else elem % size, startAfterNrOfConsumers = 2, bufferSize = 8))
val result1 = source.runWith(Sink.seq)
val result2 = source.runWith(Sink.seq)
result1.futureValue should ===((0 to 8 by 2).filterNot(_ == 4))
result2.futureValue should ===((1 to 9 by 2).filterNot(_ == 3))
}
} }
} }

View file

@ -173,7 +173,8 @@ object PartitionHub {
* @param partitioner Function that decides where to route an element. The function takes two parameters; * @param partitioner Function that decides where to route an element. The function takes two parameters;
* the first is the number of active consumers and the second is the stream element. The function should * the first is the number of active consumers and the second is the stream element. The function should
* return the index of the selected consumer for the given element, i.e. int greater than or equal to 0 * return the index of the selected consumer for the given element, i.e. int greater than or equal to 0
* and less than number of consumers. E.g. `(size, elem) -> Math.abs(elem.hashCode()) % size`. * and less than number of consumers. E.g. `(size, elem) -> Math.abs(elem.hashCode()) % size`. It's also
* possible to use `-1` to drop the element.
* @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected. * @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected.
* This is only used initially when the stage is starting up, i.e. it is not honored when consumers have * This is only used initially when the stage is starting up, i.e. it is not honored when consumers have
* been removed (canceled). * been removed (canceled).

View file

@ -776,7 +776,8 @@ object PartitionHub {
* @param partitioner Function that decides where to route an element. The function takes two parameters; * @param partitioner Function that decides where to route an element. The function takes two parameters;
* the first is the number of active consumers and the second is the stream element. The function should * the first is the number of active consumers and the second is the stream element. The function should
* return the index of the selected consumer for the given element, i.e. int greater than or equal to 0 * return the index of the selected consumer for the given element, i.e. int greater than or equal to 0
* and less than number of consumers. E.g. `(size, elem) => math.abs(elem.hashCode) % size`. * and less than number of consumers. E.g. `(size, elem) => math.abs(elem.hashCode) % size`. It's also
* possible to use `-1` to drop the element.
* @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected. * @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected.
* This is only used initially when the stage is starting up, i.e. it is not honored when consumers have * This is only used initially when the stage is starting up, i.e. it is not honored when consumers have
* been removed (canceled). * been removed (canceled).
@ -785,8 +786,14 @@ object PartitionHub {
*/ */
@ApiMayChange @ApiMayChange
def sink[T](partitioner: (Int, T) Int, startAfterNrOfConsumers: Int, def sink[T](partitioner: (Int, T) Int, startAfterNrOfConsumers: Int,
bufferSize: Int = defaultBufferSize): Sink[T, Source[T, NotUsed]] = bufferSize: Int = defaultBufferSize): Sink[T, Source[T, NotUsed]] = {
statefulSink(() (info, elem) info.consumerIdByIdx(partitioner(info.size, elem)), startAfterNrOfConsumers, bufferSize) val fun: (ConsumerInfo, T) Long = { (info, elem)
val idx = partitioner(info.size, elem)
if (idx < 0) -1L
else info.consumerIdByIdx(idx)
}
statefulSink(() fun, startAfterNrOfConsumers, bufferSize)
}
@DoNotInherit @ApiMayChange trait ConsumerInfo extends akka.stream.javadsl.PartitionHub.ConsumerInfo { @DoNotInherit @ApiMayChange trait ConsumerInfo extends akka.stream.javadsl.PartitionHub.ConsumerInfo {
@ -1051,8 +1058,10 @@ object PartitionHub {
pending :+= elem pending :+= elem
} else { } else {
val id = materializedPartitioner(consumerInfo, elem) val id = materializedPartitioner(consumerInfo, elem)
queue.offer(id, elem) if (id >= 0) { // negative id is a way to drop the element
wakeup(id) queue.offer(id, elem)
wakeup(id)
}
} }
} }