+str,htc #16071, #16072 New Stream Tcp and Http API

* StreamTcp and Http extensions now return Flows and Sources that can be materialized later
* Flow can now be completed with another flow to be turned into a runnable flow
This commit is contained in:
Björn Antonsson 2014-11-28 10:41:57 +01:00
parent cac9137aa9
commit 672d4ed091
43 changed files with 1327 additions and 1236 deletions

View file

@ -15,12 +15,4 @@ public final class Http {
public static HttpExt get(ActorSystem system) {
return (HttpExt) akka.http.Http.get(system);
}
/** Create a Bind message to send to the Http Manager */
public static Object bind(String host, int port) {
return Accessors$.MODULE$.Bind(host, port);
}
/** Create a Bind message to send to the Http Manager */
public static Object bind(String host, int port, MaterializerSettings materializerSettings) {
return Accessors$.MODULE$.Bind(host, port, materializerSettings);
}
}

View file

@ -6,150 +6,107 @@ package akka.http
import java.io.Closeable
import java.net.InetSocketAddress
import com.typesafe.config.Config
import org.reactivestreams.{ Publisher, Subscriber }
import akka.stream.io.StreamTcp
import akka.stream.scaladsl._
import scala.collection.immutable
import akka.io.Inet
import akka.stream.MaterializerSettings
import akka.http.engine.client.ClientConnectionSettings
import akka.http.engine.server.ServerSettings
import akka.http.model.{ ErrorInfo, HttpResponse, HttpRequest, japi }
import akka.http.util._
import akka.http.engine.client.{ HttpClientPipeline, ClientConnectionSettings }
import akka.http.engine.server.{ HttpServerPipeline, ServerSettings }
import akka.http.model.{ ErrorInfo, HttpResponse, HttpRequest }
import akka.actor._
object Http extends ExtensionKey[HttpExt] {
import scala.concurrent.Future
object Http extends ExtensionKey[HttpExt] with ExtensionIdProvider {
/**
* Command that can be sent to `IO(Http)` to trigger the setup of an HTTP client facility at
* a certain API level (connection, host or request).
* The HTTP layer will respond with an `Http.OutgoingChannel` reply (or `Status.Failure`).
* The sender `ActorRef`of this response can then be sent `HttpRequest` instances to which
* it will respond with `HttpResponse` instances (or `Status.Failure`).
* A flow representing an outgoing HTTP connection, and the key used to get information about
* the materialized connection. The flow takes pairs of a ``HttpRequest`` and a user definable
* context that will be correlated with the corresponding ``HttpResponse``.
*/
sealed trait SetupOutgoingChannel
final case class Connect(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress],
options: immutable.Traversable[Inet.SocketOption],
settings: Option[ClientConnectionSettings],
materializerSettings: Option[MaterializerSettings]) extends SetupOutgoingChannel
object Connect {
def apply(host: String, port: Int = 80,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ClientConnectionSettings] = None,
materializerSettings: Option[MaterializerSettings] = None): Connect =
apply(new InetSocketAddress(host, port), localAddress, options, settings, materializerSettings)
}
// PREVIEW OF COMING API HERE:
//
// case class SetupHostConnector(host: String, port: Int = 80,
// options: immutable.Traversable[Inet.SocketOption] = Nil,
// settings: Option[HostConnectorSettings] = None,
// connectionType: ClientConnectionType = ClientConnectionType.AutoProxied,
// defaultHeaders: immutable.Seq[HttpHeader] = Nil) extends SetupOutgoingChannel {
// private[http] def normalized(implicit refFactory: ActorRefFactory) =
// if (settings.isDefined) this
// else copy(settings = Some(HostConnectorSettings(actorSystem)))
// }
// object SetupHostConnector {
// def apply(host: String, port: Int, sslEncryption: Boolean)(implicit refFactory: ActorRefFactory): SetupHostConnector =
// apply(host, port, sslEncryption).normalized
// }
// sealed trait ClientConnectionType
// object ClientConnectionType {
// object Direct extends ClientConnectionType
// object AutoProxied extends ClientConnectionType
// final case class Proxied(proxyHost: String, proxyPort: Int) extends ClientConnectionType
// }
//
// case object SetupRequestChannel extends SetupOutgoingChannel
final case class OutgoingFlow(flow: Flow[(HttpRequest, Any), (HttpResponse, Any)],
key: Key { type MaterializedType = Future[Http.OutgoingConnection] })
/**
* An `OutgoingHttpChannel` with a single outgoing HTTP connection as the underlying transport.
* The materialized result of an outgoing HTTP connection stream with a single connection as the underlying transport.
*/
// FIXME: hook up with new style IO
final case class OutgoingConnection(remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
responsePublisher: Publisher[(HttpResponse, Any)],
requestSubscriber: Subscriber[(HttpRequest, Any)]) {
}
localAddress: InetSocketAddress)
// PREVIEW OF COMING API HERE:
//
// /**
// * An `OutgoingHttpChannel` with a connection pool to a specific host/port as the underlying transport.
// */
// final case class HostChannel(host: String, port: Int,
// untypedProcessor: HttpClientProcessor[Any]) extends OutgoingChannel {
// def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]]
// }
//
// /**
// * A general `OutgoingHttpChannel` with connection pools to all possible host/port combinations
// * as the underlying transport.
// */
// final case class RequestChannel(untypedProcessor: HttpClientProcessor[Any]) extends OutgoingChannel {
// def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]]
// }
/**
* A source representing an bound HTTP server socket, and the key to get information about
* the materialized bound socket.
*/
final case class ServerSource(source: Source[IncomingConnection],
key: Key { type MaterializedType = Future[ServerBinding] })
final case class Bind(endpoint: InetSocketAddress,
backlog: Int,
options: immutable.Traversable[Inet.SocketOption],
serverSettings: Option[ServerSettings],
materializerSettings: Option[MaterializerSettings])
object Bind {
def apply(interface: String, port: Int = 80, backlog: Int = 100,
options: immutable.Traversable[Inet.SocketOption] = Nil,
serverSettings: Option[ServerSettings] = None,
materializerSettings: Option[MaterializerSettings] = None): Bind =
apply(new InetSocketAddress(interface, port), backlog, options, serverSettings, materializerSettings)
}
/**
* An incoming HTTP connection.
*/
final case class IncomingConnection(remoteAddress: InetSocketAddress, stream: Flow[HttpResponse, HttpRequest])
sealed abstract case class ServerBinding(localAddress: InetSocketAddress,
connectionStream: Publisher[IncomingConnection]) extends model.japi.ServerBinding with Closeable {
/** Java API */
def getConnectionStream: Publisher[japi.IncomingConnection] = connectionStream.asInstanceOf[Publisher[japi.IncomingConnection]]
}
class StreamException(val info: ErrorInfo) extends RuntimeException(info.summary)
object ServerBinding {
def apply(localAddress: InetSocketAddress, connectionStream: Publisher[IncomingConnection]): ServerBinding =
new ServerBinding(localAddress, connectionStream) {
override def close() = ()
}
/**
* The materialized result of a bound HTTP server socket.
*/
private[akka] sealed abstract case class ServerBinding(localAddress: InetSocketAddress) extends Closeable
def apply(localAddress: InetSocketAddress, connectionStream: Publisher[IncomingConnection], closeable: Closeable): ServerBinding =
new ServerBinding(localAddress, connectionStream) {
/**
* INTERNAL API
*/
private[akka] object ServerBinding {
def apply(localAddress: InetSocketAddress, closeable: Closeable): ServerBinding =
new ServerBinding(localAddress) {
override def close() = closeable.close()
}
}
final case class IncomingConnection(remoteAddress: InetSocketAddress,
requestPublisher: Publisher[HttpRequest],
responseSubscriber: Subscriber[HttpResponse]) extends model.japi.IncomingConnection {
/** Java API */
def getRequestPublisher: Publisher[japi.HttpRequest] = requestPublisher.asInstanceOf[Publisher[japi.HttpRequest]]
/** Java API */
def getResponseSubscriber: Subscriber[japi.HttpResponse] = responseSubscriber.asInstanceOf[Subscriber[japi.HttpResponse]]
}
case object BindFailedException extends SingletonException
class ConnectionException(message: String) extends RuntimeException(message)
class ConnectionAttemptFailedException(val endpoint: InetSocketAddress) extends ConnectionException(s"Connection attempt to $endpoint failed")
class RequestTimeoutException(val request: HttpRequest, message: String) extends ConnectionException(message)
class StreamException(val info: ErrorInfo) extends RuntimeException(info.summary)
}
class HttpExt(system: ExtendedActorSystem) extends akka.io.IO.Extension {
val Settings = new Settings(system.settings.config getConfig "akka.http")
class Settings private[HttpExt] (config: Config) {
val ManagerDispatcher = config getString "manager-dispatcher"
class HttpExt(system: ExtendedActorSystem) extends Extension {
@volatile private[this] var clientPipelines = Map.empty[ClientConnectionSettings, HttpClientPipeline]
def connect(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress],
options: immutable.Traversable[Inet.SocketOption],
settings: Option[ClientConnectionSettings]): Http.OutgoingFlow = {
// FIXME #16378 Where to do logging? log.debug("Attempting connection to {}", remoteAddress)
val effectiveSettings = ClientConnectionSettings(settings)(system)
val tcpFlow = StreamTcp(system).connect(remoteAddress, localAddress, options, effectiveSettings.connectingTimeout)
val pipeline = clientPipelines.getOrElse(effectiveSettings, {
val pl = new HttpClientPipeline(effectiveSettings, system.log)(system.dispatcher)
clientPipelines = clientPipelines.updated(effectiveSettings, pl)
pl
})
pipeline(tcpFlow, remoteAddress)
}
val manager = system.actorOf(props = HttpManager.props(Settings), name = "IO-HTTP")
}
def connect(host: String, port: Int = 80,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ClientConnectionSettings] = None): Http.OutgoingFlow =
connect(new InetSocketAddress(host, port), localAddress, options, settings)
def bind(endpoint: InetSocketAddress,
backlog: Int,
options: immutable.Traversable[Inet.SocketOption],
serverSettings: Option[ServerSettings]): Http.ServerSource = {
import system.dispatcher
// FIXME IdleTimeout?
val src = StreamTcp(system).bind(endpoint, backlog, options)
val key = new Key {
override type MaterializedType = Future[Http.ServerBinding]
override def materialize(map: MaterializedMap) = map.get(src).map(s Http.ServerBinding(s.localAddress, s))
}
val log = system.log
val effectiveSettings = ServerSettings(serverSettings)(system)
Http.ServerSource(src.withKey(key).map(new HttpServerPipeline(effectiveSettings, log)), key)
}
def bind(interface: String, port: Int = 80, backlog: Int = 100,
options: immutable.Traversable[Inet.SocketOption] = Nil,
serverSettings: Option[ServerSettings] = None): Http.ServerSource =
bind(new InetSocketAddress(interface, port), backlog, options, serverSettings)
}

View file

@ -1,84 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http
import scala.util.{ Failure, Success }
import scala.concurrent.duration._
import akka.actor._
import akka.http.engine.client._
import akka.http.engine.server.{ HttpServerPipeline, ServerSettings }
import akka.io.IO
import akka.pattern.ask
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.io.StreamTcp
import akka.util.Timeout
/**
* INTERNAL API
*
* The gateway actor into the low-level HTTP layer.
*/
private[http] class HttpManager(httpSettings: HttpExt#Settings) extends Actor with ActorLogging {
import context.dispatcher
private[this] var clientPipelines = Map.empty[ClientConnectionSettings, HttpClientPipeline]
def receive = {
case connect @ Http.Connect(remoteAddress, localAddress, options, clientConnectionSettings, materializerSettings)
log.debug("Attempting connection to {}", remoteAddress)
val commander = sender()
val effectiveSettings = ClientConnectionSettings(clientConnectionSettings)
val tcpConnect = StreamTcp.Connect(remoteAddress, localAddress, materializerSettings, options,
effectiveSettings.connectingTimeout, effectiveSettings.idleTimeout)
val askTimeout = Timeout(effectiveSettings.connectingTimeout + 5.seconds) // FIXME: how can we improve this?
val tcpConnectionFuture = IO(StreamTcp)(context.system).ask(tcpConnect)(askTimeout)
tcpConnectionFuture onComplete {
case Success(tcpConn: StreamTcp.OutgoingTcpConnection)
val pipeline = clientPipelines.getOrElse(effectiveSettings, {
val pl = new HttpClientPipeline(effectiveSettings, log)(FlowMaterializer(materializerSettings))
clientPipelines = clientPipelines.updated(effectiveSettings, pl)
pl
})
commander ! pipeline(tcpConn)
case Failure(error)
log.debug("Could not connect to {} due to {}", remoteAddress, error)
commander ! Status.Failure(new Http.ConnectionAttemptFailedException(remoteAddress))
case x throw new IllegalStateException("Unexpected response to `Connect` from StreamTcp: " + x)
}
case Http.Bind(endpoint, backlog, options, serverSettings, materializerSettings)
log.debug("Binding to {}", endpoint)
val commander = sender()
val effectiveSettings = ServerSettings(serverSettings)
val tcpBind = StreamTcp.Bind(endpoint, materializerSettings, backlog, options)
val askTimeout = Timeout(effectiveSettings.bindTimeout + 5.seconds) // FIXME: how can we improve this?
val tcpServerBindingFuture = IO(StreamTcp)(context.system).ask(tcpBind)(askTimeout)
tcpServerBindingFuture onComplete {
case Success(tcpServerBinding @ StreamTcp.TcpServerBinding(localAddress, connectionStream))
log.info("Bound to {}", endpoint)
implicit val materializer = FlowMaterializer()
val httpServerPipeline = new HttpServerPipeline(effectiveSettings, log)
val httpConnectionStream = Source(connectionStream)
.map(httpServerPipeline)
.runWith(Sink.publisher)
commander ! Http.ServerBinding(localAddress, httpConnectionStream, tcpServerBinding)
case Failure(error)
log.warning("Bind to {} failed due to {}", endpoint, error)
commander ! Status.Failure(Http.BindFailedException)
case x throw new IllegalStateException("Unexpected response to `Bind` from StreamTcp: " + x)
}
}
}
private[http] object HttpManager {
def props(httpSettings: HttpExt#Settings) =
Props(classOf[HttpManager], httpSettings) withDispatcher httpSettings.ManagerDispatcher
}

View file

@ -8,7 +8,6 @@ import java.net.InetSocketAddress
import scala.collection.immutable.Queue
import akka.stream.scaladsl._
import akka.event.LoggingAdapter
import akka.stream.FlowMaterializer
import akka.stream.FlattenStrategy
import akka.stream.io.StreamTcp
import akka.util.ByteString
@ -19,12 +18,14 @@ import akka.http.engine.parsing.{ HttpRequestParser, HttpHeaderParser, HttpRespo
import akka.http.engine.parsing.ParserOutput._
import akka.http.util._
import scala.concurrent.{ ExecutionContext, Future }
/**
* INTERNAL API
*/
private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettings,
log: LoggingAdapter)(implicit fm: FlowMaterializer)
extends (StreamTcp.OutgoingTcpConnection Http.OutgoingConnection) {
log: LoggingAdapter)(implicit ec: ExecutionContext)
extends ((StreamTcp.OutgoingTcpFlow, InetSocketAddress) Http.OutgoingFlow) {
import effectiveSettings._
@ -38,23 +39,24 @@ private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettin
val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log)
def apply(tcpConn: StreamTcp.OutgoingTcpConnection): Http.OutgoingConnection = {
def apply(tcpFlow: StreamTcp.OutgoingTcpFlow, remoteAddress: InetSocketAddress): Http.OutgoingFlow = {
import FlowGraphImplicits._
val requestMethodByPass = new RequestMethodByPass(tcpConn.remoteAddress)
val httpKey = new HttpKey(tcpFlow.key)
val userIn = Source.subscriber[(HttpRequest, Any)]
val userOut = Sink.publisher[(HttpResponse, Any)]
val flowWithHttpKey = tcpFlow.flow.withKey(httpKey)
val netOut = Sink(tcpConn.outputStream)
val netIn = Source(tcpConn.inputStream)
val requestMethodByPass = new RequestMethodByPass(remoteAddress)
val pipeline = FlowGraph { implicit b
val bypassFanout = Broadcast[(HttpRequest, Any)]("bypassFanout")
val pipeline = Flow() { implicit b
val userIn = UndefinedSource[(HttpRequest, Any)]
val userOut = UndefinedSink[(HttpResponse, Any)]
val bypassFanout = Unzip[HttpRequest, Any]("bypassFanout")
val bypassFanin = Zip[HttpResponse, Any]("bypassFanin")
val requestPipeline =
Flow[(HttpRequest, Any)]
Flow[HttpRequest]
.map(requestMethodByPass)
.transform("renderer", () requestRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
@ -74,24 +76,21 @@ private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettin
}
//FIXME: the graph is unnecessary after fixing #15957
userIn ~> bypassFanout ~> requestPipeline ~> netOut
bypassFanout ~> Flow[(HttpRequest, Any)].map(_._2) ~> bypassFanin.right
netIn ~> responsePipeline ~> bypassFanin.left
userIn ~> bypassFanout.in
bypassFanout.left ~> requestPipeline ~> flowWithHttpKey ~> responsePipeline ~> bypassFanin.left
bypassFanout.right ~> bypassFanin.right
bypassFanin.out ~> userOut
}.run()
Http.OutgoingConnection(
tcpConn.remoteAddress,
tcpConn.localAddress,
pipeline.get(userOut),
pipeline.get(userIn))
userIn -> userOut
}
Http.OutgoingFlow(pipeline, httpKey)
}
class RequestMethodByPass(serverAddress: InetSocketAddress)
extends (((HttpRequest, Any)) RequestRenderingContext) with (() HttpMethod) {
extends ((HttpRequest) RequestRenderingContext) with (() HttpMethod) {
private[this] var requestMethods = Queue.empty[HttpMethod]
def apply(tuple: (HttpRequest, Any)) = {
val request = tuple._1
def apply(request: HttpRequest) = {
requestMethods = requestMethods.enqueue(request.method)
RequestRenderingContext(request, serverAddress)
}
@ -102,4 +101,11 @@ private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettin
method
} else HttpResponseParser.NoMethod
}
class HttpKey(tcpKey: Key { type MaterializedType = Future[StreamTcp.OutgoingTcpConnection] }) extends Key {
type MaterializedType = Future[Http.OutgoingConnection]
override def materialize(map: MaterializedMap): MaterializedType =
map.get(tcpKey).map(tcp Http.OutgoingConnection(tcp.remoteAddress, tcp.localAddress))
}
}

View file

@ -24,8 +24,7 @@ import scala.util.control.NonFatal
/**
* INTERNAL API
*/
private[http] class HttpServerPipeline(settings: ServerSettings,
log: LoggingAdapter)(implicit fm: FlowMaterializer)
private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAdapter)
extends (StreamTcp.IncomingTcpConnection Http.IncomingConnection) {
import settings.parserSettings
@ -40,55 +39,62 @@ private[http] class HttpServerPipeline(settings: ServerSettings,
val responseRendererFactory = new HttpResponseRendererFactory(settings.serverHeader, settings.responseHeaderSizeHint, log)
val bypassFanout = Broadcast[RequestOutput]("bypassFanout")
val bypassMerge = new BypassMerge
val requestPreparation =
Flow[RequestOutput]
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.headAndTail
.collect {
case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts)
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader)
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method
HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol)
}
val rendererPipeline =
Flow[ResponseRenderingContext]
.transform("recover", () new ErrorsTo500ResponseRecovery(log)) // FIXME: simplify after #16394 is closed
.transform("renderer", () responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing response stream error"))
def apply(tcpConn: StreamTcp.IncomingTcpConnection): Http.IncomingConnection = {
import FlowGraphImplicits._
val networkIn = Source(tcpConn.inputStream)
val networkOut = Sink(tcpConn.outputStream)
val userIn = UndefinedSink[HttpRequest]
val userOut = UndefinedSource[HttpResponse]
val userIn = Sink.publisher[HttpRequest]
val userOut = Source.subscriber[HttpResponse]
val oneHundredContinueSource = Source[OneHundredContinue.type](Props[OneHundredContinueSourceActor])
@volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168
val oneHundredContinueSource = Source[OneHundredContinue.type] {
Props {
val actor = new OneHundredContinueSourceActor
oneHundredContinueRef = Some(actor.context.self)
actor
}
}
val pipeline = FlowGraph { implicit b
val bypassFanout = Broadcast[RequestOutput]("bypassFanout")
val bypassMerge = new BypassMerge
// FIXME The whole pipeline can maybe be created up front when #16168 is fixed
val pipeline = Flow() { implicit b
val requestParsing = Flow[ByteString].transform("rootParser", ()
// each connection uses a single (private) request parser instance for all its requests
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(() oneHundredContinueRef))
val requestPreparation =
Flow[RequestOutput]
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.headAndTail
.collect {
case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts)
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader)
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method
HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol)
}
val rendererPipeline =
Flow[ResponseRenderingContext]
.transform("recover", () new ErrorsTo500ResponseRecovery(log)) // FIXME: simplify after #16394 is closed
.transform("renderer", () responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing response stream error"))
//FIXME: the graph is unnecessary after fixing #15957
networkIn ~> requestParsing ~> bypassFanout ~> requestPreparation ~> userIn
userOut ~> bypassMerge.applicationInput ~> rendererPipeline ~> tcpConn.stream ~> requestParsing ~> bypassFanout ~> requestPreparation ~> userIn
bypassFanout ~> bypassMerge.bypassInput
userOut ~> bypassMerge.applicationInput ~> rendererPipeline ~> networkOut
oneHundredContinueSource ~> bypassMerge.oneHundredContinueInput
}.run()
oneHundredContinueRef = Some(pipeline.get(oneHundredContinueSource))
b.allowCycles()
Http.IncomingConnection(tcpConn.remoteAddress, pipeline.get(userIn), pipeline.get(userOut))
userOut -> userIn
}
Http.IncomingConnection(tcpConn.remoteAddress, pipeline)
}
class BypassMerge extends FlexiMerge[ResponseRenderingContext]("BypassMerge") {

View file

@ -20,11 +20,4 @@ private[http] object Accessors {
/** INTERNAL API */
private[http] def Uri(uri: model.Uri): Uri = JavaUri(uri)
/** INTERNAL API */
private[http] def Bind(host: String, port: Int): AnyRef =
akka.http.Http.Bind(host, port, materializerSettings = None)
/** INTERNAL API */
private[http] def Bind(host: String, port: Int, materializerSettings: MaterializerSettings): AnyRef =
akka.http.Http.Bind(host, port, materializerSettings = Some(materializerSettings))
}

View file

@ -181,7 +181,8 @@ private[http] object StreamUtils {
def runStrict(sourceData: ByteString, transformer: Flow[ByteString, ByteString], maxByteSize: Long, maxElements: Int): Try[Option[ByteString]] =
Try {
transformer match {
case Pipe(ops)
// FIXME #16382 right now the flow can't use keys, should that be allowed?
case Pipe(ops, keys) if keys.isEmpty
if (ops.isEmpty)
Some(sourceData)
else {

View file

@ -19,39 +19,41 @@ import java.io.IOException;
import java.io.InputStreamReader;
public abstract class JavaTestServer {
public static void main(String[] args) throws IOException, InterruptedException {
ActorSystem system = ActorSystem.create();
final FlowMaterializer materializer = FlowMaterializer.create(system);
ActorRef httpManager = Http.get(system).manager();
Future<Object> binding = ask(httpManager, Http.bind("localhost", 8080), 1000);
binding.foreach(new Foreach<Object>() {
@Override
public void each(Object result) throws Throwable {
ServerBinding binding = (ServerBinding) result;
System.out.println("Bound to " + binding.localAddress());
Source.from(binding.getConnectionStream()).foreach(new akka.stream.javadsl.japi.Procedure<IncomingConnection>() {
@Override
public void apply(IncomingConnection conn) throws Exception {
System.out.println("New incoming connection from " + conn.remoteAddress());
Source.from(conn.getRequestPublisher()).map(new akka.stream.javadsl.japi.Function<HttpRequest, HttpResponse>() {
@Override
public HttpResponse apply(HttpRequest request) throws Exception {
System.out.println("Handling request to " + request.getUri());
return JavaApiTestCases.handleRequest(request);
}
}).runWith(Sink.create(conn.getResponseSubscriber()), materializer);
}
}, materializer);
}
}, system.dispatcher());
System.out.println("Press ENTER to stop.");
new BufferedReader(new InputStreamReader(System.in)).readLine();
system.shutdown();
}
// FIXME Java Http API
// public static void main(String[] args) throws IOException, InterruptedException {
// ActorSystem system = ActorSystem.create();
//
// final FlowMaterializer materializer = FlowMaterializer.create(system);
//
// ActorRef httpManager = Http.get(system).manager();
// Future<Object> binding = ask(httpManager, Http.bind("localhost", 8080), 1000);
// binding.foreach(new Foreach<Object>() {
// @Override
// public void each(Object result) throws Throwable {
// ServerBinding binding = (ServerBinding) result;
// System.out.println("Bound to " + binding.localAddress());
//
// Source.from(binding.getConnectionStream()).foreach(new akka.stream.javadsl.japi.Procedure<IncomingConnection>() {
// @Override
// public void apply(IncomingConnection conn) throws Exception {
// System.out.println("New incoming connection from " + conn.remoteAddress());
//
// Source.from(conn.getRequestPublisher()).map(new akka.stream.javadsl.japi.Function<HttpRequest, HttpResponse>() {
// @Override
// public HttpResponse apply(HttpRequest request) throws Exception {
// System.out.println("Handling request to " + request.getUri());
// return JavaApiTestCases.handleRequest(request);
// }
// }).runWith(Sink.create(conn.getResponseSubscriber()), materializer);
// }
// }, materializer);
// }
// }, system.dispatcher());
//
// System.out.println("Press ENTER to stop.");
// new BufferedReader(new InputStreamReader(System.in)).readLine();
//
// system.shutdown();
// }
}

View file

@ -7,16 +7,12 @@ package akka.http
import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter }
import java.net.Socket
import com.typesafe.config.{ Config, ConfigFactory }
import org.reactivestreams.Publisher
import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import akka.actor.{ Status, ActorSystem }
import akka.io.IO
import akka.testkit.TestProbe
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.impl.SynchronousIterablePublisher
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe }
import akka.stream.scaladsl._
@ -42,33 +38,34 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
"properly bind and unbind a server" in {
val (hostname, port) = temporaryServerHostnameAndPort()
val commander = TestProbe()
commander.send(IO(Http), Http.Bind(hostname, port))
val Http.ServerBinding(localAddress, connectionStream) = commander.expectMsgType[Http.ServerBinding]
val Http.ServerSource(source, key) = Http(system).bind(hostname, port)
val c = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm = source.to(Sink(c)).run()
val Http.ServerBinding(localAddress) = Await.result(mm.get(key), 3.seconds)
val sub = c.expectSubscription()
localAddress.getHostName shouldEqual hostname
localAddress.getPort shouldEqual port
val c = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
connectionStream.subscribe(c)
val sub = c.expectSubscription()
sub.cancel()
// TODO: verify unbinding effect
}
"report failure if bind fails" in {
val (hostname, port) = temporaryServerHostnameAndPort()
val commander = TestProbe()
commander.send(IO(Http), Http.Bind(hostname, port))
val binding = commander.expectMsgType[Http.ServerBinding]
commander.send(IO(Http), Http.Bind(hostname, port))
commander.expectMsgType[Status.Failure]
// Clean up
val c = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
binding.connectionStream.subscribe(c)
val sub = c.expectSubscription()
val Http.ServerSource(source, key) = Http(system).bind(hostname, port)
val c1 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm1 = source.to(Sink(c1)).run()
val sub = c1.expectSubscription()
val Http.ServerBinding(localAddress) = Await.result(mm1.get(key), 3.seconds)
localAddress.getHostName shouldEqual hostname
localAddress.getPort shouldEqual port
val c2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm2 = source.to(Sink(c2)).run()
val failure = intercept[akka.stream.io.StreamTcp.IncomingTcpException] {
val serverBinding = Await.result(mm2.get(key), 3.seconds)
}
failure.getMessage should be("Bind failed")
sub.cancel()
}
@ -126,40 +123,38 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
class TestSetup {
val (hostname, port) = temporaryServerHostnameAndPort()
val bindHandler = TestProbe()
def configOverrides = ""
// automatically bind a server
val connectionStream: SubscriberProbe[Http.IncomingConnection] = {
val commander = TestProbe()
val settings = configOverrides.toOption.map(ServerSettings.apply)
commander.send(IO(Http), Http.Bind(hostname, port, serverSettings = settings))
val Http.ServerSource(source, key) = Http(system).bind(hostname, port, serverSettings = settings)
val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection]
commander.expectMsgType[Http.ServerBinding].connectionStream.subscribe(probe)
source.to(Sink(probe)).run()
probe
}
val connectionStreamSub = connectionStream.expectSubscription()
def openNewClientConnection[T](settings: Option[ClientConnectionSettings] = None): (PublisherProbe[(HttpRequest, T)], SubscriberProbe[(HttpResponse, T)]) = {
val commander = TestProbe()
commander.send(IO(Http), Http.Connect(hostname, port, settings = settings))
val connection = commander.expectMsgType[Http.OutgoingConnection]
connection.remoteAddress.getPort shouldEqual port
connection.remoteAddress.getHostName shouldEqual hostname
val outgoingFlow = Http(system).connect(hostname, port, settings = settings)
val requestPublisherProbe = StreamTestKit.PublisherProbe[(HttpRequest, T)]()
val responseSubscriberProbe = StreamTestKit.SubscriberProbe[(HttpResponse, T)]()
requestPublisherProbe.subscribe(connection.requestSubscriber)
connection.responsePublisher.asInstanceOf[Publisher[(HttpResponse, T)]].subscribe(responseSubscriberProbe)
val tflow = outgoingFlow.flow.asInstanceOf[Flow[((HttpRequest, T)), ((HttpResponse, T))]]
val mm = Flow(Sink(responseSubscriberProbe), Source(requestPublisherProbe)).join(tflow).run()
val connection = Await.result(mm.get(outgoingFlow.key), 3.seconds)
connection.remoteAddress.getPort shouldEqual port
connection.remoteAddress.getHostName shouldEqual hostname
requestPublisherProbe -> responseSubscriberProbe
}
def acceptConnection(): (SubscriberProbe[HttpRequest], PublisherProbe[HttpResponse]) = {
connectionStreamSub.request(1)
val Http.IncomingConnection(_, requestPublisher, responseSubscriber) = connectionStream.expectNext()
val Http.IncomingConnection(remoteAddress, flow) = connectionStream.expectNext()
val requestSubscriberProbe = StreamTestKit.SubscriberProbe[HttpRequest]()
val responsePublisherProbe = StreamTestKit.PublisherProbe[HttpResponse]()
requestPublisher.subscribe(requestSubscriberProbe)
responsePublisherProbe.subscribe(responseSubscriber)
Flow(Sink(requestSubscriberProbe), Source(responsePublisherProbe)).join(flow).run()
requestSubscriberProbe -> responsePublisherProbe
}

View file

@ -5,17 +5,10 @@
package akka.http
import com.typesafe.config.{ Config, ConfigFactory }
import org.reactivestreams.Subscriber
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import akka.io.IO
import akka.http.model.HttpMethods._
import akka.http.model._
object TestClient extends App {
@ -27,26 +20,16 @@ object TestClient extends App {
import akka.http.TestClient.system.dispatcher
implicit val materializer = FlowMaterializer()
implicit val askTimeout: Timeout = 500.millis
val host = "spray.io"
println(s"Fetching HTTP server version of host `$host` ...")
val result = for {
connection IO(Http).ask(Http.Connect(host)).mapTo[Http.OutgoingConnection]
response sendRequest(HttpRequest(GET, uri = "/"), connection)
} yield response.header[headers.Server]
val outgoingFlow = Http(system).connect(host)
val result = Source.singleton(HttpRequest() -> 'NoContext).via(outgoingFlow.flow).map(_._1).runWith(Sink.head)
def sendRequest(request: HttpRequest, connection: Http.OutgoingConnection): Future[HttpResponse] = {
Source.singleton(HttpRequest() -> 'NoContext)
.to(Sink(connection.requestSubscriber))
.run()
Source(connection.responsePublisher).map(_._1).runWith(Sink.head)
}
result onComplete {
result.map(_.header[headers.Server]) onComplete {
case Success(res) println(s"$host is running ${res mkString ", "}")
case Failure(error) println(s"Error: $error")
}
result onComplete { _ system.shutdown() }
}
}

View file

@ -4,16 +4,14 @@
package akka.http
import com.typesafe.config.{ ConfigFactory, Config }
import scala.concurrent.duration._
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import akka.io.IO
import akka.util.Timeout
import akka.actor.ActorSystem
import akka.pattern.ask
import akka.http.model._
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import com.typesafe.config.{ ConfigFactory, Config }
import HttpMethods._
import scala.concurrent.Await
import scala.concurrent.duration._
object TestServer extends App {
val testConf: Config = ConfigFactory.parseString("""
@ -21,7 +19,6 @@ object TestServer extends App {
akka.log-dead-letters = off
""")
implicit val system = ActorSystem("ServerTest", testConf)
import system.dispatcher
val requestHandler: HttpRequest HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) index
@ -32,20 +29,20 @@ object TestServer extends App {
implicit val materializer = FlowMaterializer()
implicit val askTimeout: Timeout = 500.millis
val bindingFuture = IO(Http) ? Http.Bind(interface = "localhost", port = 8080)
bindingFuture foreach {
case Http.ServerBinding(localAddress, connectionStream)
Source(connectionStream).foreach {
case Http.IncomingConnection(remoteAddress, requestPublisher, responseSubscriber)
println("Accepted new connection from " + remoteAddress)
Source(requestPublisher).map(requestHandler).to(Sink(responseSubscriber)).run()
}
}
val Http.ServerSource(source, key) = Http(system).bind(interface = "localhost", port = 8080)
val materializedMap = source.to(Sink.foreach {
case Http.IncomingConnection(remoteAddress, flow)
println("Accepted new connection from " + remoteAddress)
flow.join(Flow[HttpRequest].map(requestHandler)).run()
}).run()
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
val serverBinding = Await.result(materializedMap.get(key), 3 seconds)
println(s"Server online at http://${serverBinding.localAddress.getHostName}:${serverBinding.localAddress.getPort}")
println("Press RETURN to stop...")
Console.readLine()
serverBinding.close()
system.shutdown()
////////////// helpers //////////////

View file

@ -614,12 +614,12 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
class TestSetup {
val netIn = StreamTestKit.PublisherProbe[ByteString]
val netOut = StreamTestKit.SubscriberProbe[ByteString]
val tcpConnection = StreamTcp.IncomingTcpConnection(null, netIn, netOut)
val tcpConnection = StreamTcp.IncomingTcpConnection(null, Flow(Sink(netOut), Source(netIn)))
def settings = ServerSettings(system).copy(serverHeader = Some(Server(List(ProductVersion("akka-http", "test")))))
val pipeline = new HttpServerPipeline(settings, NoLogging)
val Http.IncomingConnection(_, requestsIn, responsesOut) = pipeline(tcpConnection)
val Http.IncomingConnection(_, httpPipelineFlow) = pipeline(tcpConnection)
def wipeDate(string: String) =
string.fastSplit('\n').map {
@ -627,14 +627,13 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
case s s
}.mkString("\n")
val netInSub = netIn.expectSubscription()
val netOutSub = netOut.expectSubscription()
val requests = StreamTestKit.SubscriberProbe[HttpRequest]
val responses = StreamTestKit.PublisherProbe[HttpResponse]
requestsIn.subscribe(requests)
Flow(Sink(requests), Source(responses)).join(httpPipelineFlow).run()
val netInSub = netIn.expectSubscription()
val netOutSub = netOut.expectSubscription()
val requestsSub = requests.expectSubscription()
responses.subscribe(responsesOut)
val responsesSub = responses.expectSubscription()
def expectRequest: HttpRequest = {