=str #15859 Eliminate usage of Stash mailbox
* As reported in #16522 Stash is slow * Nothing fancy here, but should be more efficient than Stash
This commit is contained in:
parent
83e91f770b
commit
a165de74b3
6 changed files with 49 additions and 26 deletions
|
|
@ -10,6 +10,7 @@ import akka.actor.ActorRef
|
|||
import akka.actor.ActorRefWithCell
|
||||
import akka.stream.impl.io.StreamTcpManager
|
||||
import akka.actor.Actor
|
||||
import akka.stream.impl.io.TcpListenStreamActor
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -27,7 +28,8 @@ private[akka] final case class StreamTestDefaultMailbox() extends MailboxType wi
|
|||
val actorClass = r.underlying.props.actorClass
|
||||
assert(actorClass != classOf[Actor], s"Don't use anonymous actor classes, actor class for $r was [${actorClass.getName}]")
|
||||
// StreamTcpManager is allowed to use another dispatcher
|
||||
assert(!actorClass.getName.startsWith("akka.stream.") || actorClass == classOf[StreamTcpManager],
|
||||
val specialCases: Set[Class[_]] = Set(classOf[StreamTcpManager], classOf[TcpListenStreamActor])
|
||||
assert(!actorClass.getName.startsWith("akka.stream.") || specialCases(actorClass),
|
||||
s"$r with actor class [${actorClass.getName}] must not run on default dispatcher in tests. " +
|
||||
"Did you forget to define `props.withDispatcher` when creating the actor? " +
|
||||
"Or did you forget to configure the `akka.stream.materializer` setting accordingly or force the " +
|
||||
|
|
|
|||
|
|
@ -223,8 +223,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
|
|||
private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMaterializerSettings)
|
||||
extends Actor
|
||||
with ActorLogging
|
||||
with Pump
|
||||
with Stash {
|
||||
with Pump {
|
||||
|
||||
// FIXME: make pump a member
|
||||
protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) {
|
||||
|
|
@ -236,13 +235,11 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali
|
|||
/**
|
||||
* Subclass may override [[#activeReceive]]
|
||||
*/
|
||||
final override def receive = {
|
||||
// FIXME using Stash mailbox is not the best for performance, we probably want a better solution to this
|
||||
case ep: ExposedPublisher ⇒
|
||||
final override def receive = new ExposedPublisherReceive(activeReceive, unhandled) {
|
||||
override def receiveExposedPublisher(ep: ExposedPublisher): Unit = {
|
||||
primaryOutputs.subreceive(ep)
|
||||
context become activeReceive
|
||||
unstashAll()
|
||||
case _ ⇒ stash()
|
||||
}
|
||||
}
|
||||
|
||||
def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive
|
||||
|
|
|
|||
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import akka.actor.Actor
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] abstract class ExposedPublisherReceive(activeReceive: Actor.Receive, unhandled: Any ⇒ Unit) extends Actor.Receive {
|
||||
private var stash = List.empty[Any]
|
||||
|
||||
def isDefinedAt(o: Any): Boolean = true
|
||||
|
||||
def apply(o: Any): Unit = o match {
|
||||
case ep: ExposedPublisher ⇒
|
||||
receiveExposedPublisher(ep)
|
||||
if (stash.nonEmpty) {
|
||||
// we don't use sender() so this is allright
|
||||
stash.reverse.foreach { msg ⇒
|
||||
activeReceive.applyOrElse(msg, unhandled)
|
||||
}
|
||||
}
|
||||
case other ⇒
|
||||
stash ::= other
|
||||
}
|
||||
|
||||
def receiveExposedPublisher(ep: ExposedPublisher): Unit
|
||||
}
|
||||
|
|
@ -13,7 +13,6 @@ import akka.io.Tcp._
|
|||
import akka.stream.ActorFlowMaterializerSettings
|
||||
import akka.stream.StreamTcpException
|
||||
import org.reactivestreams.Processor
|
||||
import akka.actor.Stash
|
||||
import akka.stream.impl._
|
||||
import akka.actor.ActorLogging
|
||||
|
||||
|
|
@ -37,7 +36,7 @@ private[akka] object TcpStreamActor {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerSettings) extends Actor with Stash
|
||||
private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerSettings) extends Actor
|
||||
with ActorLogging {
|
||||
|
||||
import TcpStreamActor._
|
||||
|
|
@ -169,13 +168,11 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
|
|||
override protected def pumpFailed(e: Throwable): Unit = fail(e)
|
||||
}
|
||||
|
||||
final override def receive = {
|
||||
// FIXME using Stash mailbox is not the best for performance, we probably want a better solution to this
|
||||
case ep: ExposedPublisher ⇒
|
||||
final override def receive = new ExposedPublisherReceive(activeReceive, unhandled) {
|
||||
override def receiveExposedPublisher(ep: ExposedPublisher): Unit = {
|
||||
primaryOutputs.subreceive(ep)
|
||||
context become activeReceive
|
||||
unstashAll()
|
||||
case _ ⇒ stash()
|
||||
}
|
||||
}
|
||||
|
||||
def activeReceive =
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import scala.concurrent.{ Future, Promise }
|
|||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Props
|
||||
import akka.actor.Stash
|
||||
import akka.io.{ IO, Tcp }
|
||||
import akka.io.Tcp._
|
||||
import akka.stream.{ FlowMaterializer, ActorFlowMaterializerSettings }
|
||||
|
|
@ -40,7 +39,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
|
|||
unbindPromise: Promise[() ⇒ Future[Unit]],
|
||||
flowSubscriber: Subscriber[StreamTcp.IncomingConnection],
|
||||
bindCmd: Tcp.Bind, settings: ActorFlowMaterializerSettings) extends Actor
|
||||
with Pump with Stash with ActorLogging {
|
||||
with Pump with ActorLogging {
|
||||
import context.system
|
||||
|
||||
object primaryOutputs extends SimpleOutputs(self, pump = this) {
|
||||
|
|
@ -130,13 +129,11 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
|
|||
}
|
||||
}
|
||||
|
||||
final override def receive = {
|
||||
// FIXME using Stash mailbox is not the best for performance, we probably want a better solution to this
|
||||
case ep: ExposedPublisher ⇒
|
||||
final override def receive = new ExposedPublisherReceive(activeReceive, unhandled) {
|
||||
override def receiveExposedPublisher(ep: ExposedPublisher): Unit = {
|
||||
primaryOutputs.subreceive(ep)
|
||||
context become activeReceive
|
||||
unstashAll()
|
||||
case _ ⇒ stash()
|
||||
}
|
||||
}
|
||||
|
||||
def activeReceive: Actor.Receive = primaryOutputs.subreceive orElse incomingConnections.subreceive
|
||||
|
|
|
|||
|
|
@ -51,9 +51,9 @@ private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Ke
|
|||
}
|
||||
|
||||
override def to(sink: Sink[Out]): Sink[In] = sink match {
|
||||
case sp: SinkPipe[Out] ⇒ sp.prependPipe(this)
|
||||
case sp: SinkPipe[Out] ⇒ sp.prependPipe(this)
|
||||
case gs: GraphBackedSink[Out, _] ⇒ gs.prepend(this)
|
||||
case d: Sink[Out] ⇒ this.withSink(d)
|
||||
case d: Sink[Out] ⇒ this.withSink(d)
|
||||
}
|
||||
|
||||
override def join(flow: Flow[Out, In]): RunnableFlow = flow match {
|
||||
|
|
@ -99,9 +99,9 @@ private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[As
|
|||
}
|
||||
|
||||
override def to(sink: Sink[Out]): RunnableFlow = sink match {
|
||||
case sp: SinkPipe[Out] ⇒ RunnablePipe(input, sp.output, sp.ops ::: ops, keys ::: sp.keys) // FIXME raw addition of AstNodes
|
||||
case sp: SinkPipe[Out] ⇒ RunnablePipe(input, sp.output, sp.ops ::: ops, keys ::: sp.keys) // FIXME raw addition of AstNodes
|
||||
case g: GraphBackedSink[Out, _] ⇒ g.prepend(this)
|
||||
case d: Sink[Out] ⇒ this.withSink(d)
|
||||
case d: Sink[Out] ⇒ this.withSink(d)
|
||||
}
|
||||
|
||||
override def withKey(key: Key[_]): SourcePipe[Out] = SourcePipe(input, ops, keys :+ key)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue