fail fast in mapAsync for completed failed future, #21318 (#21322)

* fail fast in mapAsync for completed failed future, #21318

* the callback was not run until todo == parallelism or the sync
  event limit was reached, and that can be a problem if upstream
  stages are slow (noticed this for a blocking Kafka producer stage)

* skip callback for completed futures

* fix callback allocation
This commit is contained in:
Patrik Nordwall 2016-08-30 19:30:08 +02:00 committed by GitHub
parent e20d6f5f36
commit a95b2d6746
4 changed files with 100 additions and 43 deletions

View file

@ -3,7 +3,7 @@
*/ */
package akka.stream.io package akka.stream.io
import java.nio.file.{FileSystems, Files} import java.nio.file.{ FileSystems, Files }
import java.nio.charset.StandardCharsets.UTF_8 import java.nio.charset.StandardCharsets.UTF_8
import java.util.Random import java.util.Random
@ -16,14 +16,14 @@ import akka.stream.impl.ActorMaterializerImpl
import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor
import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.StreamSupervisor.Children
import akka.stream.io.FileSourceSpec.Settings import akka.stream.io.FileSourceSpec.Settings
import akka.stream.scaladsl.{FileIO, Keep, Sink} import akka.stream.scaladsl.{ FileIO, Keep, Sink }
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestDuration import akka.testkit.TestDuration
import akka.util.ByteString import akka.util.ByteString
import akka.util.Timeout import akka.util.Timeout
import com.google.common.jimfs.{Configuration, Jimfs} import com.google.common.jimfs.{ Configuration, Jimfs }
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -48,10 +48,12 @@ class FlowMapAsyncSpec extends StreamSpec {
"produce future elements in order" in { "produce future elements in order" in {
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 50).mapAsync(4)(n Future { val p = Source(1 to 50).mapAsync(4)(n
Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10)) if (n % 3 == 0) Future.successful(n)
n else Future {
}).to(Sink.fromSubscriber(c)).run() Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10))
n
}).to(Sink.fromSubscriber(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(1000) sub.request(1000)
for (n 1 to 50) c.expectNext(n) for (n 1 to 50) c.expectNext(n)
@ -99,6 +101,27 @@ class FlowMapAsyncSpec extends StreamSpec {
latch.countDown() latch.countDown()
} }
"signal future failure asap" in assertAllStagesStopped {
val latch = TestLatch(1)
val done = Source(1 to 5)
.map { n
if (n == 1) n
else {
// slow upstream should not block the error
Await.ready(latch, 10.seconds)
n
}
}
.mapAsync(4) { n
if (n == 1) Future.failed(new RuntimeException("err1") with NoStackTrace)
else Future.successful(n)
}.runWith(Sink.ignore)
intercept[RuntimeException] {
Await.result(done, remainingOrDefault)
}.getMessage should be("err1")
latch.countDown()
}
"signal error from mapAsync" in assertAllStagesStopped { "signal error from mapAsync" in assertAllStagesStopped {
val latch = TestLatch(1) val latch = TestLatch(1)
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()

View file

@ -55,10 +55,15 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec {
val probe = TestProbe() val probe = TestProbe()
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 20).mapAsyncUnordered(4)(n Future { val p = Source(1 to 20).mapAsyncUnordered(4)(n
probe.ref ! n if (n % 3 == 0) {
n probe.ref ! n
}).to(Sink.fromSubscriber(c)).run() Future.successful(n)
} else
Future {
probe.ref ! n
n
}).to(Sink.fromSubscriber(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
c.expectNoMsg(200.millis) c.expectNoMsg(200.millis)
probe.expectNoMsg(Duration.Zero) probe.expectNoMsg(Duration.Zero)
@ -93,6 +98,27 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec {
latch.countDown() latch.countDown()
} }
"signal future failure asap" in assertAllStagesStopped {
val latch = TestLatch(1)
val done = Source(1 to 5)
.map { n
if (n == 1) n
else {
// slow upstream should not block the error
Await.ready(latch, 10.seconds)
n
}
}
.mapAsyncUnordered(4) { n
if (n == 1) Future.failed(new RuntimeException("err1") with NoStackTrace)
else Future.successful(n)
}.runWith(Sink.ignore)
intercept[RuntimeException] {
Await.result(done, remainingOrDefault)
}.getMessage should be("err1")
latch.countDown()
}
"signal error from mapAsyncUnordered" in assertAllStagesStopped { "signal error from mapAsyncUnordered" in assertAllStagesStopped {
val latch = TestLatch(1) val latch = TestLatch(1)
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()

View file

@ -942,11 +942,14 @@ final class Expand[In, Out](val extrapolate: In ⇒ Iterator[Out]) extends Graph
*/ */
private[akka] object MapAsync { private[akka] object MapAsync {
final class Holder[T](var elem: Try[T], val cb: AsyncCallback[Holder[T]]) extends (Try[T] Unit) { final class Holder[T](var elem: Try[T], val cb: AsyncCallback[Holder[T]]) extends (Try[T] Unit) {
override def apply(t: Try[T]): Unit = { def setElem(t: Try[T]): Unit =
elem = t match { elem = t match {
case Success(null) Failure[T](ReactiveStreamsCompliance.elementMustNotBeNullException) case Success(null) Failure[T](ReactiveStreamsCompliance.elementMustNotBeNullException)
case other other case other other
} }
override def apply(t: Try[T]): Unit = {
setElem(t)
cb.invoke(this) cb.invoke(this)
} }
} }
@ -974,12 +977,14 @@ final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out])
//FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync? //FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync?
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
var buffer: BufferImpl[Holder[Out]] = _ var buffer: BufferImpl[Holder[Out]] = _
val futureCB =
getAsyncCallback[Holder[Out]]( def holderCompleted(h: Holder[Out]): Unit = {
_.elem match { h.elem match {
case Failure(e) if decider(e) == Supervision.Stop failStage(e) case Failure(e) if decider(e) == Supervision.Stop failStage(e)
case _ if (isAvailable(out)) pushOne() case _ if (isAvailable(out)) pushOne()
}) }
}
val futureCB = getAsyncCallback[Holder[Out]](holderCompleted)
private[this] def todo = buffer.used private[this] def todo = buffer.used
@ -1007,14 +1012,16 @@ final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out])
// #20217 We dispatch the future if it's ready to optimize away // #20217 We dispatch the future if it's ready to optimize away
// scheduling it to an execution context // scheduling it to an execution context
future.value match { future.value match {
case None future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) case None future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
case Some(f) holder.apply(f) case Some(v)
holder.setElem(v)
holderCompleted(holder)
} }
} catch { } catch {
case NonFatal(ex) if (decider(ex) == Supervision.Stop) failStage(ex) case NonFatal(ex) if (decider(ex) == Supervision.Stop) failStage(ex)
} }
if (todo < parallelism) tryPull(in) if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
} }
override def onUpstreamFinish(): Unit = if (todo == 0) completeStage() override def onUpstreamFinish(): Unit = if (todo == 0) completeStage()
@ -1049,38 +1056,39 @@ final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[O
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer) override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
private val futureCB = def futureCompleted(result: Try[Out]): Unit = {
getAsyncCallback((result: Try[Out]) { inFlight -= 1
inFlight -= 1 result match {
result match { case Success(elem) if elem != null
case Success(elem) if elem != null if (isAvailable(out)) {
if (isAvailable(out)) { if (!hasBeenPulled(in)) tryPull(in)
if (!hasBeenPulled(in)) tryPull(in) push(out, elem)
push(out, elem) } else buffer.enqueue(elem)
} else buffer.enqueue(elem) case other
case other val ex = other match {
val ex = other match { case Failure(t) t
case Failure(t) t case Success(s) if s == null ReactiveStreamsCompliance.elementMustNotBeNullException
case Success(s) if s == null ReactiveStreamsCompliance.elementMustNotBeNullException }
} if (decider(ex) == Supervision.Stop) failStage(ex)
if (decider(ex) == Supervision.Stop) failStage(ex) else if (isClosed(in) && todo == 0) completeStage()
else if (isClosed(in) && todo == 0) completeStage() else if (!hasBeenPulled(in)) tryPull(in)
else if (!hasBeenPulled(in)) tryPull(in) }
} }
}).invoke _ private val futureCB = getAsyncCallback(futureCompleted)
private val invokeFutureCB: Try[Out] Unit = futureCB.invoke
override def onPush(): Unit = { override def onPush(): Unit = {
try { try {
val future = f(grab(in)) val future = f(grab(in))
inFlight += 1 inFlight += 1
future.value match { future.value match {
case None future.onComplete(futureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) case None future.onComplete(invokeFutureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
case Some(f) futureCB.apply(f) case Some(v) futureCompleted(v)
} }
} catch { } catch {
case NonFatal(ex) if (decider(ex) == Supervision.Stop) failStage(ex) case NonFatal(ex) if (decider(ex) == Supervision.Stop) failStage(ex)
} }
if (todo < parallelism) tryPull(in) if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
} }
override def onUpstreamFinish(): Unit = { override def onUpstreamFinish(): Unit = {