diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java index 545dbd238f..aac8d8d3d8 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java @@ -61,7 +61,21 @@ public interface HttpMessage { ResponseEntity entity(); /** - * Drains entity stream of this message + * Discards the entities data bytes by running the {@code dataBytes} Source contained by the {@code entity} + * of this HTTP message. + * + * Note: It is crucial that entities are either discarded, or consumed by running the underlying [[Source]] + * as otherwise the lack of consuming of the data will trigger back-pressure to the underlying TCP connection + * (as designed), however possibly leading to an idle-timeout that will close the connection, instead of + * just having ignored the data. + * + * Warning: It is not allowed to discard and/or consume the the {@code entity.dataBytes} more than once + * as the stream is directly attached to the "live" incoming data source from the underlying TCP connection. + * Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose + * of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an + * "stream can cannot be materialized more than once" exception. + * + * In future versions, more automatic ways to warn or resolve these situations may be introduced, see issue #18716. */ DiscardedEntity discardEntityBytes(Materializer materializer); @@ -69,7 +83,7 @@ public interface HttpMessage { * Represents the the currently being-drained HTTP Entity which triggers completion of the contained * Future once the entity has been drained for the given HttpMessage completely. */ - public interface DiscardedEntity { + interface DiscardedEntity { /** * This future completes successfully once the underlying entity stream has been * successfully drained (and fails otherwise). @@ -83,7 +97,7 @@ public interface HttpMessage { CompletionStage completionStage(); } - public static interface MessageTransformations { + interface MessageTransformations { /** * Returns a copy of this message with a new protocol. */ diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index b2cea22a33..1881ebab24 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -59,7 +59,7 @@ private[http] object StreamUtils { override def onPull(): Unit = pull(in) override def onUpstreamFailure(ex: Throwable): Unit = { - promise.failure(ex) + promise.tryFailure(ex) failStage(ex) } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala index 66808d21bb..d8d87357db 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala @@ -166,8 +166,24 @@ object HttpMessage { def completionStage: CompletionStage[Done] = FutureConverters.toJava(f) } - implicit final class HttpMessageDiscardEntity(val httpMessage: HttpMessage) extends AnyVal { - /** Drains entity stream of this message */ + /** Adds Scala DSL idiomatic methods to [[HttpMessage]], e.g. versions of methods with an implicit [[Materializer]]. */ + implicit final class HttpMessageScalaDSLSugar(val httpMessage: HttpMessage) extends AnyVal { + /** + * Discards the entities data bytes by running the `dataBytes` Source contained by the `entity` of this HTTP message. + * + * Note: It is crucial that entities are either discarded, or consumed by running the underlying [[akka.stream.scaladsl.Source]] + * as otherwise the lack of consuming of the data will trigger back-pressure to the underlying TCP connection + * (as designed), however possibly leading to an idle-timeout that will close the connection, instead of + * just having ignored the data. + * + * Warning: It is not allowed to discard and/or consume the the `entity.dataBytes` more than once + * as the stream is directly attached to the "live" incoming data source from the underlying TCP connection. + * Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose + * of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an + * "stream can cannot be materialized more than once" exception. + * + * In future versions, more automatic ways to warn or resolve these situations may be introduced, see issue #18716. + */ def discardEntityBytes()(implicit mat: Materializer): HttpMessage.DiscardedEntity = httpMessage.discardEntityBytes(mat) } diff --git a/akka-http-core/src/test/java/akka/http/javadsl/model/EntityDrainingTestCases.java b/akka-http-core/src/test/java/akka/http/javadsl/model/EntityDiscardingTest.java similarity index 93% rename from akka-http-core/src/test/java/akka/http/javadsl/model/EntityDrainingTestCases.java rename to akka-http-core/src/test/java/akka/http/javadsl/model/EntityDiscardingTest.java index 94478f6fd5..fbdf6add6a 100644 --- a/akka-http-core/src/test/java/akka/http/javadsl/model/EntityDrainingTestCases.java +++ b/akka-http-core/src/test/java/akka/http/javadsl/model/EntityDiscardingTest.java @@ -21,14 +21,14 @@ import java.util.concurrent.CompletableFuture; import static org.junit.Assert.assertEquals; -public class EntityDrainingTestCases extends JUnitSuite { +public class EntityDiscardingTest extends JUnitSuite { private ActorSystem sys = ActorSystem.create("test"); private ActorMaterializer mat = ActorMaterializer.create(sys); private Iterable testData = Arrays.asList(ByteString.fromString("abc"), ByteString.fromString("def")); @Test - public void testHttpRequestDrainEntity() { + public void testHttpRequestDiscardEntity() { CompletableFuture f = new CompletableFuture<>(); Source s = Source.from(testData).alsoTo(Sink.onComplete(completeDone(f))); @@ -43,7 +43,7 @@ public class EntityDrainingTestCases extends JUnitSuite { } @Test - public void testHttpResponseDrainEntity() { + public void testHttpResponseDiscardEntity() { CompletableFuture f = new CompletableFuture<>(); Source s = Source.from(testData).alsoTo(Sink.onComplete(completeDone(f))); diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/model/EntityDiscardingSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/model/EntityDiscardingSpec.scala new file mode 100644 index 0000000000..6730d315c3 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/model/EntityDiscardingSpec.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ + +package akka.http.scaladsl.model + +import akka.Done +import akka.http.scaladsl.model.HttpEntity.Chunked +import akka.http.scaladsl.{ Http, TestUtils } +import akka.stream.ActorMaterializer +import akka.stream.scaladsl._ +import akka.testkit.AkkaSpec +import scala.concurrent.duration._ +import akka.util.ByteString + +import scala.concurrent.{ Await, Promise } + +class EntityDiscardingSpec extends AkkaSpec { + + implicit val mat = ActorMaterializer() + + val testData = Vector.tabulate(200)(i ⇒ ByteString(s"row-$i")) + + "HttpRequest" should { + + "discard entity stream after .discardEntityBytes() call" in { + + val p = Promise[Done]() + val s = Source + .fromIterator[ByteString](() ⇒ testData.iterator) + .alsoTo(Sink.onComplete(t ⇒ p.complete(t))) + + val req = HttpRequest(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, s)) + val de = req.discardEntityBytes() + + p.future.futureValue should ===(Done) + de.future.futureValue should ===(Done) + } + } + + "HttpResponse" should { + + "discard entity stream after .discardEntityBytes() call" in { + + val p = Promise[Done]() + val s = Source + .fromIterator[ByteString](() ⇒ testData.iterator) + .alsoTo(Sink.onComplete(t ⇒ p.complete(t))) + + val resp = HttpResponse(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, s)) + val de = resp.discardEntityBytes() + + p.future.futureValue should ===(Done) + de.future.futureValue should ===(Done) + } + + // TODO consider improving this by storing a mutable "already materialized" flag somewhere + // TODO likely this is going to inter-op with the auto-draining as described in #18716 + "should not allow draining a second time" in { + val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() + val bound = Http().bindAndHandleSync( + req ⇒ + HttpResponse(entity = HttpEntity( + ContentTypes.`text/csv(UTF-8)`, Source.fromIterator[ByteString](() ⇒ testData.iterator))), + host, port).futureValue + + try { + + val response = Http().singleRequest(HttpRequest(uri = s"http://$host:$port/")).futureValue + + val de = response.discardEntityBytes() + de.future.futureValue should ===(Done) + + val de2 = response.discardEntityBytes() + val secondRunException = intercept[IllegalStateException] { Await.result(de2.future, 3.seconds) } + secondRunException.getMessage should include("Source cannot be materialized more than once") + } finally bound.unbind().futureValue + } + } + +} diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/model/EntityDrainingSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/model/EntityDrainingSpec.scala deleted file mode 100644 index 47d5cf6af0..0000000000 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/model/EntityDrainingSpec.scala +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Copyright (C) 2009-2016 Lightbend Inc. - */ - -package akka.http.scaladsl.model - -import java.util.concurrent.CompletableFuture - -import akka.Done -import akka.actor.ActorSystem -import akka.japi.function -import akka.stream.ActorMaterializer -import akka.stream.scaladsl._ -import akka.util.ByteString -import org.scalatest.concurrent.ScalaFutures._ -import org.scalatest.{ Matchers, WordSpec } - -import scala.concurrent.Promise -import scala.util.{ Failure, Success, Try } - -class EntityDrainingSpec extends WordSpec with Matchers { - - implicit val sys = ActorSystem("test") - implicit val mat = ActorMaterializer() - - val testData = Vector.tabulate(200)(i ⇒ ByteString(s"row-$i")) - - "HttpRequest" should { - - "drain entity stream after .discardEntityBytes() call" in { - - val p = Promise[Done]() - val s = Source - .fromIterator[ByteString](() ⇒ testData.iterator) - .alsoTo(Sink.onComplete(t ⇒ p.complete(t))) - - val req = HttpRequest(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, s)) - val de = req.discardEntityBytes() - - p.future.futureValue should ===(Done) - de.future.futureValue should ===(Done) - } - } - - "HttpResponse" should { - - "drain entity stream after .discardEntityBytes() call" in { - - val p = Promise[Done]() - val s = Source - .fromIterator[ByteString](() ⇒ testData.iterator) - .alsoTo(Sink.onComplete(t ⇒ p.complete(t))) - - val resp = HttpResponse(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, s)) - val de = resp.discardEntityBytes() - - p.future.futureValue should ===(Done) - de.future.futureValue should ===(Done) - } - } - -}