This commit is contained in:
parent
a7451fd888
commit
374a852553
6 changed files with 120 additions and 71 deletions
|
|
@ -61,7 +61,21 @@ public interface HttpMessage {
|
||||||
ResponseEntity entity();
|
ResponseEntity entity();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Drains entity stream of this message
|
* Discards the entities data bytes by running the {@code dataBytes} Source contained by the {@code entity}
|
||||||
|
* of this HTTP message.
|
||||||
|
*
|
||||||
|
* Note: It is crucial that entities are either discarded, or consumed by running the underlying [[Source]]
|
||||||
|
* as otherwise the lack of consuming of the data will trigger back-pressure to the underlying TCP connection
|
||||||
|
* (as designed), however possibly leading to an idle-timeout that will close the connection, instead of
|
||||||
|
* just having ignored the data.
|
||||||
|
*
|
||||||
|
* Warning: It is not allowed to discard and/or consume the the {@code entity.dataBytes} more than once
|
||||||
|
* as the stream is directly attached to the "live" incoming data source from the underlying TCP connection.
|
||||||
|
* Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose
|
||||||
|
* of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an
|
||||||
|
* "stream can cannot be materialized more than once" exception.
|
||||||
|
*
|
||||||
|
* In future versions, more automatic ways to warn or resolve these situations may be introduced, see issue #18716.
|
||||||
*/
|
*/
|
||||||
DiscardedEntity discardEntityBytes(Materializer materializer);
|
DiscardedEntity discardEntityBytes(Materializer materializer);
|
||||||
|
|
||||||
|
|
@ -69,7 +83,7 @@ public interface HttpMessage {
|
||||||
* Represents the the currently being-drained HTTP Entity which triggers completion of the contained
|
* Represents the the currently being-drained HTTP Entity which triggers completion of the contained
|
||||||
* Future once the entity has been drained for the given HttpMessage completely.
|
* Future once the entity has been drained for the given HttpMessage completely.
|
||||||
*/
|
*/
|
||||||
public interface DiscardedEntity {
|
interface DiscardedEntity {
|
||||||
/**
|
/**
|
||||||
* This future completes successfully once the underlying entity stream has been
|
* This future completes successfully once the underlying entity stream has been
|
||||||
* successfully drained (and fails otherwise).
|
* successfully drained (and fails otherwise).
|
||||||
|
|
@ -83,7 +97,7 @@ public interface HttpMessage {
|
||||||
CompletionStage<Done> completionStage();
|
CompletionStage<Done> completionStage();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static interface MessageTransformations<Self> {
|
interface MessageTransformations<Self> {
|
||||||
/**
|
/**
|
||||||
* Returns a copy of this message with a new protocol.
|
* Returns a copy of this message with a new protocol.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -59,7 +59,7 @@ private[http] object StreamUtils {
|
||||||
override def onPull(): Unit = pull(in)
|
override def onPull(): Unit = pull(in)
|
||||||
|
|
||||||
override def onUpstreamFailure(ex: Throwable): Unit = {
|
override def onUpstreamFailure(ex: Throwable): Unit = {
|
||||||
promise.failure(ex)
|
promise.tryFailure(ex)
|
||||||
failStage(ex)
|
failStage(ex)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -166,8 +166,24 @@ object HttpMessage {
|
||||||
def completionStage: CompletionStage[Done] = FutureConverters.toJava(f)
|
def completionStage: CompletionStage[Done] = FutureConverters.toJava(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
implicit final class HttpMessageDiscardEntity(val httpMessage: HttpMessage) extends AnyVal {
|
/** Adds Scala DSL idiomatic methods to [[HttpMessage]], e.g. versions of methods with an implicit [[Materializer]]. */
|
||||||
/** Drains entity stream of this message */
|
implicit final class HttpMessageScalaDSLSugar(val httpMessage: HttpMessage) extends AnyVal {
|
||||||
|
/**
|
||||||
|
* Discards the entities data bytes by running the `dataBytes` Source contained by the `entity` of this HTTP message.
|
||||||
|
*
|
||||||
|
* Note: It is crucial that entities are either discarded, or consumed by running the underlying [[akka.stream.scaladsl.Source]]
|
||||||
|
* as otherwise the lack of consuming of the data will trigger back-pressure to the underlying TCP connection
|
||||||
|
* (as designed), however possibly leading to an idle-timeout that will close the connection, instead of
|
||||||
|
* just having ignored the data.
|
||||||
|
*
|
||||||
|
* Warning: It is not allowed to discard and/or consume the the `entity.dataBytes` more than once
|
||||||
|
* as the stream is directly attached to the "live" incoming data source from the underlying TCP connection.
|
||||||
|
* Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose
|
||||||
|
* of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an
|
||||||
|
* "stream can cannot be materialized more than once" exception.
|
||||||
|
*
|
||||||
|
* In future versions, more automatic ways to warn or resolve these situations may be introduced, see issue #18716.
|
||||||
|
*/
|
||||||
def discardEntityBytes()(implicit mat: Materializer): HttpMessage.DiscardedEntity =
|
def discardEntityBytes()(implicit mat: Materializer): HttpMessage.DiscardedEntity =
|
||||||
httpMessage.discardEntityBytes(mat)
|
httpMessage.discardEntityBytes(mat)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,14 +21,14 @@ import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class EntityDrainingTestCases extends JUnitSuite {
|
public class EntityDiscardingTest extends JUnitSuite {
|
||||||
|
|
||||||
private ActorSystem sys = ActorSystem.create("test");
|
private ActorSystem sys = ActorSystem.create("test");
|
||||||
private ActorMaterializer mat = ActorMaterializer.create(sys);
|
private ActorMaterializer mat = ActorMaterializer.create(sys);
|
||||||
private Iterable<ByteString> testData = Arrays.asList(ByteString.fromString("abc"), ByteString.fromString("def"));
|
private Iterable<ByteString> testData = Arrays.asList(ByteString.fromString("abc"), ByteString.fromString("def"));
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHttpRequestDrainEntity() {
|
public void testHttpRequestDiscardEntity() {
|
||||||
|
|
||||||
CompletableFuture<Done> f = new CompletableFuture<>();
|
CompletableFuture<Done> f = new CompletableFuture<>();
|
||||||
Source<ByteString, ?> s = Source.from(testData).alsoTo(Sink.onComplete(completeDone(f)));
|
Source<ByteString, ?> s = Source.from(testData).alsoTo(Sink.onComplete(completeDone(f)));
|
||||||
|
|
@ -43,7 +43,7 @@ public class EntityDrainingTestCases extends JUnitSuite {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHttpResponseDrainEntity() {
|
public void testHttpResponseDiscardEntity() {
|
||||||
|
|
||||||
CompletableFuture<Done> f = new CompletableFuture<>();
|
CompletableFuture<Done> f = new CompletableFuture<>();
|
||||||
Source<ByteString, ?> s = Source.from(testData).alsoTo(Sink.onComplete(completeDone(f)));
|
Source<ByteString, ?> s = Source.from(testData).alsoTo(Sink.onComplete(completeDone(f)));
|
||||||
|
|
@ -0,0 +1,81 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.http.scaladsl.model
|
||||||
|
|
||||||
|
import akka.Done
|
||||||
|
import akka.http.scaladsl.model.HttpEntity.Chunked
|
||||||
|
import akka.http.scaladsl.{ Http, TestUtils }
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.stream.scaladsl._
|
||||||
|
import akka.testkit.AkkaSpec
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import akka.util.ByteString
|
||||||
|
|
||||||
|
import scala.concurrent.{ Await, Promise }
|
||||||
|
|
||||||
|
class EntityDiscardingSpec extends AkkaSpec {
|
||||||
|
|
||||||
|
implicit val mat = ActorMaterializer()
|
||||||
|
|
||||||
|
val testData = Vector.tabulate(200)(i ⇒ ByteString(s"row-$i"))
|
||||||
|
|
||||||
|
"HttpRequest" should {
|
||||||
|
|
||||||
|
"discard entity stream after .discardEntityBytes() call" in {
|
||||||
|
|
||||||
|
val p = Promise[Done]()
|
||||||
|
val s = Source
|
||||||
|
.fromIterator[ByteString](() ⇒ testData.iterator)
|
||||||
|
.alsoTo(Sink.onComplete(t ⇒ p.complete(t)))
|
||||||
|
|
||||||
|
val req = HttpRequest(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, s))
|
||||||
|
val de = req.discardEntityBytes()
|
||||||
|
|
||||||
|
p.future.futureValue should ===(Done)
|
||||||
|
de.future.futureValue should ===(Done)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"HttpResponse" should {
|
||||||
|
|
||||||
|
"discard entity stream after .discardEntityBytes() call" in {
|
||||||
|
|
||||||
|
val p = Promise[Done]()
|
||||||
|
val s = Source
|
||||||
|
.fromIterator[ByteString](() ⇒ testData.iterator)
|
||||||
|
.alsoTo(Sink.onComplete(t ⇒ p.complete(t)))
|
||||||
|
|
||||||
|
val resp = HttpResponse(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, s))
|
||||||
|
val de = resp.discardEntityBytes()
|
||||||
|
|
||||||
|
p.future.futureValue should ===(Done)
|
||||||
|
de.future.futureValue should ===(Done)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO consider improving this by storing a mutable "already materialized" flag somewhere
|
||||||
|
// TODO likely this is going to inter-op with the auto-draining as described in #18716
|
||||||
|
"should not allow draining a second time" in {
|
||||||
|
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
|
||||||
|
val bound = Http().bindAndHandleSync(
|
||||||
|
req ⇒
|
||||||
|
HttpResponse(entity = HttpEntity(
|
||||||
|
ContentTypes.`text/csv(UTF-8)`, Source.fromIterator[ByteString](() ⇒ testData.iterator))),
|
||||||
|
host, port).futureValue
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
val response = Http().singleRequest(HttpRequest(uri = s"http://$host:$port/")).futureValue
|
||||||
|
|
||||||
|
val de = response.discardEntityBytes()
|
||||||
|
de.future.futureValue should ===(Done)
|
||||||
|
|
||||||
|
val de2 = response.discardEntityBytes()
|
||||||
|
val secondRunException = intercept[IllegalStateException] { Await.result(de2.future, 3.seconds) }
|
||||||
|
secondRunException.getMessage should include("Source cannot be materialized more than once")
|
||||||
|
} finally bound.unbind().futureValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -1,62 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
|
||||||
*/
|
|
||||||
|
|
||||||
package akka.http.scaladsl.model
|
|
||||||
|
|
||||||
import java.util.concurrent.CompletableFuture
|
|
||||||
|
|
||||||
import akka.Done
|
|
||||||
import akka.actor.ActorSystem
|
|
||||||
import akka.japi.function
|
|
||||||
import akka.stream.ActorMaterializer
|
|
||||||
import akka.stream.scaladsl._
|
|
||||||
import akka.util.ByteString
|
|
||||||
import org.scalatest.concurrent.ScalaFutures._
|
|
||||||
import org.scalatest.{ Matchers, WordSpec }
|
|
||||||
|
|
||||||
import scala.concurrent.Promise
|
|
||||||
import scala.util.{ Failure, Success, Try }
|
|
||||||
|
|
||||||
class EntityDrainingSpec extends WordSpec with Matchers {
|
|
||||||
|
|
||||||
implicit val sys = ActorSystem("test")
|
|
||||||
implicit val mat = ActorMaterializer()
|
|
||||||
|
|
||||||
val testData = Vector.tabulate(200)(i ⇒ ByteString(s"row-$i"))
|
|
||||||
|
|
||||||
"HttpRequest" should {
|
|
||||||
|
|
||||||
"drain entity stream after .discardEntityBytes() call" in {
|
|
||||||
|
|
||||||
val p = Promise[Done]()
|
|
||||||
val s = Source
|
|
||||||
.fromIterator[ByteString](() ⇒ testData.iterator)
|
|
||||||
.alsoTo(Sink.onComplete(t ⇒ p.complete(t)))
|
|
||||||
|
|
||||||
val req = HttpRequest(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, s))
|
|
||||||
val de = req.discardEntityBytes()
|
|
||||||
|
|
||||||
p.future.futureValue should ===(Done)
|
|
||||||
de.future.futureValue should ===(Done)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"HttpResponse" should {
|
|
||||||
|
|
||||||
"drain entity stream after .discardEntityBytes() call" in {
|
|
||||||
|
|
||||||
val p = Promise[Done]()
|
|
||||||
val s = Source
|
|
||||||
.fromIterator[ByteString](() ⇒ testData.iterator)
|
|
||||||
.alsoTo(Sink.onComplete(t ⇒ p.complete(t)))
|
|
||||||
|
|
||||||
val resp = HttpResponse(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, s))
|
|
||||||
val de = resp.discardEntityBytes()
|
|
||||||
|
|
||||||
p.future.futureValue should ===(Done)
|
|
||||||
de.future.futureValue should ===(Done)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue