=str,htp clean up build warnings
- explicitly provide Unit values and place parens around tuple creation - remove structural type usage in TestUtils - fix Java double-casts - use unused Java values by asserting their non-nullness - work around inability to place case class in trait (scripted test) The remaining warnings about using private types in public methods are bogus as reported in https://issues.scala-lang.org/browse/SI-9490.
This commit is contained in:
parent
ae83053a64
commit
68ba0643d6
58 changed files with 160 additions and 132 deletions
|
|
@ -16,6 +16,7 @@ import akka.http.javadsl.model.*;
|
||||||
import akka.http.javadsl.Http;
|
import akka.http.javadsl.Http;
|
||||||
import scala.util.Try;
|
import scala.util.Try;
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
public class HttpClientExampleDocTest {
|
public class HttpClientExampleDocTest {
|
||||||
|
|
||||||
// compile only test
|
// compile only test
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ import akka.http.javadsl.model.*;
|
||||||
import akka.http.javadsl.model.headers.*;
|
import akka.http.javadsl.model.headers.*;
|
||||||
//#import-model
|
//#import-model
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
public class ModelDocTest {
|
public class ModelDocTest {
|
||||||
@Test
|
@Test
|
||||||
public void testConstructRequest() {
|
public void testConstructRequest() {
|
||||||
|
|
@ -78,7 +79,7 @@ public class ModelDocTest {
|
||||||
//#headers
|
//#headers
|
||||||
|
|
||||||
// a method that extracts basic HTTP credentials from a request
|
// a method that extracts basic HTTP credentials from a request
|
||||||
private Option<BasicHttpCredentials> getCredentialsOfRequest(HttpRequest request) {
|
private Option<BasicHttpCredentials> getCredentialsOfRequest(HttpRequest request) {
|
||||||
Option<Authorization> auth = request.getHeader(Authorization.class);
|
Option<Authorization> auth = request.getHeader(Authorization.class);
|
||||||
if (auth.isDefined() && auth.get().credentials() instanceof BasicHttpCredentials)
|
if (auth.isDefined() && auth.get().credentials() instanceof BasicHttpCredentials)
|
||||||
return Option.some((BasicHttpCredentials) auth.get().credentials());
|
return Option.some((BasicHttpCredentials) auth.get().credentials());
|
||||||
|
|
|
||||||
|
|
@ -10,16 +10,12 @@ import akka.http.javadsl.Http;
|
||||||
import akka.http.javadsl.IncomingConnection;
|
import akka.http.javadsl.IncomingConnection;
|
||||||
import akka.http.javadsl.ServerBinding;
|
import akka.http.javadsl.ServerBinding;
|
||||||
import akka.http.javadsl.model.*;
|
import akka.http.javadsl.model.*;
|
||||||
import akka.http.javadsl.model.ws.Websocket;
|
|
||||||
import akka.japi.function.Function;
|
import akka.japi.function.Function;
|
||||||
import akka.japi.function.Procedure;
|
import akka.japi.function.Procedure;
|
||||||
import akka.stream.ActorMaterializer;
|
import akka.stream.ActorMaterializer;
|
||||||
import akka.stream.Materializer;
|
import akka.stream.Materializer;
|
||||||
import akka.stream.javadsl.Flow;
|
|
||||||
import akka.stream.javadsl.Sink;
|
import akka.stream.javadsl.Sink;
|
||||||
import akka.stream.javadsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
import org.junit.Test;
|
|
||||||
import scala.Function1;
|
|
||||||
import scala.concurrent.Await;
|
import scala.concurrent.Await;
|
||||||
import scala.concurrent.Future;
|
import scala.concurrent.Future;
|
||||||
import scala.concurrent.duration.FiniteDuration;
|
import scala.concurrent.duration.FiniteDuration;
|
||||||
|
|
@ -29,7 +25,7 @@ import java.io.InputStreamReader;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class HttpServerExampleDocTest {
|
public class HttpServerExampleDocTest {
|
||||||
public static void bindingExample() {
|
public static void bindingExample() throws Exception {
|
||||||
//#binding-example
|
//#binding-example
|
||||||
ActorSystem system = ActorSystem.create();
|
ActorSystem system = ActorSystem.create();
|
||||||
Materializer materializer = ActorMaterializer.create(system);
|
Materializer materializer = ActorMaterializer.create(system);
|
||||||
|
|
@ -47,6 +43,7 @@ public class HttpServerExampleDocTest {
|
||||||
}
|
}
|
||||||
})).run(materializer);
|
})).run(materializer);
|
||||||
//#binding-example
|
//#binding-example
|
||||||
|
Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
public static void fullServerExample() throws Exception {
|
public static void fullServerExample() throws Exception {
|
||||||
//#full-server-example
|
//#full-server-example
|
||||||
|
|
|
||||||
|
|
@ -23,13 +23,13 @@ object MyRejectionHandler {
|
||||||
complete(HttpResponse(BadRequest, entity = "No cookies, no service!!!"))
|
complete(HttpResponse(BadRequest, entity = "No cookies, no service!!!"))
|
||||||
}
|
}
|
||||||
.handle { case AuthorizationFailedRejection ⇒
|
.handle { case AuthorizationFailedRejection ⇒
|
||||||
complete(Forbidden, "You're out of your depth!")
|
complete((Forbidden, "You're out of your depth!"))
|
||||||
}
|
}
|
||||||
.handleAll[MethodRejection] { methodRejections ⇒
|
.handleAll[MethodRejection] { methodRejections ⇒
|
||||||
val names = methodRejections.map(_.supported.name)
|
val names = methodRejections.map(_.supported.name)
|
||||||
complete(MethodNotAllowed, s"Can't do that! Supported: ${names mkString " or "}!")
|
complete((MethodNotAllowed, s"Can't do that! Supported: ${names mkString " or "}!"))
|
||||||
}
|
}
|
||||||
.handleNotFound { complete(NotFound, "Not here!") }
|
.handleNotFound { complete((NotFound, "Not here!")) }
|
||||||
.result()
|
.result()
|
||||||
|
|
||||||
object MyApp extends App {
|
object MyApp extends App {
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ class DebuggingDirectivesExamplesSpec extends RoutingSpec {
|
||||||
DebuggingDirectives.logRequest("get-user")
|
DebuggingDirectives.logRequest("get-user")
|
||||||
|
|
||||||
// marks with "get-user", log with info level, HttpRequest.toString
|
// marks with "get-user", log with info level, HttpRequest.toString
|
||||||
DebuggingDirectives.logRequest("get-user", Logging.InfoLevel)
|
DebuggingDirectives.logRequest(("get-user", Logging.InfoLevel))
|
||||||
|
|
||||||
// logs just the request method at debug level
|
// logs just the request method at debug level
|
||||||
def requestMethod(req: HttpRequest): String = req.method.toString
|
def requestMethod(req: HttpRequest): String = req.method.toString
|
||||||
|
|
@ -46,7 +46,7 @@ class DebuggingDirectivesExamplesSpec extends RoutingSpec {
|
||||||
DebuggingDirectives.logRequestResult("get-user")
|
DebuggingDirectives.logRequestResult("get-user")
|
||||||
|
|
||||||
// marks with "get-user", log with info level, HttpRequest.toString, HttpResponse.toString
|
// marks with "get-user", log with info level, HttpRequest.toString, HttpResponse.toString
|
||||||
DebuggingDirectives.logRequestResult("get-user", Logging.InfoLevel)
|
DebuggingDirectives.logRequestResult(("get-user", Logging.InfoLevel))
|
||||||
|
|
||||||
// logs just the request method and response status at info level
|
// logs just the request method and response status at info level
|
||||||
def requestMethodAndResponseStatusAsInfo(req: HttpRequest): Any => Option[LogEntry] = {
|
def requestMethodAndResponseStatusAsInfo(req: HttpRequest): Any => Option[LogEntry] = {
|
||||||
|
|
@ -72,7 +72,7 @@ class DebuggingDirectivesExamplesSpec extends RoutingSpec {
|
||||||
DebuggingDirectives.logResult("get-user")
|
DebuggingDirectives.logResult("get-user")
|
||||||
|
|
||||||
// marks with "get-user", log with info level, HttpResponse.toString
|
// marks with "get-user", log with info level, HttpResponse.toString
|
||||||
DebuggingDirectives.logResult("get-user", Logging.InfoLevel)
|
DebuggingDirectives.logResult(("get-user", Logging.InfoLevel))
|
||||||
|
|
||||||
// logs just the response status at debug level
|
// logs just the response status at debug level
|
||||||
def responseStatus(res: Any): String = res match {
|
def responseStatus(res: Any): String = res match {
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import akka.http.scaladsl.server._
|
||||||
class ExecutionDirectivesExamplesSpec extends RoutingSpec {
|
class ExecutionDirectivesExamplesSpec extends RoutingSpec {
|
||||||
"handleExceptions" in {
|
"handleExceptions" in {
|
||||||
val divByZeroHandler = ExceptionHandler {
|
val divByZeroHandler = ExceptionHandler {
|
||||||
case _: ArithmeticException => complete(StatusCodes.BadRequest, "You've got your arithmetic wrong, fool!")
|
case _: ArithmeticException => complete((StatusCodes.BadRequest, "You've got your arithmetic wrong, fool!"))
|
||||||
}
|
}
|
||||||
val route =
|
val route =
|
||||||
path("divide" / IntNumber / IntNumber) { (a, b) =>
|
path("divide" / IntNumber / IntNumber) { (a, b) =>
|
||||||
|
|
@ -30,7 +30,7 @@ class ExecutionDirectivesExamplesSpec extends RoutingSpec {
|
||||||
}
|
}
|
||||||
"handleRejections" in {
|
"handleRejections" in {
|
||||||
val totallyMissingHandler = RejectionHandler.newBuilder()
|
val totallyMissingHandler = RejectionHandler.newBuilder()
|
||||||
.handleNotFound { complete(StatusCodes.NotFound, "Oh man, what you are looking for is long gone.") }
|
.handleNotFound { complete((StatusCodes.NotFound, "Oh man, what you are looking for is long gone.")) }
|
||||||
.result()
|
.result()
|
||||||
val route =
|
val route =
|
||||||
pathPrefix("handled") {
|
pathPrefix("handled") {
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ class FutureDirectivesExamplesSpec extends RoutingSpec {
|
||||||
implicit val myExceptionHandler =
|
implicit val myExceptionHandler =
|
||||||
ExceptionHandler {
|
ExceptionHandler {
|
||||||
case TestException => ctx =>
|
case TestException => ctx =>
|
||||||
ctx.complete(InternalServerError, "Unsuccessful future!")
|
ctx.complete((InternalServerError, "Unsuccessful future!"))
|
||||||
}
|
}
|
||||||
|
|
||||||
val resourceActor = system.actorOf(Props(new Actor {
|
val resourceActor = system.actorOf(Props(new Actor {
|
||||||
|
|
@ -40,7 +40,7 @@ class FutureDirectivesExamplesSpec extends RoutingSpec {
|
||||||
path("divide" / IntNumber / IntNumber) { (a, b) =>
|
path("divide" / IntNumber / IntNumber) { (a, b) =>
|
||||||
onComplete(divide(a, b)) {
|
onComplete(divide(a, b)) {
|
||||||
case Success(value) => complete(s"The result was $value")
|
case Success(value) => complete(s"The result was $value")
|
||||||
case Failure(ex) => complete(InternalServerError, s"An error occurred: ${ex.getMessage}")
|
case Failure(ex) => complete((InternalServerError, s"An error occurred: ${ex.getMessage}"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ class PathDirectivesExamplesSpec extends RoutingSpec {
|
||||||
|
|
||||||
//# path-dsl
|
//# path-dsl
|
||||||
// matches /foo/
|
// matches /foo/
|
||||||
path("foo" /)
|
path("foo"./)
|
||||||
|
|
||||||
// matches e.g. /foo/123 and extracts "123" as a String
|
// matches e.g. /foo/123 and extracts "123" as a String
|
||||||
path("foo" / """\d+""".r)
|
path("foo" / """\d+""".r)
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ class RouteDirectivesExamplesSpec extends RoutingSpec {
|
||||||
complete(HttpResponse(entity = "foo"))
|
complete(HttpResponse(entity = "foo"))
|
||||||
} ~
|
} ~
|
||||||
path("b") {
|
path("b") {
|
||||||
complete(StatusCodes.Created, "bar")
|
complete((StatusCodes.Created, "bar"))
|
||||||
} ~
|
} ~
|
||||||
(path("c") & complete("baz")) // `&` also works with `complete` as the 2nd argument
|
(path("c") & complete("baz")) // `&` also works with `complete` as the 2nd argument
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -199,7 +199,7 @@ class FlowDocSpec extends AkkaSpec {
|
||||||
val (promise, cancellable, future) = r11.run()
|
val (promise, cancellable, future) = r11.run()
|
||||||
|
|
||||||
// Type inference works as expected
|
// Type inference works as expected
|
||||||
promise.success(0)
|
promise.success(())
|
||||||
cancellable.cancel()
|
cancellable.cancel()
|
||||||
future.map(_ + 3)
|
future.map(_ + 3)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -141,8 +141,9 @@ class FlowGraphDocSpec extends AkkaSpec {
|
||||||
outlets: immutable.Seq[Outlet[_]]) = {
|
outlets: immutable.Seq[Outlet[_]]) = {
|
||||||
assert(inlets.size == this.inlets.size)
|
assert(inlets.size == this.inlets.size)
|
||||||
assert(outlets.size == this.outlets.size)
|
assert(outlets.size == this.outlets.size)
|
||||||
// This is why order matters when overriding inlets and outlets
|
// This is why order matters when overriding inlets and outlets.
|
||||||
PriorityWorkerPoolShape(inlets(0), inlets(1), outlets(0))
|
// The "[Nothing, Any]" is equivalent to casting the Inlets/Outlets.
|
||||||
|
PriorityWorkerPoolShape[Nothing, Any](inlets(0), inlets(1), outlets(0))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//#flow-graph-components-shape
|
//#flow-graph-components-shape
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
||||||
//#file-source
|
//#file-source
|
||||||
import akka.stream.io._
|
import akka.stream.io._
|
||||||
//#file-source
|
//#file-source
|
||||||
|
Thread.sleep(0) // needs a statement here for valid syntax and to avoid "unused" warnings
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -23,17 +23,17 @@ public abstract class Util {
|
||||||
// needed to provide covariant conversions that the Java interfaces don't provide automatically.
|
// needed to provide covariant conversions that the Java interfaces don't provide automatically.
|
||||||
// The alternative would be having to cast around everywhere instead of doing it here in a central place.
|
// The alternative would be having to cast around everywhere instead of doing it here in a central place.
|
||||||
public static <U, T extends U> Option<U> convertOption(scala.Option<T> o) {
|
public static <U, T extends U> Option<U> convertOption(scala.Option<T> o) {
|
||||||
return (Option<U>)(Option) akka.japi.Option.fromScalaOption(o);
|
return (Option<U>)(Object) akka.japi.Option.fromScalaOption(o);
|
||||||
}
|
}
|
||||||
@SuppressWarnings("unchecked") // no support for covariance of Publisher in Java
|
@SuppressWarnings("unchecked") // no support for covariance of Publisher in Java
|
||||||
// needed to provide covariant conversions that the Java interfaces don't provide automatically.
|
// needed to provide covariant conversions that the Java interfaces don't provide automatically.
|
||||||
// The alternative would be having to cast around everywhere instead of doing it here in a central place.
|
// The alternative would be having to cast around everywhere instead of doing it here in a central place.
|
||||||
public static <U, T extends U> Source<U, scala.Unit> convertPublisher(Source<T, scala.Unit> p) {
|
public static <U, T extends U> Source<U, scala.Unit> convertPublisher(Source<T, scala.Unit> p) {
|
||||||
return (Source<U, scala.Unit>)(Source) p;
|
return (Source<U, scala.Unit>)(Object) p;
|
||||||
}
|
}
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public static <T, U extends T> Source<U, scala.Unit> upcastSource(Source<T, scala.Unit> p) {
|
public static <T, U extends T> Source<U, scala.Unit> upcastSource(Source<T, scala.Unit> p) {
|
||||||
return (Source<U, scala.Unit>)(Source) p;
|
return (Source<U, scala.Unit>)(Object) p;
|
||||||
}
|
}
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public static scala.collection.immutable.Map<String, String> convertMapToScala(Map<String, String> map) {
|
public static scala.collection.immutable.Map<String, String> convertMapToScala(Map<String, String> map) {
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ public abstract class ContentRange {
|
||||||
}
|
}
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public static ContentRange create(long first, long last, Option<Long> instanceLength) {
|
public static ContentRange create(long first, long last, Option<Long> instanceLength) {
|
||||||
return ContentRange$.MODULE$.apply(first, last, ((Option<Object>) (Option) instanceLength).asScala());
|
return ContentRange$.MODULE$.apply(first, last, ((Option<Object>) (Object) instanceLength).asScala());
|
||||||
}
|
}
|
||||||
public static ContentRange createUnsatisfiable(long length) {
|
public static ContentRange createUnsatisfiable(long length) {
|
||||||
return new akka.http.scaladsl.model.ContentRange.Unsatisfiable(length);
|
return new akka.http.scaladsl.model.ContentRange.Unsatisfiable(length);
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@
|
||||||
|
|
||||||
package akka.http.javadsl.model.headers;
|
package akka.http.javadsl.model.headers;
|
||||||
|
|
||||||
import akka.http.scaladsl.model.headers.HttpChallenge$;
|
|
||||||
import akka.http.impl.util.Util;
|
import akka.http.impl.util.Util;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ public abstract class HttpCookie {
|
||||||
return new akka.http.scaladsl.model.headers.HttpCookie(
|
return new akka.http.scaladsl.model.headers.HttpCookie(
|
||||||
name, value,
|
name, value,
|
||||||
Util.<DateTime, akka.http.scaladsl.model.DateTime>convertOptionToScala(expires),
|
Util.<DateTime, akka.http.scaladsl.model.DateTime>convertOptionToScala(expires),
|
||||||
((Option<Object>) (Option) maxAge).asScala(),
|
((Option<Object>) (Object) maxAge).asScala(),
|
||||||
domain.asScala(),
|
domain.asScala(),
|
||||||
path.asScala(),
|
path.asScala(),
|
||||||
secure,
|
secure,
|
||||||
|
|
|
||||||
|
|
@ -4,10 +4,6 @@
|
||||||
|
|
||||||
package akka.http.javadsl.model.headers;
|
package akka.http.javadsl.model.headers;
|
||||||
|
|
||||||
import akka.http.javadsl.model.MediaType;
|
|
||||||
import akka.http.javadsl.model.Uri;
|
|
||||||
import akka.http.impl.util.Util;
|
|
||||||
|
|
||||||
public abstract class LinkParam {
|
public abstract class LinkParam {
|
||||||
public abstract String key();
|
public abstract String key();
|
||||||
public abstract Object value();
|
public abstract Object value();
|
||||||
|
|
|
||||||
|
|
@ -126,6 +126,7 @@ private[http] object OutgoingConnectionBlueprint {
|
||||||
case (ctx, `terminationBackchannelInput`) ⇒
|
case (ctx, `terminationBackchannelInput`) ⇒
|
||||||
ctx.finish()
|
ctx.finish()
|
||||||
SameState
|
SameState
|
||||||
|
case (_, _) ⇒ SameState
|
||||||
},
|
},
|
||||||
onUpstreamFailure = defaultCompletionHandling.onUpstreamFailure)
|
onUpstreamFailure = defaultCompletionHandling.onUpstreamFailure)
|
||||||
}
|
}
|
||||||
|
|
@ -220,6 +221,7 @@ private[http] object OutgoingConnectionBlueprint {
|
||||||
ctx.fail(new ResponseParsingError(parser.onPull().asInstanceOf[ErrorOutput]))
|
ctx.fail(new ResponseParsingError(parser.onPull().asInstanceOf[ErrorOutput]))
|
||||||
}
|
}
|
||||||
SameState
|
SameState
|
||||||
|
case (_, _) ⇒ SameState
|
||||||
},
|
},
|
||||||
onUpstreamFailure = defaultCompletionHandling.onUpstreamFailure)
|
onUpstreamFailure = defaultCompletionHandling.onUpstreamFailure)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -204,10 +204,14 @@ private[http] object HttpServerBluePrint {
|
||||||
}
|
}
|
||||||
|
|
||||||
case (ctx, _, OneHundredContinue) ⇒
|
case (ctx, _, OneHundredContinue) ⇒
|
||||||
assert(requestStart.expect100ContinueResponsePending)
|
require(requestStart.expect100ContinueResponsePending)
|
||||||
ctx.emit(ResponseRenderingContext(HttpResponse(StatusCodes.Continue)))
|
ctx.emit(ResponseRenderingContext(HttpResponse(StatusCodes.Continue)))
|
||||||
requestStart = requestStart.copy(expect100ContinueResponsePending = false)
|
requestStart = requestStart.copy(expect100ContinueResponsePending = false)
|
||||||
SameState
|
SameState
|
||||||
|
|
||||||
|
case (ctx, _, msg) ⇒
|
||||||
|
ctx.fail(new IllegalStateException(s"unexpected message of type [${msg.getClass}], expecting only HttpResponse or OneHundredContinue"))
|
||||||
|
SameState
|
||||||
}
|
}
|
||||||
|
|
||||||
val waitingForApplicationResponseCompletionHandling = CompletionHandling(
|
val waitingForApplicationResponseCompletionHandling = CompletionHandling(
|
||||||
|
|
|
||||||
|
|
@ -99,7 +99,7 @@ private[http] class FrameEventParser extends ByteStringParserStage[FrameEvent] {
|
||||||
remaining -= elem.size
|
remaining -= elem.size
|
||||||
ctx.push(FrameData(elem, lastPart = false))
|
ctx.push(FrameData(elem, lastPart = false))
|
||||||
} else {
|
} else {
|
||||||
assert(remaining <= Int.MaxValue) // safe because, remaining <= elem.size <= Int.MaxValue
|
require(remaining <= Int.MaxValue) // safe because, remaining <= elem.size <= Int.MaxValue
|
||||||
val frameData = elem.take(remaining.toInt)
|
val frameData = elem.take(remaining.toInt)
|
||||||
val remainingData = elem.drop(remaining.toInt)
|
val remainingData = elem.drop(remaining.toInt)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,10 +20,13 @@ private[http] class FrameEventRenderer extends StatefulStage[FrameEvent, ByteStr
|
||||||
object Idle extends State {
|
object Idle extends State {
|
||||||
def onPush(elem: FrameEvent, ctx: Context[ByteString]): SyncDirective = elem match {
|
def onPush(elem: FrameEvent, ctx: Context[ByteString]): SyncDirective = elem match {
|
||||||
case start @ FrameStart(header, data) ⇒
|
case start @ FrameStart(header, data) ⇒
|
||||||
assert(header.length >= data.size)
|
require(header.length >= data.size)
|
||||||
if (!start.lastPart && header.length > 0) become(renderData(header.length - data.length, this))
|
if (!start.lastPart && header.length > 0) become(renderData(header.length - data.length, this))
|
||||||
|
|
||||||
ctx.push(renderStart(start))
|
ctx.push(renderStart(start))
|
||||||
|
|
||||||
|
case f: FrameData ⇒
|
||||||
|
ctx.fail(new IllegalStateException("unexpected FrameData (need FrameStart first)"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -43,6 +46,9 @@ private[http] class FrameEventRenderer extends StatefulStage[FrameEvent, ByteStr
|
||||||
remaining -= data.size
|
remaining -= data.size
|
||||||
ctx.push(data)
|
ctx.push(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case f: FrameStart ⇒
|
||||||
|
ctx.fail(new IllegalStateException("unexpected FrameStart (need more FrameData first)"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -106,10 +106,11 @@ private[http] object FrameHandler {
|
||||||
val closeCode = FrameEventParser.parseCloseCode(data)
|
val closeCode = FrameEventParser.parseCloseCode(data)
|
||||||
emit(Iterator(Left(PeerClosed(closeCode)), Right(PeerClosed(closeCode))), ctx, WaitForPeerTcpClose)
|
emit(Iterator(Left(PeerClosed(closeCode)), Right(PeerClosed(closeCode))), ctx, WaitForPeerTcpClose)
|
||||||
case Opcode.Other(o) ⇒ closeWithCode(Protocol.CloseCodes.ProtocolError, "Unsupported opcode")
|
case Opcode.Other(o) ⇒ closeWithCode(Protocol.CloseCodes.ProtocolError, "Unsupported opcode")
|
||||||
|
case other ⇒ ctx.fail(new IllegalStateException(s"unexpected message of type [${other.getClass.getName}] when expecting ControlFrame"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private def collectControlFrame(start: FrameStart, nextState: State)(implicit ctx: Ctx): SyncDirective = {
|
private def collectControlFrame(start: FrameStart, nextState: State)(implicit ctx: Ctx): SyncDirective = {
|
||||||
assert(!start.isFullMessage)
|
require(!start.isFullMessage)
|
||||||
become(new CollectingControlFrame(start.header.opcode, start.data, nextState))
|
become(new CollectingControlFrame(start.header.opcode, start.data, nextState))
|
||||||
ctx.pull()
|
ctx.pull()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -57,6 +57,8 @@ private[http] object Masking {
|
||||||
become(new Running(mask))
|
become(new Running(mask))
|
||||||
current.onPush(start.copy(header = setNewMask(header, mask)), ctx)
|
current.onPush(start.copy(header = setNewMask(header, mask)), ctx)
|
||||||
}
|
}
|
||||||
|
case _: FrameData ⇒
|
||||||
|
ctx.fail(new IllegalStateException("unexpected FrameData (need FrameStart first)"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
class Running(initialMask: Int) extends State {
|
class Running(initialMask: Int) extends State {
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ private[http] class Utf8Encoder extends PushStage[String, ByteString] {
|
||||||
val builder = new ByteStringBuilder
|
val builder = new ByteStringBuilder
|
||||||
|
|
||||||
def b(v: Int): Unit = {
|
def b(v: Int): Unit = {
|
||||||
assert((v & 0xff) == v)
|
|
||||||
builder += v.toByte
|
builder += v.toByte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ package object util {
|
||||||
new EnhancedInetSocketAddress(address)
|
new EnhancedInetSocketAddress(address)
|
||||||
private[http] implicit def enhanceByteStrings(byteStrings: TraversableOnce[ByteString]): EnhancedByteStringTraversableOnce =
|
private[http] implicit def enhanceByteStrings(byteStrings: TraversableOnce[ByteString]): EnhancedByteStringTraversableOnce =
|
||||||
new EnhancedByteStringTraversableOnce(byteStrings)
|
new EnhancedByteStringTraversableOnce(byteStrings)
|
||||||
private[http] implicit def enhanceByteStrings[Mat](byteStrings: Source[ByteString, Mat]): EnhancedByteStringSource[Mat] =
|
private[http] implicit def enhanceByteStringsMat[Mat](byteStrings: Source[ByteString, Mat]): EnhancedByteStringSource[Mat] =
|
||||||
new EnhancedByteStringSource(byteStrings)
|
new EnhancedByteStringSource(byteStrings)
|
||||||
|
|
||||||
private[http] def headAndTailFlow[T]: Flow[Source[T, Any], (T, Source[T, Unit]), Unit] =
|
private[http] def headAndTailFlow[T]: Flow[Source[T, Any], (T, Source[T, Unit]), Unit] =
|
||||||
|
|
|
||||||
|
|
@ -209,8 +209,8 @@ class ConnectionPoolSpec extends AkkaSpec("""
|
||||||
val PoolGateway.Running(_, shutdownStartedPromise, shutdownCompletedPromise) = gateway.currentState
|
val PoolGateway.Running(_, shutdownStartedPromise, shutdownCompletedPromise) = gateway.currentState
|
||||||
shutdownStartedPromise.isCompleted shouldEqual false
|
shutdownStartedPromise.isCompleted shouldEqual false
|
||||||
shutdownCompletedPromise.isCompleted shouldEqual false
|
shutdownCompletedPromise.isCompleted shouldEqual false
|
||||||
Await.result(shutdownStartedPromise.future, 1500.millis) shouldEqual () // verify shutdown start (after idle)
|
Await.result(shutdownStartedPromise.future, 1500.millis) // verify shutdown start (after idle)
|
||||||
Await.result(shutdownCompletedPromise.future, 1500.millis) shouldEqual () // verify shutdown completed
|
Await.result(shutdownCompletedPromise.future, 1500.millis) // verify shutdown completed
|
||||||
}
|
}
|
||||||
|
|
||||||
"transparently restart after idle shutdown" in new TestSetup() {
|
"transparently restart after idle shutdown" in new TestSetup() {
|
||||||
|
|
@ -218,7 +218,7 @@ class ConnectionPoolSpec extends AkkaSpec("""
|
||||||
|
|
||||||
val gateway = Await.result(hcp.gatewayFuture, 500.millis)
|
val gateway = Await.result(hcp.gatewayFuture, 500.millis)
|
||||||
val PoolGateway.Running(_, _, shutdownCompletedPromise) = gateway.currentState
|
val PoolGateway.Running(_, _, shutdownCompletedPromise) = gateway.currentState
|
||||||
Await.result(shutdownCompletedPromise.future, 1500.millis) shouldEqual () // verify shutdown completed
|
Await.result(shutdownCompletedPromise.future, 1500.millis) // verify shutdown completed
|
||||||
|
|
||||||
requestIn.sendNext(HttpRequest(uri = "/") -> 42)
|
requestIn.sendNext(HttpRequest(uri = "/") -> 42)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -44,6 +44,7 @@ abstract class RouteTest extends AllDirectives {
|
||||||
|
|
||||||
result.awaitResult(awaitDuration) match {
|
result.awaitResult(awaitDuration) match {
|
||||||
case RouteResult.Complete(response) ⇒ createTestResponse(response)
|
case RouteResult.Complete(response) ⇒ createTestResponse(response)
|
||||||
|
case RouteResult.Rejected(ex) ⇒ throw new AssertionError("got unexpected rejection: " + ex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -40,6 +40,7 @@ public class SimpleServerApp8 extends HttpApp {
|
||||||
@Override
|
@Override
|
||||||
public Route createRoute() {
|
public Route createRoute() {
|
||||||
Handler addHandler = new Handler() {
|
Handler addHandler = new Handler() {
|
||||||
|
static final long serialVersionUID = 1L;
|
||||||
@Override
|
@Override
|
||||||
public RouteResult apply(RequestContext ctx) {
|
public RouteResult apply(RequestContext ctx) {
|
||||||
int xVal = x.get(ctx);
|
int xVal = x.get(ctx);
|
||||||
|
|
@ -49,6 +50,7 @@ public class SimpleServerApp8 extends HttpApp {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Handler2<Integer, Integer> subtractHandler = new Handler2<Integer, Integer>() {
|
Handler2<Integer, Integer> subtractHandler = new Handler2<Integer, Integer>() {
|
||||||
|
static final long serialVersionUID = 1L;
|
||||||
public RouteResult apply(RequestContext ctx, Integer xVal, Integer yVal) {
|
public RouteResult apply(RequestContext ctx, Integer xVal, Integer yVal) {
|
||||||
int result = xVal - yVal;
|
int result = xVal - yVal;
|
||||||
return ctx.complete(String.format("%d - %d = %d", xVal, yVal, result));
|
return ctx.complete(String.format("%d - %d = %d", xVal, yVal, result));
|
||||||
|
|
|
||||||
|
|
@ -11,13 +11,10 @@ import akka.http.javadsl.server.values.Parameters;
|
||||||
import akka.http.javadsl.server.values.PathMatchers;
|
import akka.http.javadsl.server.values.PathMatchers;
|
||||||
import akka.http.javadsl.testkit.JUnitRouteTest;
|
import akka.http.javadsl.testkit.JUnitRouteTest;
|
||||||
import akka.http.javadsl.testkit.TestRoute;
|
import akka.http.javadsl.testkit.TestRoute;
|
||||||
import akka.japi.function.Function;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import scala.concurrent.ExecutionContext;
|
import scala.concurrent.ExecutionContext;
|
||||||
import scala.concurrent.Future;
|
import scala.concurrent.Future;
|
||||||
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
|
|
||||||
public class HandlerExampleDocTest extends JUnitRouteTest {
|
public class HandlerExampleDocTest extends JUnitRouteTest {
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleHandler() {
|
public void testSimpleHandler() {
|
||||||
|
|
@ -25,6 +22,7 @@ public class HandlerExampleDocTest extends JUnitRouteTest {
|
||||||
class TestHandler extends akka.http.javadsl.server.AllDirectives {
|
class TestHandler extends akka.http.javadsl.server.AllDirectives {
|
||||||
//#simple-handler
|
//#simple-handler
|
||||||
Handler handler = new Handler() {
|
Handler handler = new Handler() {
|
||||||
|
static final long serialVersionUID = 1L;
|
||||||
@Override
|
@Override
|
||||||
public RouteResult apply(RequestContext ctx) {
|
public RouteResult apply(RequestContext ctx) {
|
||||||
return ctx.complete("This was a " + ctx.request().method().value() +
|
return ctx.complete("This was a " + ctx.request().method().value() +
|
||||||
|
|
@ -75,6 +73,7 @@ public class HandlerExampleDocTest extends JUnitRouteTest {
|
||||||
//#handler2
|
//#handler2
|
||||||
final Handler2<Integer, Integer> multiply =
|
final Handler2<Integer, Integer> multiply =
|
||||||
new Handler2<Integer, Integer>() {
|
new Handler2<Integer, Integer>() {
|
||||||
|
static final long serialVersionUID = 1L;
|
||||||
@Override
|
@Override
|
||||||
public RouteResult apply(RequestContext ctx, Integer x, Integer y) {
|
public RouteResult apply(RequestContext ctx, Integer x, Integer y) {
|
||||||
int result = x * y;
|
int result = x * y;
|
||||||
|
|
|
||||||
|
|
@ -146,7 +146,7 @@ class CacheConditionDirectivesSpec extends RoutingSpec {
|
||||||
|
|
||||||
"not filter out a `Range` header if `If-Range` does match the timestamp" in {
|
"not filter out a `Range` header if `If-Range` does match the timestamp" in {
|
||||||
Get() ~> `If-Range`(timestamp) ~> Range(ByteRange(0, 10)) ~> {
|
Get() ~> `If-Range`(timestamp) ~> Range(ByteRange(0, 10)) ~> {
|
||||||
(conditional(tag, timestamp) & optionalHeaderValueByType[Range]()) { echoComplete }
|
(conditional(tag, timestamp) & optionalHeaderValueByType[Range](())) { echoComplete }
|
||||||
} ~> check {
|
} ~> check {
|
||||||
status shouldEqual OK
|
status shouldEqual OK
|
||||||
responseAs[String] should startWith("Some")
|
responseAs[String] should startWith("Some")
|
||||||
|
|
@ -155,7 +155,7 @@ class CacheConditionDirectivesSpec extends RoutingSpec {
|
||||||
|
|
||||||
"filter out a `Range` header if `If-Range` doesn't match the timestamp" in {
|
"filter out a `Range` header if `If-Range` doesn't match the timestamp" in {
|
||||||
Get() ~> `If-Range`(timestamp - 1000) ~> Range(ByteRange(0, 10)) ~> {
|
Get() ~> `If-Range`(timestamp - 1000) ~> Range(ByteRange(0, 10)) ~> {
|
||||||
(conditional(tag, timestamp) & optionalHeaderValueByType[Range]()) { echoComplete }
|
(conditional(tag, timestamp) & optionalHeaderValueByType[Range](())) { echoComplete }
|
||||||
} ~> check {
|
} ~> check {
|
||||||
status shouldEqual OK
|
status shouldEqual OK
|
||||||
responseAs[String] shouldEqual "None"
|
responseAs[String] shouldEqual "None"
|
||||||
|
|
@ -164,7 +164,7 @@ class CacheConditionDirectivesSpec extends RoutingSpec {
|
||||||
|
|
||||||
"not filter out a `Range` header if `If-Range` does match the ETag" in {
|
"not filter out a `Range` header if `If-Range` does match the ETag" in {
|
||||||
Get() ~> `If-Range`(tag) ~> Range(ByteRange(0, 10)) ~> {
|
Get() ~> `If-Range`(tag) ~> Range(ByteRange(0, 10)) ~> {
|
||||||
(conditional(tag, timestamp) & optionalHeaderValueByType[Range]()) { echoComplete }
|
(conditional(tag, timestamp) & optionalHeaderValueByType[Range](())) { echoComplete }
|
||||||
} ~> check {
|
} ~> check {
|
||||||
status shouldEqual OK
|
status shouldEqual OK
|
||||||
responseAs[String] should startWith("Some")
|
responseAs[String] should startWith("Some")
|
||||||
|
|
@ -173,7 +173,7 @@ class CacheConditionDirectivesSpec extends RoutingSpec {
|
||||||
|
|
||||||
"filter out a `Range` header if `If-Range` doesn't match the ETag" in {
|
"filter out a `Range` header if `If-Range` doesn't match the ETag" in {
|
||||||
Get() ~> `If-Range`(EntityTag("other")) ~> Range(ByteRange(0, 10)) ~> {
|
Get() ~> `If-Range`(EntityTag("other")) ~> Range(ByteRange(0, 10)) ~> {
|
||||||
(conditional(tag, timestamp) & optionalHeaderValueByType[Range]()) { echoComplete }
|
(conditional(tag, timestamp) & optionalHeaderValueByType[Range](())) { echoComplete }
|
||||||
} ~> check {
|
} ~> check {
|
||||||
status shouldEqual OK
|
status shouldEqual OK
|
||||||
responseAs[String] shouldEqual "None"
|
responseAs[String] shouldEqual "None"
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ class ExecutionDirectivesSpec extends RoutingSpec {
|
||||||
object MyException extends RuntimeException
|
object MyException extends RuntimeException
|
||||||
val handler =
|
val handler =
|
||||||
ExceptionHandler {
|
ExceptionHandler {
|
||||||
case MyException ⇒ complete(500, "Pling! Plong! Something went wrong!!!")
|
case MyException ⇒ complete((500, "Pling! Plong! Something went wrong!!!"))
|
||||||
}
|
}
|
||||||
|
|
||||||
"The `handleExceptions` directive" should {
|
"The `handleExceptions` directive" should {
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ class FutureDirectivesSpec extends RoutingSpec {
|
||||||
def throwTestException[T](msgPrefix: String): T ⇒ Nothing = t ⇒ throw new TestException(msgPrefix + t)
|
def throwTestException[T](msgPrefix: String): T ⇒ Nothing = t ⇒ throw new TestException(msgPrefix + t)
|
||||||
|
|
||||||
implicit val exceptionHandler = ExceptionHandler {
|
implicit val exceptionHandler = ExceptionHandler {
|
||||||
case e: TestException ⇒ complete(StatusCodes.InternalServerError, "Oops. " + e)
|
case e: TestException ⇒ complete((StatusCodes.InternalServerError, "Oops. " + e))
|
||||||
}
|
}
|
||||||
|
|
||||||
"The `onComplete` directive" should {
|
"The `onComplete` directive" should {
|
||||||
|
|
@ -39,13 +39,13 @@ class FutureDirectivesSpec extends RoutingSpec {
|
||||||
"catch an exception in the success case" in {
|
"catch an exception in the success case" in {
|
||||||
Get() ~> onComplete(Future.successful("ok")) { throwTestException("EX when ") } ~> check {
|
Get() ~> onComplete(Future.successful("ok")) { throwTestException("EX when ") } ~> check {
|
||||||
status shouldEqual StatusCodes.InternalServerError
|
status shouldEqual StatusCodes.InternalServerError
|
||||||
responseAs[String] shouldEqual "Oops. akka.http.scaladsl.server.directives.FutureDirectivesSpec$TestException: EX when Success(ok)"
|
responseAs[String] shouldEqual s"Oops. akka.http.scaladsl.server.directives.FutureDirectivesSpec$$TestException: EX when Success(ok)"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"catch an exception in the failure case" in {
|
"catch an exception in the failure case" in {
|
||||||
Get() ~> onComplete(Future.failed[String](new RuntimeException("no"))) { throwTestException("EX when ") } ~> check {
|
Get() ~> onComplete(Future.failed[String](new RuntimeException("no"))) { throwTestException("EX when ") } ~> check {
|
||||||
status shouldEqual StatusCodes.InternalServerError
|
status shouldEqual StatusCodes.InternalServerError
|
||||||
responseAs[String] shouldEqual "Oops. akka.http.scaladsl.server.directives.FutureDirectivesSpec$TestException: EX when Failure(java.lang.RuntimeException: no)"
|
responseAs[String] shouldEqual s"Oops. akka.http.scaladsl.server.directives.FutureDirectivesSpec$$TestException: EX when Failure(java.lang.RuntimeException: no)"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -64,7 +64,7 @@ class FutureDirectivesSpec extends RoutingSpec {
|
||||||
"catch an exception in the success case" in {
|
"catch an exception in the success case" in {
|
||||||
Get() ~> onSuccess(Future.successful("ok")) { throwTestException("EX when ") } ~> check {
|
Get() ~> onSuccess(Future.successful("ok")) { throwTestException("EX when ") } ~> check {
|
||||||
status shouldEqual StatusCodes.InternalServerError
|
status shouldEqual StatusCodes.InternalServerError
|
||||||
responseAs[String] shouldEqual "Oops. akka.http.scaladsl.server.directives.FutureDirectivesSpec$TestException: EX when ok"
|
responseAs[String] shouldEqual s"Oops. akka.http.scaladsl.server.directives.FutureDirectivesSpec$$TestException: EX when ok"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"catch an exception in the failure case" in {
|
"catch an exception in the failure case" in {
|
||||||
|
|
@ -99,7 +99,7 @@ class FutureDirectivesSpec extends RoutingSpec {
|
||||||
"catch an exception during recovery" in {
|
"catch an exception during recovery" in {
|
||||||
Get() ~> completeOrRecoverWith(Future.failed[String](TestException)) { throwTestException("EX when ") } ~> check {
|
Get() ~> completeOrRecoverWith(Future.failed[String](TestException)) { throwTestException("EX when ") } ~> check {
|
||||||
status shouldEqual StatusCodes.InternalServerError
|
status shouldEqual StatusCodes.InternalServerError
|
||||||
responseAs[String] shouldEqual "Oops. akka.http.scaladsl.server.directives.FutureDirectivesSpec$TestException: EX when akka.http.scaladsl.server.directives.FutureDirectivesSpec$TestException$: XXX"
|
responseAs[String] shouldEqual s"Oops. akka.http.scaladsl.server.directives.FutureDirectivesSpec$$TestException: EX when akka.http.scaladsl.server.directives.FutureDirectivesSpec$$TestException$$: XXX"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,9 +21,9 @@ class TupleOpsSpec extends WordSpec with Matchers {
|
||||||
}
|
}
|
||||||
|
|
||||||
"support joining tuples" in {
|
"support joining tuples" in {
|
||||||
(1, 'X2, "3") join () shouldEqual (1, 'X2, "3")
|
(1, 'X2, "3") join (()) shouldEqual ((1, 'X2, "3"))
|
||||||
() join (1, 'X2, "3") shouldEqual (1, 'X2, "3")
|
() join ((1, 'X2, "3")) shouldEqual ((1, 'X2, "3"))
|
||||||
(1, 'X2, "3") join (4.0, 5L) shouldEqual (1, 'X2, "3", 4.0, 5L)
|
(1, 'X2, "3") join ((4.0, 5L)) shouldEqual ((1, 'X2, "3", 4.0, 5L))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -151,7 +151,7 @@ private[http] object RouteImplementation extends Directives with server.RouteCon
|
||||||
(requestValToDirective(dyn.value1) & requestValToDirective(dyn.value2))(runToRoute)
|
(requestValToDirective(dyn.value1) & requestValToDirective(dyn.value2))(runToRoute)
|
||||||
|
|
||||||
case o: OpaqueRoute ⇒ (ctx ⇒ o.handle(new RequestContextImpl(ctx)).asInstanceOf[RouteResultImpl].underlying)
|
case o: OpaqueRoute ⇒ (ctx ⇒ o.handle(new RequestContextImpl(ctx)).asInstanceOf[RouteResultImpl].underlying)
|
||||||
case p: Product ⇒ extractExecutionContext { implicit ec ⇒ complete(500, s"Not implemented: ${p.productPrefix}") }
|
case p: Product ⇒ extractExecutionContext { implicit ec ⇒ complete((500, s"Not implemented: ${p.productPrefix}")) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def pathMatcherDirective[T](matchers: immutable.Seq[PathMatcher[_]],
|
def pathMatcherDirective[T](matchers: immutable.Seq[PathMatcher[_]],
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ object Headers {
|
||||||
HeaderImpl[HttpHeader](name, _ ⇒ optionalHeaderInstanceByName(name.toLowerCase()), classTag[HttpHeader])
|
HeaderImpl[HttpHeader](name, _ ⇒ optionalHeaderInstanceByName(name.toLowerCase()), classTag[HttpHeader])
|
||||||
|
|
||||||
def byClass[T <: HttpHeader](clazz: Class[T]): Header[T] =
|
def byClass[T <: HttpHeader](clazz: Class[T]): Header[T] =
|
||||||
HeaderImpl[T](clazz.getSimpleName, ct ⇒ optionalHeaderValueByType(ClassMagnet()(ct)), ClassTag(clazz))
|
HeaderImpl[T](clazz.getSimpleName, ct ⇒ optionalHeaderValueByType(ClassMagnet(ct)), ClassTag(clazz))
|
||||||
|
|
||||||
private def optionalHeaderInstanceByName(lowercaseName: String): Directive1[Option[model.HttpHeader]] =
|
private def optionalHeaderInstanceByName(lowercaseName: String): Directive1[Option[model.HttpHeader]] =
|
||||||
extract(_.request.headers.collectFirst {
|
extract(_.request.headers.collectFirst {
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ class DeflateCompressor extends Compressor {
|
||||||
override final def finish(): ByteString = finishWithBuffer(newTempBuffer())
|
override final def finish(): ByteString = finishWithBuffer(newTempBuffer())
|
||||||
|
|
||||||
protected def compressWithBuffer(input: ByteString, buffer: Array[Byte]): ByteString = {
|
protected def compressWithBuffer(input: ByteString, buffer: Array[Byte]): ByteString = {
|
||||||
assert(deflater.needsInput())
|
require(deflater.needsInput())
|
||||||
deflater.setInput(input.toArray)
|
deflater.setInput(input.toArray)
|
||||||
drainDeflater(deflater, buffer)
|
drainDeflater(deflater, buffer)
|
||||||
}
|
}
|
||||||
|
|
@ -108,7 +108,7 @@ private[http] object DeflateCompressor {
|
||||||
result ++= ByteString.fromArray(buffer, 0, len)
|
result ++= ByteString.fromArray(buffer, 0, len)
|
||||||
drainDeflater(deflater, buffer, result)
|
drainDeflater(deflater, buffer, result)
|
||||||
} else {
|
} else {
|
||||||
assert(deflater.needsInput())
|
require(deflater.needsInput())
|
||||||
result.result()
|
result.result()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -101,7 +101,7 @@ object Directive {
|
||||||
/**
|
/**
|
||||||
* A Directive that always passes the request on to its inner route (i.e. does nothing).
|
* A Directive that always passes the request on to its inner route (i.e. does nothing).
|
||||||
*/
|
*/
|
||||||
val Empty: Directive0 = Directive(_())
|
val Empty: Directive0 = Directive(_(()))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds `apply` to all Directives with 1 or more extractions,
|
* Adds `apply` to all Directives with 1 or more extractions,
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,7 @@ object ExceptionHandler {
|
||||||
case IllegalRequestException(info, status) ⇒ ctx ⇒ {
|
case IllegalRequestException(info, status) ⇒ ctx ⇒ {
|
||||||
ctx.log.warning("Illegal request {}\n\t{}\n\tCompleting with '{}' response",
|
ctx.log.warning("Illegal request {}\n\t{}\n\tCompleting with '{}' response",
|
||||||
ctx.request, info.formatPretty, status)
|
ctx.request, info.formatPretty, status)
|
||||||
ctx.complete(status, info.format(settings.verboseErrorMessages))
|
ctx.complete((status, info.format(settings.verboseErrorMessages)))
|
||||||
}
|
}
|
||||||
case NonFatal(e) ⇒ ctx ⇒ {
|
case NonFatal(e) ⇒ ctx ⇒ {
|
||||||
ctx.log.error(e, "Error during processing of request {}", ctx.request)
|
ctx.log.error(e, "Error during processing of request {}", ctx.request)
|
||||||
|
|
|
||||||
|
|
@ -275,7 +275,7 @@ trait ImplicitPathMatcherConstruction {
|
||||||
*/
|
*/
|
||||||
implicit def valueMap2PathMatcher[T](valueMap: Map[String, T]): PathMatcher1[T] =
|
implicit def valueMap2PathMatcher[T](valueMap: Map[String, T]): PathMatcher1[T] =
|
||||||
if (valueMap.isEmpty) PathMatchers.nothingMatcher
|
if (valueMap.isEmpty) PathMatchers.nothingMatcher
|
||||||
else valueMap.map { case (prefix, value) ⇒ stringExtractionPair2PathMatcher(prefix, value) }.reduceLeft(_ | _)
|
else valueMap.map { case (prefix, value) ⇒ stringExtractionPair2PathMatcher((prefix, value)) }.reduceLeft(_ | _)
|
||||||
}
|
}
|
||||||
|
|
||||||
trait PathMatchers {
|
trait PathMatchers {
|
||||||
|
|
|
||||||
|
|
@ -117,60 +117,60 @@ object RejectionHandler {
|
||||||
newBuilder()
|
newBuilder()
|
||||||
.handleAll[SchemeRejection] { rejections ⇒
|
.handleAll[SchemeRejection] { rejections ⇒
|
||||||
val schemes = rejections.map(_.supported).mkString(", ")
|
val schemes = rejections.map(_.supported).mkString(", ")
|
||||||
complete(BadRequest, "Uri scheme not allowed, supported schemes: " + schemes)
|
complete((BadRequest, "Uri scheme not allowed, supported schemes: " + schemes))
|
||||||
}
|
}
|
||||||
.handleAll[MethodRejection] { rejections ⇒
|
.handleAll[MethodRejection] { rejections ⇒
|
||||||
val (methods, names) = rejections.map(r ⇒ r.supported -> r.supported.name).unzip
|
val (methods, names) = rejections.map(r ⇒ r.supported -> r.supported.name).unzip
|
||||||
complete(MethodNotAllowed, List(Allow(methods)), "HTTP method not allowed, supported methods: " + names.mkString(", "))
|
complete((MethodNotAllowed, List(Allow(methods)), "HTTP method not allowed, supported methods: " + names.mkString(", ")))
|
||||||
}
|
}
|
||||||
.handle {
|
.handle {
|
||||||
case AuthorizationFailedRejection ⇒
|
case AuthorizationFailedRejection ⇒
|
||||||
complete(Forbidden, "The supplied authentication is not authorized to access this resource")
|
complete((Forbidden, "The supplied authentication is not authorized to access this resource"))
|
||||||
}
|
}
|
||||||
.handle {
|
.handle {
|
||||||
case MalformedFormFieldRejection(name, msg, _) ⇒
|
case MalformedFormFieldRejection(name, msg, _) ⇒
|
||||||
complete(BadRequest, "The form field '" + name + "' was malformed:\n" + msg)
|
complete((BadRequest, "The form field '" + name + "' was malformed:\n" + msg))
|
||||||
}
|
}
|
||||||
.handle {
|
.handle {
|
||||||
case MalformedHeaderRejection(headerName, msg, _) ⇒
|
case MalformedHeaderRejection(headerName, msg, _) ⇒
|
||||||
complete(BadRequest, s"The value of HTTP header '$headerName' was malformed:\n" + msg)
|
complete((BadRequest, s"The value of HTTP header '$headerName' was malformed:\n" + msg))
|
||||||
}
|
}
|
||||||
.handle {
|
.handle {
|
||||||
case MalformedQueryParamRejection(name, msg, _) ⇒
|
case MalformedQueryParamRejection(name, msg, _) ⇒
|
||||||
complete(BadRequest, "The query parameter '" + name + "' was malformed:\n" + msg)
|
complete((BadRequest, "The query parameter '" + name + "' was malformed:\n" + msg))
|
||||||
}
|
}
|
||||||
.handle {
|
.handle {
|
||||||
case MalformedRequestContentRejection(msg, _) ⇒
|
case MalformedRequestContentRejection(msg, _) ⇒
|
||||||
complete(BadRequest, "The request content was malformed:\n" + msg)
|
complete((BadRequest, "The request content was malformed:\n" + msg))
|
||||||
}
|
}
|
||||||
.handle {
|
.handle {
|
||||||
case MissingCookieRejection(cookieName) ⇒
|
case MissingCookieRejection(cookieName) ⇒
|
||||||
complete(BadRequest, "Request is missing required cookie '" + cookieName + '\'')
|
complete((BadRequest, "Request is missing required cookie '" + cookieName + '\''))
|
||||||
}
|
}
|
||||||
.handle {
|
.handle {
|
||||||
case MissingFormFieldRejection(fieldName) ⇒
|
case MissingFormFieldRejection(fieldName) ⇒
|
||||||
complete(BadRequest, "Request is missing required form field '" + fieldName + '\'')
|
complete((BadRequest, "Request is missing required form field '" + fieldName + '\''))
|
||||||
}
|
}
|
||||||
.handle {
|
.handle {
|
||||||
case MissingHeaderRejection(headerName) ⇒
|
case MissingHeaderRejection(headerName) ⇒
|
||||||
complete(BadRequest, "Request is missing required HTTP header '" + headerName + '\'')
|
complete((BadRequest, "Request is missing required HTTP header '" + headerName + '\''))
|
||||||
}
|
}
|
||||||
.handle {
|
.handle {
|
||||||
case MissingQueryParamRejection(paramName) ⇒
|
case MissingQueryParamRejection(paramName) ⇒
|
||||||
complete(NotFound, "Request is missing required query parameter '" + paramName + '\'')
|
complete((NotFound, "Request is missing required query parameter '" + paramName + '\''))
|
||||||
}
|
}
|
||||||
.handle {
|
.handle {
|
||||||
case RequestEntityExpectedRejection ⇒
|
case RequestEntityExpectedRejection ⇒
|
||||||
complete(BadRequest, "Request entity expected but not supplied")
|
complete((BadRequest, "Request entity expected but not supplied"))
|
||||||
}
|
}
|
||||||
.handle {
|
.handle {
|
||||||
case TooManyRangesRejection(_) ⇒
|
case TooManyRangesRejection(_) ⇒
|
||||||
complete(RequestedRangeNotSatisfiable, "Request contains too many ranges.")
|
complete((RequestedRangeNotSatisfiable, "Request contains too many ranges."))
|
||||||
}
|
}
|
||||||
.handle {
|
.handle {
|
||||||
case UnsatisfiableRangeRejection(unsatisfiableRanges, actualEntityLength) ⇒
|
case UnsatisfiableRangeRejection(unsatisfiableRanges, actualEntityLength) ⇒
|
||||||
complete(RequestedRangeNotSatisfiable, List(`Content-Range`(ContentRange.Unsatisfiable(actualEntityLength))),
|
complete((RequestedRangeNotSatisfiable, List(`Content-Range`(ContentRange.Unsatisfiable(actualEntityLength))),
|
||||||
unsatisfiableRanges.mkString("None of the following requested Ranges were satisfiable:\n", "\n", ""))
|
unsatisfiableRanges.mkString("None of the following requested Ranges were satisfiable:\n", "\n", "")))
|
||||||
}
|
}
|
||||||
.handleAll[AuthenticationFailedRejection] { rejections ⇒
|
.handleAll[AuthenticationFailedRejection] { rejections ⇒
|
||||||
val rejectionMessage = rejections.head.cause match {
|
val rejectionMessage = rejections.head.cause match {
|
||||||
|
|
@ -184,36 +184,36 @@ object RejectionHandler {
|
||||||
// See https://code.google.com/p/chromium/issues/detail?id=103220
|
// See https://code.google.com/p/chromium/issues/detail?id=103220
|
||||||
// and https://bugzilla.mozilla.org/show_bug.cgi?id=669675
|
// and https://bugzilla.mozilla.org/show_bug.cgi?id=669675
|
||||||
val authenticateHeaders = rejections.map(r ⇒ `WWW-Authenticate`(r.challenge))
|
val authenticateHeaders = rejections.map(r ⇒ `WWW-Authenticate`(r.challenge))
|
||||||
complete(Unauthorized, authenticateHeaders, rejectionMessage)
|
complete((Unauthorized, authenticateHeaders, rejectionMessage))
|
||||||
}
|
}
|
||||||
.handleAll[UnacceptedResponseContentTypeRejection] { rejections ⇒
|
.handleAll[UnacceptedResponseContentTypeRejection] { rejections ⇒
|
||||||
val supported = rejections.flatMap(_.supported)
|
val supported = rejections.flatMap(_.supported)
|
||||||
complete(NotAcceptable, "Resource representation is only available with these Content-Types:\n" +
|
complete((NotAcceptable, "Resource representation is only available with these Content-Types:\n" +
|
||||||
supported.map(_.value).mkString("\n"))
|
supported.map(_.value).mkString("\n")))
|
||||||
}
|
}
|
||||||
.handleAll[UnacceptedResponseEncodingRejection] { rejections ⇒
|
.handleAll[UnacceptedResponseEncodingRejection] { rejections ⇒
|
||||||
val supported = rejections.flatMap(_.supported)
|
val supported = rejections.flatMap(_.supported)
|
||||||
complete(NotAcceptable, "Resource representation is only available with these Content-Encodings:\n" +
|
complete((NotAcceptable, "Resource representation is only available with these Content-Encodings:\n" +
|
||||||
supported.map(_.value).mkString("\n"))
|
supported.map(_.value).mkString("\n")))
|
||||||
}
|
}
|
||||||
.handleAll[UnsupportedRequestContentTypeRejection] { rejections ⇒
|
.handleAll[UnsupportedRequestContentTypeRejection] { rejections ⇒
|
||||||
val supported = rejections.flatMap(_.supported).mkString(" or ")
|
val supported = rejections.flatMap(_.supported).mkString(" or ")
|
||||||
complete(UnsupportedMediaType, "The request's Content-Type is not supported. Expected:\n" + supported)
|
complete((UnsupportedMediaType, "The request's Content-Type is not supported. Expected:\n" + supported))
|
||||||
}
|
}
|
||||||
.handleAll[UnsupportedRequestEncodingRejection] { rejections ⇒
|
.handleAll[UnsupportedRequestEncodingRejection] { rejections ⇒
|
||||||
val supported = rejections.map(_.supported.value).mkString(" or ")
|
val supported = rejections.map(_.supported.value).mkString(" or ")
|
||||||
complete(BadRequest, "The request's Content-Encoding is not supported. Expected:\n" + supported)
|
complete((BadRequest, "The request's Content-Encoding is not supported. Expected:\n" + supported))
|
||||||
}
|
}
|
||||||
.handle { case ExpectedWebsocketRequestRejection ⇒ complete(BadRequest, "Expected Websocket Upgrade request") }
|
.handle { case ExpectedWebsocketRequestRejection ⇒ complete((BadRequest, "Expected Websocket Upgrade request")) }
|
||||||
.handleAll[UnsupportedWebsocketSubprotocolRejection] { rejections ⇒
|
.handleAll[UnsupportedWebsocketSubprotocolRejection] { rejections ⇒
|
||||||
val supported = rejections.map(_.supportedProtocol)
|
val supported = rejections.map(_.supportedProtocol)
|
||||||
complete(HttpResponse(BadRequest,
|
complete(HttpResponse(BadRequest,
|
||||||
entity = s"None of the websocket subprotocols offered in the request are supported. Supported are ${supported.map("'" + _ + "'").mkString(",")}.",
|
entity = s"None of the websocket subprotocols offered in the request are supported. Supported are ${supported.map("'" + _ + "'").mkString(",")}.",
|
||||||
headers = `Sec-WebSocket-Protocol`(supported) :: Nil))
|
headers = `Sec-WebSocket-Protocol`(supported) :: Nil))
|
||||||
}
|
}
|
||||||
.handle { case ValidationRejection(msg, _) ⇒ complete(BadRequest, msg) }
|
.handle { case ValidationRejection(msg, _) ⇒ complete((BadRequest, msg)) }
|
||||||
.handle { case x ⇒ sys.error("Unhandled rejection: " + x) }
|
.handle { case x ⇒ sys.error("Unhandled rejection: " + x) }
|
||||||
.handleNotFound { complete(NotFound, "The requested resource could not be found.") }
|
.handleNotFound { complete((NotFound, "The requested resource could not be found.")) }
|
||||||
.result()
|
.result()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ trait MiscDirectives {
|
||||||
* If the condition fails the route is rejected with a [[ValidationRejection]].
|
* If the condition fails the route is rejected with a [[ValidationRejection]].
|
||||||
*/
|
*/
|
||||||
def validate(check: ⇒ Boolean, errorMsg: String): Directive0 =
|
def validate(check: ⇒ Boolean, errorMsg: String): Directive0 =
|
||||||
Directive { inner ⇒ if (check) inner() else reject(ValidationRejection(errorMsg)) }
|
Directive { inner ⇒ if (check) inner(()) else reject(ValidationRejection(errorMsg)) }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extracts the client's IP from either the X-Forwarded-For, Remote-Address or X-Real-IP header
|
* Extracts the client's IP from either the X-Forwarded-For, Remote-Address or X-Real-IP header
|
||||||
|
|
|
||||||
|
|
@ -108,7 +108,7 @@ trait RangeDirectives {
|
||||||
case Nil ⇒ ctx.reject(UnsatisfiableRangeRejection(ranges, length))
|
case Nil ⇒ ctx.reject(UnsatisfiableRangeRejection(ranges, length))
|
||||||
case Seq(satisfiableRange) ⇒ ctx.complete(rangeResponse(satisfiableRange, entity, length, headers))
|
case Seq(satisfiableRange) ⇒ ctx.complete(rangeResponse(satisfiableRange, entity, length, headers))
|
||||||
case satisfiableRanges ⇒
|
case satisfiableRanges ⇒
|
||||||
ctx.complete(PartialContent, headers, multipartRanges(satisfiableRanges, entity))
|
ctx.complete((PartialContent, headers, multipartRanges(satisfiableRanges, entity)))
|
||||||
}
|
}
|
||||||
case None ⇒
|
case None ⇒
|
||||||
// Ranges not supported for Chunked or CloseDelimited responses
|
// Ranges not supported for Chunked or CloseDelimited responses
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,7 @@ trait SecurityDirectives {
|
||||||
* Extracts the potentially present [[HttpCredentials]] provided with the request's [[Authorization]] header.
|
* Extracts the potentially present [[HttpCredentials]] provided with the request's [[Authorization]] header.
|
||||||
*/
|
*/
|
||||||
def extractCredentials: Directive1[Option[HttpCredentials]] =
|
def extractCredentials: Directive1[Option[HttpCredentials]] =
|
||||||
optionalHeaderValueByType[Authorization]().map(_.map(_.credentials))
|
optionalHeaderValueByType[Authorization](()).map(_.map(_.credentials))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wraps the inner route with Http Basic authentication support using a given ``Authenticator[T]``.
|
* Wraps the inner route with Http Basic authentication support using a given ``Authenticator[T]``.
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ trait WebsocketDirectives {
|
||||||
* Extract the [[UpgradeToWebsocket]] header if existent. Rejects with an [[ExpectedWebsocketRequestRejection]], otherwise.
|
* Extract the [[UpgradeToWebsocket]] header if existent. Rejects with an [[ExpectedWebsocketRequestRejection]], otherwise.
|
||||||
*/
|
*/
|
||||||
def extractUpgradeToWebsocket: Directive1[UpgradeToWebsocket] =
|
def extractUpgradeToWebsocket: Directive1[UpgradeToWebsocket] =
|
||||||
optionalHeaderValueByType[UpgradeToWebsocket]().flatMap {
|
optionalHeaderValueByType[UpgradeToWebsocket](()).flatMap {
|
||||||
case Some(upgrade) ⇒ provide(upgrade)
|
case Some(upgrade) ⇒ provide(upgrade)
|
||||||
case None ⇒ reject(ExpectedWebsocketRequestRejection)
|
case None ⇒ reject(ExpectedWebsocketRequestRejection)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,9 +18,10 @@ trait ClassMagnet[T] {
|
||||||
def extractPF: PartialFunction[Any, T]
|
def extractPF: PartialFunction[Any, T]
|
||||||
}
|
}
|
||||||
object ClassMagnet {
|
object ClassMagnet {
|
||||||
implicit def apply[T](c: Class[T]): ClassMagnet[T] = ClassMagnet()(ClassTag(c))
|
implicit def fromClass[T](c: Class[T]): ClassMagnet[T] = ClassMagnet(ClassTag(c))
|
||||||
|
implicit def fromUnit[T](u: Unit)(implicit tag: ClassTag[T]) = ClassMagnet(tag)
|
||||||
|
|
||||||
implicit def apply[T](u: Unit)(implicit tag: ClassTag[T]): ClassMagnet[T] =
|
def apply[T](implicit tag: ClassTag[T]): ClassMagnet[T] =
|
||||||
new ClassMagnet[T] {
|
new ClassMagnet[T] {
|
||||||
val classTag: ClassTag[T] = tag
|
val classTag: ClassTag[T] = tag
|
||||||
val runtimeClass: Class[T] = tag.runtimeClass.asInstanceOf[Class[T]]
|
val runtimeClass: Class[T] = tag.runtimeClass.asInstanceOf[Class[T]]
|
||||||
|
|
|
||||||
|
|
@ -34,35 +34,40 @@ trait ScriptedTest extends Matchers {
|
||||||
jumps ++= Vector.fill(ins.size - 1)(0) ++ Vector(outs.size)
|
jumps ++= Vector.fill(ins.size - 1)(0) ++ Vector(outs.size)
|
||||||
}
|
}
|
||||||
|
|
||||||
Script(providedInputs, expectedOutputs, jumps, inputCursor = 0, outputCursor = 0, outputEndCursor = 0, completed = false)
|
new Script(providedInputs, expectedOutputs, jumps, inputCursor = 0, outputCursor = 0, outputEndCursor = 0, completed = false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final case class Script[In, Out](
|
final class Script[In, Out](
|
||||||
providedInputs: Vector[In],
|
val providedInputs: Vector[In],
|
||||||
expectedOutputs: Vector[Out],
|
val expectedOutputs: Vector[Out],
|
||||||
jumps: Vector[Int],
|
val jumps: Vector[Int],
|
||||||
inputCursor: Int,
|
val inputCursor: Int,
|
||||||
outputCursor: Int,
|
val outputCursor: Int,
|
||||||
outputEndCursor: Int,
|
val outputEndCursor: Int,
|
||||||
completed: Boolean) {
|
val completed: Boolean) {
|
||||||
require(jumps.size == providedInputs.size)
|
require(jumps.size == providedInputs.size)
|
||||||
|
|
||||||
def provideInput: (In, Script[In, Out]) =
|
def provideInput: (In, Script[In, Out]) =
|
||||||
if (noInsPending)
|
if (noInsPending)
|
||||||
throw new ScriptException("Script cannot provide more input.")
|
throw new ScriptException("Script cannot provide more input.")
|
||||||
else
|
else
|
||||||
(providedInputs(inputCursor), this.copy(inputCursor = inputCursor + 1, outputEndCursor = outputEndCursor + jumps(inputCursor)))
|
(providedInputs(inputCursor),
|
||||||
|
new Script(providedInputs, expectedOutputs, jumps, inputCursor = inputCursor + 1,
|
||||||
|
outputCursor, outputEndCursor = outputEndCursor + jumps(inputCursor), completed))
|
||||||
|
|
||||||
def consumeOutput(out: Out): Script[In, Out] = {
|
def consumeOutput(out: Out): Script[In, Out] = {
|
||||||
if (noOutsPending)
|
if (noOutsPending)
|
||||||
throw new ScriptException(s"Tried to produce element ${out} but no elements should be produced right now.")
|
throw new ScriptException(s"Tried to produce element ${out} but no elements should be produced right now.")
|
||||||
out should be(expectedOutputs(outputCursor))
|
out should be(expectedOutputs(outputCursor))
|
||||||
this.copy(outputCursor = outputCursor + 1)
|
new Script(providedInputs, expectedOutputs, jumps, inputCursor,
|
||||||
|
outputCursor = outputCursor + 1, outputEndCursor, completed)
|
||||||
}
|
}
|
||||||
|
|
||||||
def complete(): Script[In, Out] = {
|
def complete(): Script[In, Out] = {
|
||||||
if (finished) copy(completed = true)
|
if (finished)
|
||||||
|
new Script(providedInputs, expectedOutputs, jumps, inputCursor,
|
||||||
|
outputCursor = outputCursor + 1, outputEndCursor, completed = true)
|
||||||
else fail("received onComplete prematurely")
|
else fail("received onComplete prematurely")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,14 +8,24 @@ import java.nio.channels.DatagramChannel
|
||||||
import java.nio.channels.ServerSocketChannel
|
import java.nio.channels.ServerSocketChannel
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.net.SocketAddress
|
import java.net.SocketAddress
|
||||||
|
import java.net.DatagramSocket
|
||||||
|
import java.net.ServerSocket
|
||||||
|
|
||||||
object TestUtils { // FIXME: remove once going back to project dependencies
|
object TestUtils { // FIXME: remove once going back to project dependencies
|
||||||
import scala.language.reflectiveCalls
|
trait GeneralSocket extends Any {
|
||||||
// Structural type needed since DatagramSocket and ServerSocket has no common ancestor apart from Object
|
|
||||||
type GeneralSocket = {
|
|
||||||
def bind(sa: SocketAddress): Unit
|
def bind(sa: SocketAddress): Unit
|
||||||
def close(): Unit
|
def close(): Unit
|
||||||
def getLocalPort(): Int
|
def getLocalPort: Int
|
||||||
|
}
|
||||||
|
implicit class GeneralDatagramSocket(val s: DatagramSocket) extends GeneralSocket {
|
||||||
|
def bind(sa: SocketAddress): Unit = s.bind(sa)
|
||||||
|
def close(): Unit = s.close()
|
||||||
|
def getLocalPort: Int = s.getLocalPort
|
||||||
|
}
|
||||||
|
implicit class GeneralServerSocket(val s: ServerSocket) extends GeneralSocket {
|
||||||
|
def bind(sa: SocketAddress): Unit = s.bind(sa)
|
||||||
|
def close(): Unit = s.close()
|
||||||
|
def getLocalPort: Int = s.getLocalPort
|
||||||
}
|
}
|
||||||
|
|
||||||
def temporaryServerAddress(address: String = "127.0.0.1", udp: Boolean = false): InetSocketAddress =
|
def temporaryServerAddress(address: String = "127.0.0.1", udp: Boolean = false): InetSocketAddress =
|
||||||
|
|
|
||||||
|
|
@ -82,7 +82,7 @@ public class SinkTest extends StreamTest {
|
||||||
final Sink<Integer, ?> sink1 = Sink.actorRef(probe1.getRef(), "done1");
|
final Sink<Integer, ?> sink1 = Sink.actorRef(probe1.getRef(), "done1");
|
||||||
final Sink<Integer, ?> sink2 = Sink.actorRef(probe2.getRef(), "done2");
|
final Sink<Integer, ?> sink2 = Sink.actorRef(probe2.getRef(), "done2");
|
||||||
|
|
||||||
final Sink<Integer, ?> sink = Sink.combine(sink1, sink2, new ArrayList(),
|
final Sink<Integer, ?> sink = Sink.combine(sink1, sink2, new ArrayList<Sink<Integer, ?>>(),
|
||||||
new Function<Integer, Graph<UniformFanOutShape<Integer, Integer>, BoxedUnit>>() {
|
new Function<Integer, Graph<UniformFanOutShape<Integer, Integer>, BoxedUnit>>() {
|
||||||
public Graph<UniformFanOutShape<Integer, Integer>, BoxedUnit> apply(Integer elem) {
|
public Graph<UniformFanOutShape<Integer, Integer>, BoxedUnit> apply(Integer elem) {
|
||||||
return Broadcast.create(elem);
|
return Broadcast.create(elem);
|
||||||
|
|
|
||||||
|
|
@ -566,7 +566,7 @@ public class SourceTest extends StreamTest {
|
||||||
final Source<Integer, ?> source1 = Source.from(Arrays.asList(0, 1));
|
final Source<Integer, ?> source1 = Source.from(Arrays.asList(0, 1));
|
||||||
final Source<Integer, ?> source2 = Source.from(Arrays.asList(2, 3));
|
final Source<Integer, ?> source2 = Source.from(Arrays.asList(2, 3));
|
||||||
|
|
||||||
final Source<Integer, ?> source = Source.combine(source1, source2, new ArrayList(),
|
final Source<Integer, ?> source = Source.combine(source1, source2, new ArrayList<Source<Integer, ?>>(),
|
||||||
new Function<Integer, Graph<UniformFanInShape<Integer, Integer>, BoxedUnit>>() {
|
new Function<Integer, Graph<UniformFanInShape<Integer, Integer>, BoxedUnit>>() {
|
||||||
public Graph<UniformFanInShape<Integer, Integer>, BoxedUnit> apply(Integer elem) {
|
public Graph<UniformFanInShape<Integer, Integer>, BoxedUnit> apply(Integer elem) {
|
||||||
return Merge.create(elem);
|
return Merge.create(elem);
|
||||||
|
|
|
||||||
|
|
@ -183,6 +183,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
|
||||||
def returnTypeString(m: Method): String =
|
def returnTypeString(m: Method): String =
|
||||||
m.returnType.getName.drop("akka.stream.".length)
|
m.returnType.getName.drop("akka.stream.".length)
|
||||||
|
|
||||||
|
import language.existentials
|
||||||
case class Method(name: String, parameterTypes: List[Class[_]], returnType: Class[_], declaringClass: Class[_])
|
case class Method(name: String, parameterTypes: List[Class[_]], returnType: Class[_], declaringClass: Class[_])
|
||||||
|
|
||||||
sealed trait MatchResult {
|
sealed trait MatchResult {
|
||||||
|
|
|
||||||
|
|
@ -505,7 +505,7 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
|
||||||
val address = temporaryServerAddress()
|
val address = temporaryServerAddress()
|
||||||
val firstClientConnected = Promise[Unit]()
|
val firstClientConnected = Promise[Unit]()
|
||||||
val takeTwoAndDropSecond = Flow[IncomingConnection].map(conn ⇒ {
|
val takeTwoAndDropSecond = Flow[IncomingConnection].map(conn ⇒ {
|
||||||
firstClientConnected.trySuccess()
|
firstClientConnected.trySuccess(())
|
||||||
conn
|
conn
|
||||||
}).grouped(2).take(1).map(_.head)
|
}).grouped(2).take(1).map(_.head)
|
||||||
Tcp().bind(address.getHostName, address.getPort)
|
Tcp().bind(address.getHostName, address.getPort)
|
||||||
|
|
|
||||||
|
|
@ -90,12 +90,12 @@ class GraphStageTimersSpec extends AkkaSpec {
|
||||||
"receive single-shot timer" in {
|
"receive single-shot timer" in {
|
||||||
val driver = setupIsolatedStage
|
val driver = setupIsolatedStage
|
||||||
|
|
||||||
within(2 seconds) {
|
within(2.seconds) {
|
||||||
within(500 millis, 1 second) {
|
within(500.millis, 1.second) {
|
||||||
driver ! TestSingleTimer
|
driver ! TestSingleTimer
|
||||||
expectMsg(Tick(1))
|
expectMsg(Tick(1))
|
||||||
}
|
}
|
||||||
expectNoMsg(1 second)
|
expectNoMsg(1.second)
|
||||||
}
|
}
|
||||||
|
|
||||||
driver.stopStage()
|
driver.stopStage()
|
||||||
|
|
@ -104,15 +104,15 @@ class GraphStageTimersSpec extends AkkaSpec {
|
||||||
"resubmit single-shot timer" in {
|
"resubmit single-shot timer" in {
|
||||||
val driver = setupIsolatedStage
|
val driver = setupIsolatedStage
|
||||||
|
|
||||||
within(2.5 seconds) {
|
within(2.5.seconds) {
|
||||||
within(500 millis, 1 second) {
|
within(500.millis, 1.second) {
|
||||||
driver ! TestSingleTimerResubmit
|
driver ! TestSingleTimerResubmit
|
||||||
expectMsg(Tick(1))
|
expectMsg(Tick(1))
|
||||||
}
|
}
|
||||||
within(1 second) {
|
within(1.second) {
|
||||||
expectMsg(Tick(2))
|
expectMsg(Tick(2))
|
||||||
}
|
}
|
||||||
expectNoMsg(1 second)
|
expectNoMsg(1.second)
|
||||||
}
|
}
|
||||||
|
|
||||||
driver.stopStage()
|
driver.stopStage()
|
||||||
|
|
@ -122,13 +122,13 @@ class GraphStageTimersSpec extends AkkaSpec {
|
||||||
val driver = setupIsolatedStage
|
val driver = setupIsolatedStage
|
||||||
|
|
||||||
driver ! TestCancelTimer
|
driver ! TestCancelTimer
|
||||||
within(500 millis) {
|
within(500.millis) {
|
||||||
expectMsg(TestCancelTimerAck)
|
expectMsg(TestCancelTimerAck)
|
||||||
}
|
}
|
||||||
within(300 millis, 1 second) {
|
within(300.millis, 1.second) {
|
||||||
expectMsg(Tick(1))
|
expectMsg(Tick(1))
|
||||||
}
|
}
|
||||||
expectNoMsg(1 second)
|
expectNoMsg(1.second)
|
||||||
|
|
||||||
driver.stopStage()
|
driver.stopStage()
|
||||||
}
|
}
|
||||||
|
|
@ -137,11 +137,11 @@ class GraphStageTimersSpec extends AkkaSpec {
|
||||||
val driver = setupIsolatedStage
|
val driver = setupIsolatedStage
|
||||||
|
|
||||||
driver ! TestRepeatedTimer
|
driver ! TestRepeatedTimer
|
||||||
val seq = receiveWhile(2 seconds) {
|
val seq = receiveWhile(2.seconds) {
|
||||||
case t: Tick ⇒ t
|
case t: Tick ⇒ t
|
||||||
}
|
}
|
||||||
seq should have length 5
|
seq should have length 5
|
||||||
expectNoMsg(1 second)
|
expectNoMsg(1.second)
|
||||||
|
|
||||||
driver.stopStage()
|
driver.stopStage()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -339,7 +339,6 @@ private[akka] object ActorProcessorFactory {
|
||||||
case Scan(z, f, _) ⇒ interp(fusing.Scan(z, f, settings.supervisionDecider))
|
case Scan(z, f, _) ⇒ interp(fusing.Scan(z, f, settings.supervisionDecider))
|
||||||
case Fold(z, f, _) ⇒ interp(fusing.Fold(z, f, settings.supervisionDecider))
|
case Fold(z, f, _) ⇒ interp(fusing.Fold(z, f, settings.supervisionDecider))
|
||||||
case Recover(pf, _) ⇒ interp(fusing.Recover(pf))
|
case Recover(pf, _) ⇒ interp(fusing.Recover(pf))
|
||||||
case Scan(z, f, _) ⇒ interp(fusing.Scan(z, f, settings.supervisionDecider))
|
|
||||||
case Expand(s, f, _) ⇒ interp(fusing.Expand(s, f))
|
case Expand(s, f, _) ⇒ interp(fusing.Expand(s, f))
|
||||||
case Conflate(s, f, _) ⇒ interp(fusing.Conflate(s, f, settings.supervisionDecider))
|
case Conflate(s, f, _) ⇒ interp(fusing.Conflate(s, f, settings.supervisionDecider))
|
||||||
case Buffer(n, s, _) ⇒ interp(fusing.Buffer(n, s))
|
case Buffer(n, s, _) ⇒ interp(fusing.Buffer(n, s))
|
||||||
|
|
|
||||||
|
|
@ -114,7 +114,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump)
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def onSubscribe(subscription: Subscription): Unit = {
|
protected def onSubscribe(subscription: Subscription): Unit = {
|
||||||
assert(subscription != null)
|
require(subscription != null)
|
||||||
if (upstreamCompleted) subscription.cancel()
|
if (upstreamCompleted) subscription.cancel()
|
||||||
else {
|
else {
|
||||||
upstream = subscription
|
upstream = subscription
|
||||||
|
|
|
||||||
|
|
@ -166,7 +166,7 @@ private[akka] object FanIn {
|
||||||
while (!(marked(id) && pending(id))) {
|
while (!(marked(id) && pending(id))) {
|
||||||
id += 1
|
id += 1
|
||||||
if (id == inputCount) id = 0
|
if (id == inputCount) id = 0
|
||||||
assert(id != preferredId, "Tried to dequeue without waiting for any input")
|
require(id != preferredId, "Tried to dequeue without waiting for any input")
|
||||||
}
|
}
|
||||||
id
|
id
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -141,7 +141,7 @@ private[akka] object FanOut {
|
||||||
while (!(marked(id) && pending(id))) {
|
while (!(marked(id) && pending(id))) {
|
||||||
id += 1
|
id += 1
|
||||||
if (id == outputCount) id = 0
|
if (id == outputCount) id = 0
|
||||||
assert(id != preferredId, "Tried to enqueue without waiting for any demand")
|
require(id != preferredId, "Tried to enqueue without waiting for any demand")
|
||||||
}
|
}
|
||||||
id
|
id
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String)
|
||||||
|
|
||||||
private def dequeue(): Any = {
|
private def dequeue(): Any = {
|
||||||
val elem = inputBuffer(nextInputElementCursor)
|
val elem = inputBuffer(nextInputElementCursor)
|
||||||
assert(elem ne null)
|
require(elem ne null)
|
||||||
inputBuffer(nextInputElementCursor) = null
|
inputBuffer(nextInputElementCursor) = null
|
||||||
|
|
||||||
batchRemaining -= 1
|
batchRemaining -= 1
|
||||||
|
|
@ -110,7 +110,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def onSubscribe(subscription: Subscription): Unit = {
|
private def onSubscribe(subscription: Subscription): Unit = {
|
||||||
assert(subscription != null)
|
require(subscription != null)
|
||||||
if (upstreamCompleted)
|
if (upstreamCompleted)
|
||||||
tryCancel(subscription)
|
tryCancel(subscription)
|
||||||
else if (downstreamCanceled) {
|
else if (downstreamCanceled) {
|
||||||
|
|
|
||||||
|
|
@ -319,7 +319,7 @@ object Flow extends FlowApply {
|
||||||
* Flow with attached input and output, can be executed.
|
* Flow with attached input and output, can be executed.
|
||||||
*/
|
*/
|
||||||
case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module) extends Graph[ClosedShape, Mat] {
|
case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module) extends Graph[ClosedShape, Mat] {
|
||||||
assert(module.isRunnable)
|
require(module.isRunnable)
|
||||||
def shape = ClosedShape
|
def shape = ClosedShape
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue