diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/MediaTypes.java b/akka-http-core/src/main/java/akka/http/javadsl/model/MediaTypes.java index 18b68730bb..8c3324eceb 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/MediaTypes.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/MediaTypes.java @@ -190,7 +190,8 @@ public abstract class MediaTypes { public static MediaType custom(String value, boolean binary, boolean compressible) { akka.http.scaladsl.model.MediaType.Compressibility comp = compressible ? akka.http.scaladsl.model.MediaType.Compressible$.MODULE$ : akka.http.scaladsl.model.MediaType.NotCompressible$.MODULE$; - return akka.http.scaladsl.model.MediaType.custom(value, binary, comp , List.empty()); + return akka.http.scaladsl.model.MediaType.custom(value, binary, comp , + akka.http.scaladsl.model.MediaType.custom$default$4()); } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index be1df51daf..91abc5fb8d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -11,8 +11,9 @@ import akka.stream.actor.ActorSubscriberMessage._ import akka.stream.actor.ActorPublisherMessage import akka.stream.actor.ActorPublisherMessage._ import scala.concurrent.forkjoin.ThreadLocalRandom - import java.{ util ⇒ ju } +import scala.concurrent._ +import akka.dispatch.ExecutionContexts.sameThreadExecutionContext final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Source[T, M], T]] { private val in = Inlet[Source[T, M]]("flatten.in") @@ -70,7 +71,7 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Source val src = q.dequeue() push(out, src.elem) src.elem = null.asInstanceOf[T] - if (src.sub != null) src.sub.pull() + if (src.isActive) src.pull() else removeSource(src) } @@ -98,23 +99,23 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Source def addSource(source: Source[T, M]): Unit = { val localSource = new LocalSource[T]() sources += localSource - val sub = source.runWith(new LocalSink(getAsyncCallback[ActorSubscriberMessage] { + val subF = source.runWith(new LocalSink(getAsyncCallback[ActorSubscriberMessage] { case OnNext(elem) ⇒ val elemT = elem.asInstanceOf[T] if (isAvailable(out)) { push(out, elemT) - localSource.sub.pull() + localSource.pull() } else { localSource.elem = elemT q.enqueue(localSource) } case OnComplete ⇒ - localSource.sub = null + localSource.deactivate() if (localSource.elem == null) removeSource(localSource) case OnError(ex) ⇒ failStage(ex) }.invoke))(interpreter.materializer) - localSource.sub = sub + localSource.activate(subF) } def removeSource(src: LocalSource[T]): Unit = { @@ -125,9 +126,7 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Source } override def postStop(): Unit = { - sources.foreach { src ⇒ - if (src.sub != null) src.sub.cancel() - } + sources.foreach(_.cancel()) } } } @@ -145,29 +144,70 @@ private[fusing] final class LocalSinkSubscription[T](sub: ActorPublisherMessage /** * INTERNAL API */ -private[fusing] final class LocalSource[T](var sub: LocalSinkSubscription[T] = null, var elem: T = null.asInstanceOf[T]) +private[fusing] final class LocalSource[T] { + private var subF: Future[LocalSinkSubscription[T]] = _ + private var sub: LocalSinkSubscription[T] = _ + + var elem: T = null.asInstanceOf[T] + + def isActive: Boolean = sub ne null + def deactivate(): Unit = { + sub = null + subF = null + } + def activate(f: Future[LocalSinkSubscription[T]]): Unit = { + subF = f + /* + * The subscription is communicated to the FlattenMerge stage by way of completing + * the future. Encoding it like this means that the `sub` field will be written + * either by us (if the future has already been completed) or by the LocalSink (when + * it eventually completes the future in its `preStart`). The important part is that + * either way the `sub` field is populated before we get the first `OnNext` message + * and the value is safely published in either case as well (since AsyncCallback is + * based on an Actor message send). + */ + f.foreach(s ⇒ sub = s)(sameThreadExecutionContext) + } + + def pull(): Unit = { + if (sub eq null) + throw new IllegalStateException("not yet initialized, subscription future has " + subF.value) + sub.pull() + } + + def cancel(): Unit = + if (subF ne null) + subF.foreach(_.cancel())(sameThreadExecutionContext) +} /** * INTERNAL API */ -private[fusing] final class LocalSink[T](notifier: ActorSubscriberMessage ⇒ Unit) extends GraphStageWithMaterializedValue[SinkShape[T], LocalSinkSubscription[T]] { +private[fusing] final class LocalSink[T](notifier: ActorSubscriberMessage ⇒ Unit) + extends GraphStageWithMaterializedValue[SinkShape[T], Future[LocalSinkSubscription[T]]] { + private val in = Inlet[T]("LocalSink.in") override val shape = SinkShape(in) - override def createLogicAndMaterializedValue(attr: Attributes) = { - class Logic extends GraphStageLogic(shape) { + + override def createLogicAndMaterializedValue(attr: Attributes): (GraphStageLogic, Future[LocalSinkSubscription[T]]) = { + val sub = Promise[LocalSinkSubscription[T]] + val logic = new GraphStageLogic(shape) { setHandler(in, new InHandler { override def onPush(): Unit = notifier(OnNext(grab(in))) override def onUpstreamFinish(): Unit = notifier(OnComplete) override def onUpstreamFailure(ex: Throwable): Unit = notifier(OnError(ex)) }) - val sub = new LocalSinkSubscription[T](getAsyncCallback[ActorPublisherMessage] { - case Request(1) ⇒ tryPull(in) - case Cancel ⇒ completeStage() - case _ ⇒ - }.invoke) - override def preStart(): Unit = pull(in) + + override def preStart(): Unit = { + pull(in) + sub.success( + new LocalSinkSubscription[T](getAsyncCallback[ActorPublisherMessage] { + case Request(1) ⇒ tryPull(in) + case Cancel ⇒ completeStage() + case _ ⇒ + }.invoke)) + } } - val logic = new Logic - logic -> logic.sub + logic -> sub.future } }