From 4be757e49236a0c7082cde56f3e7c079eed32e30 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 20 Aug 2014 19:43:31 +0200 Subject: [PATCH] =str #15608 Make sure ExposedPublisher is first --- .../scala/akka/stream/impl/ActorProcessor.scala | 17 +++++++++++++++-- .../stream/impl/MapFutureProcessorImpl.scala | 2 +- .../stream/impl/StreamOfStreamProcessors.scala | 8 +++++--- .../impl/TimerTransformerProcessorsImpl.scala | 2 +- .../akka/stream/io/TcpConnectionStream.scala | 16 +++++++++++++--- .../akka/stream/io/TcpListenStreamActor.scala | 13 +++++++++++-- .../src/test/scala/akka/stream/DuctSpec.scala | 2 +- 7 files changed, 47 insertions(+), 13 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index bd71906e74..bb27554709 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -243,7 +243,8 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin extends Actor with ActorLogging with SoftShutdown - with Pump { + with Pump + with Stash { // FIXME: make pump a member protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) { @@ -258,7 +259,19 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin } } - override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive + /** + * Subclass may override [[#activeReceive]] + */ + final override def receive = { + // FIXME using Stash mailbox is not the best for performance, we probably want a better solution to this + case ep: ExposedPublisher ⇒ + primaryOutputs.subreceive(ep) + context become activeReceive + unstashAll() + case _ ⇒ stash() + } + + def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive protected def onError(e: Throwable): Unit = fail(e) diff --git a/akka-stream/src/main/scala/akka/stream/impl/MapFutureProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/MapFutureProcessorImpl.scala index 600d54f49b..37e7aaa3e1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/MapFutureProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/MapFutureProcessorImpl.scala @@ -51,7 +51,7 @@ private[akka] class MapFutureProcessorImpl(_settings: MaterializerSettings, f: A // keep future results arriving too early in a buffer sorted by seqNo var orderedBuffer = TreeSet.empty[FutureElement] - override def receive = futureReceive orElse super.receive + override def activeReceive = futureReceive orElse super.activeReceive def drainBuffer(): List[Any] = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index e87486681e..86f8addbc6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -7,6 +7,7 @@ import akka.stream.MaterializerSettings import akka.actor.{ Actor, Terminated, ActorRef } import org.reactivestreams.{ Publisher, Subscriber, Subscription } import akka.stream.actor.ActorSubscriber.{ OnNext, OnError, OnComplete, OnSubscribe } +import akka.actor.Stash /** * INTERNAL API @@ -111,7 +112,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS } - override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement + override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement } /** @@ -159,7 +160,8 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett } } - override def receive = secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.subreceive + override def activeReceive: Receive = + secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.subreceive other.subscribe(new OtherActorSubscriber(self)) @@ -248,5 +250,5 @@ private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSe super.shutdownHooks() } - override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement + override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala index 8661cb03cb..d96305098c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala @@ -48,7 +48,7 @@ private[akka] class TimerTransformerProcessorsImpl( override def inputsAvailable: Boolean = !queue.isEmpty } - override def receive = super.receive orElse schedulerInputs.subreceive + override def activeReceive = super.activeReceive orElse schedulerInputs.subreceive object RunningCondition extends TransferState { def isReady = { diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala index 8cd0a56146..dfdd0fca1b 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala @@ -11,6 +11,7 @@ import akka.util.ByteString import akka.io.Tcp._ import akka.stream.MaterializerSettings import org.reactivestreams.Processor +import akka.actor.Stash /** * INTERNAL API @@ -28,7 +29,7 @@ private[akka] object TcpStreamActor { /** * INTERNAL API */ -private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) extends Actor { +private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) extends Actor with Stash { import TcpStreamActor._ @@ -166,7 +167,16 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) override protected def pumpContext: ActorRefFactory = context } - override def receive = + final override def receive = { + // FIXME using Stash mailbox is not the best for performance, we probably want a better solution to this + case ep: ExposedPublisher ⇒ + primaryOutputs.subreceive(ep) + context become activeReceive + unstashAll() + case _ ⇒ stash() + } + + def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse tcpInputs.subreceive orElse tcpOutputs.subreceive readPump.nextPhase(readPump.running) @@ -205,7 +215,7 @@ private[akka] class OutboundTcpStreamActor(val connectCmd: Connect, val requeste val initSteps = new SubReceive(waitingExposedProcessor) - override def receive = initSteps orElse super.receive + override def activeReceive = initSteps orElse super.activeReceive def waitingExposedProcessor: Receive = { case StreamTcpManager.ExposedProcessor(processor) ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala index 70e9d858d1..8e7adef27d 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala @@ -28,7 +28,7 @@ private[akka] object TcpListenStreamActor { * INTERNAL API */ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, val settings: MaterializerSettings) extends Actor - with Pump { + with Pump with Stash { import akka.stream.io.TcpListenStreamActor._ import context.system @@ -101,7 +101,16 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, } - override def receive: Actor.Receive = primaryOutputs.subreceive orElse incomingConnections.subreceive + final override def receive = { + // FIXME using Stash mailbox is not the best for performance, we probably want a better solution to this + case ep: ExposedPublisher ⇒ + primaryOutputs.subreceive(ep) + context become activeReceive + unstashAll() + case _ ⇒ stash() + } + + def activeReceive: Actor.Receive = primaryOutputs.subreceive orElse incomingConnections.subreceive def runningPhase = TransferPhase(primaryOutputs.NeedsDemand && incomingConnections.NeedsInput) { () ⇒ val (connected: Connected, connection: ActorRef) = incomingConnections.dequeueInputElement() diff --git a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala index 8ed1071f00..7b97dbc21b 100644 --- a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala @@ -208,7 +208,7 @@ class DuctSpec extends AkkaSpec { val d1: Duct[String, Publisher[Fruit]] = Duct[String].map(_ ⇒ new Apple).splitWhen(_ ⇒ true) val d2: Duct[String, (Boolean, Publisher[Fruit])] = Duct[String].map(_ ⇒ new Apple).groupBy(_ ⇒ true) val d3: Duct[String, (immutable.Seq[Apple], Publisher[Fruit])] = Duct[String].map(_ ⇒ new Apple).prefixAndTail(1) - val s1: Subscriber[Fruit] = null + val s1: Subscriber[Fruit] = StreamTestKit.SubscriberProbe[Fruit]() val s2: Subscriber[String] = Duct[String].map(_ ⇒ new Apple).produceTo(s1, materializer) val t: Tuple2[Subscriber[String], Publisher[Fruit]] = Duct[String].map(_ ⇒ new Apple).build(materializer) }