- Fix accidentally passing ClientServerSpec tests

- Making materializer more robust against exceptions
This commit is contained in:
Endre Sándor Varga 2015-06-22 18:02:42 +02:00
parent 3f92387ffc
commit 21d45ea1b8
7 changed files with 86 additions and 42 deletions

View file

@ -15,7 +15,7 @@ import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import akka.actor.ActorSystem
import akka.testkit.EventFilter
import akka.stream.{ ActorFlowMaterializer, BindFailedException }
import akka.stream.{ StreamTcpException, ActorFlowMaterializer, BindFailedException }
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.http.scaladsl.model.HttpEntity._
@ -31,7 +31,9 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
akka.loggers = ["akka.testkit.TestEventListener"]
akka.loglevel = ERROR
akka.stdout-loglevel = ERROR
akka.log-dead-letters = OFF""")
akka.log-dead-letters = OFF
akka.io.tcp.windows-connection-abort-workaround-enabled = auto
""")
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
@ -131,30 +133,40 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
}
"log materialization errors in `bindAndHandle`" which {
"are triggered in `transform`" in {
val testConf2: Config =
ConfigFactory.parseString("akka.stream.materializer.subscription-timeout.timeout = 1 s")
.withFallback(testConf)
val system2 = ActorSystem(getClass.getSimpleName, testConf2)
import system2.dispatcher
val materializer2 = ActorFlowMaterializer.create(system2)
"are triggered in `transform`" in Utils.assertAllStagesStopped {
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
val flow = Flow[HttpRequest].transform[HttpResponse](() sys.error("BOOM"))
val binding = Http().bindAndHandle(flow, hostname, port)
val binding = Http(system2).bindAndHandle(flow, hostname, port)(materializer2)
val b1 = Await.result(binding, 3.seconds)
EventFilter[RuntimeException](message = "BOOM", occurrences = 1) intercept {
val (_, responseFuture) = Http().outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)
Await.result(responseFuture.failed, 1.second) shouldBe a[NoSuchElementException]
}
EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept {
val (_, responseFuture) =
Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2)
Await.result(responseFuture.failed, 5.second) shouldBe a[StreamTcpException]
}(system2)
Await.result(b1.unbind(), 1.second)
}
"are triggered in `mapMaterialized`" in {
}(materializer2)
"are triggered in `mapMaterialized`" in Utils.assertAllStagesStopped {
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
val flow = Flow[HttpRequest].map(_ HttpResponse()).mapMaterializedValue(_ sys.error("BOOM"))
val binding = Http().bindAndHandle(flow, hostname, port)
val b1 = Await.result(binding, 3.seconds)
val binding = Http(system2).bindAndHandle(flow, hostname, port)(materializer2)
val b1 = Await.result(binding, 1.seconds)
EventFilter[RuntimeException](message = "BOOM", occurrences = 1) intercept {
val (_, responseFuture) = Http().outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)
Await.result(responseFuture.failed, 1.second) shouldBe a[NoSuchElementException]
}
EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept {
val (_, responseFuture) =
Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2)
Await.result(responseFuture.failed, 5.second) shouldBe a[StreamTcpException]
}(system2)
Await.result(b1.unbind(), 1.second)
}
}(materializer2)
}
"properly complete a simple request/response cycle" in Utils.assertAllStagesStopped {

View file

@ -150,6 +150,13 @@ class ActorInterpreterSpec extends AkkaSpec {
}
}
"handle failed stage factories" in {
a[RuntimeException] should be thrownBy
Await.result(
Source.empty[Int].transform(() sys.error("test error")).runWith(Sink.head),
3.seconds)
}
def largeDemand(extra: Int): Unit = {
val N = 3 * system.settings.config.getInt("akka.stream.materializer.output-burst-limit")
val large = new PushPullStage[Int, Int] {

View file

@ -4,33 +4,20 @@
package akka.stream.io
import akka.actor.{ActorSystem, Kill}
import akka.stream.scaladsl.Tcp.OutgoingConnection
import scala.collection.immutable
import scala.concurrent.{ Future, Await }
import akka.io.Tcp._
import akka.stream.{ BindFailedException, ActorFlowMaterializer, ActorFlowMaterializerSettings, StreamTcpException }
import akka.stream.scaladsl.Tcp.IncomingConnection
import akka.stream.scaladsl.{Flow, _}
import akka.stream.testkit.TestUtils.temporaryServerAddress
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.util.{ Helpers, ByteString }
import akka.stream.{ActorFlowMaterializer, BindFailedException, StreamTcpException}
import akka.util.{ByteString, Helpers}
import scala.collection.immutable
import akka.stream.{ ActorFlowMaterializer, StreamTcpException, BindFailedException }
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.util.ByteString
import akka.stream.scaladsl.Flow
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream.scaladsl._
import akka.stream.testkit.TestUtils.temporaryServerAddress
class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto\nakka.stream.subscription-timeout.timeout = 3s") with TcpHelper {
import akka.stream.io.TcpHelper._
class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto\nakka.stream.materializer.subscription-timeout.timeout = 3s") with TcpHelper {
var demand = 0L
"Outgoing TCP stream" must {

View file

@ -4,12 +4,13 @@
package akka.stream.impl
import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean, AtomicReference }
import akka.stream.impl.MaterializerSession.MaterializationPanic
import akka.stream.impl.StreamLayout.Module
import akka.stream.scaladsl.Keep
import akka.stream._
import org.reactivestreams.{ Processor, Subscription, Publisher, Subscriber }
import scala.collection.mutable
import scala.util.control.NonFatal
import scala.util.control.{ NoStackTrace, NonFatal }
import akka.event.Logging.simpleName
import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicLong
@ -540,6 +541,13 @@ private[stream] class MaterializedValuePublisher extends Publisher[Any] {
}
/**
* INERNAL API
*/
private[stream] object MaterializerSession {
class MaterializationPanic(cause: Throwable) extends RuntimeException("Materialization aborted.", cause) with NoStackTrace
}
/**
* INTERNAL API
*/
@ -599,12 +607,38 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
}
// Cancels all intermediate Publishers and fails all intermediate Subscribers.
// (This is an attempt to clean up after an exception during materialization)
private def panic(cause: Throwable): Unit = {
val panicError = new MaterializationPanic(cause)
for (subMap subscribersStack; sub subMap.valuesIterator) {
sub.onSubscribe(new Subscription {
override def cancel(): Unit = ()
override def request(n: Long): Unit = sub.onError(panicError)
})
}
for (pubMap publishersStack; pub pubMap.valuesIterator) {
pub.subscribe(new Subscriber[Any] {
override def onSubscribe(s: Subscription): Unit = s.cancel()
override def onComplete(): Unit = ()
override def onError(t: Throwable): Unit = ()
override def onNext(t: Any): Unit = ()
})
}
}
final def materialize(): Any = {
require(topLevel ne EmptyModule, "An empty module cannot be materialized (EmptyModule was given)")
require(
topLevel.isRunnable,
s"The top level module cannot be materialized because it has unconnected ports: ${(topLevel.inPorts ++ topLevel.outPorts).mkString(", ")}")
materializeModule(topLevel, topLevel.attributes)
try materializeModule(topLevel, topLevel.attributes)
catch {
case NonFatal(e)
panic(e)
throw e
}
}
protected def mergeAttributes(parent: OperationAttributes, current: OperationAttributes): OperationAttributes =

View file

@ -377,9 +377,11 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings
try upstream.onInternalError(AbruptTerminationException(self))
// Will only have an effect if the above call to the interpreter failed to emit a proper failure to the downstream
// otherwise this will have no effect
finally downstream.fail(AbruptTerminationException(self))
finally {
downstream.fail(AbruptTerminationException(self))
upstream.cancel()
}
}
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)

View file

@ -10,7 +10,7 @@ import scala.concurrent.Promise
import akka.actor._
import akka.util.ByteString
import akka.io.Tcp._
import akka.stream.{ StreamSubscriptionTimeoutSettings, ActorFlowMaterializerSettings, StreamTcpException }
import akka.stream.{ AbruptTerminationException, StreamSubscriptionTimeoutSettings, ActorFlowMaterializerSettings, StreamTcpException }
import org.reactivestreams.{ Publisher, Processor }
import akka.stream.impl._
@ -279,10 +279,11 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
override def postStop(): Unit = {
// Close if it has not yet been done
val abruptTermination = AbruptTerminationException(self)
tcpInputs.cancel()
tcpOutputs.complete()
tcpOutputs.error(abruptTermination)
primaryInputs.cancel()
primaryOutputs.complete()
primaryOutputs.error(abruptTermination)
subscriptionTimer.foreach(_.cancel())
super.postStop() // Remember, we have a Stash
}

View file

@ -160,6 +160,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
override def postStop(): Unit = {
unboundPromise.trySuccess(())
primaryOutputs.complete()
super.postStop()
}