diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala index dbf4030d9d..adb2bc237f 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala @@ -10,6 +10,7 @@ import akka.actor.ActorRef import akka.actor.ActorRefWithCell import akka.stream.impl.io.StreamTcpManager import akka.actor.Actor +import akka.stream.impl.io.TcpListenStreamActor /** * INTERNAL API @@ -27,7 +28,8 @@ private[akka] final case class StreamTestDefaultMailbox() extends MailboxType wi val actorClass = r.underlying.props.actorClass assert(actorClass != classOf[Actor], s"Don't use anonymous actor classes, actor class for $r was [${actorClass.getName}]") // StreamTcpManager is allowed to use another dispatcher - assert(!actorClass.getName.startsWith("akka.stream.") || actorClass == classOf[StreamTcpManager], + val specialCases: Set[Class[_]] = Set(classOf[StreamTcpManager], classOf[TcpListenStreamActor]) + assert(!actorClass.getName.startsWith("akka.stream.") || specialCases(actorClass), s"$r with actor class [${actorClass.getName}] must not run on default dispatcher in tests. " + "Did you forget to define `props.withDispatcher` when creating the actor? " + "Or did you forget to configure the `akka.stream.materializer` setting accordingly or force the " + diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 6f1f5852f7..3d3d033947 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -223,8 +223,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMaterializerSettings) extends Actor with ActorLogging - with Pump - with Stash { + with Pump { // FIXME: make pump a member protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) { @@ -236,13 +235,11 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali /** * Subclass may override [[#activeReceive]] */ - final override def receive = { - // FIXME using Stash mailbox is not the best for performance, we probably want a better solution to this - case ep: ExposedPublisher ⇒ + final override def receive = new ExposedPublisherReceive(activeReceive, unhandled) { + override def receiveExposedPublisher(ep: ExposedPublisher): Unit = { primaryOutputs.subreceive(ep) context become activeReceive - unstashAll() - case _ ⇒ stash() + } } def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive diff --git a/akka-stream/src/main/scala/akka/stream/impl/ExposedPublisherReceive.scala b/akka-stream/src/main/scala/akka/stream/impl/ExposedPublisherReceive.scala new file mode 100644 index 0000000000..d7de0ae7d0 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/ExposedPublisherReceive.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl + +import akka.actor.Actor + +/** + * INTERNAL API + */ +private[akka] abstract class ExposedPublisherReceive(activeReceive: Actor.Receive, unhandled: Any ⇒ Unit) extends Actor.Receive { + private var stash = List.empty[Any] + + def isDefinedAt(o: Any): Boolean = true + + def apply(o: Any): Unit = o match { + case ep: ExposedPublisher ⇒ + receiveExposedPublisher(ep) + if (stash.nonEmpty) { + // we don't use sender() so this is allright + stash.reverse.foreach { msg ⇒ + activeReceive.applyOrElse(msg, unhandled) + } + } + case other ⇒ + stash ::= other + } + + def receiveExposedPublisher(ep: ExposedPublisher): Unit +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index b0894cee24..d86ce289ff 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -13,7 +13,6 @@ import akka.io.Tcp._ import akka.stream.ActorFlowMaterializerSettings import akka.stream.StreamTcpException import org.reactivestreams.Processor -import akka.actor.Stash import akka.stream.impl._ import akka.actor.ActorLogging @@ -37,7 +36,7 @@ private[akka] object TcpStreamActor { /** * INTERNAL API */ -private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerSettings) extends Actor with Stash +private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerSettings) extends Actor with ActorLogging { import TcpStreamActor._ @@ -169,13 +168,11 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS override protected def pumpFailed(e: Throwable): Unit = fail(e) } - final override def receive = { - // FIXME using Stash mailbox is not the best for performance, we probably want a better solution to this - case ep: ExposedPublisher ⇒ + final override def receive = new ExposedPublisherReceive(activeReceive, unhandled) { + override def receiveExposedPublisher(ep: ExposedPublisher): Unit = { primaryOutputs.subreceive(ep) context become activeReceive - unstashAll() - case _ ⇒ stash() + } } def activeReceive = diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index c7742a4814..295a4d2b89 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -8,7 +8,6 @@ import scala.concurrent.{ Future, Promise } import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Props -import akka.actor.Stash import akka.io.{ IO, Tcp } import akka.io.Tcp._ import akka.stream.{ FlowMaterializer, ActorFlowMaterializerSettings } @@ -40,7 +39,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket unbindPromise: Promise[() ⇒ Future[Unit]], flowSubscriber: Subscriber[StreamTcp.IncomingConnection], bindCmd: Tcp.Bind, settings: ActorFlowMaterializerSettings) extends Actor - with Pump with Stash with ActorLogging { + with Pump with ActorLogging { import context.system object primaryOutputs extends SimpleOutputs(self, pump = this) { @@ -130,13 +129,11 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket } } - final override def receive = { - // FIXME using Stash mailbox is not the best for performance, we probably want a better solution to this - case ep: ExposedPublisher ⇒ + final override def receive = new ExposedPublisherReceive(activeReceive, unhandled) { + override def receiveExposedPublisher(ep: ExposedPublisher): Unit = { primaryOutputs.subreceive(ep) context become activeReceive - unstashAll() - case _ ⇒ stash() + } } def activeReceive: Actor.Receive = primaryOutputs.subreceive orElse incomingConnections.subreceive diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala index ecd5fb97ca..c399fbc8e3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala @@ -51,9 +51,9 @@ private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Ke } override def to(sink: Sink[Out]): Sink[In] = sink match { - case sp: SinkPipe[Out] ⇒ sp.prependPipe(this) + case sp: SinkPipe[Out] ⇒ sp.prependPipe(this) case gs: GraphBackedSink[Out, _] ⇒ gs.prepend(this) - case d: Sink[Out] ⇒ this.withSink(d) + case d: Sink[Out] ⇒ this.withSink(d) } override def join(flow: Flow[Out, In]): RunnableFlow = flow match { @@ -99,9 +99,9 @@ private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[As } override def to(sink: Sink[Out]): RunnableFlow = sink match { - case sp: SinkPipe[Out] ⇒ RunnablePipe(input, sp.output, sp.ops ::: ops, keys ::: sp.keys) // FIXME raw addition of AstNodes + case sp: SinkPipe[Out] ⇒ RunnablePipe(input, sp.output, sp.ops ::: ops, keys ::: sp.keys) // FIXME raw addition of AstNodes case g: GraphBackedSink[Out, _] ⇒ g.prepend(this) - case d: Sink[Out] ⇒ this.withSink(d) + case d: Sink[Out] ⇒ this.withSink(d) } override def withKey(key: Key[_]): SourcePipe[Out] = SourcePipe(input, ops, keys :+ key)