+str add akka.stream.materializer.file-io-dispatcher setting

This commit is contained in:
Johannes Rudolph 2014-11-07 15:00:50 +01:00
parent 840feb42ea
commit 4631e052f2
7 changed files with 51 additions and 40 deletions

View file

@ -0,0 +1,19 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream;
import akka.actor.ActorSystem;
import akka.stream.javadsl.AkkaJUnitActorSystemResource;
public abstract class StreamTest {
final protected ActorSystem system;
final protected FlowMaterializer materializer;
protected StreamTest(AkkaJUnitActorSystemResource actorSystemResource) {
system = actorSystemResource.getSystem();
MaterializerSettings settings = MaterializerSettings.create(system);
materializer = FlowMaterializer.create(settings, system);
}
}

View file

@ -1,10 +1,8 @@
package akka.stream.actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings;
import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Source;
import akka.stream.testkit.AkkaSpec;
@ -15,12 +13,15 @@ import org.reactivestreams.Publisher;
import static akka.stream.actor.ActorPublisherMessage.Request;
public class ActorPublisherTest {
public class ActorPublisherTest extends StreamTest {
public ActorPublisherTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ActorPublisherTest", AkkaSpec.testConf());
public static class TestPublisher extends UntypedActorPublisher<Integer> {
public static class TestPublisher extends UntypedActorPublisher<Integer> {
@Override
public void onReceive(Object msg) {
@ -35,11 +36,6 @@ public class ActorPublisherTest {
}
}
final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = MaterializerSettings.create(system);
final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test
public void mustHaveJavaAPI() {
final JavaTestKit probe = new JavaTestKit(system);

View file

@ -1,10 +1,8 @@
package akka.stream.actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings;
import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
@ -19,7 +17,10 @@ import java.util.Arrays;
import static akka.stream.actor.ActorSubscriberMessage.OnError;
import static akka.stream.actor.ActorSubscriberMessage.OnNext;
public class ActorSubscriberTest {
public class ActorSubscriberTest extends StreamTest {
public ActorSubscriberTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf());
@ -55,11 +56,6 @@ public class ActorSubscriberTest {
}
}
final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = MaterializerSettings.create(system);
final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test
public void mustHaveJavaAPI() {
final JavaTestKit probe = new JavaTestKit(system);

View file

@ -1,15 +1,13 @@
package akka.stream.javadsl;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Foreach;
import akka.dispatch.Futures;
import akka.dispatch.OnSuccess;
import akka.japi.Pair;
import akka.japi.Util;
import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings;
import akka.stream.OverflowStrategy;
import akka.stream.StreamTest;
import akka.stream.Transformer;
import akka.stream.javadsl.japi.*;
import akka.stream.testkit.AkkaSpec;
@ -32,17 +30,15 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class FlowTest {
public class FlowTest extends StreamTest {
public FlowTest() {
super(actorSystemResource);
}
@ClassRule
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest",
AkkaSpec.testConf());
final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = MaterializerSettings.create(system);
final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test
public void mustBeAbleToUseSimpleOperators() {
final JavaTestKit probe = new JavaTestKit(system);

View file

@ -3,9 +3,10 @@
*/
package akka.stream.javadsl;
import akka.actor.ActorSystem;
import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings;
import java.util.ArrayList;
import java.util.List;
import akka.stream.StreamTest;
import akka.stream.javadsl.japi.Function2;
import akka.stream.testkit.AkkaSpec;
import org.junit.ClassRule;
@ -18,17 +19,15 @@ import scala.concurrent.duration.Duration;
import java.util.ArrayList;
import java.util.List;
public class SinkTest {
public class SinkTest extends StreamTest {
public SinkTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest",
AkkaSpec.testConf());
final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = MaterializerSettings.create(system);
final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test
public void mustBeAbleToUseFanoutPublisher() throws Exception {
final KeyedSink<Object, Publisher<Object>> pubSink = Sink.fanoutPublisher(2, 2);

View file

@ -40,6 +40,9 @@ akka {
timeout = 5s
}
# Fully qualified config path which holds the dispatcher configuration
# to be used by FlowMaterialiser when creating Actors for IO operations.
file-io-dispatcher = ${akka.io.tcp.file-io-dispatcher}
}
}

View file

@ -184,7 +184,8 @@ object MaterializerSettings {
config.getInt("initial-fan-out-buffer-size"),
config.getInt("max-fan-out-buffer-size"),
config.getString("dispatcher"),
StreamSubscriptionTimeoutSettings(config))
StreamSubscriptionTimeoutSettings(config),
config.getString("file-io-dispatcher"))
/**
* Java API
@ -223,7 +224,8 @@ final case class MaterializerSettings(
initialFanOutBufferSize: Int,
maxFanOutBufferSize: Int,
dispatcher: String,
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings) {
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
fileIODispatcher: String) {
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")