diff --git a/akka-stream-tests/src/test/java/akka/stream/StreamTest.java b/akka-stream-tests/src/test/java/akka/stream/StreamTest.java new file mode 100644 index 0000000000..b09f2b04bb --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/StreamTest.java @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.stream; + +import akka.actor.ActorSystem; +import akka.stream.javadsl.AkkaJUnitActorSystemResource; + +public abstract class StreamTest { + final protected ActorSystem system; + final protected FlowMaterializer materializer; + + protected StreamTest(AkkaJUnitActorSystemResource actorSystemResource) { + system = actorSystemResource.getSystem(); + MaterializerSettings settings = MaterializerSettings.create(system); + materializer = FlowMaterializer.create(settings, system); + } +} diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java index 341f477194..9682d539b9 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java @@ -1,10 +1,8 @@ package akka.stream.actor; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.actor.Props; -import akka.stream.FlowMaterializer; -import akka.stream.MaterializerSettings; +import akka.stream.StreamTest; import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.Source; import akka.stream.testkit.AkkaSpec; @@ -15,12 +13,15 @@ import org.reactivestreams.Publisher; import static akka.stream.actor.ActorPublisherMessage.Request; -public class ActorPublisherTest { +public class ActorPublisherTest extends StreamTest { + public ActorPublisherTest() { + super(actorSystemResource); + } @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ActorPublisherTest", AkkaSpec.testConf()); - public static class TestPublisher extends UntypedActorPublisher { + public static class TestPublisher extends UntypedActorPublisher { @Override public void onReceive(Object msg) { @@ -35,11 +36,6 @@ public class ActorPublisherTest { } } - final ActorSystem system = actorSystemResource.getSystem(); - - final MaterializerSettings settings = MaterializerSettings.create(system); - final FlowMaterializer materializer = FlowMaterializer.create(settings, system); - @Test public void mustHaveJavaAPI() { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java index 2340385719..de5da1c7ec 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java @@ -1,10 +1,8 @@ package akka.stream.actor; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.actor.Props; -import akka.stream.FlowMaterializer; -import akka.stream.MaterializerSettings; +import akka.stream.StreamTest; import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; @@ -19,7 +17,10 @@ import java.util.Arrays; import static akka.stream.actor.ActorSubscriberMessage.OnError; import static akka.stream.actor.ActorSubscriberMessage.OnNext; -public class ActorSubscriberTest { +public class ActorSubscriberTest extends StreamTest { + public ActorSubscriberTest() { + super(actorSystemResource); + } @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf()); @@ -55,11 +56,6 @@ public class ActorSubscriberTest { } } - final ActorSystem system = actorSystemResource.getSystem(); - - final MaterializerSettings settings = MaterializerSettings.create(system); - final FlowMaterializer materializer = FlowMaterializer.create(settings, system); - @Test public void mustHaveJavaAPI() { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index dd01fcee0a..d83c37eace 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -1,15 +1,13 @@ package akka.stream.javadsl; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.dispatch.Foreach; import akka.dispatch.Futures; import akka.dispatch.OnSuccess; import akka.japi.Pair; import akka.japi.Util; -import akka.stream.FlowMaterializer; -import akka.stream.MaterializerSettings; import akka.stream.OverflowStrategy; +import akka.stream.StreamTest; import akka.stream.Transformer; import akka.stream.javadsl.japi.*; import akka.stream.testkit.AkkaSpec; @@ -32,17 +30,15 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; -public class FlowTest { +public class FlowTest extends StreamTest { + public FlowTest() { + super(actorSystemResource); + } - @ClassRule + @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf()); - final ActorSystem system = actorSystemResource.getSystem(); - - final MaterializerSettings settings = MaterializerSettings.create(system); - final FlowMaterializer materializer = FlowMaterializer.create(settings, system); - @Test public void mustBeAbleToUseSimpleOperators() { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 4a2666c4e9..25b4e7479d 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -3,9 +3,10 @@ */ package akka.stream.javadsl; -import akka.actor.ActorSystem; -import akka.stream.FlowMaterializer; -import akka.stream.MaterializerSettings; +import java.util.ArrayList; +import java.util.List; + +import akka.stream.StreamTest; import akka.stream.javadsl.japi.Function2; import akka.stream.testkit.AkkaSpec; import org.junit.ClassRule; @@ -18,17 +19,15 @@ import scala.concurrent.duration.Duration; import java.util.ArrayList; import java.util.List; -public class SinkTest { +public class SinkTest extends StreamTest { + public SinkTest() { + super(actorSystemResource); + } @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf()); - final ActorSystem system = actorSystemResource.getSystem(); - - final MaterializerSettings settings = MaterializerSettings.create(system); - final FlowMaterializer materializer = FlowMaterializer.create(settings, system); - @Test public void mustBeAbleToUseFanoutPublisher() throws Exception { final KeyedSink> pubSink = Sink.fanoutPublisher(2, 2); diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 49c055c288..c57b627d57 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -40,6 +40,9 @@ akka { timeout = 5s } + # Fully qualified config path which holds the dispatcher configuration + # to be used by FlowMaterialiser when creating Actors for IO operations. + file-io-dispatcher = ${akka.io.tcp.file-io-dispatcher} } } diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 7268e68576..8d069f2ad1 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -184,7 +184,8 @@ object MaterializerSettings { config.getInt("initial-fan-out-buffer-size"), config.getInt("max-fan-out-buffer-size"), config.getString("dispatcher"), - StreamSubscriptionTimeoutSettings(config)) + StreamSubscriptionTimeoutSettings(config), + config.getString("file-io-dispatcher")) /** * Java API @@ -223,7 +224,8 @@ final case class MaterializerSettings( initialFanOutBufferSize: Int, maxFanOutBufferSize: Int, dispatcher: String, - subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings) { + subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, + fileIODispatcher: String) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")