+str #15174 Add dispatcher setting to FlowMaterializer

* Verify dispatcher with custom default mailbox
This commit is contained in:
Patrik Nordwall 2014-05-13 17:17:33 +02:00
parent a1588e2e39
commit e9a2585050
46 changed files with 183 additions and 68 deletions

View file

@ -70,7 +70,7 @@ case class PersistentPublisherSettings(fromSequenceNr: Long = 1L, maxBufferSize:
private object PersistentPublisher {
def props(processorId: String, publisherSettings: PersistentPublisherSettings, settings: MaterializerSettings): Props =
Props(classOf[PersistentPublisherImpl], processorId, publisherSettings, settings)
Props(classOf[PersistentPublisherImpl], processorId, publisherSettings, settings).withDispatcher(settings.dispatcher)
}
private case class PersistentPublisherNode(processorId: String, publisherSettings: PersistentPublisherSettings) extends ProducerNode[Persistent] {
@ -90,7 +90,8 @@ private class PersistentPublisherImpl(processorId: String, publisherSettings: Pe
type S = ActorSubscription[Persistent]
private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], processorId, publisherSettings, self), "publisherBuffer")
private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], processorId, publisherSettings, self).
withDispatcher(context.props.dispatcher), "publisherBuffer")
private var pub: ActorPublisher[Persistent] = _
private var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason

View file

@ -10,6 +10,7 @@ import akka.stream.impl.Ast
import org.reactivestreams.api.Producer
import scala.concurrent.duration._
import org.reactivestreams.api.Consumer
import akka.actor.Deploy
object FlowMaterializer {
@ -102,7 +103,8 @@ case class MaterializerSettings(
initialInputBufferSize: Int = 4,
maximumInputBufferSize: Int = 16,
upstreamSubscriptionTimeout: FiniteDuration = 3.seconds,
downstreamSubscriptionTimeout: FiniteDuration = 3.seconds) {
downstreamSubscriptionTimeout: FiniteDuration = 3.seconds,
dispatcher: String = Deploy.NoDispatcherGiven) {
private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0
require(initialFanOutBufferSize > 0, "initialFanOutBufferSize must be > 0")
@ -131,5 +133,7 @@ case class MaterializerSettings(
copy(upstreamSubscriptionTimeout = upstreamSubscriptionTimeout,
downstreamSubscriptionTimeout = downstreamSubscriptionTimeout)
def withDispatcher(dispatcher: String): MaterializerSettings = copy(dispatcher = dispatcher)
}

View file

@ -43,10 +43,11 @@ private[akka] class ActorConsumer[T]( final val impl: ActorRef) extends ActorCon
private[akka] object ActorConsumer {
import Ast._
def props(settings: MaterializerSettings, op: AstNode) = op match {
def props(settings: MaterializerSettings, op: AstNode): Props =
(op match {
case t: Transform Props(new TransformActorConsumer(settings, t.transformer))
case r: Recover Props(new RecoverActorConsumer(settings, r.recoveryTransformer))
}
}).withDispatcher(settings.dispatcher)
}
/**

View file

@ -14,7 +14,8 @@ import akka.event.LoggingReceive
*/
private[akka] object ActorProcessor {
import Ast._
def props(settings: MaterializerSettings, op: AstNode): Props = op match {
def props(settings: MaterializerSettings, op: AstNode): Props =
(op match {
case t: Transform Props(new TransformProcessorImpl(settings, t.transformer))
case r: Recover Props(new RecoverProcessorImpl(settings, r.recoveryTransformer))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
@ -23,7 +24,7 @@ private[akka] object ActorProcessor {
case z: Zip Props(new ZipImpl(settings, z.other))
case c: Concat Props(new ConcatImpl(settings, c.next))
case t: Tee Props(new TeeImpl(settings, t.other))
}
}).withDispatcher(settings.dispatcher)
}
/**

View file

@ -56,7 +56,7 @@ private[akka] class ActorProducer[T]( final val impl: ActorRef, val equalityValu
*/
private[akka] object ActorProducer {
def props[T](settings: MaterializerSettings, f: () T): Props =
Props(new ActorProducerImpl(f, settings))
Props(new ActorProducerImpl(f, settings)).withDispatcher(settings.dispatcher)
def unapply(o: Any): Option[(ActorRef, Option[AnyRef])] = o match {
case other: ActorProducer[_] Some((other.impl, other.equalityValue))

View file

@ -23,7 +23,7 @@ import akka.stream.MaterializerSettings
*/
private[akka] object FutureProducer {
def props(future: Future[Any], settings: MaterializerSettings): Props =
Props(new FutureProducer(future, settings))
Props(new FutureProducer(future, settings)).withDispatcher(settings.dispatcher)
object FutureSubscription {
case class Cancel(subscription: FutureSubscription)

View file

@ -21,7 +21,7 @@ import scala.concurrent.duration.Duration
*/
private[akka] object IterableProducer {
def props(iterable: immutable.Iterable[Any], settings: MaterializerSettings): Props =
Props(new IterableProducer(iterable, settings))
Props(new IterableProducer(iterable, settings)).withDispatcher(settings.dispatcher)
object BasicActorSubscription {
case object Cancel
@ -102,7 +102,7 @@ private[akka] class IterableProducer(iterable: immutable.Iterable[Any], settings
else {
val iterator = withCtx(context)(iterable.iterator)
val worker = context.watch(context.actorOf(IterableProducerWorker.props(iterator, subscriber,
settings.maximumInputBufferSize)))
settings.maximumInputBufferSize).withDispatcher(context.props.dispatcher)))
val subscription = new BasicActorSubscription(worker)
subscribers += subscriber
workers = workers.updated(worker, subscriber)

View file

@ -37,7 +37,8 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
private var completed: Boolean = false
private var demands: Int = 0
val substream = context.watch(context.actorOf(IdentityProcessorImpl.props(settings)))
val substream = context.watch(context.actorOf(IdentityProcessorImpl.props(settings).
withDispatcher(context.props.dispatcher)))
val processor = new ActorProcessor[AnyRef, AnyRef](substream)
override def isClosed: Boolean = completed

View file

@ -20,9 +20,9 @@ private[akka] object TcpStreamActor {
class TcpStreamException(msg: String) extends RuntimeException(msg) with NoStackTrace
def outboundProps(connectCmd: Connect, requester: ActorRef, settings: MaterializerSettings): Props =
Props(new OutboundTcpStreamActor(connectCmd, requester, settings))
Props(new OutboundTcpStreamActor(connectCmd, requester, settings)).withDispatcher(settings.dispatcher)
def inboundProps(connection: ActorRef, settings: MaterializerSettings): Props =
Props(new InboundTcpStreamActor(connection, settings))
Props(new InboundTcpStreamActor(connection, settings)).withDispatcher(settings.dispatcher)
}
/**

View file

@ -20,7 +20,7 @@ private[akka] object TcpListenStreamActor {
class TcpListenStreamException(msg: String) extends RuntimeException(msg) with NoStackTrace
def props(bindCmd: Tcp.Bind, requester: ActorRef, settings: MaterializerSettings): Props =
Props(new TcpListenStreamActor(bindCmd, requester, settings))
Props(new TcpListenStreamActor(bindCmd, requester, settings)).withDispatcher(settings.dispatcher)
case class ConnectionProducer(getPublisher: Publisher[StreamTcp.IncomingTcpConnection])
extends Producer[StreamTcp.IncomingTcpConnection] {

View file

@ -41,7 +41,7 @@ public class FlowTest {
final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = MaterializerSettings.create();
final MaterializerSettings settings = MaterializerSettings.create().withDispatcher("akka.test.stream-dispatcher");
final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test

View file

@ -0,0 +1,18 @@
# The StreamTestDefaultMailbox verifies that stream actors are using
# the dispatcher defined in MaterializerSettings. All tests should use
# MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")
# or disable this check by defining
# akka.actor.default-mailbox.mailbox-type = "akka.dispatch.UnboundedMailbox"
akka.actor.default-mailbox.mailbox-type = "akka.stream.testkit.StreamTestDefaultMailbox"
# Dispatcher for stream actors. Specified in tests with
# MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")
akka.test.stream-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 8
parallelism-max = 8
}
mailbox-requirement = "akka.dispatch.UnboundedMessageQueueSemantics"
}

View file

@ -30,7 +30,7 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb",
val numMessages = 10
val publisherSettings = PersistentPublisherSettings(idle = Some(100.millis))
val materializer = FlowMaterializer(MaterializerSettings())
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
var processor1: ActorRef = _
var processor2: ActorRef = _

View file

@ -26,7 +26,7 @@ class ActorProducerTest(_system: ActorSystem, env: TestEnvironment, publisherShu
this(ActorSystem(classOf[ActorProducerTest].getSimpleName, AkkaSpec.testConf))
}
private val materializer = FlowMaterializer(MaterializerSettings())
private val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
private def createProducer(elements: Int): Producer[Int] = {
val iter = Iterator from 1000

View file

@ -16,7 +16,7 @@ import scala.util.Failure
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DuctSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings())
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
"A Duct" must {

View file

@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class FlowCollectSpec extends AkkaSpec with ScriptedTest {
val settings = MaterializerSettings()
val settings = MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")
"A Collect" must {

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.testkit.TestProbe
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowDispatcherSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
"Flow with dispatcher setting" must {
"use the specified dispatcher" in {
val probe = TestProbe()
val p = Flow(List(1, 2, 3)).map(i
{ probe.ref ! Thread.currentThread().getName(); i }).
consume(materializer)
probe.receiveN(3) foreach {
case s: String s should startWith(system.name + "-akka.test.stream-dispatcher")
}
}
}
}

View file

@ -13,7 +13,8 @@ class FlowDropSpec extends AkkaSpec with ScriptedTest {
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
maxFanOutBufferSize = 16,
dispatcher = "akka.test.stream-dispatcher")
"A Drop" must {

View file

@ -15,7 +15,8 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest {
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
maxFanOutBufferSize = 16,
dispatcher = "akka.test.stream-dispatcher")
"A Filter" must {
@ -25,15 +26,16 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest {
}
"not blow up with high request counts" in {
val gen = FlowMaterializer(MaterializerSettings(
val materializer = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 1,
maximumInputBufferSize = 1,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 1))
maxFanOutBufferSize = 1,
dispatcher = "akka.test.stream-dispatcher"))
val probe = StreamTestKit.consumerProbe[Int]
Flow(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0).
toProducer(gen).produceTo(probe)
toProducer(materializer).produceTo(probe)
val subscription = probe.expectSubscription()
for (_ 1 to 10000) {

View file

@ -13,7 +13,8 @@ class FlowFoldSpec extends AkkaSpec with ScriptedTest {
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
maxFanOutBufferSize = 16,
dispatcher = "akka.test.stream-dispatcher")
"A Fold" must {

View file

@ -13,7 +13,8 @@ class FlowForeachSpec extends AkkaSpec with ScriptedTest {
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
maxFanOutBufferSize = 16,
dispatcher = "akka.test.stream-dispatcher")
"A Foreach" must {

View file

@ -18,7 +18,7 @@ import scala.concurrent.Promise
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowFromFutureSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings())
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
"A Flow based on a Future" must {
"produce one element from already successful Future" in {

View file

@ -16,7 +16,8 @@ class FlowGroupBySpec extends AkkaSpec {
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2))
maxFanOutBufferSize = 2,
dispatcher = "akka.test.stream-dispatcher"))
case class StreamPuppet(p: Producer[Int]) {
val probe = StreamTestKit.consumerProbe[Int]

View file

@ -14,7 +14,8 @@ class FlowGroupedSpec extends AkkaSpec with ScriptedTest {
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
maxFanOutBufferSize = 16,
dispatcher = "akka.test.stream-dispatcher")
"A Grouped" must {

View file

@ -17,7 +17,8 @@ import akka.stream.scaladsl.Flow
class FlowIterableSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))
maximumInputBufferSize = 512,
dispatcher = "akka.test.stream-dispatcher"))
"A Flow based on an iterable" must {
"produce elements" in {

View file

@ -18,7 +18,8 @@ class FlowIteratorSpec extends AkkaSpec {
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 4,
maxFanOutBufferSize = 4))
maxFanOutBufferSize = 4,
dispatcher = "akka.test.stream-dispatcher"))
"A Flow based on an iterator" must {
"produce elements" in {

View file

@ -12,7 +12,8 @@ class FlowMapConcatSpec extends AkkaSpec with ScriptedTest {
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
maxFanOutBufferSize = 16,
dispatcher = "akka.test.stream-dispatcher")
"A MapConcat" must {

View file

@ -15,7 +15,8 @@ class FlowMapSpec extends AkkaSpec with ScriptedTest {
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
maxFanOutBufferSize = 16,
dispatcher = "akka.test.stream-dispatcher")
val gen = FlowMaterializer(settings)

View file

@ -22,7 +22,8 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16))
maxFanOutBufferSize = 16,
dispatcher = "akka.test.stream-dispatcher"))
"A Flow with onComplete" must {

View file

@ -9,7 +9,7 @@ import akka.stream.testkit.StreamTestKit
class FlowProduceToConsumerSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings())
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
"A Flow with toProducer" must {

View file

@ -20,7 +20,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
maxFanOutBufferSize = 16,
dispatcher = "akka.test.stream-dispatcher")
val identity: Flow[Any] Flow[Any] = in in.map(e e)
val identity2: Flow[Any] Flow[Any] = in identity(in)

View file

@ -16,7 +16,8 @@ class FlowSplitWhenSpec extends AkkaSpec {
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2))
maxFanOutBufferSize = 2,
dispatcher = "akka.test.stream-dispatcher"))
case class StreamPuppet(p: Producer[Int]) {
val probe = StreamTestKit.consumerProbe[Int]

View file

@ -16,7 +16,8 @@ class FlowTakeSpec extends AkkaSpec with ScriptedTest {
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
maxFanOutBufferSize = 16,
dispatcher = "akka.test.stream-dispatcher")
muteDeadLetters(classOf[OnNext], OnComplete.getClass, classOf[RequestMore])()

View file

@ -15,7 +15,8 @@ class FlowTeeSpec extends AkkaSpec {
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16))
maxFanOutBufferSize = 16,
dispatcher = "akka.test.stream-dispatcher"))
"A Tee" must {

View file

@ -18,7 +18,8 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest {
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16))
maxFanOutBufferSize = 16,
dispatcher = "akka.test.stream-dispatcher"))
"A Flow with toFuture" must {

View file

@ -31,7 +31,8 @@ class FlowTransformRecoverSpec extends AkkaSpec {
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2))
maxFanOutBufferSize = 2,
dispatcher = "akka.test.stream-dispatcher"))
"A Flow with transformRecover operations" must {
"produce one-to-one transformation as expected" in {

View file

@ -21,7 +21,8 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2))
maxFanOutBufferSize = 2,
dispatcher = "akka.test.stream-dispatcher"))
"A Flow with transform operations" must {
"produce one-to-one transformation as expected" in {

View file

@ -44,7 +44,8 @@ class IdentityProcessorTest(_system: ActorSystem, env: TestEnvironment, publishe
initialInputBufferSize = inputSize,
maximumInputBufferSize = inputSize,
initialFanOutBufferSize = fanoutSize,
maxFanOutBufferSize = fanoutSize),
maxFanOutBufferSize = fanoutSize,
dispatcher = "akka.test.stream-dispatcher"),
system, system.name)
val processor = materializer.processorForNode(Ast.Transform(
@ -57,7 +58,7 @@ class IdentityProcessorTest(_system: ActorSystem, env: TestEnvironment, publishe
def createHelperPublisher(elements: Int): Publisher[Int] = {
val materializer = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))(system)
maximumInputBufferSize = 512, dispatcher = "akka.test.stream-dispatcher"))(system)
val iter = Iterator from 1000
Flow(if (elements > 0) iter take elements else iter).toProducer(materializer).getPublisher
}

View file

@ -26,7 +26,7 @@ class IterableProducerTest(_system: ActorSystem, env: TestEnvironment, publisher
}
val materializer = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))(system)
maximumInputBufferSize = 512, dispatcher = "akka.test.stream-dispatcher"))(system)
def createPublisher(elements: Int): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =

View file

@ -25,7 +25,7 @@ class IteratorProducerTest(_system: ActorSystem, env: TestEnvironment, publisher
}
val materializer = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))(system)
maximumInputBufferSize = 512, dispatcher = "akka.test.stream-dispatcher"))(system)
def createPublisher(elements: Int): Publisher[Int] = {
val iter: Iterator[Int] =

View file

@ -15,7 +15,7 @@ import akka.stream.impl.ActorBasedFlowMaterializer
class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\nakka.loglevel=INFO") {
val materializer = FlowMaterializer(MaterializerSettings())
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
def self = ActorBasedFlowMaterializer.currentActorContext().self

View file

@ -23,13 +23,13 @@ class ProcessorNamingSpec extends AkkaSpec("akka.loglevel=INFO") {
"Processors of a flow" must {
"have sensible default names for flow with one step" in {
val materializer = FlowMaterializer(MaterializerSettings())
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
Flow(List(1)).map(in { testActor ! self; in }).consume(materializer)
expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-1-map")
}
"have sensible default names for flow with several steps" in {
val materializer = FlowMaterializer(MaterializerSettings())
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
Flow(List(1)).
map(in { testActor ! self; in }).
transform(new Transformer[Int, Int] {
@ -44,19 +44,21 @@ class ProcessorNamingSpec extends AkkaSpec("akka.loglevel=INFO") {
}
"use specified flow namePrefix in materializer" in {
val materializer = FlowMaterializer(MaterializerSettings(), namePrefix = Some("myflow"))
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"),
namePrefix = Some("myflow"))
Flow(List(1)).map(in { testActor ! self; in }).consume(materializer)
expectMsgType[ActorRef].path.name should be(s"myflow-$flowCount-1-map")
}
"use specified withNamePrefix in materializer" in {
val materializer = FlowMaterializer(MaterializerSettings())
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
Flow(List(2)).map(in { testActor ! self; in }).consume(materializer.withNamePrefix("myotherflow"))
expectMsgType[ActorRef].path.name should be(s"myotherflow-$flowCount-1-map")
}
"create unique name for each materialization" in {
val materializer = FlowMaterializer(MaterializerSettings(), namePrefix = Some("myflow"))
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"),
namePrefix = Some("myflow"))
val flow = Flow(List(1)).map(in { testActor ! self; in })
flow.consume(materializer)
val name1 = expectMsgType[ActorRef].path.name

View file

@ -17,7 +17,8 @@ abstract class TwoStreamsSetup extends AkkaSpec {
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2))
maxFanOutBufferSize = 2,
dispatcher = "akka.test.stream-dispatcher"))
case class TE(message: String) extends RuntimeException(message) with NoStackTrace

View file

@ -21,7 +21,7 @@ object LogSpec {
class LogSpec extends AkkaSpec("akka.loglevel=INFO") {
import LogSpec._
val materializer = FlowMaterializer(MaterializerSettings())
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
def flowCount = FlowNameCounter(system).counter.get
def nextFlowCount = flowCount + 1

View file

@ -27,8 +27,10 @@ object TcpFlowSpec {
case object WriteAck extends Tcp.Event
def testClientProps(connection: ActorRef): Props = Props(new TestClient(connection))
def testServerProps(address: InetSocketAddress, probe: ActorRef): Props = Props(new TestServer(address, probe))
def testClientProps(connection: ActorRef): Props =
Props(new TestClient(connection)).withDispatcher("akka.test.stream-dispatcher")
def testServerProps(address: InetSocketAddress, probe: ActorRef): Props =
Props(new TestServer(address, probe)).withDispatcher("akka.test.stream-dispatcher")
class TestClient(connection: ActorRef) extends Actor {
connection ! Tcp.Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false)
@ -111,7 +113,8 @@ class TcpFlowSpec extends AkkaSpec {
initialInputBufferSize = 4,
maximumInputBufferSize = 4,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2)
maxFanOutBufferSize = 2,
dispatcher = "akka.test.stream-dispatcher")
val materializer = FlowMaterializer(settings)

View file

@ -0,0 +1,37 @@
package akka.stream.testkit
import akka.dispatch.ProducesMessageQueue
import akka.dispatch.UnboundedMailbox
import akka.dispatch.MessageQueue
import com.typesafe.config.Config
import akka.actor.ActorSystem
import akka.dispatch.MailboxType
import akka.actor.ActorRef
import akka.actor.ActorRefWithCell
import akka.stream.io.StreamTcpManager
import akka.actor.Actor
/**
* INTERNAL API
* This mailbox is only used in tests to verify that stream actors are using
* the dispatcher defined in MaterializerSettings.
*/
private[akka] case class StreamTestDefaultMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = {
owner match {
case Some(r: ActorRefWithCell)
val actorClass = r.underlying.props.actorClass
assert(actorClass != classOf[Actor], s"Don't use anonymous actor classes, actor class for $r was [${actorClass.getName}]")
// StreamTcpManager is allowed to use another dispatcher
assert(!actorClass.getName.startsWith("akka.stream.") || actorClass == classOf[StreamTcpManager],
s"$r with actor class [${actorClass.getName}] must not run on default dispatcher in tests. " +
"Have you forgot to define `props.withDispatcher` when creating the actor? " +
"""Or have you forgot to use `MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")` in the test?""")
case _
}
new UnboundedMailbox.MessageQueue
}
}