=str #15608 Make sure ExposedPublisher is first

This commit is contained in:
Patrik Nordwall 2014-08-20 19:43:31 +02:00
parent fedde06bec
commit 4be757e492
7 changed files with 47 additions and 13 deletions

View file

@ -243,7 +243,8 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin
extends Actor
with ActorLogging
with SoftShutdown
with Pump {
with Pump
with Stash {
// FIXME: make pump a member
protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) {
@ -258,7 +259,19 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin
}
}
override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive
/**
* Subclass may override [[#activeReceive]]
*/
final override def receive = {
// FIXME using Stash mailbox is not the best for performance, we probably want a better solution to this
case ep: ExposedPublisher
primaryOutputs.subreceive(ep)
context become activeReceive
unstashAll()
case _ stash()
}
def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive
protected def onError(e: Throwable): Unit = fail(e)

View file

@ -51,7 +51,7 @@ private[akka] class MapFutureProcessorImpl(_settings: MaterializerSettings, f: A
// keep future results arriving too early in a buffer sorted by seqNo
var orderedBuffer = TreeSet.empty[FutureElement]
override def receive = futureReceive orElse super.receive
override def activeReceive = futureReceive orElse super.activeReceive
def drainBuffer(): List[Any] = {

View file

@ -7,6 +7,7 @@ import akka.stream.MaterializerSettings
import akka.actor.{ Actor, Terminated, ActorRef }
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import akka.stream.actor.ActorSubscriber.{ OnNext, OnError, OnComplete, OnSubscribe }
import akka.actor.Stash
/**
* INTERNAL API
@ -111,7 +112,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
}
override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
}
/**
@ -159,7 +160,8 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett
}
}
override def receive = secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.subreceive
override def activeReceive: Receive =
secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.subreceive
other.subscribe(new OtherActorSubscriber(self))
@ -248,5 +250,5 @@ private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSe
super.shutdownHooks()
}
override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
}

View file

@ -48,7 +48,7 @@ private[akka] class TimerTransformerProcessorsImpl(
override def inputsAvailable: Boolean = !queue.isEmpty
}
override def receive = super.receive orElse schedulerInputs.subreceive
override def activeReceive = super.activeReceive orElse schedulerInputs.subreceive
object RunningCondition extends TransferState {
def isReady = {

View file

@ -11,6 +11,7 @@ import akka.util.ByteString
import akka.io.Tcp._
import akka.stream.MaterializerSettings
import org.reactivestreams.Processor
import akka.actor.Stash
/**
* INTERNAL API
@ -28,7 +29,7 @@ private[akka] object TcpStreamActor {
/**
* INTERNAL API
*/
private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) extends Actor {
private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) extends Actor with Stash {
import TcpStreamActor._
@ -166,7 +167,16 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings)
override protected def pumpContext: ActorRefFactory = context
}
override def receive =
final override def receive = {
// FIXME using Stash mailbox is not the best for performance, we probably want a better solution to this
case ep: ExposedPublisher
primaryOutputs.subreceive(ep)
context become activeReceive
unstashAll()
case _ stash()
}
def activeReceive =
primaryInputs.subreceive orElse primaryOutputs.subreceive orElse tcpInputs.subreceive orElse tcpOutputs.subreceive
readPump.nextPhase(readPump.running)
@ -205,7 +215,7 @@ private[akka] class OutboundTcpStreamActor(val connectCmd: Connect, val requeste
val initSteps = new SubReceive(waitingExposedProcessor)
override def receive = initSteps orElse super.receive
override def activeReceive = initSteps orElse super.activeReceive
def waitingExposedProcessor: Receive = {
case StreamTcpManager.ExposedProcessor(processor)

View file

@ -28,7 +28,7 @@ private[akka] object TcpListenStreamActor {
* INTERNAL API
*/
private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, val settings: MaterializerSettings) extends Actor
with Pump {
with Pump with Stash {
import akka.stream.io.TcpListenStreamActor._
import context.system
@ -101,7 +101,16 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef,
}
override def receive: Actor.Receive = primaryOutputs.subreceive orElse incomingConnections.subreceive
final override def receive = {
// FIXME using Stash mailbox is not the best for performance, we probably want a better solution to this
case ep: ExposedPublisher
primaryOutputs.subreceive(ep)
context become activeReceive
unstashAll()
case _ stash()
}
def activeReceive: Actor.Receive = primaryOutputs.subreceive orElse incomingConnections.subreceive
def runningPhase = TransferPhase(primaryOutputs.NeedsDemand && incomingConnections.NeedsInput) { ()
val (connected: Connected, connection: ActorRef) = incomingConnections.dequeueInputElement()

View file

@ -208,7 +208,7 @@ class DuctSpec extends AkkaSpec {
val d1: Duct[String, Publisher[Fruit]] = Duct[String].map(_ new Apple).splitWhen(_ true)
val d2: Duct[String, (Boolean, Publisher[Fruit])] = Duct[String].map(_ new Apple).groupBy(_ true)
val d3: Duct[String, (immutable.Seq[Apple], Publisher[Fruit])] = Duct[String].map(_ new Apple).prefixAndTail(1)
val s1: Subscriber[Fruit] = null
val s1: Subscriber[Fruit] = StreamTestKit.SubscriberProbe[Fruit]()
val s2: Subscriber[String] = Duct[String].map(_ new Apple).produceTo(s1, materializer)
val t: Tuple2[Subscriber[String], Publisher[Fruit]] = Duct[String].map(_ new Apple).build(materializer)
}