=str #16597 initial steps with idleTimeout

This commit is contained in:
Konrad Malawski 2015-10-23 06:41:02 -07:00
parent 99a9c5964e
commit 2c2228c241
12 changed files with 124 additions and 72 deletions

View file

@ -7,31 +7,26 @@ package akka.http.impl.engine.server
import java.net.InetSocketAddress
import java.util.Random
import akka.http.ServerSettings
import akka.http.scaladsl.model.ws.Message
import akka.stream.io._
import org.reactivestreams.{ Subscriber, Publisher }
import scala.util.control.NonFatal
import akka.util.ByteString
import akka.actor.{ ActorRef, Deploy, Props }
import akka.event.LoggingAdapter
import akka.actor.{ Deploy, ActorRef, Props }
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.PushPullStage
import akka.http.impl.engine.parsing._
import akka.http.impl.engine.rendering.{ ResponseRenderingOutput, ResponseRenderingContext, HttpResponseRendererFactory }
import akka.http.ServerSettings
import akka.http.impl.engine.TokenSourceActor
import akka.http.impl.engine.parsing.ParserOutput._
import akka.http.impl.engine.parsing._
import akka.http.impl.engine.rendering.{ HttpResponseRendererFactory, ResponseRenderingContext, ResponseRenderingOutput }
import akka.http.impl.engine.ws.Websocket.SwitchToWebsocketToken
import akka.http.impl.engine.ws._
import akka.http.impl.util._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.impl.util._
import akka.http.impl.engine.ws._
import Websocket.SwitchToWebsocketToken
import ParserOutput._
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.OutHandler
import akka.stream.stage.InHandler
import akka.http.impl.engine.rendering.ResponseRenderingContext
import akka.stream._
import akka.stream.io._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.ByteString
import org.reactivestreams.{ Publisher, Subscriber }
import scala.util.control.NonFatal
/**
* INTERNAL API

View file

@ -4,18 +4,19 @@
package akka.http.impl.util
import java.io.InputStream
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.{ SourceModule, SinkModule, PublisherSink }
import org.reactivestreams.{ Subscription, Processor, Subscriber, Publisher }
import scala.collection.immutable
import scala.concurrent.{ Promise, ExecutionContext, Future }
import akka.util.ByteString
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import akka.http.scaladsl.model.RequestEntity
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.{ PublisherSink, SinkModule, SourceModule }
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.ByteString
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future, Promise }
/**
* INTERNAL API
@ -305,7 +306,7 @@ private[http] object StreamUtils {
}
/**
* Similar to Source.lazyEmpty but doesn't rely on materialization. Can only be used once.
* Similar to Source.maybe but doesn't rely on materialization. Can only be used once.
*/
trait OneTimeValve {
def source[T]: Source[T, Unit]

View file

@ -5,9 +5,8 @@
package akka.http.scaladsl
import java.net.InetSocketAddress
import java.security.SecureRandom
import java.util.concurrent.ConcurrentHashMap
import java.util.{ Collection JCollection, Random }
import java.util.{ Collection JCollection }
import javax.net.ssl.{ SSLContext, SSLParameters }
import akka.actor._
@ -15,8 +14,8 @@ import akka.event.LoggingAdapter
import akka.http._
import akka.http.impl.engine.client._
import akka.http.impl.engine.server._
import akka.http.impl.util.{ ReadTheDocumentationException, Java6Compat, StreamUtils }
import akka.http.impl.engine.ws.WebsocketClientBlueprint
import akka.http.impl.util.{ Java6Compat, ReadTheDocumentationException, StreamUtils }
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Host
import akka.http.scaladsl.model.ws.{ WebsocketUpgradeResponse, WebsocketRequest, Message }
@ -25,7 +24,6 @@ import akka.japi
import akka.stream.Materializer
import akka.stream.io._
import akka.stream.scaladsl._
import akka.util.ByteString
import com.typesafe.config.Config
import scala.collection.immutable

View file

@ -10,6 +10,7 @@ import akka.http.impl.engine.ws.ByteStringSinkProbe
import akka.stream.io.{ SendBytes, SslTlsOutbound, SessionBytes }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.event.NoLogging
@ -32,7 +33,8 @@ abstract class HttpServerTestSetupBase {
val requests = TestSubscriber.probe[HttpRequest]
val responses = TestPublisher.probe[HttpResponse]()
def settings = ServerSettings(system).copy(serverHeader = Some(Server(List(ProductVersion("akka-http", "test")))))
def settings = ServerSettings(system)
.copy(serverHeader = Some(Server(List(ProductVersion("akka-http", "test")))))
def remoteAddress: Option[InetSocketAddress] = None
val (netIn, netOut) = {
@ -68,6 +70,8 @@ abstract class HttpServerTestSetupBase {
def expectRequest: HttpRequest = requests.requestNext()
def expectNoRequest(max: FiniteDuration): Unit = requests.expectNoMsg(max)
def expectSubscribe(): Unit = netOut.expectComplete()
def expectSubscribeAndNetworkClose(): Unit = netOut.expectSubscriptionAndComplete()
def expectNetworkClose(): Unit = netOut.expectComplete()
def send(data: ByteString): Unit = netIn.sendNext(data)

View file

@ -5,7 +5,7 @@
package akka.http.impl.engine.ws
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Source, Sink }
import akka.stream.scaladsl.Sink
import akka.stream.testkit.TestSubscriber
import akka.util.ByteString
@ -23,6 +23,7 @@ trait ByteStringSinkProbe {
def expectNoBytes(): Unit
def expectNoBytes(timeout: FiniteDuration): Unit
def expectSubscriptionAndComplete(): Unit
def expectComplete(): Unit
def expectError(): Throwable
def expectError(cause: Throwable): Unit
@ -62,6 +63,7 @@ object ByteStringSinkProbe {
def expectUtf8EncodedString(string: String): Unit =
expectBytes(ByteString(string, "utf8"))
def expectSubscriptionAndComplete(): Unit = probe.expectSubscriptionAndComplete()
def expectComplete(): Unit = probe.expectComplete()
def expectError(): Throwable = probe.expectError()
def expectError(cause: Throwable): Unit = probe.expectError(cause)

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.scaladsl
@ -8,6 +8,7 @@ import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStream
import java.net.Socket
import akka.http.scaladsl.Http.ServerBinding
import akka.http.{ ClientConnectionSettings, ServerSettings }
import akka.util.ByteString
import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.tailrec
import scala.concurrent.{ Promise, Future, Await }
@ -24,7 +25,7 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.http.impl.util._
import scala.util.Success
import scala.util.{ Failure, Try, Success }
class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString("""
@ -138,6 +139,43 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
Await.result(b1.unbind(), 1.second)
}
"close connection with idle client after idleTimeout" in {
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
val s = ServerSettings(system)
val theIdleTimeout = 300.millis
val settings = s.copy(timeouts = s.timeouts.copy(idleTimeout = theIdleTimeout))
val receivedRequest = Promise[Long]()
def handle(req: HttpRequest): Future[HttpResponse] = {
receivedRequest.complete(Success(System.nanoTime()))
Promise().future // never complete the request with a response; 're waiting for the timeout to happen, nothing else
}
val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = settings)
val b1 = Await.result(binding, 3.seconds)
def runIdleRequest(uri: Uri): Future[HttpResponse] = {
val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString])
Http().outgoingConnection(hostname, port)
.runWith(Source.single(HttpRequest(PUT, uri, entity = itNeverEnds)), Sink.head)
._2
}
val clientsResponseFuture = runIdleRequest("/")
// await for the server to get the request
val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds)
// waiting for the timeout to happen on the client
Try(Await.result(clientsResponseFuture, 2.second)).recoverWith {
case _: StreamTcpException Success(System.nanoTime())
case other: Throwable Failure(other)
}.get
val diff = System.nanoTime() - serverReceivedRequestAtNanos
diff should be > theIdleTimeout.toNanos
}
"log materialization errors in `bindAndHandle`" which {
"are triggered in `transform`" in Utils.assertAllStagesStopped {

View file

@ -6,6 +6,7 @@ package akka.http.scaladsl.server
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport
import akka.http.scaladsl.server.directives.Credentials
import akka.stream.scaladsl._
import com.typesafe.config.{ ConfigFactory, Config }
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

View file

@ -74,18 +74,18 @@ private[akka] class StreamTcpManager extends Actor {
}
def receive: Receive = {
case Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, halfClose, options, connectTimeout, _)
case Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, halfClose, options, connectTimeout, idleTimeout)
val connTimeout = connectTimeout match {
case x: FiniteDuration Some(x)
case _ None
}
val processorActor = context.actorOf(TcpStreamActor.outboundProps(processorPromise, localAddressPromise, halfClose,
val processorActor = context.actorOf(TcpStreamActor.outboundProps(processorPromise, localAddressPromise, halfClose, idleTimeout,
Tcp.Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true),
materializerSettings = ActorMaterializerSettings(context.system)), name = encName("client", remoteAddress))
processorActor ! ExposedProcessor(ActorProcessor[ByteString, ByteString](processorActor))
case Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, halfClose, options, _)
val props = TcpListenStreamActor.props(localAddressPromise, unbindPromise, flowSubscriber, halfClose,
case Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, halfClose, options, idleTimeout)
val props = TcpListenStreamActor.props(localAddressPromise, unbindPromise, flowSubscriber, halfClose, idleTimeout,
Tcp.Bind(context.system.deadLetters, endpoint, backlog, options, pullMode = true),
ActorMaterializerSettings(context.system))
.withDispatcher(context.props.dispatcher)

View file

@ -13,6 +13,7 @@ import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings, Stre
import org.reactivestreams.Processor
import akka.stream.impl._
import scala.concurrent.duration.Duration
import scala.util.control.NoStackTrace
/**
@ -24,9 +25,10 @@ private[akka] object TcpStreamActor {
def outboundProps(processorPromise: Promise[Processor[ByteString, ByteString]],
localAddressPromise: Promise[InetSocketAddress],
halfClose: Boolean,
idleTimeout: Duration,
connectCmd: Connect,
materializerSettings: ActorMaterializerSettings): Props =
Props(new OutboundTcpStreamActor(processorPromise, localAddressPromise, halfClose, connectCmd,
Props(new OutboundTcpStreamActor(processorPromise, localAddressPromise, halfClose, idleTimeout, connectCmd,
materializerSettings)).withDispatcher(materializerSettings.dispatcher).withDeploy(Deploy.local)
def inboundProps(connection: ActorRef, halfClose: Boolean, settings: ActorMaterializerSettings): Props =
@ -302,6 +304,7 @@ private[akka] class InboundTcpStreamActor(
private[akka] class OutboundTcpStreamActor(processorPromise: Promise[Processor[ByteString, ByteString]],
localAddressPromise: Promise[InetSocketAddress],
_halfClose: Boolean,
idleTimeout: Duration,
val connectCmd: Connect, _settings: ActorMaterializerSettings)
extends TcpStreamActor(_settings, _halfClose) {
import context.system

View file

@ -4,18 +4,19 @@
package akka.stream.impl.io
import java.net.InetSocketAddress
import scala.concurrent.{ Future, Promise }
import akka.actor._
import akka.io.{ IO, Tcp }
import akka.io.Tcp._
import akka.stream.{ Materializer, ActorMaterializerSettings }
import akka.io.{ IO, Tcp }
import akka.stream.impl._
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.{ Tcp StreamTcp }
import akka.stream.io.Timeouts
import akka.stream.scaladsl.{ Flow, Tcp StreamTcp }
import akka.stream.{ ActorMaterializerSettings, BindFailedException, ConnectionException }
import akka.util.ByteString
import org.reactivestreams.Subscriber
import akka.stream.ConnectionException
import akka.stream.BindFailedException
import scala.concurrent.duration.Duration
import scala.concurrent.{ Future, Promise }
/**
* INTERNAL API
@ -25,8 +26,9 @@ private[akka] object TcpListenStreamActor {
unbindPromise: Promise[() Future[Unit]],
flowSubscriber: Subscriber[StreamTcp.IncomingConnection],
halfClose: Boolean,
idleTimeout: Duration,
bindCmd: Tcp.Bind, materializerSettings: ActorMaterializerSettings): Props = {
Props(new TcpListenStreamActor(localAddressPromise, unbindPromise, flowSubscriber, halfClose, bindCmd, materializerSettings))
Props(new TcpListenStreamActor(localAddressPromise, unbindPromise, flowSubscriber, halfClose, bindCmd, idleTimeout, materializerSettings))
.withDeploy(Deploy.local)
}
}
@ -38,7 +40,9 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
unbindPromise: Promise[() Future[Unit]],
flowSubscriber: Subscriber[StreamTcp.IncomingConnection],
halfClose: Boolean,
bindCmd: Tcp.Bind, settings: ActorMaterializerSettings) extends Actor
bindCmd: Tcp.Bind,
idleTimeout: Duration,
settings: ActorMaterializerSettings) extends Actor
with Pump with ActorLogging {
import ReactiveStreamsCompliance._
import context.system
@ -151,10 +155,17 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
val (connected: Connected, connection: ActorRef) = incomingConnections.dequeueInputElement()
val tcpStreamActor = context.watch(context.actorOf(TcpStreamActor.inboundProps(connection, halfClose, settings)))
val processor = ActorProcessor[ByteString, ByteString](tcpStreamActor)
import scala.concurrent.duration._
val handler = (idleTimeout match {
case d: FiniteDuration Flow[ByteString].join(Timeouts.idleTimeoutBidi[ByteString, ByteString](d))
case _ Flow[ByteString]
}).andThenMat(() (processor, ()))
val conn = StreamTcp.IncomingConnection(
connected.localAddress,
connected.remoteAddress,
Flow[ByteString].andThenMat(() (processor, ())))
handler)
primaryOutputs.enqueueOutputElement(conn)
}

View file

@ -2,11 +2,10 @@ package akka.stream.io
import java.util.concurrent.{ TimeUnit, TimeoutException }
import akka.actor.{ Cancellable, ActorSystem }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.{ FlowShape, Outlet, Inlet, BidiShape }
import akka.stream.scaladsl.{ BidiFlow, Flow }
import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage }
import akka.stream.stage._
import akka.stream.{ BidiShape, Inlet, Outlet }
import scala.concurrent.duration.{ Deadline, FiniteDuration }

View file

@ -3,26 +3,23 @@
*/
package akka.stream.scaladsl
import java.net.{ InetSocketAddress, URLEncoder }
import akka.stream.impl.StreamLayout.Module
import scala.collection.immutable
import scala.concurrent.{ Promise, ExecutionContext, Future }
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success }
import scala.util.control.NoStackTrace
import java.net.InetSocketAddress
import akka.actor._
import akka.io.Inet.SocketOption
import akka.io.{ Tcp IoTcp }
import akka.stream._
import akka.stream.impl._
import akka.stream.impl.ReactiveStreamsCompliance._
import akka.stream.scaladsl._
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._
import akka.stream.impl.io.{ DelayedInitProcessor, StreamTcpManager }
import akka.stream.io.Timeouts
import akka.util.ByteString
import org.reactivestreams.{ Publisher, Processor, Subscriber, Subscription }
import akka.stream.impl.io.TcpStreamActor
import akka.stream.impl.io.TcpListenStreamActor
import akka.stream.impl.io.DelayedInitProcessor
import akka.stream.impl.io.StreamTcpManager
import org.reactivestreams.{ Processor, Publisher, Subscriber }
import scala.collection.immutable
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.{ Future, Promise }
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
@ -206,7 +203,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
connectTimeout: Duration = Duration.Inf,
idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = {
val remoteAddr = remoteAddress
val timeoutHandling = idleTimeout match {
case d: FiniteDuration Flow[ByteString].join(Timeouts.idleTimeoutBidi[ByteString, ByteString](d))
case _ Flow[ByteString]
}
Flow[ByteString].andThenMat(() {
val processorPromise = Promise[Processor[ByteString, ByteString]]()
@ -216,7 +216,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
import system.dispatcher
val outgoingConnection = localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))
(new DelayedInitProcessor[ByteString, ByteString](processorPromise.future), outgoingConnection)
})
}).via(timeoutHandling)
}