diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 30d8cda9aa..7e0885e5c8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -3,15 +3,22 @@ */ package akka.stream.scaladsl +import java.util + import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.Attributes._ +import akka.stream.impl.SinkModule +import akka.stream.impl.StreamLayout.Module +import akka.util.ByteString +import scala.annotation.tailrec +import scala.concurrent.{ Promise, Await } import scala.concurrent.duration._ -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings +import akka.stream._ import akka.stream.Supervision.resumingDecider import akka.stream.testkit._ import akka.stream.testkit.Utils._ import org.reactivestreams.Publisher -import akka.stream.ActorAttributes import org.scalatest.concurrent.ScalaFutures import org.scalactic.ConversionCheckedTripleEquals import org.scalatest.concurrent.PatienceConfiguration.Timeout @@ -19,6 +26,8 @@ import akka.stream.testkit.scaladsl.TestSource import akka.stream.testkit.scaladsl.TestSink import akka.testkit.AkkaSpec +import scala.concurrent.forkjoin.ThreadLocalRandom + object FlowGroupBySpec { implicit class Lift[M](val f: SubFlow[Int, M, Source[Int, M]#Repr, RunnableGraph[M]]) extends AnyVal { @@ -70,6 +79,12 @@ class FlowGroupBySpec extends AkkaSpec { } + def randomByteString(size: Int): ByteString = { + val a = new Array[Byte](size) + ThreadLocalRandom.current().nextBytes(a) + ByteString(a) + } + "groupBy" must { "work in the happy case" in assertAllStagesStopped { new SubstreamsSupport(groupCount = 2) { @@ -120,6 +135,18 @@ class FlowGroupBySpec extends AkkaSpec { .sortBy(_.head) should ===(List(List("Aaa", "Abb"), List("Bcc"), List("Cdd", "Cee"))) } + "fail when key function return null" in { + val down = Source(List("Aaa", "Abb", "Bcc", "Cdd", "Cee")) + .groupBy(3, e ⇒ if (e.startsWith("A")) null else e.substring(0, 1)) + .grouped(10) + .mergeSubstreams + .runWith(TestSink.probe[Seq[String]]) + down.request(1) + val ex = down.expectError() + ex.getMessage.indexOf("Key cannot be null") should not be (-1) + ex.isInstanceOf[IllegalArgumentException] should be(true) + } + "accept cancellation of substreams" in assertAllStagesStopped { new SubstreamsSupport(groupCount = 2) { StreamPuppet(getSubFlow(1).runWith(Sink.asPublisher(false))).cancel() @@ -309,6 +336,137 @@ class FlowGroupBySpec extends AkkaSpec { s1.expectError(ex) } + "emit subscribe before completed" in assertAllStagesStopped { + val futureGroupSource = + Source.single(0) + .groupBy(1, elem ⇒ "all") + .prefixAndTail(0) + .map(_._2) + .concatSubstreams + .runWith(Sink.head) + val pub: Publisher[Int] = Await.result(futureGroupSource, 3.seconds).runWith(Sink.asPublisher(false)) + val probe = TestSubscriber.manualProbe[Int]() + pub.subscribe(probe) + val sub = probe.expectSubscription() + sub.request(1) + probe.expectNext(0) + probe.expectComplete() + + } + + "work under fuzzing stress test" in assertAllStagesStopped { + val publisherProbe = TestPublisher.manualProbe[ByteString]() + val subscriber = TestSubscriber.manualProbe[ByteString]() + + val publisher = Source.fromPublisher[ByteString](publisherProbe) + .groupBy(256, elem ⇒ elem.head).map(_.reverse).mergeSubstreams + .groupBy(256, elem ⇒ elem.head).map(_.reverse).mergeSubstreams + .runWith(Sink.asPublisher(false)) + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbe.expectSubscription() + val downstreamSubscription = subscriber.expectSubscription() + + downstreamSubscription.request(300) + for (i ← 1 to 300) { + val byteString = randomByteString(10) + upstreamSubscription.expectRequest() + upstreamSubscription.sendNext(byteString) + subscriber.expectNext() should ===(byteString) + } + upstreamSubscription.sendComplete() + } + + "work with random demand" in assertAllStagesStopped { + val mat = ActorMaterializer(ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 1, maxSize = 1)) + + var blockingNextElement: ByteString = null.asInstanceOf[ByteString] + + val probes = new java.util.ArrayList[Promise[TestSubscriber.Probe[ByteString]]](100) + (0 to 99).foreach(_ ⇒ probes.add(Promise[TestSubscriber.Probe[ByteString]]())) + + var probesWriterTop = 0 + var probesReaderTop = 0 + + case class SubFlowState(probe: TestSubscriber.Probe[ByteString], hasDemand: Boolean, firstElement: ByteString) + val map = new util.HashMap[Int, SubFlowState]() + + final class ProbeSink(val attributes: Attributes, shape: SinkShape[ByteString])(implicit system: ActorSystem) extends SinkModule[ByteString, TestSubscriber.Probe[ByteString]](shape) { + override def create(context: MaterializationContext) = { + val promise = probes.get(probesWriterTop) + val probe = TestSubscriber.probe[ByteString]() + promise.success(probe) + probesWriterTop += 1 + (probe, probe) + } + override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, TestSubscriber.Probe[ByteString]] = new ProbeSink(attributes, shape) + override def withAttributes(attr: Attributes): Module = new ProbeSink(attr, amendShape(attr)) + } + + @tailrec + def randomDemand(): Unit = { + val nextIndex = ThreadLocalRandom.current().nextInt(0, map.size()) + val key = new util.ArrayList(map.keySet()).get(nextIndex) + if (!map.get(key).hasDemand) { + val state = map.get(key) + map.put(key, SubFlowState(state.probe, true, state.firstElement)) + + state.probe.request(1) + + //need to verify elements that are first element in subFlow or is in nextElement buffer before + // pushing next element from upstream + if (state.firstElement != null) { + state.probe.expectNext() should ===(state.firstElement) + map.put(key, SubFlowState(state.probe, false, null)) + randomDemand() + } else if (blockingNextElement != null && Math.abs(blockingNextElement.head % 100) == key) { + state.probe.expectNext() should ===(blockingNextElement) + blockingNextElement = null + map.put(key, SubFlowState(state.probe, false, null)) + randomDemand() + } else if (blockingNextElement != null) randomDemand() + } else randomDemand() + } + + val publisherProbe = TestPublisher.manualProbe[ByteString]() + Source.fromPublisher[ByteString](publisherProbe) + .groupBy(100, elem ⇒ Math.abs(elem.head % 100)).to(new Sink(new ProbeSink(none, SinkShape(Inlet("ProbeSink.in"))))).run()(mat) + + val upstreamSubscription = publisherProbe.expectSubscription() + + for (i ← 1 to 400) { + val byteString = randomByteString(10) + val index = Math.abs(byteString.head % 100) + + upstreamSubscription.expectRequest() + upstreamSubscription.sendNext(byteString) + + if (map.get(index) == null) { + val probe: TestSubscriber.Probe[ByteString] = Await.result(probes.get(probesReaderTop).future, 300.millis) + probesReaderTop += 1 + map.put(index, SubFlowState(probe, false, byteString)) + //stream automatically requests next element + } else { + val state = map.get(index) + if (state.firstElement != null) { //first element in subFlow + if (!state.hasDemand) blockingNextElement = byteString + randomDemand() + } else if (state.hasDemand) { + if (blockingNextElement == null) { + state.probe.expectNext() should ===(byteString) + map.put(index, SubFlowState(state.probe, false, null)) + randomDemand() + } else fail("INVALID CASE") + } else { + blockingNextElement = byteString + randomDemand() + } + } + } + upstreamSubscription.sendComplete() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index e4f3bcdd0a..ebb2454597 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -112,11 +112,10 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]]) matVal.put(atomic, mat) - // FIXME: Remove this, only stream-of-stream ops need it - case stage: StageModule ⇒ - val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes)) + case stage: ProcessorModule[_, _, _] ⇒ + val (processor, mat) = stage.createProcessor() assignPort(stage.inPort, processor) - assignPort(stage.outPort, processor) + assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]]) matVal.put(atomic, mat) case tls: TlsModule ⇒ // TODO solve this so TlsModule doesn't need special treatment here @@ -175,16 +174,6 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, } } - // FIXME: Remove this, only stream-of-stream ops need it - private def processorFor(op: StageModule, - effectiveAttributes: Attributes, - effectiveSettings: ActorMaterializerSettings): (Processor[Any, Any], Any) = op match { - case DirectProcessor(processorFactory, _) ⇒ processorFactory() - case _ ⇒ - val (opprops, mat) = ActorProcessorFactory.props(ActorMaterializerImpl.this, op, effectiveAttributes) - ActorProcessorFactory[Any, Any]( - actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) -> mat - } } session.materialize().asInstanceOf[Mat] @@ -294,27 +283,3 @@ private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveSh override def postStop(): Unit = haveShutDown.set(true) } -/** - * INTERNAL API - */ -private[akka] object ActorProcessorFactory { - import akka.stream.impl.Stages._ - - def props(materializer: ActorMaterializer, op: StageModule, parentAttributes: Attributes): (Props, Any) = { - val att = parentAttributes and op.attributes - // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW - // Also, otherwise the attributes will not affect the settings properly! - val settings = materializer.effectiveSettings(att) - op match { - case GroupBy(maxSubstreams, f, _) ⇒ (GroupByProcessorImpl.props(settings, maxSubstreams, f), ()) - case DirectProcessor(p, m) ⇒ throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory") - } - } - - def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { - val p = new ActorProcessor[I, O](impl) - // Resolve cyclic dependency with actor. This MUST be the first message no matter what. - impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]]) - p - } -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala deleted file mode 100644 index 0188d83898..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Copyright (C) 2015-2016 Lightbend Inc. - */ -package akka.stream.impl - -import akka.stream._ -import akka.stream.impl.StreamLayout.Module -import akka.event.Logging - -/** - * INTERNAL API - */ -private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.AtomicModule { - override def replaceShape(s: Shape) = - if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") - else this - - val inPort = Inlet[In]("Flow.in") - val outPort = Outlet[Out]("Flow.out") - override val shape = new FlowShape(inPort, outPort) - - protected def label: String = Logging.simpleName(this) - final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]" -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala deleted file mode 100644 index 95e3cf14ce..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Copyright (C) 2009-2016 Lightbend Inc. - */ -package akka.stream.impl - -import scala.util.control.NonFatal -import akka.actor.{ Deploy, Props } -import akka.stream.ActorMaterializerSettings -import akka.stream.Supervision -import akka.stream.scaladsl.Source - -/** - * INTERNAL API - */ -private[akka] object GroupByProcessorImpl { - def props(settings: ActorMaterializerSettings, maxSubstreams: Int, keyFor: Any ⇒ Any): Props = - Props(new GroupByProcessorImpl(settings, maxSubstreams, keyFor)).withDeploy(Deploy.local) - - private case object Drop -} - -/** - * INTERNAL API - */ -private[akka] class GroupByProcessorImpl(settings: ActorMaterializerSettings, val maxSubstreams: Int, val keyFor: Any ⇒ Any) - extends MultiStreamOutputProcessor(settings) { - - import MultiStreamOutputProcessor._ - import GroupByProcessorImpl.Drop - - val decider = settings.supervisionDecider - val keyToSubstreamOutput = collection.mutable.Map.empty[Any, SubstreamOutput] - - var pendingSubstreamOutput: SubstreamOutput = _ - - // No substream is open yet. If downstream cancels now, we are complete - val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ - val elem = primaryInputs.dequeueInputElement() - tryKeyFor(elem) match { - case Drop ⇒ - case key ⇒ nextPhase(openSubstream(elem, key)) - } - } - - // some substreams are open now. If downstream cancels, we still continue until the substreams are closed - val waitNext = TransferPhase(primaryInputs.NeedsInput) { () ⇒ - val elem = primaryInputs.dequeueInputElement() - tryKeyFor(elem) match { - case Drop ⇒ - case key ⇒ - keyToSubstreamOutput.get(key) match { - case Some(substream) if substream.isOpen ⇒ nextPhase(dispatchToSubstream(elem, keyToSubstreamOutput(key))) - case None if primaryOutputs.isOpen ⇒ nextPhase(openSubstream(elem, key)) - case _ ⇒ // stay - } - } - } - - private def tryKeyFor(elem: Any): Any = - try keyFor(elem) catch { - case NonFatal(e) if decider(e) != Supervision.Stop ⇒ - if (settings.debugLogging) - log.debug("Dropped element [{}] due to exception from groupBy function: {}", elem, e.getMessage) - Drop - } - - def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemandOrCancel) { () ⇒ - if (primaryOutputs.isClosed) { - // Just drop, we do not open any more substreams - nextPhase(waitNext) - } else { - if (keyToSubstreamOutput.size == maxSubstreams) - throw new IllegalStateException(s"cannot open substream for key '$key': too many substreams open") - val substreamOutput = createSubstreamOutput() - val substreamFlow = Source.fromPublisher[Any](substreamOutput) - primaryOutputs.enqueueOutputElement(substreamFlow) - keyToSubstreamOutput(key) = substreamOutput - nextPhase(dispatchToSubstream(elem, substreamOutput)) - } - } - - def dispatchToSubstream(elem: Any, substream: SubstreamOutput): TransferPhase = { - pendingSubstreamOutput = substream - TransferPhase(substream.NeedsDemand) { () ⇒ - substream.enqueueOutputElement(elem) - pendingSubstreamOutput = null - nextPhase(waitNext) - } - } - - initialPhase(1, waitFirst) - - override def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { - if ((pendingSubstreamOutput ne null) && substream == pendingSubstreamOutput.key) { - pendingSubstreamOutput = null - nextPhase(waitNext) - } - super.invalidateSubstreamOutput(substream) - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 3929c9f39c..3fb9bbe8ab 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -5,7 +5,7 @@ package akka.stream.impl import akka.stream.impl.QueueSink.{ Output, Pull } import akka.{ Done, NotUsed } -import akka.actor.{ ActorRef, Props } +import akka.actor.{ ActorRef, Actor, Props } import akka.stream.Attributes.InputBuffer import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes @@ -100,10 +100,12 @@ private[akka] final class FanoutPublisherSink[In]( override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = { val actorMaterializer = ActorMaterializer.downcast(context.materializer) - val fanoutProcessor = ActorProcessorFactory[In, In]( - actorMaterializer.actorOf( - context, - FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(attributes)))) + val impl = actorMaterializer.actorOf( + context, + FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(attributes))) + val fanoutProcessor = new ActorProcessor[In, In](impl) + impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]]) + // Resolve cyclic dependency with actor. This MUST be the first message no matter what. (fanoutProcessor, fanoutProcessor) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index fad770ed7c..b943026fe7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -134,12 +134,6 @@ private[stream] object Stages { import DefaultAttributes._ - // FIXME: To be deprecated as soon as stream-of-stream operations are stages - sealed trait StageModule extends FlowModule[Any, Any, Any] { - def withAttributes(attributes: Attributes): StageModule - override def carbonCopy: Module = withAttributes(attributes) - } - /* * Stage that is backed by a GraphStage but can be symbolically introspected */ @@ -189,14 +183,4 @@ private[stream] object Stages { override def create(attr: Attributes): Stage[T, T] = fusing.Buffer(size, overflowStrategy) } - // FIXME: These are not yet proper stages, therefore they use the deprecated StageModule infrastructure - - final case class GroupBy(maxSubstreams: Int, f: Any ⇒ Any, attributes: Attributes = groupBy) extends StageModule { - override def withAttributes(attributes: Attributes) = copy(attributes = attributes) - override protected def label: String = s"GroupBy($maxSubstreams)" - } - - final case class DirectProcessor(p: () ⇒ (Processor[Any, Any], Any), attributes: Attributes = processor) extends StageModule { - override def withAttributes(attributes: Attributes) = copy(attributes = attributes) - } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 0fa9bb9047..d91a783f4a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -7,6 +7,7 @@ import java.util.concurrent.atomic.{ AtomicReference } import java.{ util ⇒ ju } import akka.NotUsed import akka.stream.impl.MaterializerSession.MaterializationPanic +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.scaladsl.Keep import akka.stream._ @@ -935,3 +936,20 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo } } + +/** + * INTERNAL API + */ +private[akka] final case class ProcessorModule[In, Out, Mat](val createProcessor: () ⇒ (Processor[In, Out], Mat), + attributes: Attributes = DefaultAttributes.processor) extends StreamLayout.AtomicModule { + val inPort = Inlet[In]("ProcessorModule.in") + val outPort = Outlet[Out]("ProcessorModule.out") + override val shape = new FlowShape(inPort, outPort) + + override def replaceShape(s: Shape) = if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") + else this + + override def withAttributes(attributes: Attributes) = copy(attributes = attributes) + override def carbonCopy: Module = withAttributes(attributes) + override def toString: String = f"ProcessorModule [${System.identityHashCode(this)}%08x]" +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala deleted file mode 100644 index e038b398e6..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ /dev/null @@ -1,226 +0,0 @@ -/** - * Copyright (C) 2009-2016 Lightbend Inc. - */ -package akka.stream.impl - -import java.util.concurrent.atomic.AtomicReference -import akka.actor._ -import akka.stream.ActorMaterializerSettings -import org.reactivestreams.{ Publisher, Subscriber, Subscription } -import scala.collection.mutable - -/** - * INTERNAL API - */ -private[akka] object MultiStreamOutputProcessor { - final case class SubstreamKey(id: Long) - final case class SubstreamRequestMore(substream: SubstreamKey, demand: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded - final case class SubstreamCancel(substream: SubstreamKey) extends DeadLetterSuppression with NoSerializationVerificationNeeded - final case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any]) extends DeadLetterSuppression with NoSerializationVerificationNeeded - final case class SubstreamSubscriptionTimeout(substream: SubstreamKey) extends DeadLetterSuppression with NoSerializationVerificationNeeded - - class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription { - override def request(elements: Long): Unit = parent ! SubstreamRequestMore(substreamKey, elements) - override def cancel(): Unit = parent ! SubstreamCancel(substreamKey) - override def toString = "SubstreamSubscription" + System.identityHashCode(this) - } - - object SubstreamOutput { - sealed trait PublisherState - sealed trait CompletedState extends PublisherState - case object Open extends PublisherState - final case class Attached(sub: Subscriber[Any]) extends PublisherState - case object Completed extends CompletedState - case object Cancelled extends CompletedState - final case class Failed(e: Throwable) extends CompletedState - } - - class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: Cancellable) - extends SimpleOutputs(actor, pump) with Publisher[Any] { - import ReactiveStreamsCompliance._ - - import SubstreamOutput._ - - private val subscription = new SubstreamSubscription(actor, key) - private val state = new AtomicReference[PublisherState](Open) - - override def subreceive: SubReceive = - throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block") - - def isAttached = state.get().isInstanceOf[Attached] - - def enqueueOutputDemand(demand: Long): Unit = { - downstreamDemand += demand - pump.pump() - } - - override def error(e: Throwable): Unit = { - if (!downstreamCompleted) { - closePublisher(Failed(e)) - downstreamCompleted = true - } - } - - override def cancel(): Unit = { - if (!downstreamCompleted) { - closePublisher(Cancelled) - downstreamCompleted = true - } - } - - override def complete(): Unit = { - if (!downstreamCompleted) { - closePublisher(Completed) - downstreamCompleted = true - } - } - - private def closePublisher(withState: CompletedState): Unit = { - subscriptionTimeout.cancel() - state.getAndSet(withState) match { - case _: CompletedState ⇒ throw new IllegalStateException("Attempted to double shutdown publisher") - case Attached(sub) ⇒ - if (subscriber eq null) tryOnSubscribe(sub, CancelledSubscription) - closeSubscriber(sub, withState) - case Open ⇒ // No action needed - } - } - - private def closeSubscriber(s: Subscriber[Any], withState: CompletedState): Unit = withState match { - case Completed ⇒ tryOnComplete(s) - case Cancelled ⇒ // nothing to do - case Failed(e: SpecViolation) ⇒ // nothing to do - case Failed(e) ⇒ tryOnError(s, e) - } - - override def subscribe(s: Subscriber[_ >: Any]): Unit = { - requireNonNullSubscriber(s) - subscriptionTimeout.cancel() - if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s) - else { - state.get() match { - case _: Attached | Cancelled ⇒ - rejectAdditionalSubscriber(s, "Substream publisher") - case c: CompletedState ⇒ - tryOnSubscribe(s, CancelledSubscription) - closeSubscriber(s, c) - case Open ⇒ - throw new IllegalStateException("Publisher cannot become open after being used before") - } - } - } - - def attachSubscriber(s: Subscriber[Any]): Unit = - if (subscriber eq null) { - subscriber = s - tryOnSubscribe(subscriber, subscription) - } else - rejectAdditionalSubscriber(s, "Substream publisher") - } -} - -/** - * INTERNAL API - */ -private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubscriptionTimeoutSupport { - this: Actor with ActorLogging ⇒ - - import MultiStreamOutputProcessor._ - import StreamSubscriptionTimeoutSupport._ - - protected def nextId(): Long - - // stream keys will be removed from this map on cancellation/subscription-timeout, never assume a key is present - private val substreamOutputs = mutable.Map.empty[SubstreamKey, SubstreamOutput] - - protected def createSubstreamOutput(): SubstreamOutput = { - val id = SubstreamKey(nextId()) - val cancellable = scheduleSubscriptionTimeout(self, SubstreamSubscriptionTimeout(id)) - val output = new SubstreamOutput(id, self, this, cancellable) - substreamOutputs(output.key) = output - output - } - - protected def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { - cancelSubstreamOutput(substream) - pump() - } - - protected def cancelSubstreamOutput(substream: SubstreamKey): Unit = { - substreamOutputs.get(substream) match { - case Some(sub) ⇒ - sub.cancel() - substreamOutputs -= substream - case _ ⇒ // ignore, already completed... - } - } - - protected def completeSubstreamOutput(substream: SubstreamKey): Unit = { - substreamOutputs.get(substream) match { - case Some(sub) ⇒ - sub.complete() - substreamOutputs -= substream - case _ ⇒ // ignore, already completed... - } - } - - protected def failOutputs(e: Throwable): Unit = { - substreamOutputs.values foreach (_.error(e)) - } - - protected def finishOutputs(): Unit = { - substreamOutputs.values foreach (_.complete()) - } - - val outputSubstreamManagement: Receive = { - case SubstreamRequestMore(key, demand) ⇒ - substreamOutputs.get(key) match { - case Some(sub) ⇒ - if (demand < 1) // According to Reactive Streams Spec 3.9, with non-positive demand must yield onError - sub.error(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException) - else - sub.enqueueOutputDemand(demand) - case _ ⇒ // ignore... - } - case SubstreamSubscribe(key, subscriber) ⇒ substreamOutputs.get(key) match { - case Some(sub) ⇒ sub.attachSubscriber(subscriber) - case _ ⇒ // ignore... - } - case SubstreamSubscriptionTimeout(key) ⇒ substreamOutputs.get(key) match { - case Some(sub) if !sub.isAttached ⇒ subscriptionTimedOut(sub) - case _ ⇒ // ignore... - } - case SubstreamCancel(key) ⇒ - invalidateSubstreamOutput(key) - } - - override protected def handleSubscriptionTimeout(target: Publisher[_], cause: Exception) = target match { - case s: SubstreamOutput ⇒ - s.error(cause) - s.attachSubscriber(CancelingSubscriber) - case _ ⇒ // ignore - } -} - -/** - * INTERNAL API - */ -private[akka] abstract class MultiStreamOutputProcessor(_settings: ActorMaterializerSettings) extends ActorProcessorImpl(_settings) with MultiStreamOutputProcessorLike { - private var _nextId = 0L - protected def nextId(): Long = { _nextId += 1; _nextId } - - override val subscriptionTimeoutSettings = _settings.subscriptionTimeoutSettings - - override protected def fail(e: Throwable): Unit = { - failOutputs(e) - super.fail(e) - } - - override def pumpFinished(): Unit = { - finishOutputs() - super.pumpFinished() - } - - override def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse outputSubstreamManagement -} - diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala index 61de42a8dd..4cb2c28f89 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala @@ -19,9 +19,6 @@ class SubFlowImpl[In, Out, Mat, F[+_], C](val subFlow: Flow[In, Out, NotUsed], finishFunction: Sink[In, NotUsed] ⇒ C) extends SubFlow[Out, Mat, F, C] { - override def deprecatedAndThen[U](op: Stages.StageModule): SubFlow[U, Mat, F, C] = - new SubFlowImpl[In, U, Mat, F, C](subFlow.deprecatedAndThen(op), mergeBackFunction, finishFunction) - override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = new SubFlowImpl[In, T, Mat, F, C](subFlow.via(flow), mergeBackFunction, finishFunction) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 13627664e8..5e7b3cec69 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -5,19 +5,21 @@ package akka.stream.impl.fusing import java.util.concurrent.atomic.AtomicReference import akka.NotUsed +import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.SubscriptionTimeoutException import akka.stream.stage._ import akka.stream.scaladsl._ import akka.stream.actor.ActorSubscriberMessage -import scala.collection.immutable +import scala.collection.{ mutable, immutable } import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal import scala.annotation.tailrec import akka.stream.impl.PublisherSource import akka.stream.impl.CancellingSubscriber import akka.stream.impl.{ Buffer ⇒ BufferImpl } +import scala.collection.JavaConversions._ /** * INTERNAL API @@ -198,8 +200,7 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable. // Otherwise substream is open, ignore } - setHandler(in, this) - setHandler(out, this) + setHandlers(in, out, this) } override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new PrefixAndTailLogic(shape) @@ -207,6 +208,179 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable. override def toString: String = s"PrefixAndTail($n)" } +/** + * INTERNAL API + */ +final class GroupBy[T, K](maxSubstreams: Int, keyFor: T ⇒ K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { + val in: Inlet[T] = Inlet("GroupBy.in") + val out: Outlet[Source[T, NotUsed]] = Outlet("GroupBy.out") + override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out) + override def initialAttributes = DefaultAttributes.groupBy + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with OutHandler with InHandler { + parent ⇒ + lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]() + private val closedSubstreams = new java.util.HashSet[Any]() + private var timeout: FiniteDuration = _ + private var substreamWaitingToBePushed: Option[SubstreamSource] = None + private var nextElementKey: K = null.asInstanceOf[K] + private var nextElementValue: T = null.asInstanceOf[T] + private var _nextId = 0 + private val substreamsJustStared = new java.util.HashSet[Any]() + private var firstPushCounter: Int = 0 + + private def nextId(): Long = { _nextId += 1; _nextId } + + private def hasNextElement = nextElementKey != null + + private def clearNextElement(): Unit = { + nextElementKey = null.asInstanceOf[K] + nextElementValue = null.asInstanceOf[T] + } + + private def tryCompleteAll(): Boolean = + if (activeSubstreamsMap.isEmpty || (!hasNextElement && firstPushCounter == 0)) { + for (value ← activeSubstreamsMap.values()) value.complete() + completeStage() + true + } else false + + private def fail(ex: Throwable): Unit = { + for (value ← activeSubstreamsMap.values()) value.fail(ex) + failStage(ex) + } + + private def needToPull: Boolean = !(hasBeenPulled(in) || isClosed(in) || hasNextElement) + + override def preStart(): Unit = + timeout = ActorMaterializer.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout + + override def onPull(): Unit = { + substreamWaitingToBePushed match { + case Some(substreamSource) ⇒ + push(out, Source.fromGraph(substreamSource.source)) + scheduleOnce(substreamSource.key, timeout) + substreamWaitingToBePushed = None + case None ⇒ + if (hasNextElement) { + val subSubstreamSource = activeSubstreamsMap.get(nextElementKey) + if (subSubstreamSource.isAvailable) { + subSubstreamSource.push(nextElementValue) + clearNextElement() + } + } else tryPull(in) + } + } + + override def onUpstreamFailure(ex: Throwable): Unit = fail(ex) + + override def onDownstreamFinish(): Unit = + if (activeSubstreamsMap.isEmpty) completeStage() else setKeepGoing(true) + + override def onPush(): Unit = try { + val elem = grab(in) + val key = keyFor(elem) + require(key != null, "Key cannot be null") + val substreamSource = activeSubstreamsMap.get(key) + if (substreamSource != null) { + if (substreamSource.isAvailable) substreamSource.push(elem) + else { + nextElementKey = key + nextElementValue = elem + } + } else { + if (activeSubstreamsMap.size == maxSubstreams) + fail(new IllegalStateException(s"Cannot open substream for key '$key': too many substreams open")) + else if (closedSubstreams.contains(key) && !hasBeenPulled(in)) + pull(in) + else runSubstream(key, elem) + } + } catch { + case NonFatal(ex) ⇒ + decider(ex) match { + case Supervision.Stop ⇒ fail(ex) + case Supervision.Resume | Supervision.Restart ⇒ if (!hasBeenPulled(in)) pull(in) + } + } + + override def onUpstreamFinish(): Unit = { + if (!tryCompleteAll()) setKeepGoing(true) + } + + private def runSubstream(key: K, value: T): Unit = { + val substreamSource = new SubstreamSource("GroupBySource " + nextId, key, value) + activeSubstreamsMap.put(key, substreamSource) + firstPushCounter += 1 + if (isAvailable(out)) { + push(out, Source.fromGraph(substreamSource.source)) + scheduleOnce(key, timeout) + substreamWaitingToBePushed = None + } else { + setKeepGoing(true) + substreamsJustStared.add(substreamSource) + substreamWaitingToBePushed = Some(substreamSource) + } + } + + override protected def onTimer(timerKey: Any): Unit = { + val substreamSource = activeSubstreamsMap.get(timerKey) + if (substreamSource != null) { + substreamSource.timeout(timeout) + closedSubstreams.add(timerKey) + activeSubstreamsMap.remove(timerKey) + if (isClosed(in)) tryCompleteAll() + } + } + + setHandlers(in, out, this) + + private class SubstreamSource(name: String, val key: K, var firstElement: T) extends SubSourceOutlet[T](name) with OutHandler { + def firstPush(): Boolean = firstElement != null + def hasNextForSubSource = hasNextElement && nextElementKey == key + private def completeSubStream(): Unit = { + complete() + activeSubstreamsMap.remove(key) + closedSubstreams.add(key) + } + + private def tryCompleteHandler(): Unit = { + if (parent.isClosed(in) && !hasNextForSubSource) { + completeSubStream() + tryCompleteAll() + } + } + + override def onPull(): Unit = { + cancelTimer(key) + if (firstPush) { + firstPushCounter -= 1 + push(firstElement) + firstElement = null.asInstanceOf[T] + substreamsJustStared.remove(this) + if (substreamsJustStared.isEmpty) setKeepGoing(false) + } else if (hasNextForSubSource) { + push(nextElementValue) + clearNextElement() + } else if (needToPull) pull(in) + + tryCompleteHandler() + } + + override def onDownstreamFinish(): Unit = { + if (hasNextElement && nextElementKey == key) clearNextElement() + if (firstPush()) firstPushCounter -= 1 + completeSubStream() + if (parent.isClosed(in)) tryCompleteAll() else if (needToPull) pull(in) + } + + setHandler(this) + } + } + + override def toString: String = "GroupBy" + +} /** * INTERNAL API */ @@ -246,7 +420,7 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, substreamC private var timeout: FiniteDuration = _ private var substreamSource: SubSourceOutlet[T] = null - private var substreamPushed = false + private var substreamWaitingToBePushed = false private var substreamCancelled = false override def preStart(): Unit = { @@ -256,16 +430,16 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, substreamC setHandler(out, new OutHandler { override def onPull(): Unit = { if (substreamSource eq null) pull(in) - else if (!substreamPushed) { + else if (!substreamWaitingToBePushed) { push(out, Source.fromGraph(substreamSource.source)) scheduleOnce(SubscriptionTimer, timeout) - substreamPushed = true + substreamWaitingToBePushed = true } } override def onDownstreamFinish(): Unit = { // If the substream is already cancelled or it has not been handed out, we can go away - if (!substreamPushed || substreamCancelled) completeStage() + if (!substreamWaitingToBePushed || substreamCancelled) completeStage() } }) @@ -300,8 +474,8 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, substreamC if (isAvailable(out)) { push(out, Source.fromGraph(substreamSource.source)) scheduleOnce(SubscriptionTimer, timeout) - substreamPushed = true - } else substreamPushed = false + substreamWaitingToBePushed = true + } else substreamWaitingToBePushed = false } } @@ -381,6 +555,8 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, substreamC } } + override def toString: String = "Split" + } /** @@ -406,11 +582,13 @@ final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage private val status = new AtomicReference[AnyRef] - def pullSubstream(): Unit = status.get match { - case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(RequestOne) - case null ⇒ - if (!status.compareAndSet(null, RequestOne)) - status.get.asInstanceOf[Command ⇒ Unit](RequestOne) + def pullSubstream(): Unit = { + status.get match { + case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(RequestOne) + case null ⇒ + if (!status.compareAndSet(null, RequestOne)) + status.get.asInstanceOf[Command ⇒ Unit](RequestOne) + } } def cancelSubstream(): Unit = status.get match { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index bf896f55e6..f546754483 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1088,6 +1088,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]] * the element is dropped and the stream and substreams continue. * + * Function `f` MUST NOT return `null`. This will throw exception and trigger supervision decision mechanism. + * * '''Emits when''' an element for which the grouping function returns a group that has not yet been created. * Emits the new group * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 2551439d63..1a7bd530cf 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -3,10 +3,10 @@ */ package akka.stream.scaladsl -import akka.event.LoggingAdapter +import akka.event.{Logging, LoggingAdapter} import akka.stream._ import akka.Done -import akka.stream.impl.Stages.{ DirectProcessor, StageModule } +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ import akka.stream.impl.fusing._ @@ -205,21 +205,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) .replaceShape(FlowShape(ins(1), outs.head))) } - /** INTERNAL API */ - // FIXME: Only exists to keep old stuff alive - private[stream] override def deprecatedAndThen[U](op: StageModule): Repr[U] = { - //No need to copy here, op is a fresh instance - if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U]] - else new Flow(module.fuse(op, shape.out, op.inPort).replaceShape(FlowShape(shape.in, op.outPort))) - } - - // FIXME: Only exists to keep old stuff alive - private[akka] def deprecatedAndThenMat[U, Mat2, O >: Out](processorFactory: () ⇒ (Processor[O, U], Mat2)): ReprMat[U, Mat2] = { - val op = DirectProcessor(processorFactory.asInstanceOf[() ⇒ (Processor[Any, Any], Any)]) - if (this.isIdentity) new Flow(op).asInstanceOf[ReprMat[U, Mat2]] - else new Flow[In, U, Mat2](module.fuse(op, shape.out, op.inPort, Keep.right).replaceShape(FlowShape(shape.in, op.outPort))) - } - /** * Change the attributes of this [[Flow]] to the given ones and seal the list * of attributes. This means that further calls will not be able to remove these @@ -293,9 +278,8 @@ object Flow { /** * Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]] and returns a materialized value. */ - def fromProcessorMat[I, O, Mat](processorFactory: () ⇒ (Processor[I, O], Mat)): Flow[I, O, Mat] = { - Flow[I].deprecatedAndThenMat(processorFactory) - } + def fromProcessorMat[I, O, M](processorFactory: () ⇒ (Processor[I, O], M)): Flow[I, O, M] = + new Flow(ProcessorModule(processorFactory)) /** * Returns a `Flow` which outputs all its inputs. @@ -1210,6 +1194,8 @@ trait FlowOps[+Out, +Mat] { * is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] * the element is dropped and the stream and substreams continue. * + * Function `f` MUST NOT return `null`. This will throw exception and trigger supervision decision mechanism. + * * '''Emits when''' an element for which the grouping function returns a group that has not yet been created. * Emits the new group * @@ -1223,16 +1209,15 @@ trait FlowOps[+Out, +Mat] { * that are supported; if more distinct keys are encountered then the stream fails */ def groupBy[K](maxSubstreams: Int, f: Out ⇒ K): SubFlow[Out, Mat, Repr, Closed] = { - implicit def mat = GraphInterpreter.currentInterpreter.materializer val merge = new SubFlowImpl.MergeBack[Out, Repr] { override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] = - deprecatedAndThen[Source[Out, NotUsed]](GroupBy(maxSubstreams, f.asInstanceOf[Any ⇒ Any])) + via(new GroupBy(maxSubstreams, f)) .map(_.via(flow)) .via(new FlattenMerge(breadth)) } val finish: (Sink[Out, NotUsed]) ⇒ Closed = s ⇒ - deprecatedAndThen[Source[Out, NotUsed]](GroupBy(maxSubstreams, f.asInstanceOf[Any ⇒ Any])) - .to(Sink.foreach(_.runWith(s))) + via(new GroupBy(maxSubstreams, f)) + .to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer))) new SubFlowImpl(Flow[Out], merge, finish) } @@ -1849,8 +1834,6 @@ trait FlowOps[+Out, +Mat] { /** INTERNAL API */ private[scaladsl] def andThen[T](op: SymbolicStage[Out, T]): Repr[T] = via(SymbolicGraphStage(op)) - - private[scaladsl] def deprecatedAndThen[U](op: StageModule): Repr[U] } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index edd6419571..e7c67f23b0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -8,7 +8,7 @@ import akka.stream._ import akka.stream.impl._ import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages.MaterializedValueSource -import akka.stream.impl.Stages.{ DefaultAttributes, StageModule } +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout._ import akka.stream.scaladsl.Partition.PartitionOutOfBoundsException import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage } @@ -977,13 +977,6 @@ object GraphDSL extends GraphApply { source.out } - private[stream] def deprecatedAndThen(port: OutPort, op: StageModule): Unit = { - moduleInProgress = - moduleInProgress - .compose(op) - .wire(port, op.inPort) - } - private[stream] def module: Module = moduleInProgress /** Converts this Scala DSL element to it's Java DSL counterpart. */ @@ -1118,11 +1111,6 @@ object GraphDSL extends GraphApply { override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = super.~>(flow)(b) - override private[scaladsl] def deprecatedAndThen[U](op: StageModule): Repr[U] = { - b.deprecatedAndThen(outlet, op) - new PortOpsImpl(op.shape.out.asInstanceOf[Outlet[U]], b) - } - def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Closed = { super.~>(sink)(b) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 5d2c4b8c14..62cba7bba1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -3,10 +3,10 @@ */ package akka.stream.scaladsl +import akka.stream.impl.Stages.DefaultAttributes import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Cancellable, Props } import akka.stream.actor.ActorPublisher -import akka.stream.impl.Stages.{ DefaultAttributes, StageModule } import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages._ @@ -74,15 +74,6 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) override def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] = new Source[Out, Mat2](module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) - /** INTERNAL API */ - override private[scaladsl] def deprecatedAndThen[U](op: StageModule): Repr[U] = { - // No need to copy here, op is a fresh instance - new Source( - module - .fuse(op, shape.out, op.inPort) - .replaceShape(SourceShape(op.outPort))) - } - /** * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]]. diff --git a/project/MiMa.scala b/project/MiMa.scala index b9c2b17b04..6b8f0b7314 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -747,6 +747,14 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.DefaultSSLContextCreation.validateAndWarnAboutLooseSettings") ), "2.4.4" -> Seq( + + //#20229 migrate GroupBy to GraphStage + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.GraphDSL#Builder.deprecatedAndThen"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.deprecatedAndThen"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.deprecatedAndThenMat"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.deprecatedAndThen"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOps.deprecatedAndThen"), + // #20080, #20081 remove race condition on HTTP client ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.scaladsl.Http#HostConnectionPool.gatewayFuture"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.Http#HostConnectionPool.copy"),