diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala index 4dc47c4b37..d749f6b234 100644 --- a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala +++ b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala @@ -70,7 +70,7 @@ case class PersistentPublisherSettings(fromSequenceNr: Long = 1L, maxBufferSize: private object PersistentPublisher { def props(processorId: String, publisherSettings: PersistentPublisherSettings, settings: MaterializerSettings): Props = - Props(classOf[PersistentPublisherImpl], processorId, publisherSettings, settings) + Props(classOf[PersistentPublisherImpl], processorId, publisherSettings, settings).withDispatcher(settings.dispatcher) } private case class PersistentPublisherNode(processorId: String, publisherSettings: PersistentPublisherSettings) extends ProducerNode[Persistent] { @@ -90,7 +90,8 @@ private class PersistentPublisherImpl(processorId: String, publisherSettings: Pe type S = ActorSubscription[Persistent] - private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], processorId, publisherSettings, self), "publisherBuffer") + private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], processorId, publisherSettings, self). + withDispatcher(context.props.dispatcher), "publisherBuffer") private var pub: ActorPublisher[Persistent] = _ private var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 8f4b0050a9..17f6dcc7bd 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -10,6 +10,7 @@ import akka.stream.impl.Ast import org.reactivestreams.api.Producer import scala.concurrent.duration._ import org.reactivestreams.api.Consumer +import akka.actor.Deploy object FlowMaterializer { @@ -102,7 +103,8 @@ case class MaterializerSettings( initialInputBufferSize: Int = 4, maximumInputBufferSize: Int = 16, upstreamSubscriptionTimeout: FiniteDuration = 3.seconds, - downstreamSubscriptionTimeout: FiniteDuration = 3.seconds) { + downstreamSubscriptionTimeout: FiniteDuration = 3.seconds, + dispatcher: String = Deploy.NoDispatcherGiven) { private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 require(initialFanOutBufferSize > 0, "initialFanOutBufferSize must be > 0") @@ -131,5 +133,7 @@ case class MaterializerSettings( copy(upstreamSubscriptionTimeout = upstreamSubscriptionTimeout, downstreamSubscriptionTimeout = downstreamSubscriptionTimeout) + def withDispatcher(dispatcher: String): MaterializerSettings = copy(dispatcher = dispatcher) + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala index 1253f04281..6df05543b5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala @@ -43,10 +43,11 @@ private[akka] class ActorConsumer[T]( final val impl: ActorRef) extends ActorCon private[akka] object ActorConsumer { import Ast._ - def props(settings: MaterializerSettings, op: AstNode) = op match { - case t: Transform ⇒ Props(new TransformActorConsumer(settings, t.transformer)) - case r: Recover ⇒ Props(new RecoverActorConsumer(settings, r.recoveryTransformer)) - } + def props(settings: MaterializerSettings, op: AstNode): Props = + (op match { + case t: Transform ⇒ Props(new TransformActorConsumer(settings, t.transformer)) + case r: Recover ⇒ Props(new RecoverActorConsumer(settings, r.recoveryTransformer)) + }).withDispatcher(settings.dispatcher) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 21d2d4215a..1e648c3e37 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -14,16 +14,17 @@ import akka.event.LoggingReceive */ private[akka] object ActorProcessor { import Ast._ - def props(settings: MaterializerSettings, op: AstNode): Props = op match { - case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.transformer)) - case r: Recover ⇒ Props(new RecoverProcessorImpl(settings, r.recoveryTransformer)) - case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) - case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) - case m: Merge ⇒ Props(new MergeImpl(settings, m.other)) - case z: Zip ⇒ Props(new ZipImpl(settings, z.other)) - case c: Concat ⇒ Props(new ConcatImpl(settings, c.next)) - case t: Tee ⇒ Props(new TeeImpl(settings, t.other)) - } + def props(settings: MaterializerSettings, op: AstNode): Props = + (op match { + case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.transformer)) + case r: Recover ⇒ Props(new RecoverProcessorImpl(settings, r.recoveryTransformer)) + case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) + case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) + case m: Merge ⇒ Props(new MergeImpl(settings, m.other)) + case z: Zip ⇒ Props(new ZipImpl(settings, z.other)) + case c: Concat ⇒ Props(new ConcatImpl(settings, c.next)) + case t: Tee ⇒ Props(new TeeImpl(settings, t.other)) + }).withDispatcher(settings.dispatcher) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala index eda89aeaad..9e31f0e39a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala @@ -56,7 +56,7 @@ private[akka] class ActorProducer[T]( final val impl: ActorRef, val equalityValu */ private[akka] object ActorProducer { def props[T](settings: MaterializerSettings, f: () ⇒ T): Props = - Props(new ActorProducerImpl(f, settings)) + Props(new ActorProducerImpl(f, settings)).withDispatcher(settings.dispatcher) def unapply(o: Any): Option[(ActorRef, Option[AnyRef])] = o match { case other: ActorProducer[_] ⇒ Some((other.impl, other.equalityValue)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FutureProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/FutureProducer.scala index 6bbd7e904f..12b7bf3a6f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FutureProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FutureProducer.scala @@ -23,7 +23,7 @@ import akka.stream.MaterializerSettings */ private[akka] object FutureProducer { def props(future: Future[Any], settings: MaterializerSettings): Props = - Props(new FutureProducer(future, settings)) + Props(new FutureProducer(future, settings)).withDispatcher(settings.dispatcher) object FutureSubscription { case class Cancel(subscription: FutureSubscription) diff --git a/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala index 4a14757c3f..0d78f3521d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala @@ -21,7 +21,7 @@ import scala.concurrent.duration.Duration */ private[akka] object IterableProducer { def props(iterable: immutable.Iterable[Any], settings: MaterializerSettings): Props = - Props(new IterableProducer(iterable, settings)) + Props(new IterableProducer(iterable, settings)).withDispatcher(settings.dispatcher) object BasicActorSubscription { case object Cancel @@ -102,7 +102,7 @@ private[akka] class IterableProducer(iterable: immutable.Iterable[Any], settings else { val iterator = withCtx(context)(iterable.iterator) val worker = context.watch(context.actorOf(IterableProducerWorker.props(iterator, subscriber, - settings.maximumInputBufferSize))) + settings.maximumInputBufferSize).withDispatcher(context.props.dispatcher))) val subscription = new BasicActorSubscription(worker) subscribers += subscriber workers = workers.updated(worker, subscriber) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 1100af4896..f522ae999a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -37,7 +37,8 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS private var completed: Boolean = false private var demands: Int = 0 - val substream = context.watch(context.actorOf(IdentityProcessorImpl.props(settings))) + val substream = context.watch(context.actorOf(IdentityProcessorImpl.props(settings). + withDispatcher(context.props.dispatcher))) val processor = new ActorProcessor[AnyRef, AnyRef](substream) override def isClosed: Boolean = completed diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala index 4c0e9b6f20..3baa900ec7 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala @@ -20,9 +20,9 @@ private[akka] object TcpStreamActor { class TcpStreamException(msg: String) extends RuntimeException(msg) with NoStackTrace def outboundProps(connectCmd: Connect, requester: ActorRef, settings: MaterializerSettings): Props = - Props(new OutboundTcpStreamActor(connectCmd, requester, settings)) + Props(new OutboundTcpStreamActor(connectCmd, requester, settings)).withDispatcher(settings.dispatcher) def inboundProps(connection: ActorRef, settings: MaterializerSettings): Props = - Props(new InboundTcpStreamActor(connection, settings)) + Props(new InboundTcpStreamActor(connection, settings)).withDispatcher(settings.dispatcher) } /** diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala index 21fee7c0ff..dd6871677c 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala @@ -20,7 +20,7 @@ private[akka] object TcpListenStreamActor { class TcpListenStreamException(msg: String) extends RuntimeException(msg) with NoStackTrace def props(bindCmd: Tcp.Bind, requester: ActorRef, settings: MaterializerSettings): Props = - Props(new TcpListenStreamActor(bindCmd, requester, settings)) + Props(new TcpListenStreamActor(bindCmd, requester, settings)).withDispatcher(settings.dispatcher) case class ConnectionProducer(getPublisher: Publisher[StreamTcp.IncomingTcpConnection]) extends Producer[StreamTcp.IncomingTcpConnection] { diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index 95fd32080f..cbc7ddbf24 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -41,7 +41,7 @@ public class FlowTest { final ActorSystem system = actorSystemResource.getSystem(); - final MaterializerSettings settings = MaterializerSettings.create(); + final MaterializerSettings settings = MaterializerSettings.create().withDispatcher("akka.test.stream-dispatcher"); final FlowMaterializer materializer = FlowMaterializer.create(settings, system); @Test diff --git a/akka-stream/src/test/resources/reference.conf b/akka-stream/src/test/resources/reference.conf new file mode 100644 index 0000000000..80fc5fb4ab --- /dev/null +++ b/akka-stream/src/test/resources/reference.conf @@ -0,0 +1,18 @@ +# The StreamTestDefaultMailbox verifies that stream actors are using +# the dispatcher defined in MaterializerSettings. All tests should use +# MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") +# or disable this check by defining +# akka.actor.default-mailbox.mailbox-type = "akka.dispatch.UnboundedMailbox" +akka.actor.default-mailbox.mailbox-type = "akka.stream.testkit.StreamTestDefaultMailbox" + +# Dispatcher for stream actors. Specified in tests with +# MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") +akka.test.stream-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 8 + parallelism-max = 8 + } + mailbox-requirement = "akka.dispatch.UnboundedMessageQueueSemantics" +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala index 230ca428bf..39d71dd386 100644 --- a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala +++ b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala @@ -30,7 +30,7 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", val numMessages = 10 val publisherSettings = PersistentPublisherSettings(idle = Some(100.millis)) - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) var processor1: ActorRef = _ var processor2: ActorRef = _ diff --git a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala index 69add49701..306303c974 100644 --- a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala @@ -26,7 +26,7 @@ class ActorProducerTest(_system: ActorSystem, env: TestEnvironment, publisherShu this(ActorSystem(classOf[ActorProducerTest].getSimpleName, AkkaSpec.testConf)) } - private val materializer = FlowMaterializer(MaterializerSettings()) + private val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) private def createProducer(elements: Int): Producer[Int] = { val iter = Iterator from 1000 diff --git a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala index 91f4bc7561..b1383ec885 100644 --- a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala @@ -16,7 +16,7 @@ import scala.util.Failure @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class DuctSpec extends AkkaSpec { - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) "A Duct" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala index 54a0d253eb..80520ac851 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class FlowCollectSpec extends AkkaSpec with ScriptedTest { - val settings = MaterializerSettings() + val settings = MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") "A Collect" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowDispatcherSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowDispatcherSpec.scala new file mode 100644 index 0000000000..9961a9629b --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowDispatcherSpec.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.scaladsl.Flow +import akka.stream.testkit.AkkaSpec +import akka.testkit.TestProbe + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowDispatcherSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + + "Flow with dispatcher setting" must { + "use the specified dispatcher" in { + val probe = TestProbe() + val p = Flow(List(1, 2, 3)).map(i ⇒ + { probe.ref ! Thread.currentThread().getName(); i }). + consume(materializer) + probe.receiveN(3) foreach { + case s: String ⇒ s should startWith(system.name + "-akka.test.stream-dispatcher") + } + } + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala index 220b370296..e792c0c95a 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala @@ -13,7 +13,8 @@ class FlowDropSpec extends AkkaSpec with ScriptedTest { initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16) + maxFanOutBufferSize = 16, + dispatcher = "akka.test.stream-dispatcher") "A Drop" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala index ab6853e1db..5f4ba6cf2e 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala @@ -15,7 +15,8 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest { initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16) + maxFanOutBufferSize = 16, + dispatcher = "akka.test.stream-dispatcher") "A Filter" must { @@ -25,15 +26,16 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest { } "not blow up with high request counts" in { - val gen = FlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 1, maximumInputBufferSize = 1, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 1)) + maxFanOutBufferSize = 1, + dispatcher = "akka.test.stream-dispatcher")) val probe = StreamTestKit.consumerProbe[Int] Flow(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0). - toProducer(gen).produceTo(probe) + toProducer(materializer).produceTo(probe) val subscription = probe.expectSubscription() for (_ ← 1 to 10000) { diff --git a/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala index b650254814..f239951bf3 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala @@ -13,7 +13,8 @@ class FlowFoldSpec extends AkkaSpec with ScriptedTest { initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16) + maxFanOutBufferSize = 16, + dispatcher = "akka.test.stream-dispatcher") "A Fold" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowForeachSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowForeachSpec.scala index 62bcbb8935..146f5349c6 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowForeachSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowForeachSpec.scala @@ -13,7 +13,8 @@ class FlowForeachSpec extends AkkaSpec with ScriptedTest { initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16) + maxFanOutBufferSize = 16, + dispatcher = "akka.test.stream-dispatcher") "A Foreach" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowFromFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFromFutureSpec.scala index aef054d824..ca1302aa89 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowFromFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowFromFutureSpec.scala @@ -18,7 +18,7 @@ import scala.concurrent.Promise @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowFromFutureSpec extends AkkaSpec { - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) "A Flow based on a Future" must { "produce one element from already successful Future" in { diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala index 0bf58b6bcc..2fd23ceccd 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala @@ -16,7 +16,8 @@ class FlowGroupBySpec extends AkkaSpec { initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2)) + maxFanOutBufferSize = 2, + dispatcher = "akka.test.stream-dispatcher")) case class StreamPuppet(p: Producer[Int]) { val probe = StreamTestKit.consumerProbe[Int] diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala index 70bd7c4ead..a9c5a1865d 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala @@ -14,7 +14,8 @@ class FlowGroupedSpec extends AkkaSpec with ScriptedTest { initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16) + maxFanOutBufferSize = 16, + dispatcher = "akka.test.stream-dispatcher") "A Grouped" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala index ab486f6e70..8987e21e14 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala @@ -17,7 +17,8 @@ import akka.stream.scaladsl.Flow class FlowIterableSpec extends AkkaSpec { val materializer = FlowMaterializer(MaterializerSettings( - maximumInputBufferSize = 512)) + maximumInputBufferSize = 512, + dispatcher = "akka.test.stream-dispatcher")) "A Flow based on an iterable" must { "produce elements" in { diff --git a/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala index b1f542c709..286b95d5f6 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala @@ -18,7 +18,8 @@ class FlowIteratorSpec extends AkkaSpec { initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 4, - maxFanOutBufferSize = 4)) + maxFanOutBufferSize = 4, + dispatcher = "akka.test.stream-dispatcher")) "A Flow based on an iterator" must { "produce elements" in { diff --git a/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala index 62b1c9abf1..aac8579af1 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala @@ -12,7 +12,8 @@ class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16) + maxFanOutBufferSize = 16, + dispatcher = "akka.test.stream-dispatcher") "A MapConcat" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala index ae9f38c34f..3f21a72626 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala @@ -15,7 +15,8 @@ class FlowMapSpec extends AkkaSpec with ScriptedTest { initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16) + maxFanOutBufferSize = 16, + dispatcher = "akka.test.stream-dispatcher") val gen = FlowMaterializer(settings) diff --git a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala index 04bf2814ee..8784f41756 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala @@ -22,7 +22,8 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16)) + maxFanOutBufferSize = 16, + dispatcher = "akka.test.stream-dispatcher")) "A Flow with onComplete" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowProduceToConsumerSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowProduceToConsumerSpec.scala index b3a5127905..1154070819 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowProduceToConsumerSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowProduceToConsumerSpec.scala @@ -9,7 +9,7 @@ import akka.stream.testkit.StreamTestKit class FlowProduceToConsumerSpec extends AkkaSpec { - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) "A Flow with toProducer" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala index ce3f15649e..dbf6952aba 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala @@ -20,7 +20,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16) + maxFanOutBufferSize = 16, + dispatcher = "akka.test.stream-dispatcher") val identity: Flow[Any] ⇒ Flow[Any] = in ⇒ in.map(e ⇒ e) val identity2: Flow[Any] ⇒ Flow[Any] = in ⇒ identity(in) diff --git a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala index 6b81190436..e39a3c2bf7 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala @@ -16,7 +16,8 @@ class FlowSplitWhenSpec extends AkkaSpec { initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2)) + maxFanOutBufferSize = 2, + dispatcher = "akka.test.stream-dispatcher")) case class StreamPuppet(p: Producer[Int]) { val probe = StreamTestKit.consumerProbe[Int] diff --git a/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala index f8049b6d3d..0ccd7ffc9e 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala @@ -16,7 +16,8 @@ class FlowTakeSpec extends AkkaSpec with ScriptedTest { initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16) + maxFanOutBufferSize = 16, + dispatcher = "akka.test.stream-dispatcher") muteDeadLetters(classOf[OnNext], OnComplete.getClass, classOf[RequestMore])() diff --git a/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala index 98269bfc2f..782c12bd9a 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala @@ -15,7 +15,8 @@ class FlowTeeSpec extends AkkaSpec { initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16)) + maxFanOutBufferSize = 16, + dispatcher = "akka.test.stream-dispatcher")) "A Tee" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala index 51e9c082a6..6db9bde8b8 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala @@ -18,7 +18,8 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest { initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16)) + maxFanOutBufferSize = 16, + dispatcher = "akka.test.stream-dispatcher")) "A Flow with toFuture" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala index 9d4fe7ffe6..e72de28a99 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala @@ -31,7 +31,8 @@ class FlowTransformRecoverSpec extends AkkaSpec { initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2)) + maxFanOutBufferSize = 2, + dispatcher = "akka.test.stream-dispatcher")) "A Flow with transformRecover operations" must { "produce one-to-one transformation as expected" in { diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala index 7f69d2e461..f75709d0c9 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala @@ -21,7 +21,8 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2)) + maxFanOutBufferSize = 2, + dispatcher = "akka.test.stream-dispatcher")) "A Flow with transform operations" must { "produce one-to-one transformation as expected" in { diff --git a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala index 20473e3bc1..b4847c0932 100644 --- a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala @@ -44,7 +44,8 @@ class IdentityProcessorTest(_system: ActorSystem, env: TestEnvironment, publishe initialInputBufferSize = inputSize, maximumInputBufferSize = inputSize, initialFanOutBufferSize = fanoutSize, - maxFanOutBufferSize = fanoutSize), + maxFanOutBufferSize = fanoutSize, + dispatcher = "akka.test.stream-dispatcher"), system, system.name) val processor = materializer.processorForNode(Ast.Transform( @@ -57,7 +58,7 @@ class IdentityProcessorTest(_system: ActorSystem, env: TestEnvironment, publishe def createHelperPublisher(elements: Int): Publisher[Int] = { val materializer = FlowMaterializer(MaterializerSettings( - maximumInputBufferSize = 512))(system) + maximumInputBufferSize = 512, dispatcher = "akka.test.stream-dispatcher"))(system) val iter = Iterator from 1000 Flow(if (elements > 0) iter take elements else iter).toProducer(materializer).getPublisher } diff --git a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala index 4e8f62796c..cbd9fad7c9 100644 --- a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala @@ -26,7 +26,7 @@ class IterableProducerTest(_system: ActorSystem, env: TestEnvironment, publisher } val materializer = FlowMaterializer(MaterializerSettings( - maximumInputBufferSize = 512))(system) + maximumInputBufferSize = 512, dispatcher = "akka.test.stream-dispatcher"))(system) def createPublisher(elements: Int): Publisher[Int] = { val iterable: immutable.Iterable[Int] = diff --git a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala index 1bc86dc0d5..d5dba584f2 100644 --- a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala @@ -25,7 +25,7 @@ class IteratorProducerTest(_system: ActorSystem, env: TestEnvironment, publisher } val materializer = FlowMaterializer(MaterializerSettings( - maximumInputBufferSize = 512))(system) + maximumInputBufferSize = 512, dispatcher = "akka.test.stream-dispatcher"))(system) def createPublisher(elements: Int): Publisher[Int] = { val iter: Iterator[Int] = diff --git a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala index 254923b466..0f8a9de9b2 100644 --- a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala @@ -15,7 +15,7 @@ import akka.stream.impl.ActorBasedFlowMaterializer class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\nakka.loglevel=INFO") { - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) def self = ActorBasedFlowMaterializer.currentActorContext().self diff --git a/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala b/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala index 7b795c5133..b7add3a934 100644 --- a/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala @@ -23,13 +23,13 @@ class ProcessorNamingSpec extends AkkaSpec("akka.loglevel=INFO") { "Processors of a flow" must { "have sensible default names for flow with one step" in { - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) Flow(List(1)).map(in ⇒ { testActor ! self; in }).consume(materializer) expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-1-map") } "have sensible default names for flow with several steps" in { - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) Flow(List(1)). map(in ⇒ { testActor ! self; in }). transform(new Transformer[Int, Int] { @@ -44,19 +44,21 @@ class ProcessorNamingSpec extends AkkaSpec("akka.loglevel=INFO") { } "use specified flow namePrefix in materializer" in { - val materializer = FlowMaterializer(MaterializerSettings(), namePrefix = Some("myflow")) + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"), + namePrefix = Some("myflow")) Flow(List(1)).map(in ⇒ { testActor ! self; in }).consume(materializer) expectMsgType[ActorRef].path.name should be(s"myflow-$flowCount-1-map") } "use specified withNamePrefix in materializer" in { - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) Flow(List(2)).map(in ⇒ { testActor ! self; in }).consume(materializer.withNamePrefix("myotherflow")) expectMsgType[ActorRef].path.name should be(s"myotherflow-$flowCount-1-map") } "create unique name for each materialization" in { - val materializer = FlowMaterializer(MaterializerSettings(), namePrefix = Some("myflow")) + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"), + namePrefix = Some("myflow")) val flow = Flow(List(1)).map(in ⇒ { testActor ! self; in }) flow.consume(materializer) val name1 = expectMsgType[ActorRef].path.name diff --git a/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala b/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala index 92504a7031..2cabf80be6 100644 --- a/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala +++ b/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala @@ -17,7 +17,8 @@ abstract class TwoStreamsSetup extends AkkaSpec { initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2)) + maxFanOutBufferSize = 2, + dispatcher = "akka.test.stream-dispatcher")) case class TE(message: String) extends RuntimeException(message) with NoStackTrace diff --git a/akka-stream/src/test/scala/akka/stream/extra/LogSpec.scala b/akka-stream/src/test/scala/akka/stream/extra/LogSpec.scala index 476dae640f..a071af5a41 100644 --- a/akka-stream/src/test/scala/akka/stream/extra/LogSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/extra/LogSpec.scala @@ -21,7 +21,7 @@ object LogSpec { class LogSpec extends AkkaSpec("akka.loglevel=INFO") { import LogSpec._ - val materializer = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) def flowCount = FlowNameCounter(system).counter.get def nextFlowCount = flowCount + 1 diff --git a/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala b/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala index 08b3e950e7..6dd4b3b1ca 100644 --- a/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala @@ -27,8 +27,10 @@ object TcpFlowSpec { case object WriteAck extends Tcp.Event - def testClientProps(connection: ActorRef): Props = Props(new TestClient(connection)) - def testServerProps(address: InetSocketAddress, probe: ActorRef): Props = Props(new TestServer(address, probe)) + def testClientProps(connection: ActorRef): Props = + Props(new TestClient(connection)).withDispatcher("akka.test.stream-dispatcher") + def testServerProps(address: InetSocketAddress, probe: ActorRef): Props = + Props(new TestServer(address, probe)).withDispatcher("akka.test.stream-dispatcher") class TestClient(connection: ActorRef) extends Actor { connection ! Tcp.Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false) @@ -111,7 +113,8 @@ class TcpFlowSpec extends AkkaSpec { initialInputBufferSize = 4, maximumInputBufferSize = 4, initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2) + maxFanOutBufferSize = 2, + dispatcher = "akka.test.stream-dispatcher") val materializer = FlowMaterializer(settings) diff --git a/akka-stream/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala new file mode 100644 index 0000000000..ccb56a2228 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala @@ -0,0 +1,37 @@ +package akka.stream.testkit + +import akka.dispatch.ProducesMessageQueue +import akka.dispatch.UnboundedMailbox +import akka.dispatch.MessageQueue +import com.typesafe.config.Config +import akka.actor.ActorSystem +import akka.dispatch.MailboxType +import akka.actor.ActorRef +import akka.actor.ActorRefWithCell +import akka.stream.io.StreamTcpManager +import akka.actor.Actor + +/** + * INTERNAL API + * This mailbox is only used in tests to verify that stream actors are using + * the dispatcher defined in MaterializerSettings. + */ +private[akka] case class StreamTestDefaultMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] { + + def this(settings: ActorSystem.Settings, config: Config) = this() + + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { + owner match { + case Some(r: ActorRefWithCell) ⇒ + val actorClass = r.underlying.props.actorClass + assert(actorClass != classOf[Actor], s"Don't use anonymous actor classes, actor class for $r was [${actorClass.getName}]") + // StreamTcpManager is allowed to use another dispatcher + assert(!actorClass.getName.startsWith("akka.stream.") || actorClass == classOf[StreamTcpManager], + s"$r with actor class [${actorClass.getName}] must not run on default dispatcher in tests. " + + "Have you forgot to define `props.withDispatcher` when creating the actor? " + + """Or have you forgot to use `MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")` in the test?""") + case _ ⇒ + } + new UnboundedMailbox.MessageQueue + } +} \ No newline at end of file