=str more robust dispatcher checking in FileSource specs

This commit is contained in:
Konrad Malawski 2015-04-24 11:07:26 +02:00
parent 1a5d114290
commit 0aecf21c7a
6 changed files with 52 additions and 24 deletions

View file

@ -8,9 +8,9 @@ import akka.actor.ActorSystem
import akka.actor.DeadLetterSuppression import akka.actor.DeadLetterSuppression
import akka.stream._ import akka.stream._
import akka.stream.impl._ import akka.stream.impl._
import akka.testkit.TestProbe
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.testkit.TestProbe
import org.reactivestreams.{ Publisher, Subscriber, Subscription } import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -3,14 +3,12 @@
*/ */
package akka.stream.testkit.scaladsl package akka.stream.testkit.scaladsl
import akka.stream._ import akka.actor.ActorSystem
import akka.stream.impl._
import akka.stream.OperationAttributes.none import akka.stream.OperationAttributes.none
import akka.stream._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.actor.ActorSystem;
/** /**
* Factory methods for test sinks. * Factory methods for test sinks.
*/ */

View file

@ -1,6 +1,7 @@
package akka.stream.testkit package akka.stream.testkit
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.ActorRefWithCell
import akka.stream.FlowMaterializer import akka.stream.FlowMaterializer
import akka.stream.impl._ import akka.stream.impl._
import akka.testkit.TestProbe import akka.testkit.TestProbe
@ -33,4 +34,11 @@ object Utils {
case _ block case _ block
} }
def assertDispatcher(ref: ActorRef, dispatcher: String): Unit = ref match {
case r: ActorRefWithCell
if (r.underlying.props.dispatcher != dispatcher)
throw new AssertionError(s"Expected $ref to use dispatcher [$dispatcher], yet used: [${r.underlying.props.dispatcher}]")
case _
throw new Exception(s"Unable to determine dispatcher of $ref")
}
} }

View file

@ -5,12 +5,19 @@ package akka.stream.io
import java.io.File import java.io.File
import akka.actor.{ ActorCell, ActorSystem, RepointableActorRef } import akka.actor.ActorSystem
import akka.stream.impl.ActorFlowMaterializerImpl
import akka.stream.impl.StreamSupervisor
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings, ActorOperationAttributes } import akka.stream.testkit.StreamTestKit
import akka.util.{ ByteString, Timeout } import akka.stream.ActorFlowMaterializer
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.ActorOperationAttributes
import akka.util.ByteString
import akka.util.Timeout
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
import scala.concurrent.Await import scala.concurrent.Await
@ -93,8 +100,9 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
try { try {
Source(() Iterator.continually(TestByteStrings.head)).runWith(SynchronousFileSink(f))(mat) Source(() Iterator.continually(TestByteStrings.head)).runWith(SynchronousFileSink(f))(mat)
val ref = Await.result(sys.actorSelection("/user/$a/flow-1-2*").resolveOne(), timeout.duration) mat.asInstanceOf[ActorFlowMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
ref.asInstanceOf[RepointableActorRef].underlying.asInstanceOf[ActorCell].dispatcher.id should ===("akka.stream.default-file-io-dispatcher") val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get
assertDispatcher(ref, "akka.stream.default-file-io-dispatcher")
} finally shutdown(sys) } finally shutdown(sys)
} }
} }
@ -111,8 +119,9 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
.withAttributes(ActorOperationAttributes.dispatcher("akka.actor.default-dispatcher")) .withAttributes(ActorOperationAttributes.dispatcher("akka.actor.default-dispatcher"))
.run()(mat) .run()(mat)
val ref = Await.result(sys.actorSelection("/user/$a/flow-1-2*").resolveOne(), timeout.duration) mat.asInstanceOf[ActorFlowMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
ref.asInstanceOf[RepointableActorRef].underlying.asInstanceOf[ActorCell].dispatcher.id should ===("akka.actor.default-dispatcher") val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get
assertDispatcher(ref, "akka.actor.default-dispatcher")
} finally shutdown(sys) } finally shutdown(sys)
} }
} }
@ -132,3 +141,4 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
} }
} }

View file

@ -3,16 +3,25 @@
*/ */
package akka.stream.io package akka.stream.io
import java.io.{ File, FileWriter } import java.io.File
import java.io.FileWriter
import java.util.Random import java.util.Random
import akka.actor.{ ActorCell, RepointableActorRef, ActorSystem } import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.ActorOperationAttributes
import akka.stream.OperationAttributes
import akka.stream.impl.ActorFlowMaterializerImpl
import akka.stream.impl.StreamSupervisor
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.io.SynchronousFileSourceSpec.Settings import akka.stream.io.SynchronousFileSourceSpec.Settings
import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Sink
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.{ ActorOperationAttributes, ActorFlowMaterializer, ActorFlowMaterializerSettings, OperationAttributes } import akka.stream.testkit.scaladsl.TestSink
import akka.util.{ Timeout, ByteString } import akka.util.ByteString
import akka.util.Timeout
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -157,16 +166,17 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
} }
} }
"use dedicated file-io-dispatcher by default" in { "use dedicated file-io-dispatcher by default" in assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val mat = ActorFlowMaterializer()(sys) val mat = ActorFlowMaterializer()(sys)
implicit val timeout = Timeout(500.millis) implicit val timeout = Timeout(500.millis)
try { try {
SynchronousFileSource(manyLines).runWith(Sink.ignore)(mat) val p = SynchronousFileSource(manyLines).runWith(TestSink.probe())(mat)
val ref = Await.result(sys.actorSelection("/user/$a/flow-*").resolveOne(), timeout.duration) mat.asInstanceOf[ActorFlowMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
ref.asInstanceOf[RepointableActorRef].underlying.asInstanceOf[ActorCell].dispatcher.id should ===("akka.stream.default-file-io-dispatcher") val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get
try assertDispatcher(ref, "akka.stream.default-file-io-dispatcher") finally p.cancel()
} finally shutdown(sys) } finally shutdown(sys)
} }
@ -176,12 +186,13 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
implicit val timeout = Timeout(500.millis) implicit val timeout = Timeout(500.millis)
try { try {
SynchronousFileSource(manyLines) val p = SynchronousFileSource(manyLines)
.withAttributes(ActorOperationAttributes.dispatcher("akka.actor.default-dispatcher")) .withAttributes(ActorOperationAttributes.dispatcher("akka.actor.default-dispatcher"))
.runWith(Sink.ignore)(mat) .runWith(TestSink.probe())(mat)
val ref = Await.result(sys.actorSelection("/user/$a/flow-*").resolveOne(), timeout.duration) mat.asInstanceOf[ActorFlowMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
ref.asInstanceOf[RepointableActorRef].underlying.asInstanceOf[ActorCell].dispatcher.id should ===("akka.actor.default-dispatcher") val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get
try assertDispatcher(ref, "akka.actor.default-dispatcher") finally p.cancel()
} finally shutdown(sys) } finally shutdown(sys)
} }
} }

View file

@ -29,6 +29,7 @@ object SynchronousFileSource {
*/ */
def apply(f: File, chunkSize: Int = DefaultChunkSize): Source[ByteString, Future[Long]] = def apply(f: File, chunkSize: Int = DefaultChunkSize): Source[ByteString, Future[Long]] =
new Source(new SynchronousFileSource(f, chunkSize, DefaultAttributes, Source.shape("SynchronousFileSource"))) new Source(new SynchronousFileSource(f, chunkSize, DefaultAttributes, Source.shape("SynchronousFileSource")))
.named(DefaultAttributes.nameOption.get)
/** /**
* Creates a synchronous (Java 6 compatible) Source from a Files contents. * Creates a synchronous (Java 6 compatible) Source from a Files contents.