=htc #20921 add discardBytes method to HttpEntity (#20925)

add DiscardedEntity interface inside HttpEntity
This commit is contained in:
Nafer Sanabria 2016-07-26 06:42:56 -05:00 committed by Konrad Malawski
parent 15c77e3392
commit da79af329f
4 changed files with 96 additions and 28 deletions

View file

@ -4,12 +4,12 @@
package akka.http.javadsl.model;
import akka.Done;
import akka.http.impl.util.Util;
import akka.http.javadsl.model.headers.EntityTagRanges;
import akka.http.scaladsl.model.HttpEntity$;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import scala.concurrent.Future;
import java.util.OptionalLong;
import java.util.concurrent.CompletionStage;
@ -142,6 +142,43 @@ public interface HttpEntity {
*/
CompletionStage<HttpEntity.Strict> toStrict(long timeoutMillis, Materializer materializer);
/**
* Discards the entities data bytes by running the {@code dataBytes} Source contained in this entity.
*
* Note: It is crucial that entities are either discarded, or consumed by running the underlying [[Source]]
* as otherwise the lack of consuming of the data will trigger back-pressure to the underlying TCP connection
* (as designed), however possibly leading to an idle-timeout that will close the connection, instead of
* just having ignored the data.
*
* Warning: It is not allowed to discard and/or consume the {@code dataBytes} more than once
* as the stream is directly attached to the "live" incoming data source from the underlying TCP connection.
* Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose
* of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an
* "stream can cannot be materialized more than once" exception.
*
* In future versions, more automatic ways to warn or resolve these situations may be introduced, see issue #18716.
*/
HttpMessage.DiscardedEntity discardBytes(Materializer materializer);
/**
* Represents the currently being-drained HTTP Entity which triggers completion of the contained
* Future once the entity has been drained for the given HttpMessage completely.
*/
interface DiscardedEntity {
/**
* This future completes successfully once the underlying entity stream has been
* successfully drained (and fails otherwise).
*/
Future<Done> future();
/**
* This future completes successfully once the underlying entity stream has been
* successfully drained (and fails otherwise).
*/
CompletionStage<Done> completionStage();
}
/**
* The entity type which consists of a predefined fixed ByteString of data.
*/

View file

@ -4,16 +4,13 @@
package akka.http.javadsl.model;
import akka.Done;
import akka.stream.Materializer;
import akka.http.javadsl.model.headers.HttpCredentials;
import akka.util.ByteString;
import scala.concurrent.Future;
import java.io.File;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
/**
* The base type for an Http message (request or response).
@ -70,7 +67,7 @@ public interface HttpMessage {
* (as designed), however possibly leading to an idle-timeout that will close the connection, instead of
* just having ignored the data.
*
* Warning: It is not allowed to discard and/or consume the the {@code entity.dataBytes} more than once
* Warning: It is not allowed to discard and/or consume the {@code entity.dataBytes} more than once
* as the stream is directly attached to the "live" incoming data source from the underlying TCP connection.
* Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose
* of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an
@ -81,21 +78,10 @@ public interface HttpMessage {
DiscardedEntity discardEntityBytes(Materializer materializer);
/**
* Represents the the currently being-drained HTTP Entity which triggers completion of the contained
* Represents the currently being-drained HTTP Entity which triggers completion of the contained
* Future once the entity has been drained for the given HttpMessage completely.
*/
interface DiscardedEntity {
/**
* This future completes successfully once the underlying entity stream has been
* successfully drained (and fails otherwise).
*/
Future<Done> future();
/**
* This future completes successfully once the underlying entity stream has been
* successfully drained (and fails otherwise).
*/
CompletionStage<Done> completionStage();
interface DiscardedEntity extends HttpEntity.DiscardedEntity {
}
interface MessageTransformations<Self> {

View file

@ -10,8 +10,9 @@ import akka.http.impl.model.JavaInitialization
import language.implicitConversions
import java.io.File
import java.nio.file.{ Path, Files }
import java.nio.file.{ Files, Path }
import java.lang.{ Iterable JIterable }
import scala.util.control.NonFatal
import scala.concurrent.Future
import scala.concurrent.duration._
@ -20,8 +21,8 @@ import akka.util.ByteString
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.stream._
import akka.{ NotUsed, stream }
import akka.http.scaladsl.model.ContentType.{ NonBinary, Binary }
import akka.{ Done, NotUsed, stream }
import akka.http.scaladsl.model.ContentType.{ Binary, NonBinary }
import akka.http.scaladsl.util.FastFuture
import akka.http.javadsl.{ model jm }
import akka.http.impl.util.{ JavaMapping, StreamUtils }
@ -31,6 +32,8 @@ import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters
/**
* Models the entity (aka "body" or "content) of an HTTP message.
*/
@ -72,6 +75,10 @@ sealed trait HttpEntity extends jm.HttpEntity {
.via(new akka.http.impl.util.ToStrict(timeout, contentType))
.runWith(Sink.head)
/** Drains entity stream */
override def discardBytes(mat: Materializer): HttpMessage.DiscardedEntity =
new HttpMessage.DiscardedEntity(dataBytes.runWith(Sink.ignore)(mat))
/**
* Returns a copy of the given entity with the ByteString chunks of this entity transformed by the given transformer.
* For a `Chunked` entity, the chunks will be transformed one by one keeping the chunk metadata (but may introduce an
@ -145,6 +152,7 @@ sealed trait HttpEntity extends jm.HttpEntity {
/** Java API */
override def toStrict(timeoutMillis: Long, materializer: Materializer): CompletionStage[jm.HttpEntity.Strict] =
toStrict(timeoutMillis.millis)(materializer).toJava
}
/* An entity that can be used for body parts */
@ -634,4 +642,44 @@ object HttpEntity {
val (newData, whenCompleted) = StreamUtils.captureTermination(x.data)
x.copy(data = newData).asInstanceOf[T] whenCompleted
}
/**
* Represents the currently being-drained HTTP Entity which triggers completion of the contained
* Future once the entity has been drained for the given HttpMessage completely.
*/
final class DiscardedEntity(f: Future[Done]) extends akka.http.javadsl.model.HttpMessage.DiscardedEntity {
/**
* This future completes successfully once the underlying entity stream has been
* successfully drained (and fails otherwise).
*/
def future: Future[Done] = f
/**
* This future completes successfully once the underlying entity stream has been
* successfully drained (and fails otherwise).
*/
def completionStage: CompletionStage[Done] = FutureConverters.toJava(f)
}
/** Adds Scala DSL idiomatic methods to [[HttpEntity]], e.g. versions of methods with an implicit [[Materializer]]. */
implicit final class HttpEntityScalaDSLSugar(val httpEntity: HttpEntity) extends AnyVal {
/**
* Discards the entities data bytes by running the `dataBytes` Source contained in this `entity`.
*
* Note: It is crucial that entities are either discarded, or consumed by running the underlying [[akka.stream.scaladsl.Source]]
* as otherwise the lack of consuming of the data will trigger back-pressure to the underlying TCP connection
* (as designed), however possibly leading to an idle-timeout that will close the connection, instead of
* just having ignored the data.
*
* Warning: It is not allowed to discard and/or consume the `entity.dataBytes` more than once
* as the stream is directly attached to the "live" incoming data source from the underlying TCP connection.
* Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose
* of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an
* "stream can cannot be materialized more than once" exception.
*
* In future versions, more automatic ways to warn or resolve these situations may be introduced, see issue #18716.
*/
def discardBytes()(implicit mat: Materializer): HttpMessage.DiscardedEntity =
httpEntity.discardBytes(mat)
}
}

View file

@ -23,9 +23,7 @@ import akka.util.{ ByteString, HashCode }
import akka.http.impl.util._
import akka.http.javadsl.{ model jm }
import akka.http.scaladsl.util.FastFuture._
import akka.stream.scaladsl.Sink
import headers._
import akka.http.impl.util.JavaMapping.Implicits._
/**
* Common base class of HttpRequest and HttpResponse.
@ -42,8 +40,7 @@ sealed trait HttpMessage extends jm.HttpMessage {
def protocol: HttpProtocol
/** Drains entity stream */
def discardEntityBytes(mat: Materializer): HttpMessage.DiscardedEntity =
new HttpMessage.DiscardedEntity(entity.dataBytes.runWith(Sink.ignore)(mat))
def discardEntityBytes(mat: Materializer): HttpMessage.DiscardedEntity = entity.discardBytes()(mat)
/** Returns a copy of this message with the list of headers set to the given ones. */
def withHeaders(headers: HttpHeader*): Self = withHeaders(headers.toList)
@ -151,7 +148,7 @@ object HttpMessage {
}
/**
* Represents the the currently being-drained HTTP Entity which triggers completion of the contained
* Represents the currently being-drained HTTP Entity which triggers completion of the contained
* Future once the entity has been drained for the given HttpMessage completely.
*/
final class DiscardedEntity(f: Future[Done]) extends akka.http.javadsl.model.HttpMessage.DiscardedEntity {
@ -178,7 +175,7 @@ object HttpMessage {
* (as designed), however possibly leading to an idle-timeout that will close the connection, instead of
* just having ignored the data.
*
* Warning: It is not allowed to discard and/or consume the the `entity.dataBytes` more than once
* Warning: It is not allowed to discard and/or consume the `entity.dataBytes` more than once
* as the stream is directly attached to the "live" incoming data source from the underlying TCP connection.
* Allowing it to be consumable twice would require buffering the incoming data, thus defeating the purpose
* of its streaming nature. If the dataBytes source is materialized a second time, it will fail with an