From 840feb42ea61193eb580aefff4575987c9a1cdd0 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Fri, 7 Nov 2014 14:39:08 +0100 Subject: [PATCH] =htp #16238 fix RangeDirectives not to assume reusable entity data stream --- .../scala/akka/http/util/StreamUtils.scala | 48 +++++++++++++++++-- .../directives/RangeDirectivesSpec.scala | 29 ++++++++--- .../server/directives/RangeDirectives.scala | 13 ++++- 3 files changed, 78 insertions(+), 12 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index 432d13740b..adb5bf8231 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -4,17 +4,18 @@ package akka.http.util +import java.util.concurrent.atomic.AtomicBoolean +import org.reactivestreams.Subscriber + import akka.http.model.RequestEntity import akka.stream.impl.ErrorPublisher -import akka.stream.Transformer -import akka.stream.FlowMaterializer -import akka.stream.scaladsl.Source +import akka.stream.{ impl, Transformer, FlowMaterializer } +import akka.stream.scaladsl._ import akka.util.ByteString import org.reactivestreams.Publisher import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } -import scala.util.control.NonFatal /** * INTERNAL API @@ -98,8 +99,47 @@ private[http] object StreamUtils { override def onTermination(e: Option[Throwable]): immutable.Seq[ByteString] = currentState.onTermination(e) } + /** + * Applies a sequence of transformers on one source and returns a sequence of sources with the result. The input source + * will only be traversed once. + */ + def transformMultiple[T, U](input: Source[T], transformers: immutable.Seq[() ⇒ Transformer[T, U]])(implicit materializer: FlowMaterializer): immutable.Seq[Source[U]] = + transformers match { + case Nil ⇒ Nil + case Seq(one) ⇒ Vector(input.transform("transformMultipleElement", one)) + case multiple ⇒ + val results = Vector.fill(multiple.size)(Sink.publisher[U]) + val mat = + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + + val broadcast = Broadcast[T]("transformMultipleInputBroadcast") + input ~> broadcast + (multiple, results).zipped.foreach { (trans, sink) ⇒ + broadcast ~> Flow[T].transform("transformMultipleElement", trans) ~> sink + } + }.run() + results.map(s ⇒ Source(mat.get(s))) + } + def mapEntityError(f: Throwable ⇒ Throwable): RequestEntity ⇒ RequestEntity = _.transformDataBytes(() ⇒ mapErrorTransformer(f)) + + /** + * Returns a source that can only be used once for testing purposes. + */ + def oneTimeSource[T](other: Source[T]): Source[T] = { + import akka.stream.impl._ + val original = other.asInstanceOf[ActorFlowSource[T]] + new AtomicBoolean(false) with SimpleActorFlowSource[T] { + override def attach(flowSubscriber: Subscriber[T], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName)._1.subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[T], Unit) = + if (!getAndSet(true)) (original.create(materializer, flowName)._1, ()) + else (ErrorPublisher(new IllegalStateException("One time source can only be instantiated once")).asInstanceOf[Publisher[T]], ()) + } + } } /** diff --git a/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala index 809133ddf9..643fb23fd0 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala @@ -9,6 +9,8 @@ import akka.http.model.StatusCodes._ import akka.http.model._ import akka.http.model.headers._ import akka.http.util._ +import akka.stream.scaladsl.Source +import akka.util.ByteString import org.scalatest.{ Inside, Inspectors } import scala.concurrent.Await @@ -93,7 +95,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside { } } - "return a 'multipart/byteranges' for a ranged request with multiple coalesced ranges with preserved order" in { + "return a 'multipart/byteranges' for a ranged request with multiple coalesced ranges and expect ranges in ascending order" in { Get() ~> addHeader(Range(ByteRange(5, 10), ByteRange(0, 1), ByteRange(1, 2))) ~> { wrs { complete("Some random and not super short entity.") } } ~> check { @@ -101,17 +103,30 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside { val parts = Await.result(responseAs[Multipart.ByteRanges].parts.collectAll, 1.second) parts.size shouldEqual 2 inside(parts(0)) { - case Multipart.ByteRanges.BodyPart(range, entity, unit, headers) ⇒ - range shouldEqual ContentRange.Default(5, 10, Some(39)) - unit shouldEqual RangeUnits.Bytes - Await.result(entity.dataBytes.utf8String, 100.millis) shouldEqual "random" - } - inside(parts(1)) { case Multipart.ByteRanges.BodyPart(range, entity, unit, headers) ⇒ range shouldEqual ContentRange.Default(0, 2, Some(39)) unit shouldEqual RangeUnits.Bytes Await.result(entity.dataBytes.utf8String, 100.millis) shouldEqual "Som" } + inside(parts(1)) { + case Multipart.ByteRanges.BodyPart(range, entity, unit, headers) ⇒ + range shouldEqual ContentRange.Default(5, 10, Some(39)) + unit shouldEqual RangeUnits.Bytes + Await.result(entity.dataBytes.utf8String, 100.millis) shouldEqual "random" + } + } + } + + "return a 'multipart/byteranges' for a ranged request with multiple ranges if entity data source isn't reusable" in { + val content = "Some random and not super short entity." + def entityData() = StreamUtils.oneTimeSource(Source.singleton(ByteString(content))) + + Get() ~> addHeader(Range(ByteRange(5, 10), ByteRange(0, 1), ByteRange(1, 2))) ~> { + wrs { complete(HttpEntity.Default(MediaTypes.`text/plain`, content.length, entityData())) } + } ~> check { + header[`Content-Range`] should be(None) + val parts = Await.result(responseAs[Multipart.ByteRanges].parts.collectAll, 1.second) + parts.size shouldEqual 2 } } diff --git a/akka-http/src/main/scala/akka/http/server/directives/RangeDirectives.scala b/akka-http/src/main/scala/akka/http/server/directives/RangeDirectives.scala index 4990f0f083..e38c584122 100644 --- a/akka-http/src/main/scala/akka/http/server/directives/RangeDirectives.scala +++ b/akka-http/src/main/scala/akka/http/server/directives/RangeDirectives.scala @@ -34,6 +34,7 @@ trait RangeDirectives { */ def withRangeSupport: Directive0 = extractRequestContext.flatMap { ctx ⇒ + import ctx.flowMaterializer val settings = ctx.settings implicit val log = ctx.log import settings.{ rangeCountLimit, rangeCoalescingThreshold } @@ -66,7 +67,17 @@ trait RangeDirectives { def multipartRanges(ranges: Seq[ByteRange], entity: UniversalEntity): Multipart.ByteRanges = { val length = entity.contentLength val iRanges: Seq[IndexRange] = ranges.map(indexRange(length)) - val bodyParts = coalesceRanges(iRanges).map(ir ⇒ Multipart.ByteRanges.BodyPart(ir.contentRange(length), ir(entity))) + + // It's only possible to run once over the input entity data stream because it's not known if the + // source is reusable. + // Therefore, ranges need to be sorted to prevent that some selected ranges already start to accumulate data + // but cannot be sent out because another range is blocking the queue. + val coalescedRanges = coalesceRanges(iRanges).sortBy(_.start) + val bodyPartTransformers = coalescedRanges.map(ir ⇒ () ⇒ StreamUtils.sliceBytesTransformer(ir.start, ir.length)).toVector + val bodyPartByteStreams = StreamUtils.transformMultiple(entity.dataBytes, bodyPartTransformers) + val bodyParts = (coalescedRanges, bodyPartByteStreams).zipped.map { (range, bytes) ⇒ + Multipart.ByteRanges.BodyPart(range.contentRange(length), HttpEntity(entity.contentType, range.length, bytes)) + } Multipart.ByteRanges(Source(bodyParts.toVector)) }