=htp Fixes early error also being signalled as error via event

This commit is contained in:
Konrad Malawski 2015-11-19 17:19:45 +01:00
parent f2003cfe23
commit 5b3c04572f
10 changed files with 235 additions and 55 deletions

View file

@ -5,18 +5,19 @@
package akka.http.impl.engine.client
import java.net.InetSocketAddress
import akka.event.Logging
import scala.annotation.tailrec
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
import akka.actor._
import akka.stream.Materializer
import akka.stream.{ ActorAttributes, Materializer }
import akka.stream.actor.{ ActorPublisher, ActorSubscriber, ZeroRequestStrategy }
import akka.stream.actor.ActorPublisherMessage._
import akka.stream.actor.ActorSubscriberMessage._
import akka.stream.impl.FixedSizeBuffer
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.impl.{ SeqActorName, FixedSizeBuffer }
import akka.stream.scaladsl.{ Keep, Flow, Sink, Source }
import akka.http.HostConnectionPoolSetup
import akka.http.impl.util.SeqActorName
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import PoolFlow._
@ -54,14 +55,23 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup,
log.debug("(Re-)starting host connection pool to {}:{}", hcps.host, hcps.port)
{ // start the pool flow with this actor acting as source as well as sink
initConnectionFlow()
/** Start the pool flow with this actor acting as source as well as sink */
private def initConnectionFlow() = {
import context.system
import hcps._
import setup._
val connectionFlow =
if (httpsContext.isEmpty) Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log)
else Http().outgoingConnectionTls(host, port, None, settings.connectionSettings, httpsContext, setup.log)
val poolFlow = PoolFlow(connectionFlow, new InetSocketAddress(host, port), settings, setup.log)
val poolFlow = PoolFlow(
Flow[HttpRequest].viaMat(connectionFlow)(Keep.right),
new InetSocketAddress(host, port), settings, setup.log)
.named("PoolFlow")
Source(ActorPublisher(self)).via(poolFlow).runWith(Sink(ActorSubscriber[ResponseContext](self)))
}
@ -106,7 +116,7 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup,
// if we can't dispatch right now we buffer and dispatch when demand from the pool arrives
if (inputBuffer.isFull) {
x.responsePromise.failure(
new RuntimeException(s"Exceeded configured max-open-requests value of [${inputBuffer.size}]"))
new RuntimeException(s"Exceeded configured max-open-requests value of [${inputBuffer.size}]")) // TODO maybe named exception?
} else inputBuffer.enqueue(x)
} else dispatchRequest(x) // if we can dispatch right now, do it
request(1) // for every incoming request we demand one response from the pool

View file

@ -4,20 +4,22 @@
package akka.http.impl.engine.client
import language.existentials
import java.net.InetSocketAddress
import scala.concurrent.Future
import scala.util.{ Failure, Success }
import scala.collection.immutable
import akka.actor._
import akka.http.scaladsl.model.{ HttpEntity, HttpResponse, HttpRequest }
import akka.http.scaladsl.util.FastFuture
import akka.http.ConnectionPoolSettings
import akka.http.impl.util._
import akka.stream.impl.{ SubscribePending, ExposedPublisher, ActorProcessor }
import akka.stream.actor._
import akka.stream.scaladsl._
import akka.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse }
import akka.http.scaladsl.util.FastFuture
import akka.stream._
import akka.stream.actor._
import akka.stream.impl.{ ActorProcessor, ExposedPublisher, SeqActorName, SubscribePending }
import akka.stream.scaladsl._
import scala.collection.immutable
import scala.concurrent.Future
import scala.language.existentials
import scala.util.{ Failure, Success }
private object PoolSlot {
import PoolFlow.{ RequestContext, ResponseContext }
@ -56,10 +58,12 @@ private object PoolSlot {
FlowGraph.create() { implicit b
import FlowGraph.Implicits._
// TODO wouldn't be better to have them under a known parent? /user/SlotProcessor-0 seems weird
val name = slotProcessorActorName.next()
val slotProcessor = b.add {
Flow.fromProcessor { ()
val actor = system.actorOf(Props(new SlotProcessor(slotIx, connectionFlow, settings)).withDeploy(Deploy.local),
slotProcessorActorName.next())
name)
ActorProcessor[RequestContext, List[ProcessorOut]](actor)
}.mapConcat(identity)
}
@ -72,8 +76,8 @@ private object PoolSlot {
split.out(1).collect { case r: RawSlotEvent r }.outlet)
}
import ActorSubscriberMessage._
import ActorPublisherMessage._
import ActorSubscriberMessage._
/**
* An actor mananging a series of materializations of the given `connectionFlow`.
@ -87,15 +91,15 @@ private object PoolSlot {
private class SlotProcessor(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any],
settings: ConnectionPoolSettings)(implicit fm: Materializer)
extends ActorSubscriber with ActorPublisher[List[ProcessorOut]] with ActorLogging {
var exposedPublisher: akka.stream.impl.ActorPublisher[Any] = _
var inflightRequests = immutable.Queue.empty[RequestContext]
val runnableGraph = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self)).withDeploy(Deploy.local))
.via(connectionFlow)
.toMat(Sink.actorSubscriber[HttpResponse](Props(new FlowOutportActor(self)).withDeploy(Deploy.local)))(Keep.both)
.named("SlotProcessorInternalConnectionFlow")
def requestStrategy = ZeroRequestStrategy
def receive = waitingExposedPublisher
override def requestStrategy = ZeroRequestStrategy
override def receive = waitingExposedPublisher
def waitingExposedPublisher: Receive = {
case ExposedPublisher(publisher)
@ -107,20 +111,25 @@ private object PoolSlot {
def waitingForSubscribePending: Receive = {
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach (s self ! ActorPublisher.Internal.Subscribe(s))
log.debug("become unconnected, from subscriber pending")
context.become(unconnected)
}
val unconnected: Receive = {
case OnNext(rc: RequestContext)
case m @ OnNext(rc: RequestContext)
val (connInport, connOutport) = runnableGraph.run()
connOutport ! Request(totalDemand)
context.become(waitingForDemandFromConnection(connInport, connOutport, rc))
case Request(_) if (remainingRequested == 0) request(1) // ask for first request if necessary
case m @ Request(_) if (remainingRequested == 0) request(1) // ask for first request if necessary
case Cancel { cancel(); shutdown() }
case OnComplete onComplete()
case OnError(e) onError(e)
case m @ OnComplete onComplete()
case m @ OnError(e) onError(e)
case m @ Cancel
cancel()
shutdown()
case c @ FromConnection(msg) // ignore ...
}
def waitingForDemandFromConnection(connInport: ActorRef, connOutport: ActorRef,
@ -136,8 +145,8 @@ private object PoolSlot {
context.become(running(connInport, connOutport))
case FromConnection(Cancel) if (!isActive) { cancel(); shutdown() } // else ignore and wait for accompanying OnComplete or OnError
case FromConnection(OnComplete) handleDisconnect(None)
case FromConnection(OnError(e)) handleDisconnect(Some(e))
case FromConnection(OnComplete) handleDisconnect(sender(), None, Some(firstRequest))
case FromConnection(OnError(e)) handleDisconnect(sender(), Some(e), Some(firstRequest))
case FromConnection(OnNext(x)) throw new IllegalStateException("Unexpected HttpResponse: " + x)
}
@ -171,18 +180,29 @@ private object PoolSlot {
val requestCompleted = SlotEvent.RequestCompletedFuture(whenCompleted.map(_ SlotEvent.RequestCompleted(slotIx)))
onNext(delivery :: requestCompleted :: Nil)
case FromConnection(OnComplete) handleDisconnect(None)
case FromConnection(OnError(e)) handleDisconnect(Some(e))
case FromConnection(OnComplete) handleDisconnect(sender(), None)
case FromConnection(OnError(e)) handleDisconnect(sender(), Some(e))
}
def handleDisconnect(error: Option[Throwable]): Unit = {
def handleDisconnect(connInport: ActorRef, error: Option[Throwable], firstContext: Option[RequestContext] = None): Unit = {
log.debug("Slot {} disconnected after {}", slotIx, error getOrElse "regular connection close")
val results: List[ProcessorOut] = inflightRequests.map { rc
if (rc.retriesLeft == 0) {
val reason = error.fold[Throwable](new RuntimeException("Unexpected disconnect"))(identityFunc)
ResponseDelivery(ResponseContext(rc, Failure(reason)))
} else SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1))
}(collection.breakOut)
val results: List[ProcessorOut] = {
if (inflightRequests.isEmpty && firstContext.isDefined) {
(error match {
case Some(err) ResponseDelivery(ResponseContext(firstContext.get, Failure(new RuntimeException("Unexpected (early) disconnect", err))))
case _ ResponseDelivery(ResponseContext(firstContext.get, Failure(new RuntimeException("Unexpected (early) disconnect"))))
}) :: Nil
} else {
inflightRequests.map { rc
if (rc.retriesLeft == 0) {
val reason = error.fold[Throwable](new RuntimeException("Unexpected disconnect"))(identityFunc)
connInport ! ActorPublisherMessage.Cancel
ResponseDelivery(ResponseContext(rc, Failure(reason)))
} else SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1))
}(collection.breakOut)
}
}
inflightRequests = immutable.Queue.empty
onNext(SlotEvent.Disconnected(slotIx, results.size) :: results)
if (canceled) onComplete()
@ -207,23 +227,27 @@ private object PoolSlot {
private case class FromConnection(ev: Any) extends NoSerializationVerificationNeeded
private class FlowInportActor(slotProcessor: ActorRef) extends ActorPublisher[HttpRequest] {
private class FlowInportActor(slotProcessor: ActorRef) extends ActorPublisher[HttpRequest] with ActorLogging {
def receive: Receive = {
case ev: Request slotProcessor ! FromConnection(ev)
case Cancel { slotProcessor ! FromConnection(Cancel); context.stop(self) }
case OnNext(r: HttpRequest) onNext(r)
case OnComplete { onComplete(); context.stop(self) }
case OnError(e) { onError(e); context.stop(self) }
case OnComplete onCompleteThenStop()
case OnError(e) onErrorThenStop(e)
case Cancel
slotProcessor ! FromConnection(Cancel)
context.stop(self)
}
}
private class FlowOutportActor(slotProcessor: ActorRef) extends ActorSubscriber {
private class FlowOutportActor(slotProcessor: ActorRef) extends ActorSubscriber with ActorLogging {
def requestStrategy = ZeroRequestStrategy
def receive: Receive = {
case Request(n) request(n)
case Cancel cancel()
case ev: OnNext slotProcessor ! FromConnection(ev)
case ev @ (OnComplete | OnError(_)) { slotProcessor ! FromConnection(ev); context.stop(self) }
case Request(n) request(n)
case Cancel cancel()
case ev: OnNext slotProcessor ! FromConnection(ev)
case ev @ (OnComplete | OnError(_))
slotProcessor ! FromConnection(ev)
context.stop(self)
}
}
}

View file

@ -177,11 +177,6 @@ package util {
def receive = { case x log.warning(x.toString) }
}
// Provisioning of actor names composed of a common prefix + a counter. According to #16613 not in scope as public API.
private[http] final class SeqActorName(prefix: String) extends AtomicInteger {
def next(): String = prefix + '-' + getAndIncrement()
}
private[http] trait LogMessages extends ActorLogging { this: Actor
def logMessages(mark: String = "")(r: Receive): Receive =
new Receive {

View file

@ -5,6 +5,8 @@
package akka.http.scaladsl
import java.io.{ FileOutputStream, File }
import java.net.InetSocketAddress
import java.nio.channels.ServerSocketChannel
object TestUtils {
def writeAllText(text: String, file: File): Unit = {
@ -13,4 +15,20 @@ object TestUtils {
fos.write(text.getBytes("UTF-8"))
} finally fos.close()
}
// TODO duplicated code from akka-http-core-tests
def temporaryServerAddress(interface: String = "127.0.0.1"): InetSocketAddress = {
val serverSocket = ServerSocketChannel.open()
try {
serverSocket.socket.bind(new InetSocketAddress(interface, 0))
val port = serverSocket.socket.getLocalPort
new InetSocketAddress(interface, port)
} finally serverSocket.close()
}
// TODO duplicated code from akka-http-core-tests
def temporaryServerHostnameAndPort(interface: String = "127.0.0.1"): (InetSocketAddress, String, Int) = {
val socketAddress = temporaryServerAddress(interface)
(socketAddress, socketAddress.getHostName, socketAddress.getPort)
}
}

View file

@ -0,0 +1,102 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.scaladsl.server
import java.net.InetSocketAddress
import java.nio.channels.ServerSocketChannel
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.{ TestUtils, Http }
import akka.http.scaladsl.model.{ HttpResponse, Uri, HttpRequest }
import akka.stream.impl.{ StreamSupervisor, ActorMaterializerImpl }
import akka.stream.{ ActorMaterializer, Materializer, OverflowStrategy }
import akka.stream.scaladsl.{ Sink, Source }
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, BeforeAndAfterAll, WordSpecLike }
import sun.security.provider.ConfigFile
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try }
class DontLeakActorsOnFailingConnectionSpecs extends WordSpecLike with Matchers with BeforeAndAfterAll {
val config = ConfigFactory.parseString("""
akka {
# disable logs (very noisy tests - 100 exepected errors)
loglevel = OFF
stdout-loglevel = OFF
}""").withFallback(ConfigFactory.load())
implicit val system = ActorSystem("DontLeakActorsOnFailingConnectionSpecs", config)
import system.dispatcher
implicit val mat = ActorMaterializer()
val log = Logging(system, getClass)
// TODO DUPLICATED
def assertAllStagesStopped[T](name: String)(block: T)(implicit materializer: Materializer): T =
materializer match {
case impl: ActorMaterializerImpl
val probe = TestProbe()(impl.system)
probe.send(impl.supervisor, StreamSupervisor.StopChildren)
probe.expectMsg(StreamSupervisor.StoppedChildren)
val result = block
probe.within(5.seconds) {
probe.awaitAssert {
impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref)
val children = probe.expectMsgType[StreamSupervisor.Children].children.filter { c
c.path.toString contains name
}
assert(children.isEmpty,
s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]")
}
}
result
case _ block
}
"Http.superPool" should {
"not leak connection Actors when hitting non-existing endpoint" in {
assertAllStagesStopped("InternalConnectionFlow") {
val reqsCount = 100
val clientFlow = Http().superPool[Int]()
val (_, _, port) = TestUtils.temporaryServerHostnameAndPort()
val source = Source(1 to reqsCount).map(i HttpRequest(uri = Uri(s"http://127.0.0.1:$port/test/$i")) -> i)
val countDown = new CountDownLatch(reqsCount)
val sink = Sink.foreach[(Try[HttpResponse], Int)] {
case (resp, id) handleResponse(resp, id)
}
val resps = source.via(clientFlow).runWith(sink)
resps.onComplete({ case _ countDown.countDown() })
countDown.await(10, TimeUnit.SECONDS)
}
}
}
private def handleResponse(httpResp: Try[HttpResponse], id: Int): Unit = {
httpResp match {
case Success(httpRes)
log.warning(s"$id: OK: (${httpRes.status.intValue}")
httpRes.entity.dataBytes.runWith(Sink.ignore)
case Failure(ex)
log.warning(s"$id: FAIL: $ex")
}
}
override def afterAll = {
system.shutdown()
system.awaitTermination(3.seconds)
}
}

View file

@ -4,8 +4,15 @@
package akka.http.scaladsl.server
import java.io.File
import akka.http.scaladsl.model.Multipart
import akka.stream.scaladsl.{ Source, Sink }
import akka.util.ByteString
import directives._
import scala.util.Try
trait Directives extends RouteConcatenation
with BasicDirectives
with CacheConditionDirectives

View file

@ -5,7 +5,7 @@ package akka.stream
import java.util.Locale
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean }
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props }
import akka.stream.impl._
@ -57,7 +57,7 @@ object ActorMaterializer {
system,
materializerSettings,
system.dispatchers,
context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher)),
context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()),
haveShutDown,
FlowNameCounter(system).counter,
namePrefix)

View file

@ -44,7 +44,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
override def withNamePrefix(name: String): Materializer = this.copy(namePrefix = name)
private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet()
private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() // TODO use SeqActorName instead
private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
@ -215,6 +215,9 @@ private[akka] object StreamSupervisor {
def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props =
Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local)
private val actorName = new SeqActorName("StreamSupervisor")
def nextName(): String = actorName.next()
final case class Materialize(props: Props, name: String)
extends DeadLetterSuppression with NoSerializationVerificationNeeded

View file

@ -3,6 +3,8 @@
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import akka.stream._
import akka.stream.impl.AcknowledgePublisher.{ Ok, Rejected }
@ -96,7 +98,8 @@ private[akka] final class ActorPublisherSource[Out](props: Props, val attributes
(akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef)
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = new ActorPublisherSource[Out](props, attributes, shape)
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] =
new ActorPublisherSource[Out](props, attributes, shape)
override def withAttributes(attr: Attributes): Module = new ActorPublisherSource(props, attr, amendShape(attr))
}

View file

@ -0,0 +1,18 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicLong
/**
* INTERNAL API
* As discussed in https://github.com/akka/akka/issues/16613
*
* Generator of sequentially numbered actor names.
* Pulled out from HTTP internals, most often used used by streams which materialize actors directly
*/
private[akka] final class SeqActorName(prefix: String) extends AtomicLong {
def next(): String = prefix + '-' + getAndIncrement()
}