parent
74b8d749b6
commit
03395d5739
15 changed files with 402 additions and 479 deletions
|
|
@ -3,15 +3,22 @@
|
|||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import java.util
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.Attributes._
|
||||
import akka.stream.impl.SinkModule
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.util.ByteString
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.{ Promise, Await }
|
||||
import scala.concurrent.duration._
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.ActorMaterializerSettings
|
||||
import akka.stream._
|
||||
import akka.stream.Supervision.resumingDecider
|
||||
import akka.stream.testkit._
|
||||
import akka.stream.testkit.Utils._
|
||||
import org.reactivestreams.Publisher
|
||||
import akka.stream.ActorAttributes
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
import org.scalactic.ConversionCheckedTripleEquals
|
||||
import org.scalatest.concurrent.PatienceConfiguration.Timeout
|
||||
|
|
@ -19,6 +26,8 @@ import akka.stream.testkit.scaladsl.TestSource
|
|||
import akka.stream.testkit.scaladsl.TestSink
|
||||
import akka.testkit.AkkaSpec
|
||||
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
|
||||
object FlowGroupBySpec {
|
||||
|
||||
implicit class Lift[M](val f: SubFlow[Int, M, Source[Int, M]#Repr, RunnableGraph[M]]) extends AnyVal {
|
||||
|
|
@ -70,6 +79,12 @@ class FlowGroupBySpec extends AkkaSpec {
|
|||
|
||||
}
|
||||
|
||||
def randomByteString(size: Int): ByteString = {
|
||||
val a = new Array[Byte](size)
|
||||
ThreadLocalRandom.current().nextBytes(a)
|
||||
ByteString(a)
|
||||
}
|
||||
|
||||
"groupBy" must {
|
||||
"work in the happy case" in assertAllStagesStopped {
|
||||
new SubstreamsSupport(groupCount = 2) {
|
||||
|
|
@ -120,6 +135,18 @@ class FlowGroupBySpec extends AkkaSpec {
|
|||
.sortBy(_.head) should ===(List(List("Aaa", "Abb"), List("Bcc"), List("Cdd", "Cee")))
|
||||
}
|
||||
|
||||
"fail when key function return null" in {
|
||||
val down = Source(List("Aaa", "Abb", "Bcc", "Cdd", "Cee"))
|
||||
.groupBy(3, e ⇒ if (e.startsWith("A")) null else e.substring(0, 1))
|
||||
.grouped(10)
|
||||
.mergeSubstreams
|
||||
.runWith(TestSink.probe[Seq[String]])
|
||||
down.request(1)
|
||||
val ex = down.expectError()
|
||||
ex.getMessage.indexOf("Key cannot be null") should not be (-1)
|
||||
ex.isInstanceOf[IllegalArgumentException] should be(true)
|
||||
}
|
||||
|
||||
"accept cancellation of substreams" in assertAllStagesStopped {
|
||||
new SubstreamsSupport(groupCount = 2) {
|
||||
StreamPuppet(getSubFlow(1).runWith(Sink.asPublisher(false))).cancel()
|
||||
|
|
@ -309,6 +336,137 @@ class FlowGroupBySpec extends AkkaSpec {
|
|||
s1.expectError(ex)
|
||||
}
|
||||
|
||||
"emit subscribe before completed" in assertAllStagesStopped {
|
||||
val futureGroupSource =
|
||||
Source.single(0)
|
||||
.groupBy(1, elem ⇒ "all")
|
||||
.prefixAndTail(0)
|
||||
.map(_._2)
|
||||
.concatSubstreams
|
||||
.runWith(Sink.head)
|
||||
val pub: Publisher[Int] = Await.result(futureGroupSource, 3.seconds).runWith(Sink.asPublisher(false))
|
||||
val probe = TestSubscriber.manualProbe[Int]()
|
||||
pub.subscribe(probe)
|
||||
val sub = probe.expectSubscription()
|
||||
sub.request(1)
|
||||
probe.expectNext(0)
|
||||
probe.expectComplete()
|
||||
|
||||
}
|
||||
|
||||
"work under fuzzing stress test" in assertAllStagesStopped {
|
||||
val publisherProbe = TestPublisher.manualProbe[ByteString]()
|
||||
val subscriber = TestSubscriber.manualProbe[ByteString]()
|
||||
|
||||
val publisher = Source.fromPublisher[ByteString](publisherProbe)
|
||||
.groupBy(256, elem ⇒ elem.head).map(_.reverse).mergeSubstreams
|
||||
.groupBy(256, elem ⇒ elem.head).map(_.reverse).mergeSubstreams
|
||||
.runWith(Sink.asPublisher(false))
|
||||
publisher.subscribe(subscriber)
|
||||
|
||||
val upstreamSubscription = publisherProbe.expectSubscription()
|
||||
val downstreamSubscription = subscriber.expectSubscription()
|
||||
|
||||
downstreamSubscription.request(300)
|
||||
for (i ← 1 to 300) {
|
||||
val byteString = randomByteString(10)
|
||||
upstreamSubscription.expectRequest()
|
||||
upstreamSubscription.sendNext(byteString)
|
||||
subscriber.expectNext() should ===(byteString)
|
||||
}
|
||||
upstreamSubscription.sendComplete()
|
||||
}
|
||||
|
||||
"work with random demand" in assertAllStagesStopped {
|
||||
val mat = ActorMaterializer(ActorMaterializerSettings(system)
|
||||
.withInputBuffer(initialSize = 1, maxSize = 1))
|
||||
|
||||
var blockingNextElement: ByteString = null.asInstanceOf[ByteString]
|
||||
|
||||
val probes = new java.util.ArrayList[Promise[TestSubscriber.Probe[ByteString]]](100)
|
||||
(0 to 99).foreach(_ ⇒ probes.add(Promise[TestSubscriber.Probe[ByteString]]()))
|
||||
|
||||
var probesWriterTop = 0
|
||||
var probesReaderTop = 0
|
||||
|
||||
case class SubFlowState(probe: TestSubscriber.Probe[ByteString], hasDemand: Boolean, firstElement: ByteString)
|
||||
val map = new util.HashMap[Int, SubFlowState]()
|
||||
|
||||
final class ProbeSink(val attributes: Attributes, shape: SinkShape[ByteString])(implicit system: ActorSystem) extends SinkModule[ByteString, TestSubscriber.Probe[ByteString]](shape) {
|
||||
override def create(context: MaterializationContext) = {
|
||||
val promise = probes.get(probesWriterTop)
|
||||
val probe = TestSubscriber.probe[ByteString]()
|
||||
promise.success(probe)
|
||||
probesWriterTop += 1
|
||||
(probe, probe)
|
||||
}
|
||||
override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, TestSubscriber.Probe[ByteString]] = new ProbeSink(attributes, shape)
|
||||
override def withAttributes(attr: Attributes): Module = new ProbeSink(attr, amendShape(attr))
|
||||
}
|
||||
|
||||
@tailrec
|
||||
def randomDemand(): Unit = {
|
||||
val nextIndex = ThreadLocalRandom.current().nextInt(0, map.size())
|
||||
val key = new util.ArrayList(map.keySet()).get(nextIndex)
|
||||
if (!map.get(key).hasDemand) {
|
||||
val state = map.get(key)
|
||||
map.put(key, SubFlowState(state.probe, true, state.firstElement))
|
||||
|
||||
state.probe.request(1)
|
||||
|
||||
//need to verify elements that are first element in subFlow or is in nextElement buffer before
|
||||
// pushing next element from upstream
|
||||
if (state.firstElement != null) {
|
||||
state.probe.expectNext() should ===(state.firstElement)
|
||||
map.put(key, SubFlowState(state.probe, false, null))
|
||||
randomDemand()
|
||||
} else if (blockingNextElement != null && Math.abs(blockingNextElement.head % 100) == key) {
|
||||
state.probe.expectNext() should ===(blockingNextElement)
|
||||
blockingNextElement = null
|
||||
map.put(key, SubFlowState(state.probe, false, null))
|
||||
randomDemand()
|
||||
} else if (blockingNextElement != null) randomDemand()
|
||||
} else randomDemand()
|
||||
}
|
||||
|
||||
val publisherProbe = TestPublisher.manualProbe[ByteString]()
|
||||
Source.fromPublisher[ByteString](publisherProbe)
|
||||
.groupBy(100, elem ⇒ Math.abs(elem.head % 100)).to(new Sink(new ProbeSink(none, SinkShape(Inlet("ProbeSink.in"))))).run()(mat)
|
||||
|
||||
val upstreamSubscription = publisherProbe.expectSubscription()
|
||||
|
||||
for (i ← 1 to 400) {
|
||||
val byteString = randomByteString(10)
|
||||
val index = Math.abs(byteString.head % 100)
|
||||
|
||||
upstreamSubscription.expectRequest()
|
||||
upstreamSubscription.sendNext(byteString)
|
||||
|
||||
if (map.get(index) == null) {
|
||||
val probe: TestSubscriber.Probe[ByteString] = Await.result(probes.get(probesReaderTop).future, 300.millis)
|
||||
probesReaderTop += 1
|
||||
map.put(index, SubFlowState(probe, false, byteString))
|
||||
//stream automatically requests next element
|
||||
} else {
|
||||
val state = map.get(index)
|
||||
if (state.firstElement != null) { //first element in subFlow
|
||||
if (!state.hasDemand) blockingNextElement = byteString
|
||||
randomDemand()
|
||||
} else if (state.hasDemand) {
|
||||
if (blockingNextElement == null) {
|
||||
state.probe.expectNext() should ===(byteString)
|
||||
map.put(index, SubFlowState(state.probe, false, null))
|
||||
randomDemand()
|
||||
} else fail("INVALID CASE")
|
||||
} else {
|
||||
blockingNextElement = byteString
|
||||
randomDemand()
|
||||
}
|
||||
}
|
||||
}
|
||||
upstreamSubscription.sendComplete()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -112,11 +112,10 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
|
|||
assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]])
|
||||
matVal.put(atomic, mat)
|
||||
|
||||
// FIXME: Remove this, only stream-of-stream ops need it
|
||||
case stage: StageModule ⇒
|
||||
val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes))
|
||||
case stage: ProcessorModule[_, _, _] ⇒
|
||||
val (processor, mat) = stage.createProcessor()
|
||||
assignPort(stage.inPort, processor)
|
||||
assignPort(stage.outPort, processor)
|
||||
assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]])
|
||||
matVal.put(atomic, mat)
|
||||
|
||||
case tls: TlsModule ⇒ // TODO solve this so TlsModule doesn't need special treatment here
|
||||
|
|
@ -175,16 +174,6 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
|
|||
}
|
||||
}
|
||||
|
||||
// FIXME: Remove this, only stream-of-stream ops need it
|
||||
private def processorFor(op: StageModule,
|
||||
effectiveAttributes: Attributes,
|
||||
effectiveSettings: ActorMaterializerSettings): (Processor[Any, Any], Any) = op match {
|
||||
case DirectProcessor(processorFactory, _) ⇒ processorFactory()
|
||||
case _ ⇒
|
||||
val (opprops, mat) = ActorProcessorFactory.props(ActorMaterializerImpl.this, op, effectiveAttributes)
|
||||
ActorProcessorFactory[Any, Any](
|
||||
actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) -> mat
|
||||
}
|
||||
}
|
||||
|
||||
session.materialize().asInstanceOf[Mat]
|
||||
|
|
@ -294,27 +283,3 @@ private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveSh
|
|||
override def postStop(): Unit = haveShutDown.set(true)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object ActorProcessorFactory {
|
||||
import akka.stream.impl.Stages._
|
||||
|
||||
def props(materializer: ActorMaterializer, op: StageModule, parentAttributes: Attributes): (Props, Any) = {
|
||||
val att = parentAttributes and op.attributes
|
||||
// USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW
|
||||
// Also, otherwise the attributes will not affect the settings properly!
|
||||
val settings = materializer.effectiveSettings(att)
|
||||
op match {
|
||||
case GroupBy(maxSubstreams, f, _) ⇒ (GroupByProcessorImpl.props(settings, maxSubstreams, f), ())
|
||||
case DirectProcessor(p, m) ⇒ throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory")
|
||||
}
|
||||
}
|
||||
|
||||
def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {
|
||||
val p = new ActorProcessor[I, O](impl)
|
||||
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
|
||||
impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]])
|
||||
p
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,24 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import akka.stream._
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.event.Logging
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.AtomicModule {
|
||||
override def replaceShape(s: Shape) =
|
||||
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
|
||||
else this
|
||||
|
||||
val inPort = Inlet[In]("Flow.in")
|
||||
val outPort = Outlet[Out]("Flow.out")
|
||||
override val shape = new FlowShape(inPort, outPort)
|
||||
|
||||
protected def label: String = Logging.simpleName(this)
|
||||
final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]"
|
||||
}
|
||||
|
|
@ -1,101 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
import akka.actor.{ Deploy, Props }
|
||||
import akka.stream.ActorMaterializerSettings
|
||||
import akka.stream.Supervision
|
||||
import akka.stream.scaladsl.Source
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object GroupByProcessorImpl {
|
||||
def props(settings: ActorMaterializerSettings, maxSubstreams: Int, keyFor: Any ⇒ Any): Props =
|
||||
Props(new GroupByProcessorImpl(settings, maxSubstreams, keyFor)).withDeploy(Deploy.local)
|
||||
|
||||
private case object Drop
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class GroupByProcessorImpl(settings: ActorMaterializerSettings, val maxSubstreams: Int, val keyFor: Any ⇒ Any)
|
||||
extends MultiStreamOutputProcessor(settings) {
|
||||
|
||||
import MultiStreamOutputProcessor._
|
||||
import GroupByProcessorImpl.Drop
|
||||
|
||||
val decider = settings.supervisionDecider
|
||||
val keyToSubstreamOutput = collection.mutable.Map.empty[Any, SubstreamOutput]
|
||||
|
||||
var pendingSubstreamOutput: SubstreamOutput = _
|
||||
|
||||
// No substream is open yet. If downstream cancels now, we are complete
|
||||
val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒
|
||||
val elem = primaryInputs.dequeueInputElement()
|
||||
tryKeyFor(elem) match {
|
||||
case Drop ⇒
|
||||
case key ⇒ nextPhase(openSubstream(elem, key))
|
||||
}
|
||||
}
|
||||
|
||||
// some substreams are open now. If downstream cancels, we still continue until the substreams are closed
|
||||
val waitNext = TransferPhase(primaryInputs.NeedsInput) { () ⇒
|
||||
val elem = primaryInputs.dequeueInputElement()
|
||||
tryKeyFor(elem) match {
|
||||
case Drop ⇒
|
||||
case key ⇒
|
||||
keyToSubstreamOutput.get(key) match {
|
||||
case Some(substream) if substream.isOpen ⇒ nextPhase(dispatchToSubstream(elem, keyToSubstreamOutput(key)))
|
||||
case None if primaryOutputs.isOpen ⇒ nextPhase(openSubstream(elem, key))
|
||||
case _ ⇒ // stay
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def tryKeyFor(elem: Any): Any =
|
||||
try keyFor(elem) catch {
|
||||
case NonFatal(e) if decider(e) != Supervision.Stop ⇒
|
||||
if (settings.debugLogging)
|
||||
log.debug("Dropped element [{}] due to exception from groupBy function: {}", elem, e.getMessage)
|
||||
Drop
|
||||
}
|
||||
|
||||
def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemandOrCancel) { () ⇒
|
||||
if (primaryOutputs.isClosed) {
|
||||
// Just drop, we do not open any more substreams
|
||||
nextPhase(waitNext)
|
||||
} else {
|
||||
if (keyToSubstreamOutput.size == maxSubstreams)
|
||||
throw new IllegalStateException(s"cannot open substream for key '$key': too many substreams open")
|
||||
val substreamOutput = createSubstreamOutput()
|
||||
val substreamFlow = Source.fromPublisher[Any](substreamOutput)
|
||||
primaryOutputs.enqueueOutputElement(substreamFlow)
|
||||
keyToSubstreamOutput(key) = substreamOutput
|
||||
nextPhase(dispatchToSubstream(elem, substreamOutput))
|
||||
}
|
||||
}
|
||||
|
||||
def dispatchToSubstream(elem: Any, substream: SubstreamOutput): TransferPhase = {
|
||||
pendingSubstreamOutput = substream
|
||||
TransferPhase(substream.NeedsDemand) { () ⇒
|
||||
substream.enqueueOutputElement(elem)
|
||||
pendingSubstreamOutput = null
|
||||
nextPhase(waitNext)
|
||||
}
|
||||
}
|
||||
|
||||
initialPhase(1, waitFirst)
|
||||
|
||||
override def invalidateSubstreamOutput(substream: SubstreamKey): Unit = {
|
||||
if ((pendingSubstreamOutput ne null) && substream == pendingSubstreamOutput.key) {
|
||||
pendingSubstreamOutput = null
|
||||
nextPhase(waitNext)
|
||||
}
|
||||
super.invalidateSubstreamOutput(substream)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -5,7 +5,7 @@ package akka.stream.impl
|
|||
|
||||
import akka.stream.impl.QueueSink.{ Output, Pull }
|
||||
import akka.{ Done, NotUsed }
|
||||
import akka.actor.{ ActorRef, Props }
|
||||
import akka.actor.{ ActorRef, Actor, Props }
|
||||
import akka.stream.Attributes.InputBuffer
|
||||
import akka.stream._
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
|
|
@ -100,10 +100,12 @@ private[akka] final class FanoutPublisherSink[In](
|
|||
|
||||
override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = {
|
||||
val actorMaterializer = ActorMaterializer.downcast(context.materializer)
|
||||
val fanoutProcessor = ActorProcessorFactory[In, In](
|
||||
actorMaterializer.actorOf(
|
||||
val impl = actorMaterializer.actorOf(
|
||||
context,
|
||||
FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(attributes))))
|
||||
FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(attributes)))
|
||||
val fanoutProcessor = new ActorProcessor[In, In](impl)
|
||||
impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]])
|
||||
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
|
||||
(fanoutProcessor, fanoutProcessor)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -134,12 +134,6 @@ private[stream] object Stages {
|
|||
|
||||
import DefaultAttributes._
|
||||
|
||||
// FIXME: To be deprecated as soon as stream-of-stream operations are stages
|
||||
sealed trait StageModule extends FlowModule[Any, Any, Any] {
|
||||
def withAttributes(attributes: Attributes): StageModule
|
||||
override def carbonCopy: Module = withAttributes(attributes)
|
||||
}
|
||||
|
||||
/*
|
||||
* Stage that is backed by a GraphStage but can be symbolically introspected
|
||||
*/
|
||||
|
|
@ -189,14 +183,4 @@ private[stream] object Stages {
|
|||
override def create(attr: Attributes): Stage[T, T] = fusing.Buffer(size, overflowStrategy)
|
||||
}
|
||||
|
||||
// FIXME: These are not yet proper stages, therefore they use the deprecated StageModule infrastructure
|
||||
|
||||
final case class GroupBy(maxSubstreams: Int, f: Any ⇒ Any, attributes: Attributes = groupBy) extends StageModule {
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def label: String = s"GroupBy($maxSubstreams)"
|
||||
}
|
||||
|
||||
final case class DirectProcessor(p: () ⇒ (Processor[Any, Any], Any), attributes: Attributes = processor) extends StageModule {
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import java.util.concurrent.atomic.{ AtomicReference }
|
|||
import java.{ util ⇒ ju }
|
||||
import akka.NotUsed
|
||||
import akka.stream.impl.MaterializerSession.MaterializationPanic
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.stream.scaladsl.Keep
|
||||
import akka.stream._
|
||||
|
|
@ -935,3 +936,20 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final case class ProcessorModule[In, Out, Mat](val createProcessor: () ⇒ (Processor[In, Out], Mat),
|
||||
attributes: Attributes = DefaultAttributes.processor) extends StreamLayout.AtomicModule {
|
||||
val inPort = Inlet[In]("ProcessorModule.in")
|
||||
val outPort = Outlet[Out]("ProcessorModule.out")
|
||||
override val shape = new FlowShape(inPort, outPort)
|
||||
|
||||
override def replaceShape(s: Shape) = if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
|
||||
else this
|
||||
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override def carbonCopy: Module = withAttributes(attributes)
|
||||
override def toString: String = f"ProcessorModule [${System.identityHashCode(this)}%08x]"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,226 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import akka.actor._
|
||||
import akka.stream.ActorMaterializerSettings
|
||||
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
|
||||
import scala.collection.mutable
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object MultiStreamOutputProcessor {
|
||||
final case class SubstreamKey(id: Long)
|
||||
final case class SubstreamRequestMore(substream: SubstreamKey, demand: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||
final case class SubstreamCancel(substream: SubstreamKey) extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||
final case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any]) extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||
final case class SubstreamSubscriptionTimeout(substream: SubstreamKey) extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||
|
||||
class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription {
|
||||
override def request(elements: Long): Unit = parent ! SubstreamRequestMore(substreamKey, elements)
|
||||
override def cancel(): Unit = parent ! SubstreamCancel(substreamKey)
|
||||
override def toString = "SubstreamSubscription" + System.identityHashCode(this)
|
||||
}
|
||||
|
||||
object SubstreamOutput {
|
||||
sealed trait PublisherState
|
||||
sealed trait CompletedState extends PublisherState
|
||||
case object Open extends PublisherState
|
||||
final case class Attached(sub: Subscriber[Any]) extends PublisherState
|
||||
case object Completed extends CompletedState
|
||||
case object Cancelled extends CompletedState
|
||||
final case class Failed(e: Throwable) extends CompletedState
|
||||
}
|
||||
|
||||
class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: Cancellable)
|
||||
extends SimpleOutputs(actor, pump) with Publisher[Any] {
|
||||
import ReactiveStreamsCompliance._
|
||||
|
||||
import SubstreamOutput._
|
||||
|
||||
private val subscription = new SubstreamSubscription(actor, key)
|
||||
private val state = new AtomicReference[PublisherState](Open)
|
||||
|
||||
override def subreceive: SubReceive =
|
||||
throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block")
|
||||
|
||||
def isAttached = state.get().isInstanceOf[Attached]
|
||||
|
||||
def enqueueOutputDemand(demand: Long): Unit = {
|
||||
downstreamDemand += demand
|
||||
pump.pump()
|
||||
}
|
||||
|
||||
override def error(e: Throwable): Unit = {
|
||||
if (!downstreamCompleted) {
|
||||
closePublisher(Failed(e))
|
||||
downstreamCompleted = true
|
||||
}
|
||||
}
|
||||
|
||||
override def cancel(): Unit = {
|
||||
if (!downstreamCompleted) {
|
||||
closePublisher(Cancelled)
|
||||
downstreamCompleted = true
|
||||
}
|
||||
}
|
||||
|
||||
override def complete(): Unit = {
|
||||
if (!downstreamCompleted) {
|
||||
closePublisher(Completed)
|
||||
downstreamCompleted = true
|
||||
}
|
||||
}
|
||||
|
||||
private def closePublisher(withState: CompletedState): Unit = {
|
||||
subscriptionTimeout.cancel()
|
||||
state.getAndSet(withState) match {
|
||||
case _: CompletedState ⇒ throw new IllegalStateException("Attempted to double shutdown publisher")
|
||||
case Attached(sub) ⇒
|
||||
if (subscriber eq null) tryOnSubscribe(sub, CancelledSubscription)
|
||||
closeSubscriber(sub, withState)
|
||||
case Open ⇒ // No action needed
|
||||
}
|
||||
}
|
||||
|
||||
private def closeSubscriber(s: Subscriber[Any], withState: CompletedState): Unit = withState match {
|
||||
case Completed ⇒ tryOnComplete(s)
|
||||
case Cancelled ⇒ // nothing to do
|
||||
case Failed(e: SpecViolation) ⇒ // nothing to do
|
||||
case Failed(e) ⇒ tryOnError(s, e)
|
||||
}
|
||||
|
||||
override def subscribe(s: Subscriber[_ >: Any]): Unit = {
|
||||
requireNonNullSubscriber(s)
|
||||
subscriptionTimeout.cancel()
|
||||
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
|
||||
else {
|
||||
state.get() match {
|
||||
case _: Attached | Cancelled ⇒
|
||||
rejectAdditionalSubscriber(s, "Substream publisher")
|
||||
case c: CompletedState ⇒
|
||||
tryOnSubscribe(s, CancelledSubscription)
|
||||
closeSubscriber(s, c)
|
||||
case Open ⇒
|
||||
throw new IllegalStateException("Publisher cannot become open after being used before")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def attachSubscriber(s: Subscriber[Any]): Unit =
|
||||
if (subscriber eq null) {
|
||||
subscriber = s
|
||||
tryOnSubscribe(subscriber, subscription)
|
||||
} else
|
||||
rejectAdditionalSubscriber(s, "Substream publisher")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubscriptionTimeoutSupport {
|
||||
this: Actor with ActorLogging ⇒
|
||||
|
||||
import MultiStreamOutputProcessor._
|
||||
import StreamSubscriptionTimeoutSupport._
|
||||
|
||||
protected def nextId(): Long
|
||||
|
||||
// stream keys will be removed from this map on cancellation/subscription-timeout, never assume a key is present
|
||||
private val substreamOutputs = mutable.Map.empty[SubstreamKey, SubstreamOutput]
|
||||
|
||||
protected def createSubstreamOutput(): SubstreamOutput = {
|
||||
val id = SubstreamKey(nextId())
|
||||
val cancellable = scheduleSubscriptionTimeout(self, SubstreamSubscriptionTimeout(id))
|
||||
val output = new SubstreamOutput(id, self, this, cancellable)
|
||||
substreamOutputs(output.key) = output
|
||||
output
|
||||
}
|
||||
|
||||
protected def invalidateSubstreamOutput(substream: SubstreamKey): Unit = {
|
||||
cancelSubstreamOutput(substream)
|
||||
pump()
|
||||
}
|
||||
|
||||
protected def cancelSubstreamOutput(substream: SubstreamKey): Unit = {
|
||||
substreamOutputs.get(substream) match {
|
||||
case Some(sub) ⇒
|
||||
sub.cancel()
|
||||
substreamOutputs -= substream
|
||||
case _ ⇒ // ignore, already completed...
|
||||
}
|
||||
}
|
||||
|
||||
protected def completeSubstreamOutput(substream: SubstreamKey): Unit = {
|
||||
substreamOutputs.get(substream) match {
|
||||
case Some(sub) ⇒
|
||||
sub.complete()
|
||||
substreamOutputs -= substream
|
||||
case _ ⇒ // ignore, already completed...
|
||||
}
|
||||
}
|
||||
|
||||
protected def failOutputs(e: Throwable): Unit = {
|
||||
substreamOutputs.values foreach (_.error(e))
|
||||
}
|
||||
|
||||
protected def finishOutputs(): Unit = {
|
||||
substreamOutputs.values foreach (_.complete())
|
||||
}
|
||||
|
||||
val outputSubstreamManagement: Receive = {
|
||||
case SubstreamRequestMore(key, demand) ⇒
|
||||
substreamOutputs.get(key) match {
|
||||
case Some(sub) ⇒
|
||||
if (demand < 1) // According to Reactive Streams Spec 3.9, with non-positive demand must yield onError
|
||||
sub.error(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
|
||||
else
|
||||
sub.enqueueOutputDemand(demand)
|
||||
case _ ⇒ // ignore...
|
||||
}
|
||||
case SubstreamSubscribe(key, subscriber) ⇒ substreamOutputs.get(key) match {
|
||||
case Some(sub) ⇒ sub.attachSubscriber(subscriber)
|
||||
case _ ⇒ // ignore...
|
||||
}
|
||||
case SubstreamSubscriptionTimeout(key) ⇒ substreamOutputs.get(key) match {
|
||||
case Some(sub) if !sub.isAttached ⇒ subscriptionTimedOut(sub)
|
||||
case _ ⇒ // ignore...
|
||||
}
|
||||
case SubstreamCancel(key) ⇒
|
||||
invalidateSubstreamOutput(key)
|
||||
}
|
||||
|
||||
override protected def handleSubscriptionTimeout(target: Publisher[_], cause: Exception) = target match {
|
||||
case s: SubstreamOutput ⇒
|
||||
s.error(cause)
|
||||
s.attachSubscriber(CancelingSubscriber)
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] abstract class MultiStreamOutputProcessor(_settings: ActorMaterializerSettings) extends ActorProcessorImpl(_settings) with MultiStreamOutputProcessorLike {
|
||||
private var _nextId = 0L
|
||||
protected def nextId(): Long = { _nextId += 1; _nextId }
|
||||
|
||||
override val subscriptionTimeoutSettings = _settings.subscriptionTimeoutSettings
|
||||
|
||||
override protected def fail(e: Throwable): Unit = {
|
||||
failOutputs(e)
|
||||
super.fail(e)
|
||||
}
|
||||
|
||||
override def pumpFinished(): Unit = {
|
||||
finishOutputs()
|
||||
super.pumpFinished()
|
||||
}
|
||||
|
||||
override def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse outputSubstreamManagement
|
||||
}
|
||||
|
||||
|
|
@ -19,9 +19,6 @@ class SubFlowImpl[In, Out, Mat, F[+_], C](val subFlow: Flow[In, Out, NotUsed],
|
|||
finishFunction: Sink[In, NotUsed] ⇒ C)
|
||||
extends SubFlow[Out, Mat, F, C] {
|
||||
|
||||
override def deprecatedAndThen[U](op: Stages.StageModule): SubFlow[U, Mat, F, C] =
|
||||
new SubFlowImpl[In, U, Mat, F, C](subFlow.deprecatedAndThen(op), mergeBackFunction, finishFunction)
|
||||
|
||||
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] =
|
||||
new SubFlowImpl[In, T, Mat, F, C](subFlow.via(flow), mergeBackFunction, finishFunction)
|
||||
|
||||
|
|
|
|||
|
|
@ -5,19 +5,21 @@ package akka.stream.impl.fusing
|
|||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import akka.NotUsed
|
||||
import akka.stream.ActorAttributes.SupervisionStrategy
|
||||
import akka.stream._
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.impl.SubscriptionTimeoutException
|
||||
import akka.stream.stage._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.actor.ActorSubscriberMessage
|
||||
import scala.collection.immutable
|
||||
import scala.collection.{ mutable, immutable }
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.util.control.NonFatal
|
||||
import scala.annotation.tailrec
|
||||
import akka.stream.impl.PublisherSource
|
||||
import akka.stream.impl.CancellingSubscriber
|
||||
import akka.stream.impl.{ Buffer ⇒ BufferImpl }
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -198,8 +200,7 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable.
|
|||
// Otherwise substream is open, ignore
|
||||
}
|
||||
|
||||
setHandler(in, this)
|
||||
setHandler(out, this)
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new PrefixAndTailLogic(shape)
|
||||
|
|
@ -207,6 +208,179 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable.
|
|||
override def toString: String = s"PrefixAndTail($n)"
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
final class GroupBy[T, K](maxSubstreams: Int, keyFor: T ⇒ K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
|
||||
val in: Inlet[T] = Inlet("GroupBy.in")
|
||||
val out: Outlet[Source[T, NotUsed]] = Outlet("GroupBy.out")
|
||||
override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out)
|
||||
override def initialAttributes = DefaultAttributes.groupBy
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with OutHandler with InHandler {
|
||||
parent ⇒
|
||||
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||
private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]()
|
||||
private val closedSubstreams = new java.util.HashSet[Any]()
|
||||
private var timeout: FiniteDuration = _
|
||||
private var substreamWaitingToBePushed: Option[SubstreamSource] = None
|
||||
private var nextElementKey: K = null.asInstanceOf[K]
|
||||
private var nextElementValue: T = null.asInstanceOf[T]
|
||||
private var _nextId = 0
|
||||
private val substreamsJustStared = new java.util.HashSet[Any]()
|
||||
private var firstPushCounter: Int = 0
|
||||
|
||||
private def nextId(): Long = { _nextId += 1; _nextId }
|
||||
|
||||
private def hasNextElement = nextElementKey != null
|
||||
|
||||
private def clearNextElement(): Unit = {
|
||||
nextElementKey = null.asInstanceOf[K]
|
||||
nextElementValue = null.asInstanceOf[T]
|
||||
}
|
||||
|
||||
private def tryCompleteAll(): Boolean =
|
||||
if (activeSubstreamsMap.isEmpty || (!hasNextElement && firstPushCounter == 0)) {
|
||||
for (value ← activeSubstreamsMap.values()) value.complete()
|
||||
completeStage()
|
||||
true
|
||||
} else false
|
||||
|
||||
private def fail(ex: Throwable): Unit = {
|
||||
for (value ← activeSubstreamsMap.values()) value.fail(ex)
|
||||
failStage(ex)
|
||||
}
|
||||
|
||||
private def needToPull: Boolean = !(hasBeenPulled(in) || isClosed(in) || hasNextElement)
|
||||
|
||||
override def preStart(): Unit =
|
||||
timeout = ActorMaterializer.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout
|
||||
|
||||
override def onPull(): Unit = {
|
||||
substreamWaitingToBePushed match {
|
||||
case Some(substreamSource) ⇒
|
||||
push(out, Source.fromGraph(substreamSource.source))
|
||||
scheduleOnce(substreamSource.key, timeout)
|
||||
substreamWaitingToBePushed = None
|
||||
case None ⇒
|
||||
if (hasNextElement) {
|
||||
val subSubstreamSource = activeSubstreamsMap.get(nextElementKey)
|
||||
if (subSubstreamSource.isAvailable) {
|
||||
subSubstreamSource.push(nextElementValue)
|
||||
clearNextElement()
|
||||
}
|
||||
} else tryPull(in)
|
||||
}
|
||||
}
|
||||
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = fail(ex)
|
||||
|
||||
override def onDownstreamFinish(): Unit =
|
||||
if (activeSubstreamsMap.isEmpty) completeStage() else setKeepGoing(true)
|
||||
|
||||
override def onPush(): Unit = try {
|
||||
val elem = grab(in)
|
||||
val key = keyFor(elem)
|
||||
require(key != null, "Key cannot be null")
|
||||
val substreamSource = activeSubstreamsMap.get(key)
|
||||
if (substreamSource != null) {
|
||||
if (substreamSource.isAvailable) substreamSource.push(elem)
|
||||
else {
|
||||
nextElementKey = key
|
||||
nextElementValue = elem
|
||||
}
|
||||
} else {
|
||||
if (activeSubstreamsMap.size == maxSubstreams)
|
||||
fail(new IllegalStateException(s"Cannot open substream for key '$key': too many substreams open"))
|
||||
else if (closedSubstreams.contains(key) && !hasBeenPulled(in))
|
||||
pull(in)
|
||||
else runSubstream(key, elem)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
decider(ex) match {
|
||||
case Supervision.Stop ⇒ fail(ex)
|
||||
case Supervision.Resume | Supervision.Restart ⇒ if (!hasBeenPulled(in)) pull(in)
|
||||
}
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
if (!tryCompleteAll()) setKeepGoing(true)
|
||||
}
|
||||
|
||||
private def runSubstream(key: K, value: T): Unit = {
|
||||
val substreamSource = new SubstreamSource("GroupBySource " + nextId, key, value)
|
||||
activeSubstreamsMap.put(key, substreamSource)
|
||||
firstPushCounter += 1
|
||||
if (isAvailable(out)) {
|
||||
push(out, Source.fromGraph(substreamSource.source))
|
||||
scheduleOnce(key, timeout)
|
||||
substreamWaitingToBePushed = None
|
||||
} else {
|
||||
setKeepGoing(true)
|
||||
substreamsJustStared.add(substreamSource)
|
||||
substreamWaitingToBePushed = Some(substreamSource)
|
||||
}
|
||||
}
|
||||
|
||||
override protected def onTimer(timerKey: Any): Unit = {
|
||||
val substreamSource = activeSubstreamsMap.get(timerKey)
|
||||
if (substreamSource != null) {
|
||||
substreamSource.timeout(timeout)
|
||||
closedSubstreams.add(timerKey)
|
||||
activeSubstreamsMap.remove(timerKey)
|
||||
if (isClosed(in)) tryCompleteAll()
|
||||
}
|
||||
}
|
||||
|
||||
setHandlers(in, out, this)
|
||||
|
||||
private class SubstreamSource(name: String, val key: K, var firstElement: T) extends SubSourceOutlet[T](name) with OutHandler {
|
||||
def firstPush(): Boolean = firstElement != null
|
||||
def hasNextForSubSource = hasNextElement && nextElementKey == key
|
||||
private def completeSubStream(): Unit = {
|
||||
complete()
|
||||
activeSubstreamsMap.remove(key)
|
||||
closedSubstreams.add(key)
|
||||
}
|
||||
|
||||
private def tryCompleteHandler(): Unit = {
|
||||
if (parent.isClosed(in) && !hasNextForSubSource) {
|
||||
completeSubStream()
|
||||
tryCompleteAll()
|
||||
}
|
||||
}
|
||||
|
||||
override def onPull(): Unit = {
|
||||
cancelTimer(key)
|
||||
if (firstPush) {
|
||||
firstPushCounter -= 1
|
||||
push(firstElement)
|
||||
firstElement = null.asInstanceOf[T]
|
||||
substreamsJustStared.remove(this)
|
||||
if (substreamsJustStared.isEmpty) setKeepGoing(false)
|
||||
} else if (hasNextForSubSource) {
|
||||
push(nextElementValue)
|
||||
clearNextElement()
|
||||
} else if (needToPull) pull(in)
|
||||
|
||||
tryCompleteHandler()
|
||||
}
|
||||
|
||||
override def onDownstreamFinish(): Unit = {
|
||||
if (hasNextElement && nextElementKey == key) clearNextElement()
|
||||
if (firstPush()) firstPushCounter -= 1
|
||||
completeSubStream()
|
||||
if (parent.isClosed(in)) tryCompleteAll() else if (needToPull) pull(in)
|
||||
}
|
||||
|
||||
setHandler(this)
|
||||
}
|
||||
}
|
||||
|
||||
override def toString: String = "GroupBy"
|
||||
|
||||
}
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
@ -246,7 +420,7 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, substreamC
|
|||
|
||||
private var timeout: FiniteDuration = _
|
||||
private var substreamSource: SubSourceOutlet[T] = null
|
||||
private var substreamPushed = false
|
||||
private var substreamWaitingToBePushed = false
|
||||
private var substreamCancelled = false
|
||||
|
||||
override def preStart(): Unit = {
|
||||
|
|
@ -256,16 +430,16 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, substreamC
|
|||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
if (substreamSource eq null) pull(in)
|
||||
else if (!substreamPushed) {
|
||||
else if (!substreamWaitingToBePushed) {
|
||||
push(out, Source.fromGraph(substreamSource.source))
|
||||
scheduleOnce(SubscriptionTimer, timeout)
|
||||
substreamPushed = true
|
||||
substreamWaitingToBePushed = true
|
||||
}
|
||||
}
|
||||
|
||||
override def onDownstreamFinish(): Unit = {
|
||||
// If the substream is already cancelled or it has not been handed out, we can go away
|
||||
if (!substreamPushed || substreamCancelled) completeStage()
|
||||
if (!substreamWaitingToBePushed || substreamCancelled) completeStage()
|
||||
}
|
||||
})
|
||||
|
||||
|
|
@ -300,8 +474,8 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, substreamC
|
|||
if (isAvailable(out)) {
|
||||
push(out, Source.fromGraph(substreamSource.source))
|
||||
scheduleOnce(SubscriptionTimer, timeout)
|
||||
substreamPushed = true
|
||||
} else substreamPushed = false
|
||||
substreamWaitingToBePushed = true
|
||||
} else substreamWaitingToBePushed = false
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -381,6 +555,8 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, substreamC
|
|||
|
||||
}
|
||||
}
|
||||
override def toString: String = "Split"
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -406,12 +582,14 @@ final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage
|
|||
|
||||
private val status = new AtomicReference[AnyRef]
|
||||
|
||||
def pullSubstream(): Unit = status.get match {
|
||||
def pullSubstream(): Unit = {
|
||||
status.get match {
|
||||
case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(RequestOne)
|
||||
case null ⇒
|
||||
if (!status.compareAndSet(null, RequestOne))
|
||||
status.get.asInstanceOf[Command ⇒ Unit](RequestOne)
|
||||
}
|
||||
}
|
||||
|
||||
def cancelSubstream(): Unit = status.get match {
|
||||
case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(Cancel)
|
||||
|
|
|
|||
|
|
@ -1088,6 +1088,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
* is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]]
|
||||
* the element is dropped and the stream and substreams continue.
|
||||
*
|
||||
* Function `f` MUST NOT return `null`. This will throw exception and trigger supervision decision mechanism.
|
||||
*
|
||||
* '''Emits when''' an element for which the grouping function returns a group that has not yet been created.
|
||||
* Emits the new group
|
||||
*
|
||||
|
|
|
|||
|
|
@ -3,10 +3,10 @@
|
|||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.event.{Logging, LoggingAdapter}
|
||||
import akka.stream._
|
||||
import akka.Done
|
||||
import akka.stream.impl.Stages.{ DirectProcessor, StageModule }
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.stream.impl._
|
||||
import akka.stream.impl.fusing._
|
||||
|
|
@ -205,21 +205,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
|
|||
.replaceShape(FlowShape(ins(1), outs.head)))
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
// FIXME: Only exists to keep old stuff alive
|
||||
private[stream] override def deprecatedAndThen[U](op: StageModule): Repr[U] = {
|
||||
//No need to copy here, op is a fresh instance
|
||||
if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U]]
|
||||
else new Flow(module.fuse(op, shape.out, op.inPort).replaceShape(FlowShape(shape.in, op.outPort)))
|
||||
}
|
||||
|
||||
// FIXME: Only exists to keep old stuff alive
|
||||
private[akka] def deprecatedAndThenMat[U, Mat2, O >: Out](processorFactory: () ⇒ (Processor[O, U], Mat2)): ReprMat[U, Mat2] = {
|
||||
val op = DirectProcessor(processorFactory.asInstanceOf[() ⇒ (Processor[Any, Any], Any)])
|
||||
if (this.isIdentity) new Flow(op).asInstanceOf[ReprMat[U, Mat2]]
|
||||
else new Flow[In, U, Mat2](module.fuse(op, shape.out, op.inPort, Keep.right).replaceShape(FlowShape(shape.in, op.outPort)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Change the attributes of this [[Flow]] to the given ones and seal the list
|
||||
* of attributes. This means that further calls will not be able to remove these
|
||||
|
|
@ -293,9 +278,8 @@ object Flow {
|
|||
/**
|
||||
* Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]] and returns a materialized value.
|
||||
*/
|
||||
def fromProcessorMat[I, O, Mat](processorFactory: () ⇒ (Processor[I, O], Mat)): Flow[I, O, Mat] = {
|
||||
Flow[I].deprecatedAndThenMat(processorFactory)
|
||||
}
|
||||
def fromProcessorMat[I, O, M](processorFactory: () ⇒ (Processor[I, O], M)): Flow[I, O, M] =
|
||||
new Flow(ProcessorModule(processorFactory))
|
||||
|
||||
/**
|
||||
* Returns a `Flow` which outputs all its inputs.
|
||||
|
|
@ -1210,6 +1194,8 @@ trait FlowOps[+Out, +Mat] {
|
|||
* is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]]
|
||||
* the element is dropped and the stream and substreams continue.
|
||||
*
|
||||
* Function `f` MUST NOT return `null`. This will throw exception and trigger supervision decision mechanism.
|
||||
*
|
||||
* '''Emits when''' an element for which the grouping function returns a group that has not yet been created.
|
||||
* Emits the new group
|
||||
*
|
||||
|
|
@ -1223,16 +1209,15 @@ trait FlowOps[+Out, +Mat] {
|
|||
* that are supported; if more distinct keys are encountered then the stream fails
|
||||
*/
|
||||
def groupBy[K](maxSubstreams: Int, f: Out ⇒ K): SubFlow[Out, Mat, Repr, Closed] = {
|
||||
implicit def mat = GraphInterpreter.currentInterpreter.materializer
|
||||
val merge = new SubFlowImpl.MergeBack[Out, Repr] {
|
||||
override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] =
|
||||
deprecatedAndThen[Source[Out, NotUsed]](GroupBy(maxSubstreams, f.asInstanceOf[Any ⇒ Any]))
|
||||
via(new GroupBy(maxSubstreams, f))
|
||||
.map(_.via(flow))
|
||||
.via(new FlattenMerge(breadth))
|
||||
}
|
||||
val finish: (Sink[Out, NotUsed]) ⇒ Closed = s ⇒
|
||||
deprecatedAndThen[Source[Out, NotUsed]](GroupBy(maxSubstreams, f.asInstanceOf[Any ⇒ Any]))
|
||||
.to(Sink.foreach(_.runWith(s)))
|
||||
via(new GroupBy(maxSubstreams, f))
|
||||
.to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer)))
|
||||
new SubFlowImpl(Flow[Out], merge, finish)
|
||||
}
|
||||
|
||||
|
|
@ -1849,8 +1834,6 @@ trait FlowOps[+Out, +Mat] {
|
|||
/** INTERNAL API */
|
||||
private[scaladsl] def andThen[T](op: SymbolicStage[Out, T]): Repr[T] =
|
||||
via(SymbolicGraphStage(op))
|
||||
|
||||
private[scaladsl] def deprecatedAndThen[U](op: StageModule): Repr[U]
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import akka.stream._
|
|||
import akka.stream.impl._
|
||||
import akka.stream.impl.fusing.GraphStages
|
||||
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
|
||||
import akka.stream.impl.Stages.{ DefaultAttributes, StageModule }
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.impl.StreamLayout._
|
||||
import akka.stream.scaladsl.Partition.PartitionOutOfBoundsException
|
||||
import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage }
|
||||
|
|
@ -977,13 +977,6 @@ object GraphDSL extends GraphApply {
|
|||
source.out
|
||||
}
|
||||
|
||||
private[stream] def deprecatedAndThen(port: OutPort, op: StageModule): Unit = {
|
||||
moduleInProgress =
|
||||
moduleInProgress
|
||||
.compose(op)
|
||||
.wire(port, op.inPort)
|
||||
}
|
||||
|
||||
private[stream] def module: Module = moduleInProgress
|
||||
|
||||
/** Converts this Scala DSL element to it's Java DSL counterpart. */
|
||||
|
|
@ -1118,11 +1111,6 @@ object GraphDSL extends GraphApply {
|
|||
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] =
|
||||
super.~>(flow)(b)
|
||||
|
||||
override private[scaladsl] def deprecatedAndThen[U](op: StageModule): Repr[U] = {
|
||||
b.deprecatedAndThen(outlet, op)
|
||||
new PortOpsImpl(op.shape.out.asInstanceOf[Outlet[U]], b)
|
||||
}
|
||||
|
||||
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Closed = {
|
||||
super.~>(sink)(b)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,10 +3,10 @@
|
|||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.{ Done, NotUsed }
|
||||
import akka.actor.{ ActorRef, Cancellable, Props }
|
||||
import akka.stream.actor.ActorPublisher
|
||||
import akka.stream.impl.Stages.{ DefaultAttributes, StageModule }
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.stream.impl.fusing.GraphStages
|
||||
import akka.stream.impl.fusing.GraphStages._
|
||||
|
|
@ -74,15 +74,6 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
|
|||
override def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] =
|
||||
new Source[Out, Mat2](module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any]))
|
||||
|
||||
/** INTERNAL API */
|
||||
override private[scaladsl] def deprecatedAndThen[U](op: StageModule): Repr[U] = {
|
||||
// No need to copy here, op is a fresh instance
|
||||
new Source(
|
||||
module
|
||||
.fuse(op, shape.out, op.inPort)
|
||||
.replaceShape(SourceShape(op.outPort)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
|
||||
* of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]].
|
||||
|
|
|
|||
|
|
@ -747,6 +747,14 @@ object MiMa extends AutoPlugin {
|
|||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.DefaultSSLContextCreation.validateAndWarnAboutLooseSettings")
|
||||
),
|
||||
"2.4.4" -> Seq(
|
||||
|
||||
//#20229 migrate GroupBy to GraphStage
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.GraphDSL#Builder.deprecatedAndThen"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.deprecatedAndThen"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Flow.deprecatedAndThenMat"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.deprecatedAndThen"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowOps.deprecatedAndThen"),
|
||||
|
||||
// #20080, #20081 remove race condition on HTTP client
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.scaladsl.Http#HostConnectionPool.gatewayFuture"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.Http#HostConnectionPool.copy"),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue