diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index a45d19dbec..74897ebb14 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -3,7 +3,7 @@ */ package akka.stream.io -import java.nio.file.{FileSystems, Files} +import java.nio.file.{ FileSystems, Files } import java.nio.charset.StandardCharsets.UTF_8 import java.util.Random @@ -16,14 +16,14 @@ import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor.Children import akka.stream.io.FileSourceSpec.Settings -import akka.stream.scaladsl.{FileIO, Keep, Sink} +import akka.stream.scaladsl.{ FileIO, Keep, Sink } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink import akka.testkit.TestDuration import akka.util.ByteString import akka.util.Timeout -import com.google.common.jimfs.{Configuration, Jimfs} +import com.google.common.jimfs.{ Configuration, Jimfs } import scala.concurrent.Await import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index ac16f2d693..1d0c9e2d61 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -48,10 +48,12 @@ class FlowMapAsyncSpec extends StreamSpec { "produce future elements in order" in { val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 50).mapAsync(4)(n ⇒ Future { - Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10)) - n - }).to(Sink.fromSubscriber(c)).run() + val p = Source(1 to 50).mapAsync(4)(n ⇒ + if (n % 3 == 0) Future.successful(n) + else Future { + Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10)) + n + }).to(Sink.fromSubscriber(c)).run() val sub = c.expectSubscription() sub.request(1000) for (n ← 1 to 50) c.expectNext(n) @@ -99,6 +101,27 @@ class FlowMapAsyncSpec extends StreamSpec { latch.countDown() } + "signal future failure asap" in assertAllStagesStopped { + val latch = TestLatch(1) + val done = Source(1 to 5) + .map { n ⇒ + if (n == 1) n + else { + // slow upstream should not block the error + Await.ready(latch, 10.seconds) + n + } + } + .mapAsync(4) { n ⇒ + if (n == 1) Future.failed(new RuntimeException("err1") with NoStackTrace) + else Future.successful(n) + }.runWith(Sink.ignore) + intercept[RuntimeException] { + Await.result(done, remainingOrDefault) + }.getMessage should be("err1") + latch.countDown() + } + "signal error from mapAsync" in assertAllStagesStopped { val latch = TestLatch(1) val c = TestSubscriber.manualProbe[Int]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index 558ec85495..9f60f57abd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -55,10 +55,15 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec { val probe = TestProbe() val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 20).mapAsyncUnordered(4)(n ⇒ Future { - probe.ref ! n - n - }).to(Sink.fromSubscriber(c)).run() + val p = Source(1 to 20).mapAsyncUnordered(4)(n ⇒ + if (n % 3 == 0) { + probe.ref ! n + Future.successful(n) + } else + Future { + probe.ref ! n + n + }).to(Sink.fromSubscriber(c)).run() val sub = c.expectSubscription() c.expectNoMsg(200.millis) probe.expectNoMsg(Duration.Zero) @@ -93,6 +98,27 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec { latch.countDown() } + "signal future failure asap" in assertAllStagesStopped { + val latch = TestLatch(1) + val done = Source(1 to 5) + .map { n ⇒ + if (n == 1) n + else { + // slow upstream should not block the error + Await.ready(latch, 10.seconds) + n + } + } + .mapAsyncUnordered(4) { n ⇒ + if (n == 1) Future.failed(new RuntimeException("err1") with NoStackTrace) + else Future.successful(n) + }.runWith(Sink.ignore) + intercept[RuntimeException] { + Await.result(done, remainingOrDefault) + }.getMessage should be("err1") + latch.countDown() + } + "signal error from mapAsyncUnordered" in assertAllStagesStopped { val latch = TestLatch(1) val c = TestSubscriber.manualProbe[Int]() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 562dfa70c8..d1dbdb544a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -942,11 +942,14 @@ final class Expand[In, Out](val extrapolate: In ⇒ Iterator[Out]) extends Graph */ private[akka] object MapAsync { final class Holder[T](var elem: Try[T], val cb: AsyncCallback[Holder[T]]) extends (Try[T] ⇒ Unit) { - override def apply(t: Try[T]): Unit = { + def setElem(t: Try[T]): Unit = elem = t match { case Success(null) ⇒ Failure[T](ReactiveStreamsCompliance.elementMustNotBeNullException) case other ⇒ other } + + override def apply(t: Try[T]): Unit = { + setElem(t) cb.invoke(this) } } @@ -974,12 +977,14 @@ final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out]) //FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync? lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) var buffer: BufferImpl[Holder[Out]] = _ - val futureCB = - getAsyncCallback[Holder[Out]]( - _.elem match { - case Failure(e) if decider(e) == Supervision.Stop ⇒ failStage(e) - case _ ⇒ if (isAvailable(out)) pushOne() - }) + + def holderCompleted(h: Holder[Out]): Unit = { + h.elem match { + case Failure(e) if decider(e) == Supervision.Stop ⇒ failStage(e) + case _ ⇒ if (isAvailable(out)) pushOne() + } + } + val futureCB = getAsyncCallback[Holder[Out]](holderCompleted) private[this] def todo = buffer.used @@ -1007,14 +1012,16 @@ final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out]) // #20217 We dispatch the future if it's ready to optimize away // scheduling it to an execution context future.value match { - case None ⇒ future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) - case Some(f) ⇒ holder.apply(f) + case None ⇒ future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + case Some(v) ⇒ + holder.setElem(v) + holderCompleted(holder) } } catch { case NonFatal(ex) ⇒ if (decider(ex) == Supervision.Stop) failStage(ex) } - if (todo < parallelism) tryPull(in) + if (todo < parallelism && !hasBeenPulled(in)) tryPull(in) } override def onUpstreamFinish(): Unit = if (todo == 0) completeStage() @@ -1049,38 +1056,39 @@ final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[O override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer) - private val futureCB = - getAsyncCallback((result: Try[Out]) ⇒ { - inFlight -= 1 - result match { - case Success(elem) if elem != null ⇒ - if (isAvailable(out)) { - if (!hasBeenPulled(in)) tryPull(in) - push(out, elem) - } else buffer.enqueue(elem) - case other ⇒ - val ex = other match { - case Failure(t) ⇒ t - case Success(s) if s == null ⇒ ReactiveStreamsCompliance.elementMustNotBeNullException - } - if (decider(ex) == Supervision.Stop) failStage(ex) - else if (isClosed(in) && todo == 0) completeStage() - else if (!hasBeenPulled(in)) tryPull(in) - } - }).invoke _ + def futureCompleted(result: Try[Out]): Unit = { + inFlight -= 1 + result match { + case Success(elem) if elem != null ⇒ + if (isAvailable(out)) { + if (!hasBeenPulled(in)) tryPull(in) + push(out, elem) + } else buffer.enqueue(elem) + case other ⇒ + val ex = other match { + case Failure(t) ⇒ t + case Success(s) if s == null ⇒ ReactiveStreamsCompliance.elementMustNotBeNullException + } + if (decider(ex) == Supervision.Stop) failStage(ex) + else if (isClosed(in) && todo == 0) completeStage() + else if (!hasBeenPulled(in)) tryPull(in) + } + } + private val futureCB = getAsyncCallback(futureCompleted) + private val invokeFutureCB: Try[Out] ⇒ Unit = futureCB.invoke override def onPush(): Unit = { try { val future = f(grab(in)) inFlight += 1 future.value match { - case None ⇒ future.onComplete(futureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) - case Some(f) ⇒ futureCB.apply(f) + case None ⇒ future.onComplete(invokeFutureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + case Some(v) ⇒ futureCompleted(v) } } catch { case NonFatal(ex) ⇒ if (decider(ex) == Supervision.Stop) failStage(ex) } - if (todo < parallelism) tryPull(in) + if (todo < parallelism && !hasBeenPulled(in)) tryPull(in) } override def onUpstreamFinish(): Unit = {