Merge branch 'master' into wip-sync-2.4.8-artery-patriknw

This commit is contained in:
Patrik Nordwall 2016-07-08 15:38:33 +02:00
commit ccb5d1ba04
358 changed files with 9913 additions and 2030 deletions

View file

@ -87,7 +87,7 @@ public final class HttpEntities {
(akka.http.scaladsl.model.ContentType) contentType,
toScala(data));
}
private static akka.stream.scaladsl.Source<ByteString,Object> toScala(Source<ByteString, ?> javaSource) {
return (akka.stream.scaladsl.Source<ByteString,Object>)javaSource.asScala();
}

View file

@ -4,10 +4,16 @@
package akka.http.javadsl.model;
import akka.Done;
import akka.stream.Materializer;
import akka.http.javadsl.model.headers.HttpCredentials;
import akka.util.ByteString;
import scala.concurrent.Future;
import java.io.File;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
/**
* The base type for an Http message (request or response).
@ -55,7 +61,44 @@ public interface HttpMessage {
*/
ResponseEntity entity();
public static interface MessageTransformations<Self> {
/**
* Discards the entities data bytes by running the {@code dataBytes} Source contained by the {@code entity}
* of this HTTP message.
*
* Note: It is crucial that entities are either discarded, or consumed by running the underlying [[Source]]
* as otherwise the lack of consuming of the data will trigger back-pressure to the underlying TCP connection
* (as designed), however possibly leading to an idle-timeout that will close the connection, instead of
* just having ignored the data.
*
* Warning: It is not allowed to discard and/or consume the the {@code entity.dataBytes} more than once
* as the stream is directly attached to the "live" incoming data source from the underlying TCP connection.
* Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose
* of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an
* "stream can cannot be materialized more than once" exception.
*
* In future versions, more automatic ways to warn or resolve these situations may be introduced, see issue #18716.
*/
DiscardedEntity discardEntityBytes(Materializer materializer);
/**
* Represents the the currently being-drained HTTP Entity which triggers completion of the contained
* Future once the entity has been drained for the given HttpMessage completely.
*/
interface DiscardedEntity {
/**
* This future completes successfully once the underlying entity stream has been
* successfully drained (and fails otherwise).
*/
Future<Done> future();
/**
* This future completes successfully once the underlying entity stream has been
* successfully drained (and fails otherwise).
*/
CompletionStage<Done> completionStage();
}
interface MessageTransformations<Self> {
/**
* Returns a copy of this message with a new protocol.
*/
@ -71,6 +114,11 @@ public interface HttpMessage {
*/
Self addHeaders(Iterable<HttpHeader> headers);
/**
* Returns a copy of this message with the given http credential header added to the list of headers.
*/
Self addCredentials(HttpCredentials credentials);
/**
* Returns a copy of this message with all headers of the given name (case-insensitively) removed.
*/

View file

@ -4,7 +4,12 @@
package akka.http.javadsl.model;
import akka.Done;
import akka.http.impl.util.JavaAccessors;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import java.util.concurrent.CompletionStage;
/**
* Represents an Http request.

View file

@ -4,7 +4,12 @@
package akka.http.javadsl.model;
import akka.Done;
import akka.http.impl.util.JavaAccessors;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import java.util.concurrent.CompletionStage;
/**
* Represents an Http response.
@ -16,7 +21,7 @@ public abstract class HttpResponse implements HttpMessage, HttpMessage.MessageTr
public abstract StatusCode status();
/**
* Returns the entity of this request.
* Returns the entity of this response.
*/
public abstract ResponseEntity entity();

View file

@ -0,0 +1,140 @@
/*
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.model;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import static akka.http.impl.util.Util.convertArray;
import static akka.http.impl.util.Util.convertMapToScala;
import static akka.http.impl.util.Util.emptyMap;
/**
* Constructors for Multipart instances
*/
public final class Multiparts {
/**
* Constructor for `multipart/form-data` content as defined in http://tools.ietf.org/html/rfc2388.
* All parts must have distinct names. (This is not verified!)
*/
public static Multipart.FormData createFormDataFromParts(Multipart.FormData.BodyPart... parts) {
return akka.http.scaladsl.model.Multipart.FormData$.MODULE$.createNonStrict(convertArray(parts));
}
/**
* Constructor for `multipart/form-data` content as defined in http://tools.ietf.org/html/rfc2388.
* All parts must have distinct names. (This is not verified!)
*/
public static Multipart.FormData.Strict createStrictFormDataFromParts(Multipart.FormData.BodyPart.Strict... parts) {
return akka.http.scaladsl.model.Multipart.FormData$.MODULE$.createStrict(convertArray(parts));
}
/**
* Constructor for `multipart/form-data` content as defined in http://tools.ietf.org/html/rfc2388.
* All parts must have distinct names. (This is not verified!)
*/
public static Multipart.FormData.Strict createFormDataFromFields(Map<String, HttpEntity.Strict> fields) {
return akka.http.scaladsl.model.Multipart.FormData$.MODULE$.createStrict(toScalaMap(fields));
}
/**
* Creates a FormData instance that contains a single part backed by the given file.
*
* To create an instance with several parts or for multiple files, use
* `Multiparts.createFormDataFromParts(Multiparts.createFormDataPartFromPath("field1", ...), Multiparts.createFormDataPartFromPath("field2", ...)`
*/
public static Multipart.FormData createFormDataFromPath(String name, ContentType contentType, Path path, int chunkSize) {
return akka.http.scaladsl.model.Multipart.FormData$.MODULE$.fromPath(name, (akka.http.scaladsl.model.ContentType) contentType, path, chunkSize);
}
/**
* Creates a FormData instance that contains a single part backed by the given file.
*
* To create an instance with several parts or for multiple files, use
* `Multiparts.createFormDataFromParts(Multiparts.createFormDataPartFromPath("field1", ...), Multiparts.createFormDataPartFromPath("field2", ...)`
*/
public static Multipart.FormData createFormDataFromPath(String name, ContentType contentType, Path path) {
return akka.http.scaladsl.model.Multipart.FormData$.MODULE$.fromPath(name, (akka.http.scaladsl.model.ContentType) contentType, path, -1);
}
/**
* Creates a BodyPart backed by a file that will be streamed using a FileSource.
*/
public static Multipart.FormData.BodyPart createFormDataPartFromPath(String name, ContentType contentType, Path path, int chunkSize) {
return akka.http.scaladsl.model.Multipart$FormData$BodyPart$.MODULE$.fromPath(name, (akka.http.scaladsl.model.ContentType) contentType, path, chunkSize);
}
/**
* Creates a BodyPart backed by a file that will be streamed using a FileSource.
*/
public static Multipart.FormData.BodyPart createFormDataPartFromPath(String name, ContentType contentType, Path path) {
return akka.http.scaladsl.model.Multipart$FormData$BodyPart$.MODULE$.fromPath(name, (akka.http.scaladsl.model.ContentType) contentType, path, -1);
}
/**
* Creates a BodyPart.
*/
public static Multipart.FormData.BodyPart createFormDataBodyPart(String name, BodyPartEntity entity) {
List nil = Nil$.MODULE$;
Map<String, String> additionalDispositionParams = Collections.emptyMap();
return akka.http.scaladsl.model.Multipart$FormData$BodyPart$Builder$.MODULE$.create(name, (akka.http.scaladsl.model.BodyPartEntity) entity,
convertMapToScala(additionalDispositionParams), nil);
}
/**
* Creates a BodyPart.
*/
public static Multipart.FormData.BodyPart createFormDataBodyPart(String name, BodyPartEntity entity, Map<String, String> additionalDispositionParams) {
List nil = Nil$.MODULE$;
return akka.http.scaladsl.model.Multipart$FormData$BodyPart$Builder$.MODULE$.create(name, (akka.http.scaladsl.model.BodyPartEntity) entity,
convertMapToScala(additionalDispositionParams), nil);
}
/**
* Creates a BodyPart.
*/
public static Multipart.FormData.BodyPart createFormDataBodyPart(String name, BodyPartEntity entity, Map<String, String> additionalDispositionParams, java.util.List<HttpHeader> headers) {
return akka.http.scaladsl.model.Multipart$FormData$BodyPart$Builder$.MODULE$.create(name, (akka.http.scaladsl.model.BodyPartEntity) entity,
convertMapToScala(additionalDispositionParams), toScalaSeq(headers));
}
/**
* Creates a BodyPart.Strict.
*/
public static Multipart.FormData.BodyPart.Strict createFormDataBodyPartStrict(String name, HttpEntity.Strict entity) {
List nil = Nil$.MODULE$;
Map<String, String> additionalDispositionParams = Collections.emptyMap();
return akka.http.scaladsl.model.Multipart$FormData$BodyPart$StrictBuilder$.MODULE$.createStrict(name, (akka.http.scaladsl.model.HttpEntity.Strict) entity,
convertMapToScala(additionalDispositionParams), nil);
}
/**
* Creates a BodyPart.Strict.
*/
public static Multipart.FormData.BodyPart.Strict createFormDataBodyPartStrict(String name, HttpEntity.Strict entity, Map<String, String> additionalDispositionParams) {
List nil = Nil$.MODULE$;
return akka.http.scaladsl.model.Multipart$FormData$BodyPart$StrictBuilder$.MODULE$.createStrict(name, (akka.http.scaladsl.model.HttpEntity.Strict) entity,
convertMapToScala(additionalDispositionParams), nil);
}
/**
* Creates a BodyPart.Strict.
*/
public static Multipart.FormData.BodyPart.Strict createFormDataBodyPartStrict(String name, HttpEntity.Strict entity, Map<String, String> additionalDispositionParams, java.util.List<HttpHeader> headers) {
return akka.http.scaladsl.model.Multipart$FormData$BodyPart$StrictBuilder$.MODULE$.createStrict(name, (akka.http.scaladsl.model.HttpEntity.Strict) entity,
convertMapToScala(additionalDispositionParams), toScalaSeq(headers));
}
private static scala.collection.immutable.Map<String, HttpEntity.Strict> toScalaMap(Map<String, HttpEntity.Strict> map) {
return emptyMap.$plus$plus(scala.collection.JavaConverters.mapAsScalaMapConverter(map).asScala());
}
private static scala.collection.Iterable<HttpHeader> toScalaSeq(java.util.List<HttpHeader> _headers) {
return scala.collection.JavaConverters.collectionAsScalaIterableConverter(_headers).asScala();
}
}

View file

@ -0,0 +1,17 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.model.headers;
/**
* Model for the `Connection` header.
* Specification: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.10
*/
public abstract class Connection extends akka.http.scaladsl.model.HttpHeader {
public abstract Iterable<String> getTokens();
public static Connection create(String... directives) {
return new akka.http.scaladsl.model.headers.Connection(akka.http.impl.util.Util.convertArray(directives));
}
}

View file

@ -20,4 +20,12 @@ public abstract class HttpChallenge {
public static HttpChallenge create(String scheme, String realm, Map<String, String> params) {
return new akka.http.scaladsl.model.headers.HttpChallenge(scheme, realm, Util.convertMapToScala(params));
}
}
public static HttpChallenge createBasic(String realm) {
return create("Basic", realm);
}
public static HttpChallenge createOAuth2(String realm) {
return create("Bearer", realm);
}
}

View file

@ -177,6 +177,13 @@ akka.http {
# single host endpoint is allowed to establish. Must be greater than zero.
max-connections = 4
# The minimum number of parallel connections that a pool should keep alive ("hot").
# If the number of connections is falling below the given threshold, new ones are being spawned.
# You can use this setting to build a hot pool of "always on" connections.
# Default is 0, meaning there might be no active connection at given moment.
# Keep in mind that `min-connections` should be smaller than `max-connections` or equal
min-connections = 0
# The maximum number of times failed requests are attempted again,
# (if the request can be safely retried) before giving up and returning an error.
# Set to zero to completely disable request retries.

View file

@ -23,7 +23,7 @@ private object PoolConductor {
case class Ports(
requestIn: Inlet[RequestContext],
slotEventIn: Inlet[RawSlotEvent],
slotOuts: immutable.Seq[Outlet[RequestContext]]) extends Shape {
slotOuts: immutable.Seq[Outlet[SlotCommand]]) extends Shape {
override val inlets = requestIn :: slotEventIn :: Nil
override def outlets = slotOuts
@ -38,14 +38,18 @@ private object PoolConductor {
Ports(
inlets.head.asInstanceOf[Inlet[RequestContext]],
inlets.last.asInstanceOf[Inlet[RawSlotEvent]],
outlets.asInstanceOf[immutable.Seq[Outlet[RequestContext]]])
outlets.asInstanceOf[immutable.Seq[Outlet[SlotCommand]]])
}
final case class PoolSlotsSetting(minSlots: Int, maxSlots: Int) {
require(minSlots <= maxSlots, "min-connections must be <= max-connections")
}
/*
Stream Setup
============
Request-
Request- +-----------+ +-----------+ Switch- +-------------+ +-----------+ Context
Slot-
Request- +-----------+ +-----------+ Switch- +-------------+ +-----------+ Command
Context | retry | | slot- | Command | doubler | | route +-------------->
+--------->| Merge +---->| Selector +-------------->| (MapConcat) +---->| (Flexi +-------------->
| | | | | | | Route) +-------------->
@ -63,17 +67,18 @@ private object PoolConductor {
+---------+
*/
def apply(slotCount: Int, pipeliningLimit: Int, log: LoggingAdapter): Graph[Ports, Any] =
def apply(slotSettings: PoolSlotsSetting, pipeliningLimit: Int, log: LoggingAdapter): Graph[Ports, Any] =
GraphDSL.create() { implicit b
import GraphDSL.Implicits._
val retryMerge = b.add(MergePreferred[RequestContext](1, eagerComplete = true))
val slotSelector = b.add(new SlotSelector(slotCount, pipeliningLimit, log))
val route = b.add(new Route(slotCount))
val slotSelector = b.add(new SlotSelector(slotSettings, pipeliningLimit, log))
val route = b.add(new Route(slotSettings.maxSlots))
val retrySplit = b.add(Broadcast[RawSlotEvent](2))
val flatten = Flow[RawSlotEvent].mapAsyncUnordered(slotCount) {
val flatten = Flow[RawSlotEvent].mapAsyncUnordered(slotSettings.maxSlots) {
case x: SlotEvent.Disconnected FastFuture.successful(x)
case SlotEvent.RequestCompletedFuture(future) future
case x: SlotEvent.ConnectedEagerly FastFuture.successful(x)
case x throw new IllegalStateException("Unexpected " + x)
}
@ -85,7 +90,11 @@ private object PoolConductor {
Ports(retryMerge.in(0), retrySplit.in, route.outArray.toList)
}
private case class SwitchCommand(rc: RequestContext, slotIx: Int)
sealed trait SlotCommand
final case class DispatchCommand(rc: RequestContext) extends SlotCommand
final case object ConnectEagerlyCommand extends SlotCommand
final case class SwitchSlotCommand(cmd: SlotCommand, slotIx: Int)
// the SlotSelector keeps the state of all slots as instances of this ADT
private sealed trait SlotState
@ -105,19 +114,19 @@ private object PoolConductor {
private case class Busy(openRequests: Int) extends SlotState { require(openRequests > 0) }
private object Busy extends Busy(1)
private class SlotSelector(slotCount: Int, pipeliningLimit: Int, log: LoggingAdapter)
extends GraphStage[FanInShape2[RequestContext, SlotEvent, SwitchCommand]] {
private class SlotSelector(slotSettings: PoolSlotsSetting, pipeliningLimit: Int, log: LoggingAdapter)
extends GraphStage[FanInShape2[RequestContext, SlotEvent, SwitchSlotCommand]] {
private val ctxIn = Inlet[RequestContext]("requestContext")
private val slotIn = Inlet[SlotEvent]("slotEvents")
private val out = Outlet[SwitchCommand]("switchCommand")
private val out = Outlet[SwitchSlotCommand]("slotCommand")
override def initialAttributes = Attributes.name("SlotSelector")
override val shape = new FanInShape2(ctxIn, slotIn, out)
override def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) {
val slotStates = Array.fill[SlotState](slotCount)(Unconnected)
val slotStates = Array.fill[SlotState](slotSettings.maxSlots)(Unconnected)
var nextSlot = 0
setHandler(ctxIn, new InHandler {
@ -126,7 +135,7 @@ private object PoolConductor {
val slot = nextSlot
slotStates(slot) = slotStateAfterDispatch(slotStates(slot), ctx.request.method)
nextSlot = bestSlot()
emit(out, SwitchCommand(ctx, slot), tryPullCtx)
emit(out, SwitchSlotCommand(DispatchCommand(ctx), slot), tryPullCtx)
}
})
@ -137,6 +146,9 @@ private object PoolConductor {
slotStates(slotIx) = slotStateAfterRequestCompleted(slotStates(slotIx))
case SlotEvent.Disconnected(slotIx, failed)
slotStates(slotIx) = slotStateAfterDisconnect(slotStates(slotIx), failed)
reconnectIfNeeded()
case SlotEvent.ConnectedEagerly(slotIx)
// do nothing ...
}
pull(slotIn)
val wasBlocked = nextSlot == -1
@ -153,8 +165,21 @@ private object PoolConductor {
override def preStart(): Unit = {
pull(ctxIn)
pull(slotIn)
// eagerly start at least slotSettings.minSlots connections
(0 until slotSettings.minSlots).foreach { connect }
}
def connect(slotIx: Int): Unit = {
emit(out, SwitchSlotCommand(ConnectEagerlyCommand, slotIx))
slotStates(slotIx) = Idle
}
private def reconnectIfNeeded(): Unit =
if (slotStates.count(_ != Unconnected) < slotSettings.minSlots) {
connect(slotStates.indexWhere(_ == Unconnected))
}
def slotStateAfterDispatch(slotState: SlotState, method: HttpMethod): SlotState =
slotState match {
case Unconnected | Idle if (method.isIdempotent) Loaded(1) else Busy(1)
@ -205,11 +230,11 @@ private object PoolConductor {
}
}
private class Route(slotCount: Int) extends GraphStage[UniformFanOutShape[SwitchCommand, RequestContext]] {
private class Route(slotCount: Int) extends GraphStage[UniformFanOutShape[SwitchSlotCommand, SlotCommand]] {
override def initialAttributes = Attributes.name("PoolConductor.Route")
override val shape = new UniformFanOutShape[SwitchCommand, RequestContext](slotCount)
override val shape = new UniformFanOutShape[SwitchSlotCommand, SlotCommand](slotCount)
override def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) {
shape.outArray foreach { setHandler(_, ignoreTerminateOutput) }
@ -217,8 +242,8 @@ private object PoolConductor {
val in = shape.in
setHandler(in, new InHandler {
override def onPush(): Unit = {
val cmd = grab(in)
emit(shape.outArray(cmd.slotIx), cmd.rc, pullIn)
val switchCommand = grab(in)
emit(shape.outArray(switchCommand.slotIx), switchCommand.cmd, pullIn)
}
})
val pullIn = () pull(in)

View file

@ -5,6 +5,7 @@
package akka.http.impl.engine.client
import akka.NotUsed
import akka.http.impl.engine.client.PoolConductor.PoolSlotsSetting
import akka.http.scaladsl.settings.ConnectionPoolSettings
import scala.concurrent.{ Promise, Future }
@ -76,10 +77,14 @@ private object PoolFlow {
import settings._
import GraphDSL.Implicits._
val conductor = b.add(PoolConductor(maxConnections, pipeliningLimit, log))
val conductor = b.add(
PoolConductor(PoolSlotsSetting(maxSlots = maxConnections, minSlots = minConnections), pipeliningLimit, log)
)
val slots = Vector
.tabulate(maxConnections)(PoolSlot(_, connectionFlow, settings))
.tabulate(maxConnections)(PoolSlot(_, connectionFlow))
.map(b.add)
val responseMerge = b.add(Merge[ResponseContext](maxConnections))
val slotEventMerge = b.add(Merge[PoolSlot.RawSlotEvent](maxConnections))

View file

@ -25,6 +25,8 @@ private object PoolInterfaceActor {
case object Shutdown extends DeadLetterSuppression
val name = SeqActorName("PoolInterfaceActor")
def props(gateway: PoolGateway)(implicit fm: Materializer) = Props(new PoolInterfaceActor(gateway)).withDeploy(Deploy.local)
}
/**
@ -122,7 +124,7 @@ private class PoolInterfaceActor(gateway: PoolGateway)(implicit fm: Materializer
case Shutdown // signal coming in from gateway
log.debug("Shutting down host connection pool to {}:{}", hcps.host, hcps.port)
onComplete()
onCompleteThenStop()
while (!inputBuffer.isEmpty) {
val PoolRequest(request, responsePromise) = inputBuffer.dequeue()
responsePromise.completeWith(gateway(request))
@ -147,9 +149,12 @@ private class PoolInterfaceActor(gateway: PoolGateway)(implicit fm: Materializer
}
def activateIdleTimeoutIfNecessary(): Unit =
if (remainingRequested == 0 && hcps.setup.settings.idleTimeout.isFinite) {
if (shouldStopOnIdle()) {
import context.dispatcher
val timeout = hcps.setup.settings.idleTimeout.asInstanceOf[FiniteDuration]
activeIdleTimeout = Some(context.system.scheduler.scheduleOnce(timeout)(gateway.shutdown()))
}
private def shouldStopOnIdle(): Boolean =
remainingRequested == 0 && hcps.setup.settings.idleTimeout.isFinite && hcps.setup.settings.minConnections == 0
}

View file

@ -51,8 +51,7 @@ private[http] final class PoolMasterActor extends Actor with ActorLogging {
if (poolStatus.contains(gateway)) {
throw new IllegalStateException(s"pool interface actor for $gateway already exists")
}
val props = Props(new PoolInterfaceActor(gateway)).withDeploy(Deploy.local)
val ref = context.actorOf(props, PoolInterfaceActor.name.next())
val ref = context.actorOf(PoolInterfaceActor.props(gateway), PoolInterfaceActor.name.next())
poolStatus += gateway PoolInterfaceRunning(ref)
poolInterfaces += ref gateway
context.watch(ref)
@ -133,7 +132,6 @@ private[http] final class PoolMasterActor extends Actor with ActorLogging {
// Testing only.
case PoolSize(sizePromise)
sizePromise.success(poolStatus.size)
}
}

View file

@ -5,7 +5,7 @@
package akka.http.impl.engine.client
import akka.actor._
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.http.impl.engine.client.PoolConductor.{ ConnectEagerlyCommand, DispatchCommand, SlotCommand }
import akka.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse }
import akka.stream._
import akka.stream.actor._
@ -29,6 +29,11 @@ private object PoolSlot {
final case class RetryRequest(rc: RequestContext) extends RawSlotEvent
final case class RequestCompleted(slotIx: Int) extends SlotEvent
final case class Disconnected(slotIx: Int, failedRequests: Int) extends SlotEvent
/**
* Slot with id "slotIx" has responded to request from PoolConductor and connected immediately
* Ordinary connections from slots don't produce this event
*/
final case class ConnectedEagerly(slotIx: Int) extends SlotEvent
}
private val slotProcessorActorName = SeqActorName("SlotProcessor")
@ -47,21 +52,19 @@ private object PoolSlot {
| via slotEventMerge)
v
*/
def apply(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any],
settings: ConnectionPoolSettings)(implicit
system: ActorSystem,
fm: Materializer): Graph[FanOutShape2[RequestContext, ResponseContext, RawSlotEvent], Any] =
def apply(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any])(implicit system: ActorSystem, fm: Materializer): Graph[FanOutShape2[SlotCommand, ResponseContext, RawSlotEvent], Any] =
GraphDSL.create() { implicit b
import GraphDSL.Implicits._
// TODO wouldn't be better to have them under a known parent? /user/SlotProcessor-0 seems weird
val name = slotProcessorActorName.next()
val slotProcessor = b.add {
Flow.fromProcessor { ()
val actor = system.actorOf(
Props(new SlotProcessor(slotIx, connectionFlow, settings)).withDeploy(Deploy.local),
Props(new SlotProcessor(slotIx, connectionFlow)).withDeploy(Deploy.local),
name)
ActorProcessor[RequestContext, List[ProcessorOut]](actor)
ActorProcessor[SlotCommand, List[ProcessorOut]](actor)
}.mapConcat(ConstantFun.scalaIdentityFunction)
}
val split = b.add(Broadcast[ProcessorOut](2))
@ -78,25 +81,36 @@ private object PoolSlot {
import ActorSubscriberMessage._
/**
* An actor mananging a series of materializations of the given `connectionFlow`.
* To the outside it provides a stable flow stage, consuming `RequestContext` instances on its
* An actor managing a series of materializations of the given `connectionFlow`.
* To the outside it provides a stable flow stage, consuming `SlotCommand` instances on its
* input (ActorSubscriber) side and producing `List[ProcessorOut]` instances on its output
* (ActorPublisher) side.
* The given `connectionFlow` is materialized into a running flow whenever required.
* Completion and errors from the connection are not surfaced to the outside (unless we are
* shutting down completely).
*/
private class SlotProcessor(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any],
settings: ConnectionPoolSettings)(implicit fm: Materializer)
private class SlotProcessor(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any])(implicit fm: Materializer)
extends ActorSubscriber with ActorPublisher[List[ProcessorOut]] with ActorLogging {
var exposedPublisher: akka.stream.impl.ActorPublisher[Any] = _
var inflightRequests = immutable.Queue.empty[RequestContext]
val runnableGraph = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self)).withDeploy(Deploy.local))
val runnableGraph = Source.actorPublisher[HttpRequest](flowInportProps(self))
.via(connectionFlow)
.toMat(Sink.actorSubscriber[HttpResponse](Props(new FlowOutportActor(self)).withDeploy(Deploy.local)))(Keep.both)
.toMat(Sink.actorSubscriber[HttpResponse](flowOutportProps(self)))(Keep.both)
.named("SlotProcessorInternalConnectionFlow")
override def requestStrategy = ZeroRequestStrategy
/**
* How PoolProcessor changes its `receive`:
* waitingExposedPublisher -> waitingForSubscribePending -> unconnected ->
* waitingForDemandFromConnection OR waitingEagerlyConnected -> running
* Given slot can become get to 'running' state via 'waitingForDemandFromConnection' or 'waitingEagerlyConnected'.
* The difference between those two paths is that the first one is lazy - reacts to DispatchCommand and then uses
* inport and outport actors to obtain more items.
* Where the second one is eager - reacts to SlotShouldConnectCommand from PoolConductor, sends SlotEvent.ConnectedEagerly
* back to conductor and then waits for the first DispatchCommand
*/
override def receive = waitingExposedPublisher
def waitingExposedPublisher: Receive = {
@ -114,10 +128,16 @@ private object PoolSlot {
}
val unconnected: Receive = {
case OnNext(rc: RequestContext)
case OnNext(DispatchCommand(rc: RequestContext))
val (connInport, connOutport) = runnableGraph.run()
connOutport ! Request(totalDemand)
context.become(waitingForDemandFromConnection(connInport, connOutport, rc))
context.become(waitingForDemandFromConnection(connInport = connInport, connOutport = connOutport, rc))
case OnNext(ConnectEagerlyCommand)
val (in, out) = runnableGraph.run()
onNext(SlotEvent.ConnectedEagerly(slotIx) :: Nil)
out ! Request(totalDemand)
context.become(waitingEagerlyConnected(connInport = in, connOutport = out))
case Request(_) if (remainingRequested == 0) request(1) // ask for first request if necessary
@ -130,6 +150,17 @@ private object PoolSlot {
case c @ FromConnection(msg) // ignore ...
}
def waitingEagerlyConnected(connInport: ActorRef, connOutport: ActorRef): Receive = {
case FromConnection(Request(n))
request(n)
case OnNext(DispatchCommand(rc: RequestContext))
inflightRequests = inflightRequests.enqueue(rc)
request(1)
connInport ! OnNext(rc.request)
context.become(running(connInport, connOutport))
}
def waitingForDemandFromConnection(connInport: ActorRef, connOutport: ActorRef,
firstRequest: RequestContext): Receive = {
case ev @ (Request(_) | Cancel) connOutport ! ev
@ -151,7 +182,7 @@ private object PoolSlot {
def running(connInport: ActorRef, connOutport: ActorRef): Receive = {
case ev @ (Request(_) | Cancel) connOutport ! ev
case ev @ (OnComplete | OnError(_)) connInport ! ev
case OnNext(rc: RequestContext)
case OnNext(DispatchCommand(rc: RequestContext))
inflightRequests = inflightRequests.enqueue(rc)
connInport ! OnNext(rc.request)
@ -225,6 +256,7 @@ private object PoolSlot {
context.stop(self)
}
}
def flowInportProps(s: ActorRef) = Props(new FlowInportActor(s)).withDeploy(Deploy.local)
private class FlowOutportActor(slotProcessor: ActorRef) extends ActorSubscriber with ActorLogging {
def requestStrategy = ZeroRequestStrategy
@ -237,6 +269,7 @@ private object PoolSlot {
context.stop(self)
}
}
def flowOutportProps(s: ActorRef) = Props(new FlowOutportActor(s)).withDeploy(Deploy.local)
final class UnexpectedDisconnectException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) {
def this(msg: String) = this(msg, null)

View file

@ -84,6 +84,8 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser
case NotEnoughDataException
// we are missing a try/catch{continue} wrapper somewhere
throw new IllegalStateException("unexpected NotEnoughDataException", NotEnoughDataException)
case IllegalHeaderException(error)
failMessageStart(StatusCodes.BadRequest, error)
}) match {
case Trampoline(x) run(x)
case x x

View file

@ -17,8 +17,9 @@ import akka.stream.scaladsl.Source
import akka.stream.stage._
import akka.util.ByteString
import HttpEntity._
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import scala.concurrent.forkjoin.ThreadLocalRandom
import java.util.concurrent.ThreadLocalRandom
/**
* INTERNAL API
@ -29,46 +30,60 @@ private[http] object BodyPartRenderer {
boundary: String,
nioCharset: Charset,
partHeadersSizeHint: Int,
log: LoggingAdapter): PushPullStage[Multipart.BodyPart, Source[ChunkStreamPart, Any]] =
new PushPullStage[Multipart.BodyPart, Source[ChunkStreamPart, Any]] {
log: LoggingAdapter): GraphStage[FlowShape[Multipart.BodyPart, Source[ChunkStreamPart, Any]]] =
new GraphStage[FlowShape[Multipart.BodyPart, Source[ChunkStreamPart, Any]]] {
var firstBoundaryRendered = false
override def onPush(bodyPart: Multipart.BodyPart, ctx: Context[Source[ChunkStreamPart, Any]]): SyncDirective = {
val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint)
val in: Inlet[Multipart.BodyPart] = Inlet("BodyPartRenderer.in")
val out: Outlet[Source[ChunkStreamPart, Any]] = Outlet("BodyPartRenderer.out")
override val shape: FlowShape[Multipart.BodyPart, Source[ChunkStreamPart, Any]] = FlowShape(in, out)
def bodyPartChunks(data: Source[ByteString, Any]): Source[ChunkStreamPart, Any] = {
val entityChunks = data.map[ChunkStreamPart](Chunk(_))
(chunkStream(r.get) ++ entityChunks).mapMaterializedValue((_) ())
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = {
val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint)
def completePartRendering(): Source[ChunkStreamPart, Any] =
bodyPart.entity match {
case x if x.isKnownEmpty chunkStream(r.get)
case Strict(_, data) chunkStream((r ~~ data).get)
case Default(_, _, data) bodyPartChunks(data)
case IndefiniteLength(_, data) bodyPartChunks(data)
def bodyPartChunks(data: Source[ByteString, Any]): Source[ChunkStreamPart, Any] = {
val entityChunks = data.map[ChunkStreamPart](Chunk(_))
(chunkStream(r.get) ++ entityChunks).mapMaterializedValue((_) ())
}
def completePartRendering(entity: HttpEntity): Source[ChunkStreamPart, Any] =
entity match {
case x if x.isKnownEmpty chunkStream(r.get)
case Strict(_, data) chunkStream((r ~~ data).get)
case Default(_, _, data) bodyPartChunks(data)
case IndefiniteLength(_, data) bodyPartChunks(data)
}
renderBoundary(r, boundary, suppressInitialCrLf = !firstBoundaryRendered)
firstBoundaryRendered = true
val bodyPart = grab(in)
renderEntityContentType(r, bodyPart.entity)
renderHeaders(r, bodyPart.headers, log)
push(out, completePartRendering(bodyPart.entity))
}
renderBoundary(r, boundary, suppressInitialCrLf = !firstBoundaryRendered)
firstBoundaryRendered = true
renderEntityContentType(r, bodyPart.entity)
renderHeaders(r, bodyPart.headers, log)
ctx.push(completePartRendering())
}
override def onPull(): Unit =
if (isClosed(in) && firstBoundaryRendered)
completeRendering()
else if (isClosed(in)) completeStage()
else pull(in)
override def onPull(ctx: Context[Source[ChunkStreamPart, Any]]): SyncDirective = {
val finishing = ctx.isFinishing
if (finishing && firstBoundaryRendered) {
val r = new ByteStringRendering(boundary.length + 4)
renderFinalBoundary(r, boundary)
ctx.pushAndFinish(chunkStream(r.get))
} else if (finishing)
ctx.finish()
else
ctx.pull()
}
override def onUpstreamFinish(): Unit =
if (isAvailable(out) && firstBoundaryRendered) completeRendering()
override def onUpstreamFinish(ctx: Context[Source[ChunkStreamPart, Any]]): TerminationDirective = ctx.absorbTermination()
private def completeRendering(): Unit = {
val r = new ByteStringRendering(boundary.length + 4)
renderFinalBoundary(r, boundary)
push(out, chunkStream(r.get))
completeStage()
}
setHandlers(in, out, this)
}
private def chunkStream(byteString: ByteString): Source[ChunkStreamPart, Any] =
Source.single(Chunk(byteString))
@ -124,4 +139,14 @@ private[http] object BodyPartRenderer {
random.nextBytes(array)
Base64.custom.encodeToString(array, false)
}
/**
* Creates a new random number of default length and base64 encodes it (using a custom "safe" alphabet).
*/
def randomBoundaryWithDefaults(): String = randomBoundary()
/**
* Creates a new random number of the given length and base64 encodes it (using a custom "safe" alphabet).
*/
def randomBoundaryWithDefaultRandom(length: Int): String = randomBoundary(length)
}

View file

@ -15,6 +15,9 @@ import akka.http.scaladsl.model._
import akka.http.impl.util._
import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart
import akka.stream.stage.{ Context, GraphStage, SyncDirective, TerminationDirective }
import akka.stream._
import akka.stream.scaladsl.{ Sink, Source, Flow, Keep }
/**
* INTERNAL API
*/
@ -53,19 +56,31 @@ private object RenderSupport {
}
object ChunkTransformer {
val flow = Flow[ChunkStreamPart].transform(() new ChunkTransformer).named("renderChunks")
val flow = Flow.fromGraph(new ChunkTransformer).named("renderChunks")
}
class ChunkTransformer extends StatefulStage[HttpEntity.ChunkStreamPart, ByteString] {
override def initial = new State {
override def onPush(chunk: HttpEntity.ChunkStreamPart, ctx: Context[ByteString]): SyncDirective = {
val bytes = renderChunk(chunk)
if (chunk.isLastChunk) ctx.pushAndFinish(bytes)
else ctx.push(bytes)
class ChunkTransformer extends GraphStage[FlowShape[HttpEntity.ChunkStreamPart, ByteString]] {
val out: Outlet[ByteString] = Outlet("ChunkTransformer.out")
val in: Inlet[HttpEntity.ChunkStreamPart] = Inlet("ChunkTransformer.in")
val shape: FlowShape[HttpEntity.ChunkStreamPart, ByteString] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = {
val chunk = grab(in)
val bytes = renderChunk(chunk)
push(out, bytes)
if (chunk.isLastChunk) completeStage()
}
override def onPull(): Unit = pull(in)
override def onUpstreamFinish(): Unit = {
emit(out, defaultLastChunkBytes)
completeStage()
}
setHandlers(in, out, this)
}
}
override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective =
terminationEmit(Iterator.single(defaultLastChunkBytes), ctx)
}
object CheckContentLengthTransformer {

View file

@ -392,7 +392,8 @@ private[http] object HttpServerBluePrint {
case x: EntityStreamError if messageEndPending && openRequests.isEmpty
// client terminated the connection after receiving an early response to 100-continue
completeStage()
case x push(requestPrepOut, x)
case x
push(requestPrepOut, x)
}
override def onUpstreamFinish() =
if (openRequests.isEmpty) completeStage()
@ -414,19 +415,17 @@ private[http] object HttpServerBluePrint {
val isEarlyResponse = messageEndPending && openRequests.isEmpty
if (isEarlyResponse && response.status.isSuccess)
log.warning(
"""Sending 2xx response before end of request was received...
|Note that the connection will be closed after this response. Also, many clients will not read early responses!
|Consider waiting for the request end before dispatching this response!""".stripMargin)
"Sending an 2xx 'early' response before end of request was received... " +
"Note that the connection will be closed after this response. Also, many clients will not read early responses! " +
"Consider only issuing this response after the request data has been completely read!")
val close = requestStart.closeRequested ||
requestStart.expect100Continue && oneHundredContinueResponsePending ||
isClosed(requestParsingIn) && openRequests.isEmpty ||
(requestStart.expect100Continue && oneHundredContinueResponsePending) ||
(isClosed(requestParsingIn) && openRequests.isEmpty) ||
isEarlyResponse
emit(responseCtxOut, ResponseRenderingContext(response, requestStart.method, requestStart.protocol, close),
pullHttpResponseIn)
if (close) complete(responseCtxOut)
// when the client closes the connection, we need to pull onc more time to get the
// request parser to complete
if (close && isEarlyResponse) pull(requestParsingIn)
if (close && requestStart.expect100Continue) pull(requestParsingIn)
}
override def onUpstreamFinish() =
if (openRequests.isEmpty && isClosed(requestParsingIn)) completeStage()
@ -609,7 +608,7 @@ private[http] object HttpServerBluePrint {
})
private var activeTimers = 0
private def timeout = ActorMaterializer.downcast(materializer).settings.subscriptionTimeoutSettings.timeout
private def timeout = ActorMaterializerHelper.downcast(materializer).settings.subscriptionTimeoutSettings.timeout
private def addTimeout(s: SubscriptionTimeout): Unit = {
if (activeTimers == 0) setKeepGoing(true)
activeTimers += 1

View file

@ -6,9 +6,11 @@ package akka.http.impl.engine.ws
import akka.NotUsed
import akka.stream.scaladsl.Flow
import akka.stream.stage.{ SyncDirective, Context, StatefulStage }
import akka.util.ByteString
import Protocol.Opcode
import akka.event.Logging
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import scala.util.control.NonFatal
@ -21,158 +23,163 @@ import scala.util.control.NonFatal
private[http] object FrameHandler {
def create(server: Boolean): Flow[FrameEventOrError, Output, NotUsed] =
Flow[FrameEventOrError].transform(() new HandlerStage(server))
Flow[FrameEventOrError].via(new HandlerStage(server))
private class HandlerStage(server: Boolean) extends StatefulStage[FrameEventOrError, Output] {
type Ctx = Context[Output]
def initial: State = Idle
private class HandlerStage(server: Boolean) extends GraphStage[FlowShape[FrameEventOrError, Output]] {
val in = Inlet[FrameEventOrError](Logging.simpleName(this) + ".in")
val out = Outlet[Output](Logging.simpleName(this) + ".out")
override val shape = FlowShape(in, out)
override def toString: String = s"HandlerStage(server=$server)"
private object Idle extends StateWithControlFrameHandling {
def handleRegularFrameStart(start: FrameStart)(implicit ctx: Ctx): SyncDirective =
(start.header.opcode, start.isFullMessage) match {
case (Opcode.Binary, true) publishMessagePart(BinaryMessagePart(start.data, last = true))
case (Opcode.Binary, false) becomeAndHandleWith(new CollectingBinaryMessage, start)
case (Opcode.Text, _) becomeAndHandleWith(new CollectingTextMessage, start)
case x protocolError()
override def createLogic(attributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
setHandler(out, this)
setHandler(in, IdleHandler)
override def onPull(): Unit = pull(in)
private object IdleHandler extends ControlFrameStartHandler {
def setAndHandleFrameStartWith(newHandler: ControlFrameStartHandler, start: FrameStart): Unit = {
setHandler(in, newHandler)
newHandler.handleFrameStart(start)
}
override def handleRegularFrameStart(start: FrameStart): Unit =
(start.header.opcode, start.isFullMessage) match {
case (Opcode.Binary, true) publishMessagePart(BinaryMessagePart(start.data, last = true))
case (Opcode.Binary, false) setAndHandleFrameStartWith(new BinaryMessagehandler, start)
case (Opcode.Text, _) setAndHandleFrameStartWith(new TextMessageHandler, start)
case x pushProtocolError()
}
}
}
private class CollectingBinaryMessage extends CollectingMessageFrame(Opcode.Binary) {
def createMessagePart(data: ByteString, last: Boolean): MessageDataPart = BinaryMessagePart(data, last)
}
private class CollectingTextMessage extends CollectingMessageFrame(Opcode.Text) {
val decoder = Utf8Decoder.create()
private class BinaryMessagehandler extends MessageHandler(Opcode.Binary) {
override def createMessagePart(data: ByteString, last: Boolean): MessageDataPart =
BinaryMessagePart(data, last)
}
def createMessagePart(data: ByteString, last: Boolean): MessageDataPart =
TextMessagePart(decoder.decode(data, endOfInput = last).get, last)
}
private class TextMessageHandler extends MessageHandler(Opcode.Text) {
val decoder = Utf8Decoder.create()
private abstract class CollectingMessageFrame(expectedOpcode: Opcode) extends StateWithControlFrameHandling {
var expectFirstHeader = true
var finSeen = false
def createMessagePart(data: ByteString, last: Boolean): MessageDataPart
override def createMessagePart(data: ByteString, last: Boolean): MessageDataPart =
TextMessagePart(decoder.decode(data, endOfInput = last).get, last)
}
def handleRegularFrameStart(start: FrameStart)(implicit ctx: Ctx): SyncDirective = {
if ((expectFirstHeader && start.header.opcode == expectedOpcode) // first opcode must be the expected
|| start.header.opcode == Opcode.Continuation) { // further ones continuations
expectFirstHeader = false
private abstract class MessageHandler(expectedOpcode: Opcode) extends ControlFrameStartHandler {
var expectFirstHeader = true
var finSeen = false
def createMessagePart(data: ByteString, last: Boolean): MessageDataPart
if (start.header.fin) finSeen = true
publish(start)
} else protocolError()
override def handleRegularFrameStart(start: FrameStart): Unit = {
if ((expectFirstHeader && start.header.opcode == expectedOpcode) // first opcode must be the expected
|| start.header.opcode == Opcode.Continuation) { // further ones continuations
expectFirstHeader = false
if (start.header.fin) finSeen = true
publish(start)
} else pushProtocolError()
}
override def handleFrameData(data: FrameData): Unit = publish(data)
def publish(part: FrameEvent): Unit = try {
publishMessagePart(createMessagePart(part.data, last = finSeen && part.lastPart))
} catch {
case NonFatal(e) closeWithCode(Protocol.CloseCodes.InconsistentData)
}
}
private trait ControlFrameStartHandler extends FrameHandler {
def handleRegularFrameStart(start: FrameStart): Unit
override def handleFrameStart(start: FrameStart): Unit = start.header match {
case h: FrameHeader if h.mask.isDefined && !server pushProtocolError()
case h: FrameHeader if h.rsv1 || h.rsv2 || h.rsv3 pushProtocolError()
case FrameHeader(op, _, length, fin, _, _, _) if op.isControl && (length > 125 || !fin) pushProtocolError()
case h: FrameHeader if h.opcode.isControl
if (start.isFullMessage) handleControlFrame(h.opcode, start.data, this)
else collectControlFrame(start, this)
case _ handleRegularFrameStart(start)
}
override def handleFrameData(data: FrameData): Unit =
throw new IllegalStateException("Expected FrameStart")
}
private class ControlFrameDataHandler(opcode: Opcode, _data: ByteString, nextHandler: InHandler) extends FrameHandler {
var data = _data
override def handleFrameData(data: FrameData): Unit = {
this.data ++= data.data
if (data.lastPart) handleControlFrame(opcode, this.data, nextHandler)
else pull(in)
}
override def handleFrameStart(start: FrameStart): Unit =
throw new IllegalStateException("Expected FrameData")
}
private trait FrameHandler extends InHandler {
def handleFrameData(data: FrameData): Unit
def handleFrameStart(start: FrameStart): Unit
def handleControlFrame(opcode: Opcode, data: ByteString, nextHandler: InHandler): Unit = {
setHandler(in, nextHandler)
opcode match {
case Opcode.Ping publishDirectResponse(FrameEvent.fullFrame(Opcode.Pong, None, data, fin = true))
case Opcode.Pong
// ignore unsolicited Pong frame
pull(in)
case Opcode.Close
setHandler(in, WaitForPeerTcpClose)
push(out, PeerClosed.parse(data))
case Opcode.Other(o) closeWithCode(Protocol.CloseCodes.ProtocolError, "Unsupported opcode")
case other failStage(
new IllegalStateException(s"unexpected message of type [${other.getClass.getName}] when expecting ControlFrame")
)
}
}
def pushProtocolError(): Unit = closeWithCode(Protocol.CloseCodes.ProtocolError)
def closeWithCode(closeCode: Int, reason: String = ""): Unit = {
setHandler(in, CloseAfterPeerClosed)
push(out, ActivelyCloseWithCode(Some(closeCode), reason))
}
def collectControlFrame(start: FrameStart, nextHandler: InHandler): Unit = {
require(!start.isFullMessage)
setHandler(in, new ControlFrameDataHandler(start.header.opcode, start.data, nextHandler))
pull(in)
}
def publishMessagePart(part: MessageDataPart): Unit =
if (part.last) emitMultiple(out, Iterator(part, MessageEnd), () setHandler(in, IdleHandler))
else push(out, part)
def publishDirectResponse(frame: FrameStart): Unit = push(out, DirectAnswer(frame))
override def onPush(): Unit = grab(in) match {
case data: FrameData handleFrameData(data)
case start: FrameStart handleFrameStart(start)
case FrameError(ex) failStage(ex)
}
}
private object CloseAfterPeerClosed extends InHandler {
override def onPush(): Unit = grab(in) match {
case FrameStart(FrameHeader(Opcode.Close, _, length, _, _, _, _), data)
setHandler(in, WaitForPeerTcpClose)
push(out, PeerClosed.parse(data))
case _ pull(in) // ignore all other data
}
}
private object WaitForPeerTcpClose extends InHandler {
override def onPush(): Unit = pull(in) // ignore
}
}
override def handleFrameData(data: FrameData)(implicit ctx: Ctx): SyncDirective = publish(data)
private def publish(part: FrameEvent)(implicit ctx: Ctx): SyncDirective =
try publishMessagePart(createMessagePart(part.data, last = finSeen && part.lastPart))
catch {
case NonFatal(e) closeWithCode(Protocol.CloseCodes.InconsistentData)
}
}
private class CollectingControlFrame(opcode: Opcode, _data: ByteString, nextState: State) extends InFrameState {
var data = _data
def handleFrameData(data: FrameData)(implicit ctx: Ctx): SyncDirective = {
this.data ++= data.data
if (data.lastPart) handleControlFrame(opcode, this.data, nextState)
else ctx.pull()
}
}
private def becomeAndHandleWith(newState: State, part: FrameEvent)(implicit ctx: Ctx): SyncDirective = {
become(newState)
current.onPush(part, ctx)
}
/** Returns a SyncDirective if it handled the message */
private def validateHeader(header: FrameHeader)(implicit ctx: Ctx): Option[SyncDirective] = header match {
case h: FrameHeader if h.mask.isDefined && !server Some(protocolError())
case h: FrameHeader if h.rsv1 || h.rsv2 || h.rsv3 Some(protocolError())
case FrameHeader(op, _, length, fin, _, _, _) if op.isControl && (length > 125 || !fin) Some(protocolError())
case _ None
}
private def handleControlFrame(opcode: Opcode, data: ByteString, nextState: State)(implicit ctx: Ctx): SyncDirective = {
become(nextState)
opcode match {
case Opcode.Ping publishDirectResponse(FrameEvent.fullFrame(Opcode.Pong, None, data, fin = true))
case Opcode.Pong
// ignore unsolicited Pong frame
ctx.pull()
case Opcode.Close
become(WaitForPeerTcpClose)
ctx.push(PeerClosed.parse(data))
case Opcode.Other(o) closeWithCode(Protocol.CloseCodes.ProtocolError, "Unsupported opcode")
case other ctx.fail(new IllegalStateException(s"unexpected message of type [${other.getClass.getName}] when expecting ControlFrame"))
}
}
private def collectControlFrame(start: FrameStart, nextState: State)(implicit ctx: Ctx): SyncDirective = {
require(!start.isFullMessage)
become(new CollectingControlFrame(start.header.opcode, start.data, nextState))
ctx.pull()
}
private def publishMessagePart(part: MessageDataPart)(implicit ctx: Ctx): SyncDirective =
if (part.last) emit(Iterator(part, MessageEnd), ctx, Idle)
else ctx.push(part)
private def publishDirectResponse(frame: FrameStart)(implicit ctx: Ctx): SyncDirective =
ctx.push(DirectAnswer(frame))
private def protocolError(reason: String = "")(implicit ctx: Ctx): SyncDirective =
closeWithCode(Protocol.CloseCodes.ProtocolError, reason)
private def closeWithCode(closeCode: Int, reason: String = "", cause: Throwable = null)(implicit ctx: Ctx): SyncDirective = {
become(CloseAfterPeerClosed)
ctx.push(ActivelyCloseWithCode(Some(closeCode), reason))
}
private object CloseAfterPeerClosed extends State {
def onPush(elem: FrameEventOrError, ctx: Context[Output]): SyncDirective =
elem match {
case FrameStart(FrameHeader(Opcode.Close, _, length, _, _, _, _), data)
become(WaitForPeerTcpClose)
ctx.push(PeerClosed.parse(data))
case _ ctx.pull() // ignore all other data
}
}
private object WaitForPeerTcpClose extends State {
def onPush(elem: FrameEventOrError, ctx: Context[Output]): SyncDirective =
ctx.pull() // ignore
}
private abstract class StateWithControlFrameHandling extends BetweenFrameState {
def handleRegularFrameStart(start: FrameStart)(implicit ctx: Ctx): SyncDirective
def handleFrameStart(start: FrameStart)(implicit ctx: Ctx): SyncDirective =
validateHeader(start.header).getOrElse {
if (start.header.opcode.isControl)
if (start.isFullMessage) handleControlFrame(start.header.opcode, start.data, this)
else collectControlFrame(start, this)
else handleRegularFrameStart(start)
}
}
private abstract class BetweenFrameState extends ImplicitContextState {
def handleFrameData(data: FrameData)(implicit ctx: Ctx): SyncDirective =
throw new IllegalStateException("Expected FrameStart")
}
private abstract class InFrameState extends ImplicitContextState {
def handleFrameStart(start: FrameStart)(implicit ctx: Ctx): SyncDirective =
throw new IllegalStateException("Expected FrameData")
}
private abstract class ImplicitContextState extends State {
def handleFrameData(data: FrameData)(implicit ctx: Ctx): SyncDirective
def handleFrameStart(start: FrameStart)(implicit ctx: Ctx): SyncDirective
def onPush(part: FrameEventOrError, ctx: Ctx): SyncDirective =
part match {
case data: FrameData handleFrameData(data)(ctx)
case start: FrameStart handleFrameStart(start)(ctx)
case FrameError(ex) ctx.fail(ex)
}
}
}
sealed trait Output

View file

@ -8,26 +8,23 @@ import akka.NotUsed
import akka.http.scaladsl.model.ws._
import scala.concurrent.{ Future, Promise }
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.stream.stage._
import akka.stream._
import akka.stream.TLSProtocol._
import akka.stream.scaladsl._
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ HttpResponse, HttpMethods }
import akka.http.scaladsl.model.{ HttpMethods, HttpResponse }
import akka.http.scaladsl.model.headers.Host
import akka.http.impl.engine.parsing.HttpMessageParser.StateResult
import akka.http.impl.engine.parsing.ParserOutput.{ RemainingBytes, ResponseStart, NeedMoreData }
import akka.http.impl.engine.parsing.{ ParserOutput, HttpHeaderParser, HttpResponseParser }
import akka.http.impl.engine.parsing.ParserOutput.{ NeedMoreData, RemainingBytes, ResponseStart }
import akka.http.impl.engine.parsing.{ HttpHeaderParser, HttpResponseParser, ParserOutput }
import akka.http.impl.engine.rendering.{ HttpRequestRendererFactory, RequestRenderingContext }
import akka.http.impl.engine.ws.Handshake.Client.NegotiatedWebSocketSettings
import akka.http.impl.util.StreamUtils
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
object WebSocketClientBlueprint {
/**
@ -59,68 +56,70 @@ object WebSocketClientBlueprint {
val renderedInitialRequest =
HttpRequestRendererFactory.renderStrict(RequestRenderingContext(initialRequest, hostHeader), settings, log)
class UpgradeStage extends StatefulStage[ByteString, ByteString] {
type State = StageState[ByteString, ByteString]
class UpgradeStage extends SimpleLinearGraphStage[ByteString] {
def initial: State = parsingResponse
def parsingResponse: State = new State {
// a special version of the parser which only parses one message and then reports the remaining data
// if some is available
val parser = new HttpResponseParser(settings.parserSettings, HttpHeaderParser(settings.parserSettings)()) {
var first = true
override def handleInformationalResponses = false
override protected def parseMessage(input: ByteString, offset: Int): StateResult = {
if (first) {
first = false
super.parseMessage(input, offset)
} else {
emit(RemainingBytes(input.drop(offset)))
terminate()
override def createLogic(attributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
// a special version of the parser which only parses one message and then reports the remaining data
// if some is available
val parser = new HttpResponseParser(settings.parserSettings, HttpHeaderParser(settings.parserSettings)()) {
var first = true
override def handleInformationalResponses = false
override protected def parseMessage(input: ByteString, offset: Int): StateResult = {
if (first) {
first = false
super.parseMessage(input, offset)
} else {
emit(RemainingBytes(input.drop(offset)))
terminate()
}
}
}
}
parser.setContextForNextResponse(HttpResponseParser.ResponseContext(HttpMethods.GET, None))
parser.setContextForNextResponse(HttpResponseParser.ResponseContext(HttpMethods.GET, None))
def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = {
parser.parseBytes(elem) match {
case NeedMoreData ctx.pull()
case ResponseStart(status, protocol, headers, entity, close)
val response = HttpResponse(status, headers, protocol = protocol)
Handshake.Client.validateResponse(response, subprotocol.toList, key) match {
case Right(NegotiatedWebSocketSettings(protocol))
result.success(ValidUpgrade(response, protocol))
override def onPush(): Unit = {
parser.parseBytes(grab(in)) match {
case NeedMoreData pull(in)
case ResponseStart(status, protocol, headers, entity, close)
val response = HttpResponse(status, headers, protocol = protocol)
Handshake.Client.validateResponse(response, subprotocol.toList, key) match {
case Right(NegotiatedWebSocketSettings(protocol))
result.success(ValidUpgrade(response, protocol))
become(transparent)
valve.open()
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in))
})
valve.open()
val parseResult = parser.onPull()
require(parseResult == ParserOutput.MessageEnd, s"parseResult should be MessageEnd but was $parseResult")
parser.onPull() match {
case NeedMoreData ctx.pull()
case RemainingBytes(bytes) ctx.push(bytes)
case other
throw new IllegalStateException(s"unexpected element of type ${other.getClass}")
}
case Left(problem)
result.success(InvalidUpgradeResponse(response, s"WebSocket server at $uri returned $problem"))
ctx.fail(new IllegalArgumentException(s"WebSocket upgrade did not finish because of '$problem'"))
}
case other
throw new IllegalStateException(s"unexpected element of type ${other.getClass}")
val parseResult = parser.onPull()
require(parseResult == ParserOutput.MessageEnd, s"parseResult should be MessageEnd but was $parseResult")
parser.onPull() match {
case NeedMoreData pull(in)
case RemainingBytes(bytes) push(out, bytes)
case other
throw new IllegalStateException(s"unexpected element of type ${other.getClass}")
}
case Left(problem)
result.success(InvalidUpgradeResponse(response, s"WebSocket server at $uri returned $problem"))
failStage(new IllegalArgumentException(s"WebSocket upgrade did not finish because of '$problem'"))
}
case other
throw new IllegalStateException(s"unexpected element of type ${other.getClass}")
}
}
}
}
def transparent: State = new State {
def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = ctx.push(elem)
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
override def toString = "UpgradeStage"
}
BidiFlow.fromGraph(GraphDSL.create() { implicit b
import GraphDSL.Implicits._
val networkIn = b.add(Flow[ByteString].transform(() new UpgradeStage))
val networkIn = b.add(Flow[ByteString].via(new UpgradeStage))
val wsIn = b.add(Flow[ByteString])
val handshakeRequestSource = b.add(Source.single(renderedInitialRequest) ++ valve.source)

View file

@ -16,11 +16,17 @@ import Uri._
// http://tools.ietf.org/html/rfc3986
private[http] class UriParser(val input: ParserInput,
val uriParsingCharset: Charset = UTF8,
val uriParsingMode: Uri.ParsingMode = Uri.ParsingMode.Relaxed) extends Parser
val uriParsingCharset: Charset,
val uriParsingMode: Uri.ParsingMode,
val maxValueStackSize: Int) extends Parser(maxValueStackSize)
with IpAddressParsing with StringBuilding {
import CharacterClasses._
def this(input: ParserInput,
uriParsingCharset: Charset = UTF8,
uriParsingMode: Uri.ParsingMode = Uri.ParsingMode.Relaxed) =
this(input, uriParsingCharset, uriParsingMode, 1024)
def parseAbsoluteUri(): Uri =
rule(`absolute-URI` ~ EOI).run() match {
case Right(_) => create(_scheme, _userinfo, _host, _port, collapseDotSegments(_path), _rawQueryString, _fragment)
@ -170,16 +176,33 @@ private[http] class UriParser(val input: ParserInput,
clearSBForDecoding() ~ oneOrMore('+' ~ appendSB(' ') | `query-char` ~ appendSB() | `pct-encoded`) ~ push(getDecodedString())
| push(""))
def keyValuePair: Rule2[String, String] = rule {
part ~ ('=' ~ part | push(Query.EmptyValue))
}
// has a max value-stack depth of 3
def keyValuePairsWithLimitedStackUse: Rule1[Query] = rule {
keyValuePair ~> { (key, value) => Query.Cons(key, value, Query.Empty) } ~ {
zeroOrMore('&' ~ keyValuePair ~> { (prefix: Query, key, value) => Query.Cons(key, value, prefix) }) ~>
(_.reverse)
}
}
// non-tail recursion, which we accept because it allows us to directly build the query
// without having to reverse it at the end.
// Also: request queries usually do not have hundreds of elements, so we should get away with
// putting some pressure onto the JVM and value stack
def keyValuePairs: Rule1[Query] = rule {
part ~ ('=' ~ part | push(Query.EmptyValue)) ~ ('&' ~ keyValuePairs | push(Query.Empty)) ~> { (key, value, tail) =>
// Adds 2 values to the value stack for the first pair, then parses the remaining pairs.
def keyValuePairsWithReversalAvoidance: Rule1[Query] = rule {
keyValuePair ~ ('&' ~ keyValuePairs | push(Query.Empty)) ~> { (key, value, tail) =>
Query.Cons(key, value, tail)
}
}
// Uses a reversal-free parsing approach as long as there is enough space on the value stack,
// switching to a limited-stack approach when necessary.
def keyValuePairs: Rule1[Query] =
if (valueStack.size + 5 <= maxValueStackSize) keyValuePairsWithReversalAvoidance
else keyValuePairsWithLimitedStackUse
rule { keyValuePairs }
}

View file

@ -13,6 +13,7 @@ import scala.concurrent.duration.Duration
/** INTERNAL API */
private[akka] final case class ConnectionPoolSettingsImpl(
val maxConnections: Int,
val minConnections: Int,
val maxRetries: Int,
val maxOpenRequests: Int,
val pipeliningLimit: Int,
@ -21,6 +22,8 @@ private[akka] final case class ConnectionPoolSettingsImpl(
extends ConnectionPoolSettings {
require(maxConnections > 0, "max-connections must be > 0")
require(minConnections >= 0, "min-connections must be >= 0")
require(minConnections <= maxConnections, "min-connections must be <= max-connections")
require(maxRetries >= 0, "max-retries must be >= 0")
require(maxOpenRequests > 0 && (maxOpenRequests & (maxOpenRequests - 1)) == 0, "max-open-requests must be a power of 2 > 0")
require(pipeliningLimit > 0, "pipelining-limit must be > 0")
@ -33,6 +36,7 @@ object ConnectionPoolSettingsImpl extends SettingsCompanion[ConnectionPoolSettin
def fromSubConfig(root: Config, c: Config) = {
ConnectionPoolSettingsImpl(
c getInt "max-connections",
c getInt "min-connections",
c getInt "max-retries",
c getInt "max-open-requests",
c getInt "pipelining-limit",

View file

@ -59,7 +59,7 @@ private[http] object StreamUtils {
override def onPull(): Unit = pull(in)
override def onUpstreamFailure(ex: Throwable): Unit = {
promise.failure(ex)
promise.tryFailure(ex)
failStage(ex)
}

View file

@ -5,6 +5,7 @@ package akka.http.javadsl.settings
import java.util.{ Optional, Random }
import akka.actor.ActorSystem
import akka.http.impl.settings.ClientConnectionSettingsImpl
import akka.http.javadsl.model.headers.UserAgent
import akka.io.Inet.SocketOption
@ -42,4 +43,5 @@ abstract class ClientConnectionSettings private[akka] () { self: ClientConnectio
object ClientConnectionSettings extends SettingsCompanion[ClientConnectionSettings] {
def create(config: Config): ClientConnectionSettings = ClientConnectionSettingsImpl(config)
def create(configOverrides: String): ClientConnectionSettings = ClientConnectionSettingsImpl(configOverrides)
override def create(system: ActorSystem): ClientConnectionSettings = create(system.settings.config)
}

View file

@ -3,6 +3,7 @@
*/
package akka.http.javadsl.settings
import akka.actor.ActorSystem
import akka.http.impl.settings.ConnectionPoolSettingsImpl
import com.typesafe.config.Config
@ -14,6 +15,7 @@ import akka.http.impl.util.JavaMapping.Implicits._
*/
abstract class ConnectionPoolSettings private[akka] () { self: ConnectionPoolSettingsImpl
def getMaxConnections: Int
def getMinConnections: Int
def getMaxRetries: Int
def getMaxOpenRequests: Int
def getPipeliningLimit: Int
@ -33,4 +35,5 @@ abstract class ConnectionPoolSettings private[akka] () { self: ConnectionPoolSet
object ConnectionPoolSettings extends SettingsCompanion[ConnectionPoolSettings] {
override def create(config: Config): ConnectionPoolSettings = ConnectionPoolSettingsImpl(config)
override def create(configOverrides: String): ConnectionPoolSettings = ConnectionPoolSettingsImpl(configOverrides)
override def create(system: ActorSystem): ConnectionPoolSettings = create(system.settings.config)
}

View file

@ -5,6 +5,7 @@ package akka.http.javadsl.settings
import java.util.Optional
import akka.actor.ActorSystem
import akka.http.impl.engine.parsing.BodyPartParser
import akka.http.impl.settings.ParserSettingsImpl
import java.{ util ju }
@ -83,4 +84,5 @@ object ParserSettings extends SettingsCompanion[ParserSettings] {
override def create(config: Config): ParserSettings = ParserSettingsImpl(config)
override def create(configOverrides: String): ParserSettings = ParserSettingsImpl(configOverrides)
override def create(system: ActorSystem): ParserSettings = create(system.settings.config)
}

View file

@ -3,6 +3,7 @@
*/
package akka.http.javadsl.settings
import akka.actor.ActorSystem
import akka.http.impl.settings.RoutingSettingsImpl
import com.typesafe.config.Config
@ -30,4 +31,5 @@ abstract class RoutingSettings private[akka] () { self: RoutingSettingsImpl ⇒
object RoutingSettings extends SettingsCompanion[RoutingSettings] {
override def create(config: Config): RoutingSettings = RoutingSettingsImpl(config)
override def create(configOverrides: String): RoutingSettings = RoutingSettingsImpl(configOverrides)
override def create(system: ActorSystem): RoutingSettings = create(system.settings.config)
}

View file

@ -5,6 +5,7 @@ package akka.http.javadsl.settings
import java.util.{ Optional, Random }
import akka.actor.ActorSystem
import akka.http.impl.settings.ServerSettingsImpl
import akka.http.javadsl.model.headers.Host
import akka.http.javadsl.model.headers.Server
@ -71,4 +72,5 @@ object ServerSettings extends SettingsCompanion[ServerSettings] {
override def create(config: Config): ServerSettings = ServerSettingsImpl(config)
override def create(configOverrides: String): ServerSettings = ServerSettingsImpl(configOverrides)
override def create(system: ActorSystem): ServerSettings = create(system.settings.config)
}

View file

@ -3,14 +3,16 @@ package akka.http.javadsl.settings
import akka.actor.ActorSystem
import com.typesafe.config.Config
/** INTERNAL API */
trait SettingsCompanion[T] {
/**
* WARNING: This MUST overriden in sub-classes as otherwise won't be usable (return type) from Java.
* Creates an instance of settings using the configuration provided by the given ActorSystem.
*
* Java API
*/
final def create(system: ActorSystem): T = create(system.settings.config)
def create(system: ActorSystem): T = create(system.settings.config)
/**
* Creates an instance of settings using the given Config.

View file

@ -244,7 +244,11 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
settings: ClientConnectionSettings, connectionContext: ConnectionContext,
log: LoggingAdapter): Flow[SslTlsOutbound, SslTlsInbound, Future[OutgoingConnection]] = {
val tlsStage = sslTlsStage(connectionContext, Client, Some(host port))
val transportFlow = Tcp().outgoingConnection(new InetSocketAddress(host, port), localAddress,
// The InetSocketAddress representing the remote address must be created unresolved because akka.io.TcpOutgoingConnection will
// not attempt DNS resolution if the InetSocketAddress is already resolved. That behavior is problematic when it comes to
// connection pools since it means that new connections opened by the pool in the future can end up using a stale IP address.
// By passing an unresolved InetSocketAddress instead, we ensure that DNS resolution is performed for every new connection.
val transportFlow = Tcp().outgoingConnection(InetSocketAddress.createUnresolved(host, port), localAddress,
settings.socketOptions, halfClose = true, settings.connectingTimeout, settings.idleTimeout)
tlsStage.joinMat(transportFlow) { (_, tcpConnFuture)

View file

@ -8,18 +8,22 @@ import java.io.File
import java.nio.file.Path
import java.lang.{ Iterable JIterable }
import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, ExecutionContext }
import scala.concurrent.{ ExecutionContext, Future }
import scala.collection.immutable
import scala.compat.java8.OptionConverters._
import scala.reflect.{ classTag, ClassTag }
import scala.reflect.{ ClassTag, classTag }
import akka.Done
import akka.parboiled2.CharUtils
import akka.stream.Materializer
import akka.util.{ HashCode, ByteString }
import akka.util.{ ByteString, HashCode }
import akka.http.impl.util._
import akka.http.javadsl.{ model jm }
import akka.http.scaladsl.util.FastFuture._
import akka.stream.scaladsl.Sink
import headers._
import akka.http.impl.util.JavaMapping.Implicits._
@ -37,6 +41,10 @@ sealed trait HttpMessage extends jm.HttpMessage {
def entity: ResponseEntity
def protocol: HttpProtocol
/** Drains entity stream */
def discardEntityBytes(mat: Materializer): HttpMessage.DiscardedEntity =
new HttpMessage.DiscardedEntity(entity.dataBytes.runWith(Sink.ignore)(mat))
/** Returns a copy of this message with the list of headers set to the given ones. */
def withHeaders(headers: HttpHeader*): Self = withHeaders(headers.toList)
@ -101,6 +109,8 @@ sealed trait HttpMessage extends jm.HttpMessage {
def addHeader(header: jm.HttpHeader): Self = mapHeaders(_ :+ header.asInstanceOf[HttpHeader])
def addCredentials(credentials: jm.headers.HttpCredentials): Self = addHeader(jm.headers.Authorization.create(credentials))
/** Removes the header with the given name (case-insensitive) */
def removeHeader(headerName: String): Self = {
val lowerHeaderName = headerName.toRootLowerCase
@ -139,6 +149,46 @@ object HttpMessage {
case HttpProtocols.`HTTP/1.1` connectionHeader.isDefined && connectionHeader.get.hasClose
case HttpProtocols.`HTTP/1.0` connectionHeader.isEmpty || !connectionHeader.get.hasKeepAlive
}
/**
* Represents the the currently being-drained HTTP Entity which triggers completion of the contained
* Future once the entity has been drained for the given HttpMessage completely.
*/
final class DiscardedEntity(f: Future[Done]) extends akka.http.javadsl.model.HttpMessage.DiscardedEntity {
/**
* This future completes successfully once the underlying entity stream has been
* successfully drained (and fails otherwise).
*/
def future: Future[Done] = f
/**
* This future completes successfully once the underlying entity stream has been
* successfully drained (and fails otherwise).
*/
def completionStage: CompletionStage[Done] = FutureConverters.toJava(f)
}
/** Adds Scala DSL idiomatic methods to [[HttpMessage]], e.g. versions of methods with an implicit [[Materializer]]. */
implicit final class HttpMessageScalaDSLSugar(val httpMessage: HttpMessage) extends AnyVal {
/**
* Discards the entities data bytes by running the `dataBytes` Source contained by the `entity` of this HTTP message.
*
* Note: It is crucial that entities are either discarded, or consumed by running the underlying [[akka.stream.scaladsl.Source]]
* as otherwise the lack of consuming of the data will trigger back-pressure to the underlying TCP connection
* (as designed), however possibly leading to an idle-timeout that will close the connection, instead of
* just having ignored the data.
*
* Warning: It is not allowed to discard and/or consume the the `entity.dataBytes` more than once
* as the stream is directly attached to the "live" incoming data source from the underlying TCP connection.
* Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose
* of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an
* "stream can cannot be materialized more than once" exception.
*
* In future versions, more automatic ways to warn or resolve these situations may be introduced, see issue #18716.
*/
def discardEntityBytes()(implicit mat: Materializer): HttpMessage.DiscardedEntity =
httpMessage.discardEntityBytes(mat)
}
}
/**

View file

@ -61,7 +61,7 @@ sealed trait Multipart extends jm.Multipart {
boundary: String = BodyPartRenderer.randomBoundary())(implicit log: LoggingAdapter = NoLogging): MessageEntity = {
val chunks =
parts
.transform(() BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log))
.via(BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log))
.flatMapConcat(ConstantFun.scalaIdentityFunction)
HttpEntity.Chunked(mediaType withBoundary boundary withCharset charset, chunks)
}
@ -212,7 +212,7 @@ object Multipart {
/** Java API */
override def toStrict(timeoutMillis: Long, materializer: Materializer): CompletionStage[jm.Multipart.General.Strict] =
super.toStrict(timeoutMillis, materializer).asInstanceOf[Future[jm.Multipart.General.Strict]].toJava
super.toStrict(timeoutMillis, materializer).toScala.asInstanceOf[Future[jm.Multipart.General.Strict]].toJava
}
object General {
def apply(mediaType: MediaType.Multipart, parts: BodyPart.Strict*): Strict = Strict(mediaType, parts.toVector)
@ -258,7 +258,7 @@ object Multipart {
/** Java API */
override def toStrict(timeoutMillis: Long, materializer: Materializer): CompletionStage[jm.Multipart.General.BodyPart.Strict] =
super.toStrict(timeoutMillis, materializer).asInstanceOf[Future[jm.Multipart.General.BodyPart.Strict]].toJava
super.toStrict(timeoutMillis, materializer).toScala.asInstanceOf[Future[jm.Multipart.General.BodyPart.Strict]].toJava
private[BodyPart] def tryCreateFormDataBodyPart[T](f: (String, Map[String, String], immutable.Seq[HttpHeader]) T): Try[T] = {
val params = dispositionParams
@ -323,12 +323,22 @@ object Multipart {
/** Java API */
override def toStrict(timeoutMillis: Long, materializer: Materializer): CompletionStage[jm.Multipart.FormData.Strict] =
super.toStrict(timeoutMillis, materializer).asInstanceOf[Future[jm.Multipart.FormData.Strict]].toJava
super.toStrict(timeoutMillis, materializer).toScala.asInstanceOf[Future[jm.Multipart.FormData.Strict]].toJava
}
object FormData {
def apply(parts: Multipart.FormData.BodyPart.Strict*): Multipart.FormData.Strict = Strict(parts.toVector)
def apply(parts: Multipart.FormData.BodyPart*): Multipart.FormData = Multipart.FormData(Source(parts.toVector))
// FIXME: SI-2991 workaround - two functions below. Remove when (hopefully) this issue is fixed
/** INTERNAL API */
private[akka] def createStrict(parts: Multipart.FormData.BodyPart.Strict*): Multipart.FormData.Strict = Strict(parts.toVector)
/** INTERNAL API */
private[akka] def createNonStrict(parts: Multipart.FormData.BodyPart*): Multipart.FormData = Multipart.FormData(Source(parts.toVector))
/** INTERNAL API */
private[akka] def createStrict(fields: Map[String, akka.http.javadsl.model.HttpEntity.Strict]): Multipart.FormData.Strict = Multipart.FormData.Strict {
fields.map { case (name, entity: akka.http.scaladsl.model.HttpEntity.Strict) Multipart.FormData.BodyPart.Strict(name, entity) }(collection.breakOut)
}
def apply(fields: Map[String, HttpEntity.Strict]): Multipart.FormData.Strict = Multipart.FormData.Strict {
fields.map { case (name, entity) Multipart.FormData.BodyPart.Strict(name, entity) }(collection.breakOut)
}
@ -426,7 +436,7 @@ object Multipart {
/** Java API */
override def toStrict(timeoutMillis: Long, materializer: Materializer): CompletionStage[jm.Multipart.FormData.BodyPart.Strict] =
super.toStrict(timeoutMillis, materializer).asInstanceOf[Future[jm.Multipart.FormData.BodyPart.Strict]].toJava
super.toStrict(timeoutMillis, materializer).toScala.asInstanceOf[Future[jm.Multipart.FormData.BodyPart.Strict]].toJava
}
object BodyPart {
def apply(_name: String, _entity: BodyPartEntity,
@ -467,6 +477,26 @@ object Multipart {
FastFuture.successful(this)
override def productPrefix = "FormData.BodyPart.Strict"
}
/** INTERNAL API */
private[akka] object Builder {
def create(_name: String, _entity: BodyPartEntity,
_additionalDispositionParams: Map[String, String],
_additionalHeaders: Iterable[akka.http.javadsl.model.HttpHeader]): Multipart.FormData.BodyPart = {
val _headers = _additionalHeaders.to[immutable.Seq] map { case h: akka.http.scaladsl.model.HttpHeader h }
apply(_name, _entity, _additionalDispositionParams, _headers)
}
}
/** INTERNAL API */
private[akka] object StrictBuilder {
def createStrict(_name: String, _entity: HttpEntity.Strict,
_additionalDispositionParams: Map[String, String],
_additionalHeaders: Iterable[akka.http.javadsl.model.HttpHeader]): Multipart.FormData.BodyPart.Strict = {
val _headers = _additionalHeaders.to[immutable.Seq] map { case h: akka.http.scaladsl.model.HttpHeader h }
Strict(_name, _entity, _additionalDispositionParams, _headers)
}
}
}
}
@ -488,7 +518,7 @@ object Multipart {
/** Java API */
override def toStrict(timeoutMillis: Long, materializer: Materializer): CompletionStage[jm.Multipart.ByteRanges.Strict] =
super.toStrict(timeoutMillis, materializer).asInstanceOf[Future[jm.Multipart.ByteRanges.Strict]].toJava
super.toStrict(timeoutMillis, materializer).toScala.asInstanceOf[Future[jm.Multipart.ByteRanges.Strict]].toJava
}
object ByteRanges {
def apply(parts: Multipart.ByteRanges.BodyPart.Strict*): Strict = Strict(parts.toVector)
@ -563,7 +593,7 @@ object Multipart {
/** Java API */
override def toStrict(timeoutMillis: Long, materializer: Materializer): CompletionStage[jm.Multipart.ByteRanges.BodyPart.Strict] =
super.toStrict(timeoutMillis, materializer).asInstanceOf[Future[jm.Multipart.ByteRanges.BodyPart.Strict]].toJava
super.toStrict(timeoutMillis, materializer).toScala.asInstanceOf[Future[jm.Multipart.ByteRanges.BodyPart.Strict]].toJava
}
object BodyPart {
def apply(_contentRange: ContentRange, _entity: BodyPartEntity, _rangeUnit: RangeUnit = RangeUnits.Bytes,

View file

@ -21,3 +21,10 @@ final case class HttpChallenge(scheme: String, realm: String,
/** Java API */
def getParams: util.Map[String, String] = params.asJava
}
object HttpChallenges {
def basic(realm: String): HttpChallenge = HttpChallenge("Basic", realm)
def oAuth2(realm: String): HttpChallenge = HttpChallenge("Bearer", realm)
}

View file

@ -336,7 +336,8 @@ object Connection extends ModeledCompanion[Connection] {
def apply(first: String, more: String*): Connection = apply(immutable.Seq(first +: more: _*))
implicit val tokensRenderer = Renderer.defaultSeqRenderer[String] // cache
}
final case class Connection(tokens: immutable.Seq[String]) extends RequestResponseHeader {
final case class Connection(tokens: immutable.Seq[String]) extends jm.headers.Connection
with RequestResponseHeader {
require(tokens.nonEmpty, "tokens must not be empty")
import Connection.tokensRenderer
def renderValue[R <: Rendering](r: R): r.type = r ~~ tokens

View file

@ -14,6 +14,7 @@ import scala.concurrent.duration.Duration
*/
abstract class ConnectionPoolSettings extends js.ConnectionPoolSettings { self: ConnectionPoolSettingsImpl
def maxConnections: Int
def minConnections: Int
def maxRetries: Int
def maxOpenRequests: Int
def pipeliningLimit: Int
@ -26,6 +27,7 @@ abstract class ConnectionPoolSettings extends js.ConnectionPoolSettings { self:
final override def getPipeliningLimit: Int = pipeliningLimit
final override def getIdleTimeout: Duration = idleTimeout
final override def getMaxConnections: Int = maxConnections
final override def getMinConnections: Int = minConnections
final override def getMaxOpenRequests: Int = maxOpenRequests
final override def getMaxRetries: Int = maxRetries

View file

@ -0,0 +1,71 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.model;
import akka.Done;
import akka.actor.ActorSystem;
import akka.japi.function.Procedure;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.util.Try;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.assertEquals;
public class EntityDiscardingTest extends JUnitSuite {
private ActorSystem sys = ActorSystem.create("test");
private ActorMaterializer mat = ActorMaterializer.create(sys);
private Iterable<ByteString> testData = Arrays.asList(ByteString.fromString("abc"), ByteString.fromString("def"));
@Test
public void testHttpRequestDiscardEntity() {
CompletableFuture<Done> f = new CompletableFuture<>();
Source<ByteString, ?> s = Source.from(testData).alsoTo(Sink.onComplete(completeDone(f)));
RequestEntity reqEntity = HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8, s);
HttpRequest req = HttpRequest.create().withEntity(reqEntity);
HttpMessage.DiscardedEntity de = req.discardEntityBytes(mat);
assertEquals(Done.getInstance(), f.join());
assertEquals(Done.getInstance(), de.completionStage().toCompletableFuture().join());
}
@Test
public void testHttpResponseDiscardEntity() {
CompletableFuture<Done> f = new CompletableFuture<>();
Source<ByteString, ?> s = Source.from(testData).alsoTo(Sink.onComplete(completeDone(f)));
ResponseEntity respEntity = HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8, s);
HttpResponse resp = HttpResponse.create().withEntity(respEntity);
HttpMessage.DiscardedEntity de = resp.discardEntityBytes(mat);
assertEquals(Done.getInstance(), f.join());
assertEquals(Done.getInstance(), de.completionStage().toCompletableFuture().join());
}
private Procedure<Try<Done>> completeDone(CompletableFuture<Done> p) {
return new Procedure<Try<Done>>() {
@Override
public void apply(Try<Done> t) throws Exception {
if(t.isSuccess())
p.complete(Done.getInstance());
else
p.completeExceptionally(t.failed().get());
}
};
}
}

View file

@ -0,0 +1,18 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.settings;
import akka.actor.ActorSystem;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
public class ClientConnectionSettingsTest extends JUnitSuite {
@Test
public void testCreateWithActorSystem() {
ActorSystem sys = ActorSystem.create("test");
ClientConnectionSettings settings = ClientConnectionSettings.create(sys);
}
}

View file

@ -0,0 +1,18 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.settings;
import akka.actor.ActorSystem;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
public class ConnectionPoolSettingsTest extends JUnitSuite {
@Test
public void testCreateWithActorSystem() {
ActorSystem sys = ActorSystem.create("test");
ConnectionPoolSettings settings = ConnectionPoolSettings.create(sys);
}
}

View file

@ -0,0 +1,18 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.settings;
import akka.actor.ActorSystem;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
public class ParserSettingsTest extends JUnitSuite {
@Test
public void testCreateWithActorSystem() {
ActorSystem sys = ActorSystem.create("test");
ParserSettings settings = ParserSettings.create(sys);
}
}

View file

@ -0,0 +1,31 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.settings;
import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
public class RoutingSettingsTest extends JUnitSuite {
@Test
public void testCreateWithActorSystem() {
String testConfig =
"akka.http.routing {\n" +
" verbose-error-messages = off\n" +
" file-get-conditional = on\n" +
" render-vanity-footer = yes\n" +
" range-coalescing-threshold = 80\n" +
" range-count-limit = 16\n" +
" decode-max-bytes-per-chunk = 1m\n" +
" file-io-dispatcher = \"test-only\"\n" +
"}";
Config config = ConfigFactory.parseString(testConfig);
ActorSystem sys = ActorSystem.create("test", config);
RoutingSettings settings = RoutingSettings.create(sys);
}
}

View file

@ -0,0 +1,18 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.settings;
import akka.actor.ActorSystem;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
public class ServerSettingsTest extends JUnitSuite {
@Test
public void testCreateWithActorSystem() {
ActorSystem sys = ActorSystem.create("test");
ServerSettings settings = ServerSettings.create(sys);
}
}

View file

@ -23,6 +23,7 @@ import akka.stream.testkit.{ TestPublisher, TestSubscriber }
import akka.testkit.AkkaSpec
import akka.util.ByteString
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NonFatal
@ -228,6 +229,56 @@ class ConnectionPoolSpec extends AkkaSpec("""
acceptIncomingConnection()
val (Success(_), 42) = responseOut.expectNext()
}
"never close hot connections when minConnections key is given and >0 (minConnections = 1)" in new TestSetup() {
val close: HttpHeader = Connection("close")
// for lower bound of one connection
val minConnection = 1
val (requestIn, requestOut, responseOutSub, hcpMinConnection) =
cachedHostConnectionPool[Int](idleTimeout = 100.millis, minConnections = minConnection)
val gatewayConnection = hcpMinConnection.gateway
acceptIncomingConnection()
requestIn.sendNext(HttpRequest(uri = "/minimumslots/1", headers = immutable.Seq(close)) 42)
responseOutSub.request(1)
requestOut.expectNextN(1)
condHolds(500.millis) { ()
Await.result(gatewayConnection.poolStatus(), 100.millis).get shouldBe a[PoolInterfaceRunning]
}
}
"never close hot connections when minConnections key is given and >0 (minConnections = 5)" in new TestSetup() {
val close: HttpHeader = Connection("close")
// for lower bound of five connections
val minConnections = 5
val (requestIn, requestOut, responseOutSub, hcpMinConnection) = cachedHostConnectionPool[Int](
idleTimeout = 100.millis,
minConnections = minConnections,
maxConnections = minConnections + 10)
(0 until minConnections) foreach { _ acceptIncomingConnection() }
(0 until minConnections) foreach { i
requestIn.sendNext(HttpRequest(uri = s"/minimumslots/5/$i", headers = immutable.Seq(close)) 42)
}
responseOutSub.request(minConnections)
requestOut.expectNextN(minConnections)
val gatewayConnections = hcpMinConnection.gateway
condHolds(1000.millis) { ()
val status = gatewayConnections.poolStatus()
Await.result(status, 100.millis).get shouldBe a[PoolInterfaceRunning]
}
}
"shutdown if idle and min connection has been set to 0" in new TestSetup() {
val (_, _, _, hcp) = cachedHostConnectionPool[Int](idleTimeout = 1.second, minConnections = 0)
val gateway = hcp.gateway
Await.result(gateway.poolStatus(), 1500.millis).get shouldBe a[PoolInterfaceRunning]
awaitCond({ Await.result(gateway.poolStatus(), 1500.millis).isEmpty }, 2000.millis)
}
}
"The single-request client infrastructure" should {
@ -325,24 +376,30 @@ class ConnectionPoolSpec extends AkkaSpec("""
def cachedHostConnectionPool[T](
maxConnections: Int = 2,
minConnections: Int = 0,
maxRetries: Int = 2,
maxOpenRequests: Int = 8,
pipeliningLimit: Int = 1,
idleTimeout: Duration = 5.seconds,
ccSettings: ClientConnectionSettings = ClientConnectionSettings(system)) = {
val settings = new ConnectionPoolSettingsImpl(maxConnections, maxRetries, maxOpenRequests, pipeliningLimit,
idleTimeout, ClientConnectionSettings(system))
flowTestBench(Http().cachedHostConnectionPool[T](serverHostName, serverPort, settings))
val settings =
new ConnectionPoolSettingsImpl(maxConnections, minConnections,
maxRetries, maxOpenRequests, pipeliningLimit,
idleTimeout, ccSettings)
flowTestBench(
Http().cachedHostConnectionPool[T](serverHostName, serverPort, settings))
}
def superPool[T](
maxConnections: Int = 2,
minConnections: Int = 0,
maxRetries: Int = 2,
maxOpenRequests: Int = 8,
pipeliningLimit: Int = 1,
idleTimeout: Duration = 5.seconds,
ccSettings: ClientConnectionSettings = ClientConnectionSettings(system)) = {
val settings = new ConnectionPoolSettingsImpl(maxConnections, maxRetries, maxOpenRequests, pipeliningLimit,
val settings = new ConnectionPoolSettingsImpl(maxConnections, minConnections, maxRetries, maxOpenRequests, pipeliningLimit,
idleTimeout, ClientConnectionSettings(system))
flowTestBench(Http().superPool[T](settings = settings))
}
@ -357,6 +414,22 @@ class ConnectionPoolSpec extends AkkaSpec("""
def connNr(r: HttpResponse): Int = r.headers.find(_ is "conn-nr").get.value.toInt
def requestUri(r: HttpResponse): String = r.headers.find(_ is "req-uri").get.value
/**
* Makes sure the given condition "f" holds in the timer period of "in".
* The given condition function should throw if not met.
* Note: Execution of "condHolds" will take at least "in" time, so for big "in" it might drain the ime budget for tests.
*/
def condHolds[T](in: FiniteDuration)(f: () T): T = {
val end = System.nanoTime.nanos + in
var lastR = f()
while (System.nanoTime.nanos < end) {
lastR = f()
Thread.sleep(50)
}
lastR
}
}
case class ConnNrHeader(nr: Int) extends CustomHeader {

View file

@ -129,6 +129,34 @@ class HttpConfigurationSpec extends AkkaSpec {
server.parserSettings.illegalHeaderWarnings should ===(On)
}
}
"set `akka.http.host-connection-pool.min-connections` only" in {
configuredSystem(
"""
akka.http.host-connection-pool.min-connections = 42
akka.http.host-connection-pool.max-connections = 43
""".stripMargin) { sys
val pool = ConnectionPoolSettings(sys)
pool.getMinConnections should ===(42)
pool.getMaxConnections should ===(43)
}
configuredSystem(""" """) { sys
val pool = ConnectionPoolSettings(sys)
pool.minConnections should ===(0)
}
configuredSystem(
"""
akka.http.host-connection-pool.min-connections = 101
akka.http.host-connection-pool.max-connections = 1
""".stripMargin) { sys
intercept[IllegalArgumentException] { ConnectionPoolSettings(sys) }
}
}
}
def configuredSystem(overrides: String)(block: ActorSystem Unit) = {

View file

@ -175,6 +175,28 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|""" should parseTo(HttpRequest(GET, Uri("http://x//foo").toHttpRequestTargetOriginForm, protocol = `HTTP/1.0`))
closeAfterResponseCompletion shouldEqual Seq(true)
}
"with additional fields in Strict-Transport-Security header" in new Test {
"""GET /hsts HTTP/1.1
|Host: x
|Strict-Transport-Security: max-age=1; preload; dummy
|
|""" should parseTo(HttpRequest(
GET,
"/hsts",
headers = List(Host("x"), `Strict-Transport-Security`(1, None)),
protocol = `HTTP/1.1`))
"""GET /hsts HTTP/1.1
|Host: x
|Strict-Transport-Security: max-age=1; dummy; preload
|
|""" should parseTo(HttpRequest(
GET,
"/hsts",
headers = List(Host("x"), `Strict-Transport-Security`(1, None)),
protocol = `HTTP/1.1`))
}
}
"properly parse a chunked request" - {
@ -453,6 +475,36 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|
|""" should parseToError(422: StatusCode, ErrorInfo("TRACE requests must not have an entity"))
}
"with additional fields in headers" in new Test {
"""GET / HTTP/1.1
|Host: x; dummy
|
|""" should parseToError(
BadRequest,
ErrorInfo("Illegal 'host' header: Invalid input ' ', expected 'EOI', ':', UPPER_ALPHA, lower-reg-name-char or pct-encoded (line 1, column 3)", "x; dummy\n ^"))
"""GET / HTTP/1.1
|Content-length: 3; dummy
|
|""" should parseToError(
BadRequest,
ErrorInfo("Illegal `Content-Length` header value"))
"""GET / HTTP/1.1
|Connection:keep-alive; dummy
|
|""" should parseToError(
BadRequest,
ErrorInfo("Illegal 'connection' header: Invalid input ';', expected tchar, OWS, listSep or 'EOI' (line 1, column 11)", "keep-alive; dummy\n ^"))
"""GET / HTTP/1.1
|Transfer-Encoding: chunked; dummy
|
|""" should parseToError(
BadRequest,
ErrorInfo("Illegal 'transfer-encoding' header: Invalid input ';', expected OWS, listSep or 'EOI' (line 1, column 8)", "chunked; dummy\n ^"))
}
}
}

View file

@ -684,7 +684,7 @@ class HttpServerSpec extends AkkaSpec(
// client then closes the connection
netIn.sendComplete()
requests.expectComplete() // this should happen, but never does
requests.expectComplete()
netOut.expectComplete()
})

View file

@ -0,0 +1,62 @@
/*
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.model
import java.util
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Await
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, Inside, Matchers, WordSpec }
import akka.stream.ActorMaterializer
import akka.actor.ActorSystem
import scala.compat.java8.FutureConverters
class MultipartsSpec extends WordSpec with Matchers with Inside with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString("""
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.loglevel = WARNING""")
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
implicit val materializer = ActorMaterializer()
override def afterAll() = system.terminate()
"Multiparts.createFormDataFromParts" should {
"create a model from Multiparts.createFormDataBodyPartparts" in {
val streamed = Multiparts.createFormDataFromParts(
Multiparts.createFormDataBodyPart("foo", HttpEntities.create("FOO")),
Multiparts.createFormDataBodyPart("bar", HttpEntities.create("BAR")))
val strictCS = streamed.toStrict(1000, materializer)
val strict = Await.result(FutureConverters.toScala(strictCS), 1.second)
strict shouldEqual akka.http.scaladsl.model.Multipart.FormData(
Map("foo" akka.http.scaladsl.model.HttpEntity("FOO"), "bar" akka.http.scaladsl.model.HttpEntity("BAR")))
}
}
"Multiparts.createFormDataFromFields" should {
"create a model from a map of fields" in {
val fields = new util.HashMap[String, HttpEntity.Strict]
fields.put("foo", HttpEntities.create("FOO"))
val streamed = Multiparts.createFormDataFromFields(fields)
val strictCS = streamed.toStrict(1000, materializer)
val strict = Await.result(FutureConverters.toScala(strictCS), 1.second)
strict shouldEqual akka.http.scaladsl.model.Multipart.FormData(
Map("foo" akka.http.scaladsl.model.HttpEntity("FOO")))
}
}
"Multiparts.createStrictFormDataFromParts" should {
"create a strict model from Multiparts.createFormDataBodyPartStrict parts" in {
val streamed = Multiparts.createStrictFormDataFromParts(
Multiparts.createFormDataBodyPartStrict("foo", HttpEntities.create("FOO")),
Multiparts.createFormDataBodyPartStrict("bar", HttpEntities.create("BAR")))
val strict = streamed
strict shouldEqual akka.http.scaladsl.model.Multipart.FormData(
Map("foo" akka.http.scaladsl.model.HttpEntity("FOO"), "bar" akka.http.scaladsl.model.HttpEntity("BAR")))
}
}
}

View file

@ -0,0 +1,81 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.model
import akka.Done
import akka.http.scaladsl.model.HttpEntity.Chunked
import akka.http.scaladsl.{ Http, TestUtils }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.testkit.AkkaSpec
import scala.concurrent.duration._
import akka.util.ByteString
import scala.concurrent.{ Await, Promise }
class EntityDiscardingSpec extends AkkaSpec {
implicit val mat = ActorMaterializer()
val testData = Vector.tabulate(200)(i ByteString(s"row-$i"))
"HttpRequest" should {
"discard entity stream after .discardEntityBytes() call" in {
val p = Promise[Done]()
val s = Source
.fromIterator[ByteString](() testData.iterator)
.alsoTo(Sink.onComplete(t p.complete(t)))
val req = HttpRequest(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, s))
val de = req.discardEntityBytes()
p.future.futureValue should ===(Done)
de.future.futureValue should ===(Done)
}
}
"HttpResponse" should {
"discard entity stream after .discardEntityBytes() call" in {
val p = Promise[Done]()
val s = Source
.fromIterator[ByteString](() testData.iterator)
.alsoTo(Sink.onComplete(t p.complete(t)))
val resp = HttpResponse(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, s))
val de = resp.discardEntityBytes()
p.future.futureValue should ===(Done)
de.future.futureValue should ===(Done)
}
// TODO consider improving this by storing a mutable "already materialized" flag somewhere
// TODO likely this is going to inter-op with the auto-draining as described in #18716
"should not allow draining a second time" in {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val bound = Http().bindAndHandleSync(
req
HttpResponse(entity = HttpEntity(
ContentTypes.`text/csv(UTF-8)`, Source.fromIterator[ByteString](() testData.iterator))),
host, port).futureValue
try {
val response = Http().singleRequest(HttpRequest(uri = s"http://$host:$port/")).futureValue
val de = response.discardEntityBytes()
de.future.futureValue should ===(Done)
val de2 = response.discardEntityBytes()
val secondRunException = intercept[IllegalStateException] { Await.result(de2.future, 3.seconds) }
secondRunException.getMessage should include("Source cannot be materialized more than once")
} finally bound.unbind().futureValue
}
}
}

View file

@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.model
import headers.Host

View file

@ -5,11 +5,12 @@
package akka.http.scaladsl.model
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Await
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, Inside, Matchers, WordSpec }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString
import akka.actor.ActorSystem
import headers._
@ -34,6 +35,16 @@ class MultipartSpec extends WordSpec with Matchers with Inside with BeforeAndAft
MediaTypes.`multipart/mixed`,
Multipart.General.BodyPart.Strict(HttpEntity("data"), List(ETag("xzy"))))
}
"support `toEntity`" in {
val streamed = Multipart.General(
MediaTypes.`multipart/mixed`,
Source(Multipart.General.BodyPart(defaultEntity("data"), List(ETag("xzy"))) :: Nil))
val result = streamed.toEntity(boundary = "boundary")
result.contentType shouldBe MediaTypes.`multipart/mixed`.withBoundary("boundary").withCharset(HttpCharsets.`UTF-8`)
val encoding = Await.result(result.dataBytes.runWith(Sink.seq), 1.second)
encoding.map(_.utf8String).mkString shouldBe "--boundary\r\nContent-Type: text/plain; charset=UTF-8\r\nETag: \"xzy\"\r\n\r\ndata\r\n--boundary--"
}
}
"Multipart.FormData" should {

View file

@ -639,5 +639,14 @@ class UriSpec extends WordSpec with Matchers {
val uri = Uri(s"http://foo.bar/$slashes")
uri.toString // was reported to throw StackOverflowException in Spray's URI
}
"survive parsing a URI with thousands of query string values" in {
val uriString = (1 to 2000).map("a=" + _).mkString("http://foo.bar/?", "&", "")
val uri = Uri(uriString)
val query = uri.query()
query.size shouldEqual 2000
query.head._2 shouldEqual "1"
query.last._2 shouldEqual "2000"
}
}
}

View file

@ -68,6 +68,12 @@ class HeaderSpec extends FreeSpec with Matchers {
headers.`Strict-Transport-Security`.parseFromValueString("max-age=30; includeSubDomains") shouldEqual Right(headers.`Strict-Transport-Security`(30, true))
headers.`Strict-Transport-Security`.parseFromValueString("max-age=30; includeSubDomains; preload") shouldEqual Right(headers.`Strict-Transport-Security`(30, true))
}
"successful parse run with additional values" in {
headers.`Strict-Transport-Security`.parseFromValueString("max-age=30; includeSubDomains; preload; dummy") shouldEqual
Right(headers.`Strict-Transport-Security`(30, true))
headers.`Strict-Transport-Security`.parseFromValueString("max-age=30; includeSubDomains; dummy; preload") shouldEqual
Right(headers.`Strict-Transport-Security`(30, true))
}
"failing parse run" in {
val Left(List(ErrorInfo(summary, detail))) = `Strict-Transport-Security`.parseFromValueString("max-age=30; includeSubDomains; preload;")
summary shouldEqual "Illegal HTTP header 'Strict-Transport-Security': Invalid input 'EOI', expected OWS or token0 (line 1, column 40)"

View file

@ -150,6 +150,7 @@ class HttpModelIntegrationSpec extends WordSpec with Matchers with BeforeAndAfte
"be able to wrap HttpHeaders with custom typed headers" in {
// TODO potentially use the integration for Play / Lagom APIs?
// This HTTP model is typed. It uses Akka HTTP types internally, but
// no Akka HTTP types are visible to users. This typed model is a
// model that Play Framework may eventually move to.