diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 7be5fa76f3..d5b3b20bc8 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -15,7 +15,7 @@ import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } import akka.actor.ActorSystem import akka.testkit.EventFilter -import akka.stream.{ ActorFlowMaterializer, BindFailedException } +import akka.stream.{ StreamTcpException, ActorFlowMaterializer, BindFailedException } import akka.stream.scaladsl._ import akka.stream.testkit._ import akka.http.scaladsl.model.HttpEntity._ @@ -31,7 +31,9 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { akka.loggers = ["akka.testkit.TestEventListener"] akka.loglevel = ERROR akka.stdout-loglevel = ERROR - akka.log-dead-letters = OFF""") + akka.log-dead-letters = OFF + akka.io.tcp.windows-connection-abort-workaround-enabled = auto + """) implicit val system = ActorSystem(getClass.getSimpleName, testConf) import system.dispatcher implicit val materializer = ActorFlowMaterializer() @@ -131,30 +133,40 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { } "log materialization errors in `bindAndHandle`" which { - "are triggered in `transform`" in { + val testConf2: Config = + ConfigFactory.parseString("akka.stream.materializer.subscription-timeout.timeout = 1 s") + .withFallback(testConf) + val system2 = ActorSystem(getClass.getSimpleName, testConf2) + import system2.dispatcher + val materializer2 = ActorFlowMaterializer.create(system2) + + "are triggered in `transform`" in Utils.assertAllStagesStopped { val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() val flow = Flow[HttpRequest].transform[HttpResponse](() ⇒ sys.error("BOOM")) - val binding = Http().bindAndHandle(flow, hostname, port) + val binding = Http(system2).bindAndHandle(flow, hostname, port)(materializer2) val b1 = Await.result(binding, 3.seconds) - EventFilter[RuntimeException](message = "BOOM", occurrences = 1) intercept { - val (_, responseFuture) = Http().outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head) - Await.result(responseFuture.failed, 1.second) shouldBe a[NoSuchElementException] - } + EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept { + val (_, responseFuture) = + Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2) + Await.result(responseFuture.failed, 5.second) shouldBe a[StreamTcpException] + }(system2) Await.result(b1.unbind(), 1.second) - } - "are triggered in `mapMaterialized`" in { + }(materializer2) + + "are triggered in `mapMaterialized`" in Utils.assertAllStagesStopped { val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() val flow = Flow[HttpRequest].map(_ ⇒ HttpResponse()).mapMaterializedValue(_ ⇒ sys.error("BOOM")) - val binding = Http().bindAndHandle(flow, hostname, port) - val b1 = Await.result(binding, 3.seconds) + val binding = Http(system2).bindAndHandle(flow, hostname, port)(materializer2) + val b1 = Await.result(binding, 1.seconds) - EventFilter[RuntimeException](message = "BOOM", occurrences = 1) intercept { - val (_, responseFuture) = Http().outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head) - Await.result(responseFuture.failed, 1.second) shouldBe a[NoSuchElementException] - } + EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept { + val (_, responseFuture) = + Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2) + Await.result(responseFuture.failed, 5.second) shouldBe a[StreamTcpException] + }(system2) Await.result(b1.unbind(), 1.second) - } + }(materializer2) } "properly complete a simple request/response cycle" in Utils.assertAllStagesStopped { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala index 4eca857805..b603829699 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala @@ -150,6 +150,13 @@ class ActorInterpreterSpec extends AkkaSpec { } } + "handle failed stage factories" in { + a[RuntimeException] should be thrownBy + Await.result( + Source.empty[Int].transform(() ⇒ sys.error("test error")).runWith(Sink.head), + 3.seconds) + } + def largeDemand(extra: Int): Unit = { val N = 3 * system.settings.config.getInt("akka.stream.materializer.output-burst-limit") val large = new PushPullStage[Int, Int] { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 40eb176cc9..e73b44e0f2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -3,34 +3,21 @@ */ package akka.stream.io -import akka.actor.{ ActorSystem, Kill } -import akka.stream.scaladsl.Tcp.OutgoingConnection - -import scala.collection.immutable -import scala.concurrent.{ Future, Await } +import akka.actor.{ActorSystem, Kill} import akka.io.Tcp._ -import akka.stream.{ BindFailedException, ActorFlowMaterializer, ActorFlowMaterializerSettings, StreamTcpException } import akka.stream.scaladsl.Tcp.IncomingConnection -import akka.stream.scaladsl.{ Flow, _ } +import akka.stream.scaladsl.{Flow, _} import akka.stream.testkit.TestUtils.temporaryServerAddress import akka.stream.testkit.Utils._ import akka.stream.testkit._ -import akka.util.{ Helpers, ByteString } +import akka.stream.{ActorFlowMaterializer, BindFailedException, StreamTcpException} +import akka.util.{ByteString, Helpers} import scala.collection.immutable -import akka.stream.{ ActorFlowMaterializer, StreamTcpException, BindFailedException } - import scala.concurrent.Await import scala.concurrent.duration._ -import akka.util.ByteString -import akka.stream.scaladsl.Flow -import akka.stream.testkit._ -import akka.stream.testkit.Utils._ -import akka.stream.scaladsl._ -import akka.stream.testkit.TestUtils.temporaryServerAddress -class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto\nakka.stream.subscription-timeout.timeout = 3s") with TcpHelper { - import akka.stream.io.TcpHelper._ +class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto\nakka.stream.materializer.subscription-timeout.timeout = 3s") with TcpHelper { var demand = 0L "Outgoing TCP stream" must { diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 473ef4bce4..c2cccbb051 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -4,12 +4,13 @@ package akka.stream.impl import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean, AtomicReference } +import akka.stream.impl.MaterializerSession.MaterializationPanic import akka.stream.impl.StreamLayout.Module import akka.stream.scaladsl.Keep import akka.stream._ import org.reactivestreams.{ Processor, Subscription, Publisher, Subscriber } import scala.collection.mutable -import scala.util.control.NonFatal +import scala.util.control.{ NoStackTrace, NonFatal } import akka.event.Logging.simpleName import scala.annotation.tailrec import java.util.concurrent.atomic.AtomicLong @@ -540,6 +541,13 @@ private[stream] class MaterializedValuePublisher extends Publisher[Any] { } +/** + * INERNAL API + */ +private[stream] object MaterializerSession { + class MaterializationPanic(cause: Throwable) extends RuntimeException("Materialization aborted.", cause) with NoStackTrace +} + /** * INTERNAL API */ @@ -599,12 +607,38 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo } + // Cancels all intermediate Publishers and fails all intermediate Subscribers. + // (This is an attempt to clean up after an exception during materialization) + private def panic(cause: Throwable): Unit = { + val panicError = new MaterializationPanic(cause) + for (subMap ← subscribersStack; sub ← subMap.valuesIterator) { + sub.onSubscribe(new Subscription { + override def cancel(): Unit = () + override def request(n: Long): Unit = sub.onError(panicError) + }) + } + + for (pubMap ← publishersStack; pub ← pubMap.valuesIterator) { + pub.subscribe(new Subscriber[Any] { + override def onSubscribe(s: Subscription): Unit = s.cancel() + override def onComplete(): Unit = () + override def onError(t: Throwable): Unit = () + override def onNext(t: Any): Unit = () + }) + } + } + final def materialize(): Any = { require(topLevel ne EmptyModule, "An empty module cannot be materialized (EmptyModule was given)") require( topLevel.isRunnable, s"The top level module cannot be materialized because it has unconnected ports: ${(topLevel.inPorts ++ topLevel.outPorts).mkString(", ")}") - materializeModule(topLevel, topLevel.attributes) + try materializeModule(topLevel, topLevel.attributes) + catch { + case NonFatal(e) ⇒ + panic(e) + throw e + } } protected def mergeAttributes(parent: OperationAttributes, current: OperationAttributes): OperationAttributes = diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 80ef8b52fd..bc53852e10 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -377,8 +377,10 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings try upstream.onInternalError(AbruptTerminationException(self)) // Will only have an effect if the above call to the interpreter failed to emit a proper failure to the downstream // otherwise this will have no effect - finally downstream.fail(AbruptTerminationException(self)) - upstream.cancel() + finally { + downstream.fail(AbruptTerminationException(self)) + upstream.cancel() + } } override def postRestart(reason: Throwable): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index f1512ae04f..694e708f1a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -10,7 +10,7 @@ import scala.concurrent.Promise import akka.actor._ import akka.util.ByteString import akka.io.Tcp._ -import akka.stream.{ StreamSubscriptionTimeoutSettings, ActorFlowMaterializerSettings, StreamTcpException } +import akka.stream.{ AbruptTerminationException, StreamSubscriptionTimeoutSettings, ActorFlowMaterializerSettings, StreamTcpException } import org.reactivestreams.{ Publisher, Processor } import akka.stream.impl._ @@ -279,10 +279,11 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS override def postStop(): Unit = { // Close if it has not yet been done + val abruptTermination = AbruptTerminationException(self) tcpInputs.cancel() - tcpOutputs.complete() + tcpOutputs.error(abruptTermination) primaryInputs.cancel() - primaryOutputs.complete() + primaryOutputs.error(abruptTermination) subscriptionTimer.foreach(_.cancel()) super.postStop() // Remember, we have a Stash } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 2bd462abaf..306bb2f126 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -160,6 +160,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket override def postStop(): Unit = { unboundPromise.trySuccess(()) + primaryOutputs.complete() super.postStop() }