diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntity.java b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntity.java index b58068c86b..0582d29b74 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntity.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntity.java @@ -4,12 +4,12 @@ package akka.http.javadsl.model; +import akka.Done; import akka.http.impl.util.Util; -import akka.http.javadsl.model.headers.EntityTagRanges; -import akka.http.scaladsl.model.HttpEntity$; import akka.stream.Materializer; import akka.stream.javadsl.Source; import akka.util.ByteString; +import scala.concurrent.Future; import java.util.OptionalLong; import java.util.concurrent.CompletionStage; @@ -142,6 +142,43 @@ public interface HttpEntity { */ CompletionStage toStrict(long timeoutMillis, Materializer materializer); + /** + * Discards the entities data bytes by running the {@code dataBytes} Source contained in this entity. + * + * Note: It is crucial that entities are either discarded, or consumed by running the underlying [[Source]] + * as otherwise the lack of consuming of the data will trigger back-pressure to the underlying TCP connection + * (as designed), however possibly leading to an idle-timeout that will close the connection, instead of + * just having ignored the data. + * + * Warning: It is not allowed to discard and/or consume the {@code dataBytes} more than once + * as the stream is directly attached to the "live" incoming data source from the underlying TCP connection. + * Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose + * of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an + * "stream can cannot be materialized more than once" exception. + * + * In future versions, more automatic ways to warn or resolve these situations may be introduced, see issue #18716. + */ + HttpMessage.DiscardedEntity discardBytes(Materializer materializer); + + + /** + * Represents the currently being-drained HTTP Entity which triggers completion of the contained + * Future once the entity has been drained for the given HttpMessage completely. + */ + interface DiscardedEntity { + /** + * This future completes successfully once the underlying entity stream has been + * successfully drained (and fails otherwise). + */ + Future future(); + + /** + * This future completes successfully once the underlying entity stream has been + * successfully drained (and fails otherwise). + */ + CompletionStage completionStage(); + } + /** * The entity type which consists of a predefined fixed ByteString of data. */ diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java index 5f1f1ae812..c78808b99e 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java @@ -4,16 +4,13 @@ package akka.http.javadsl.model; -import akka.Done; import akka.stream.Materializer; import akka.http.javadsl.model.headers.HttpCredentials; import akka.util.ByteString; -import scala.concurrent.Future; import java.io.File; import java.nio.file.Path; import java.util.Optional; -import java.util.concurrent.CompletionStage; /** * The base type for an Http message (request or response). @@ -70,7 +67,7 @@ public interface HttpMessage { * (as designed), however possibly leading to an idle-timeout that will close the connection, instead of * just having ignored the data. * - * Warning: It is not allowed to discard and/or consume the the {@code entity.dataBytes} more than once + * Warning: It is not allowed to discard and/or consume the {@code entity.dataBytes} more than once * as the stream is directly attached to the "live" incoming data source from the underlying TCP connection. * Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose * of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an @@ -81,21 +78,10 @@ public interface HttpMessage { DiscardedEntity discardEntityBytes(Materializer materializer); /** - * Represents the the currently being-drained HTTP Entity which triggers completion of the contained + * Represents the currently being-drained HTTP Entity which triggers completion of the contained * Future once the entity has been drained for the given HttpMessage completely. */ - interface DiscardedEntity { - /** - * This future completes successfully once the underlying entity stream has been - * successfully drained (and fails otherwise). - */ - Future future(); - - /** - * This future completes successfully once the underlying entity stream has been - * successfully drained (and fails otherwise). - */ - CompletionStage completionStage(); + interface DiscardedEntity extends HttpEntity.DiscardedEntity { } interface MessageTransformations { diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala index 6945c35dd4..453927c4e6 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala @@ -10,8 +10,9 @@ import akka.http.impl.model.JavaInitialization import language.implicitConversions import java.io.File -import java.nio.file.{ Path, Files } +import java.nio.file.{ Files, Path } import java.lang.{ Iterable ⇒ JIterable } + import scala.util.control.NonFatal import scala.concurrent.Future import scala.concurrent.duration._ @@ -20,8 +21,8 @@ import akka.util.ByteString import akka.stream.scaladsl._ import akka.stream.stage._ import akka.stream._ -import akka.{ NotUsed, stream } -import akka.http.scaladsl.model.ContentType.{ NonBinary, Binary } +import akka.{ Done, NotUsed, stream } +import akka.http.scaladsl.model.ContentType.{ Binary, NonBinary } import akka.http.scaladsl.util.FastFuture import akka.http.javadsl.{ model ⇒ jm } import akka.http.impl.util.{ JavaMapping, StreamUtils } @@ -31,6 +32,8 @@ import scala.compat.java8.OptionConverters._ import scala.compat.java8.FutureConverters._ import java.util.concurrent.CompletionStage +import scala.compat.java8.FutureConverters + /** * Models the entity (aka "body" or "content) of an HTTP message. */ @@ -72,6 +75,10 @@ sealed trait HttpEntity extends jm.HttpEntity { .via(new akka.http.impl.util.ToStrict(timeout, contentType)) .runWith(Sink.head) + /** Drains entity stream */ + override def discardBytes(mat: Materializer): HttpMessage.DiscardedEntity = + new HttpMessage.DiscardedEntity(dataBytes.runWith(Sink.ignore)(mat)) + /** * Returns a copy of the given entity with the ByteString chunks of this entity transformed by the given transformer. * For a `Chunked` entity, the chunks will be transformed one by one keeping the chunk metadata (but may introduce an @@ -145,6 +152,7 @@ sealed trait HttpEntity extends jm.HttpEntity { /** Java API */ override def toStrict(timeoutMillis: Long, materializer: Materializer): CompletionStage[jm.HttpEntity.Strict] = toStrict(timeoutMillis.millis)(materializer).toJava + } /* An entity that can be used for body parts */ @@ -634,4 +642,44 @@ object HttpEntity { val (newData, whenCompleted) = StreamUtils.captureTermination(x.data) x.copy(data = newData).asInstanceOf[T] → whenCompleted } + + /** + * Represents the currently being-drained HTTP Entity which triggers completion of the contained + * Future once the entity has been drained for the given HttpMessage completely. + */ + final class DiscardedEntity(f: Future[Done]) extends akka.http.javadsl.model.HttpMessage.DiscardedEntity { + /** + * This future completes successfully once the underlying entity stream has been + * successfully drained (and fails otherwise). + */ + def future: Future[Done] = f + + /** + * This future completes successfully once the underlying entity stream has been + * successfully drained (and fails otherwise). + */ + def completionStage: CompletionStage[Done] = FutureConverters.toJava(f) + } + + /** Adds Scala DSL idiomatic methods to [[HttpEntity]], e.g. versions of methods with an implicit [[Materializer]]. */ + implicit final class HttpEntityScalaDSLSugar(val httpEntity: HttpEntity) extends AnyVal { + /** + * Discards the entities data bytes by running the `dataBytes` Source contained in this `entity`. + * + * Note: It is crucial that entities are either discarded, or consumed by running the underlying [[akka.stream.scaladsl.Source]] + * as otherwise the lack of consuming of the data will trigger back-pressure to the underlying TCP connection + * (as designed), however possibly leading to an idle-timeout that will close the connection, instead of + * just having ignored the data. + * + * Warning: It is not allowed to discard and/or consume the `entity.dataBytes` more than once + * as the stream is directly attached to the "live" incoming data source from the underlying TCP connection. + * Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose + * of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an + * "stream can cannot be materialized more than once" exception. + * + * In future versions, more automatic ways to warn or resolve these situations may be introduced, see issue #18716. + */ + def discardBytes()(implicit mat: Materializer): HttpMessage.DiscardedEntity = + httpEntity.discardBytes(mat) + } } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala index 9cf985f448..85a6b45d3c 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala @@ -23,9 +23,7 @@ import akka.util.{ ByteString, HashCode } import akka.http.impl.util._ import akka.http.javadsl.{ model ⇒ jm } import akka.http.scaladsl.util.FastFuture._ -import akka.stream.scaladsl.Sink import headers._ -import akka.http.impl.util.JavaMapping.Implicits._ /** * Common base class of HttpRequest and HttpResponse. @@ -42,8 +40,7 @@ sealed trait HttpMessage extends jm.HttpMessage { def protocol: HttpProtocol /** Drains entity stream */ - def discardEntityBytes(mat: Materializer): HttpMessage.DiscardedEntity = - new HttpMessage.DiscardedEntity(entity.dataBytes.runWith(Sink.ignore)(mat)) + def discardEntityBytes(mat: Materializer): HttpMessage.DiscardedEntity = entity.discardBytes()(mat) /** Returns a copy of this message with the list of headers set to the given ones. */ def withHeaders(headers: HttpHeader*): Self = withHeaders(headers.toList) @@ -151,7 +148,7 @@ object HttpMessage { } /** - * Represents the the currently being-drained HTTP Entity which triggers completion of the contained + * Represents the currently being-drained HTTP Entity which triggers completion of the contained * Future once the entity has been drained for the given HttpMessage completely. */ final class DiscardedEntity(f: Future[Done]) extends akka.http.javadsl.model.HttpMessage.DiscardedEntity { @@ -178,7 +175,7 @@ object HttpMessage { * (as designed), however possibly leading to an idle-timeout that will close the connection, instead of * just having ignored the data. * - * Warning: It is not allowed to discard and/or consume the the `entity.dataBytes` more than once + * Warning: It is not allowed to discard and/or consume the `entity.dataBytes` more than once * as the stream is directly attached to the "live" incoming data source from the underlying TCP connection. * Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose * of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an