diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 039f8c9bfc..b37d0a78b2 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -8,9 +8,9 @@ import akka.actor.ActorSystem import akka.actor.DeadLetterSuppression import akka.stream._ import akka.stream.impl._ +import akka.testkit.TestProbe import akka.stream.impl.StreamLayout.Module import akka.stream.scaladsl._ -import akka.testkit.TestProbe import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.collection.immutable import scala.concurrent.duration._ diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala index 0ae3a86e03..dc62a462f7 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala @@ -3,14 +3,12 @@ */ package akka.stream.testkit.scaladsl -import akka.stream._ -import akka.stream.impl._ +import akka.actor.ActorSystem import akka.stream.OperationAttributes.none +import akka.stream._ import akka.stream.scaladsl._ import akka.stream.testkit._ -import akka.actor.ActorSystem; - /** * Factory methods for test sinks. */ diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/Utils.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/Utils.scala index 29f439fca9..ae3666a875 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/Utils.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/Utils.scala @@ -1,6 +1,7 @@ package akka.stream.testkit import akka.actor.ActorRef +import akka.actor.ActorRefWithCell import akka.stream.FlowMaterializer import akka.stream.impl._ import akka.testkit.TestProbe @@ -33,4 +34,11 @@ object Utils { case _ ⇒ block } + def assertDispatcher(ref: ActorRef, dispatcher: String): Unit = ref match { + case r: ActorRefWithCell ⇒ + if (r.underlying.props.dispatcher != dispatcher) + throw new AssertionError(s"Expected $ref to use dispatcher [$dispatcher], yet used: [${r.underlying.props.dispatcher}]") + case _ ⇒ + throw new Exception(s"Unable to determine dispatcher of $ref") + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala index 6263fc18f4..aebdaf8bd1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala @@ -5,12 +5,19 @@ package akka.stream.io import java.io.File -import akka.actor.{ ActorCell, ActorSystem, RepointableActorRef } +import akka.actor.ActorSystem +import akka.stream.impl.ActorFlowMaterializerImpl +import akka.stream.impl.StreamSupervisor +import akka.stream.impl.StreamSupervisor.Children import akka.stream.scaladsl.Source import akka.stream.testkit._ import akka.stream.testkit.Utils._ -import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings, ActorOperationAttributes } -import akka.util.{ ByteString, Timeout } +import akka.stream.testkit.StreamTestKit +import akka.stream.ActorFlowMaterializer +import akka.stream.ActorFlowMaterializerSettings +import akka.stream.ActorOperationAttributes +import akka.util.ByteString +import akka.util.Timeout import scala.collection.mutable.ListBuffer import scala.concurrent.Await @@ -93,8 +100,9 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { try { Source(() ⇒ Iterator.continually(TestByteStrings.head)).runWith(SynchronousFileSink(f))(mat) - val ref = Await.result(sys.actorSelection("/user/$a/flow-1-2*").resolveOne(), timeout.duration) - ref.asInstanceOf[RepointableActorRef].underlying.asInstanceOf[ActorCell].dispatcher.id should ===("akka.stream.default-file-io-dispatcher") + mat.asInstanceOf[ActorFlowMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get + assertDispatcher(ref, "akka.stream.default-file-io-dispatcher") } finally shutdown(sys) } } @@ -111,8 +119,9 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { .withAttributes(ActorOperationAttributes.dispatcher("akka.actor.default-dispatcher")) .run()(mat) - val ref = Await.result(sys.actorSelection("/user/$a/flow-1-2*").resolveOne(), timeout.duration) - ref.asInstanceOf[RepointableActorRef].underlying.asInstanceOf[ActorCell].dispatcher.id should ===("akka.actor.default-dispatcher") + mat.asInstanceOf[ActorFlowMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get + assertDispatcher(ref, "akka.actor.default-dispatcher") } finally shutdown(sys) } } @@ -132,3 +141,4 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } } + diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala index 19cefa99ed..bffbb5afbd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala @@ -3,16 +3,25 @@ */ package akka.stream.io -import java.io.{ File, FileWriter } +import java.io.File +import java.io.FileWriter import java.util.Random -import akka.actor.{ ActorCell, RepointableActorRef, ActorSystem } +import akka.actor.ActorSystem +import akka.stream.ActorFlowMaterializer +import akka.stream.ActorFlowMaterializerSettings +import akka.stream.ActorOperationAttributes +import akka.stream.OperationAttributes +import akka.stream.impl.ActorFlowMaterializerImpl +import akka.stream.impl.StreamSupervisor +import akka.stream.impl.StreamSupervisor.Children import akka.stream.io.SynchronousFileSourceSpec.Settings import akka.stream.scaladsl.Sink import akka.stream.testkit._ import akka.stream.testkit.Utils._ -import akka.stream.{ ActorOperationAttributes, ActorFlowMaterializer, ActorFlowMaterializerSettings, OperationAttributes } -import akka.util.{ Timeout, ByteString } +import akka.stream.testkit.scaladsl.TestSink +import akka.util.ByteString +import akka.util.Timeout import scala.concurrent.Await import scala.concurrent.duration._ @@ -157,16 +166,17 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } } - "use dedicated file-io-dispatcher by default" in { + "use dedicated file-io-dispatcher by default" in assertAllStagesStopped { val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val mat = ActorFlowMaterializer()(sys) implicit val timeout = Timeout(500.millis) try { - SynchronousFileSource(manyLines).runWith(Sink.ignore)(mat) + val p = SynchronousFileSource(manyLines).runWith(TestSink.probe())(mat) - val ref = Await.result(sys.actorSelection("/user/$a/flow-*").resolveOne(), timeout.duration) - ref.asInstanceOf[RepointableActorRef].underlying.asInstanceOf[ActorCell].dispatcher.id should ===("akka.stream.default-file-io-dispatcher") + mat.asInstanceOf[ActorFlowMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get + try assertDispatcher(ref, "akka.stream.default-file-io-dispatcher") finally p.cancel() } finally shutdown(sys) } @@ -176,12 +186,13 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { implicit val timeout = Timeout(500.millis) try { - SynchronousFileSource(manyLines) + val p = SynchronousFileSource(manyLines) .withAttributes(ActorOperationAttributes.dispatcher("akka.actor.default-dispatcher")) - .runWith(Sink.ignore)(mat) + .runWith(TestSink.probe())(mat) - val ref = Await.result(sys.actorSelection("/user/$a/flow-*").resolveOne(), timeout.duration) - ref.asInstanceOf[RepointableActorRef].underlying.asInstanceOf[ActorCell].dispatcher.id should ===("akka.actor.default-dispatcher") + mat.asInstanceOf[ActorFlowMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get + try assertDispatcher(ref, "akka.actor.default-dispatcher") finally p.cancel() } finally shutdown(sys) } } diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala index fc6dd64ab9..a48131b155 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala @@ -29,6 +29,7 @@ object SynchronousFileSource { */ def apply(f: File, chunkSize: Int = DefaultChunkSize): Source[ByteString, Future[Long]] = new Source(new SynchronousFileSource(f, chunkSize, DefaultAttributes, Source.shape("SynchronousFileSource"))) + .named(DefaultAttributes.nameOption.get) /** * Creates a synchronous (Java 6 compatible) Source from a Files contents.