Merge remote-tracking branch 'pr/19150' into release-2.3-dev
This commit is contained in:
commit
27353d4952
57 changed files with 99 additions and 95 deletions
|
|
@ -76,7 +76,7 @@ class MaterializationBenchmark {
|
|||
import MaterializationBenchmark._
|
||||
|
||||
implicit val system = ActorSystem("MaterializationBenchmark")
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
var flowWithMap: RunnableGraph[Unit] = _
|
||||
var graphWithJunctions: RunnableGraph[Unit] = _
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ import scala.concurrent.{Promise, Await, Future}
|
|||
class FileSourcesBenchmark {
|
||||
|
||||
implicit val system = ActorSystem("file-sources-benchmark")
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
val file: File = {
|
||||
val line = ByteString("x" * 2048 + "\n")
|
||||
|
|
|
|||
|
|
@ -77,11 +77,11 @@ class BasicDirectivesExamplesSpec extends RoutingSpec {
|
|||
"extractMaterializer-0" in {
|
||||
val route =
|
||||
path("sample") {
|
||||
extractMaterializer { mat =>
|
||||
extractMaterializer { materializer =>
|
||||
complete {
|
||||
// explicitly use the `mat` materializer:
|
||||
Source.single(s"Materialized by ${mat.##}!")
|
||||
.runWith(Sink.head)(mat)
|
||||
// explicitly use the `materializer`:
|
||||
Source.single(s"Materialized by ${materializer.##}!")
|
||||
.runWith(Sink.head)(materializer)
|
||||
}
|
||||
}
|
||||
} // default materializer will be used
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ class FileUploadDirectivesExamplesSpec extends RoutingSpec {
|
|||
// adding integers as a service ;)
|
||||
val route =
|
||||
extractRequestContext { ctx =>
|
||||
implicit val mat = ctx.materializer
|
||||
implicit val materializer = ctx.materializer
|
||||
implicit val ec = ctx.executionContext
|
||||
|
||||
fileUpload("csv") {
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ object ActorPublisherDocSpec {
|
|||
class ActorPublisherDocSpec extends AkkaSpec {
|
||||
import ActorPublisherDocSpec._
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"illustrate usage of ActorPublisher" in {
|
||||
def println(s: String): Unit =
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ object ActorSubscriberDocSpec {
|
|||
class ActorSubscriberDocSpec extends AkkaSpec {
|
||||
import ActorSubscriberDocSpec._
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"illustrate usage of ActorSubscriber" in {
|
||||
val replyTo = testActor
|
||||
|
|
|
|||
|
|
@ -130,7 +130,7 @@ object BidiFlowDocSpec {
|
|||
class BidiFlowDocSpec extends AkkaSpec with ConversionCheckedTripleEquals {
|
||||
import BidiFlowDocSpec._
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"A BidiFlow" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ import scala.concurrent.{ Future, Promise }
|
|||
class CompositionDocSpec extends AkkaSpec {
|
||||
|
||||
implicit val ec = system.dispatcher
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"nonnested flow" in {
|
||||
//#non-nested-flow
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ class FlowDocSpec extends AkkaSpec {
|
|||
import akka.stream.ActorMaterializer
|
||||
//#imports
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"source is immutable" in {
|
||||
//#source-immutable
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ class FlowErrorDocSpec extends AkkaSpec {
|
|||
|
||||
"demonstrate fail stream" in {
|
||||
//#stop
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
val source = Source(0 to 5).map(100 / _)
|
||||
val result = source.runWith(Sink.fold(0)(_ + _))
|
||||
// division by zero will fail the stream and the
|
||||
|
|
@ -35,7 +35,7 @@ class FlowErrorDocSpec extends AkkaSpec {
|
|||
case _: ArithmeticException => Supervision.Resume
|
||||
case _ => Supervision.Stop
|
||||
}
|
||||
implicit val mat = ActorMaterializer(
|
||||
implicit val materializer = ActorMaterializer(
|
||||
ActorMaterializerSettings(system).withSupervisionStrategy(decider))
|
||||
val source = Source(0 to 5).map(100 / _)
|
||||
val result = source.runWith(Sink.fold(0)(_ + _))
|
||||
|
|
@ -48,7 +48,7 @@ class FlowErrorDocSpec extends AkkaSpec {
|
|||
|
||||
"demonstrate resume section" in {
|
||||
//#resume-section
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
val decider: Supervision.Decider = {
|
||||
case _: ArithmeticException => Supervision.Resume
|
||||
case _ => Supervision.Stop
|
||||
|
|
@ -68,7 +68,7 @@ class FlowErrorDocSpec extends AkkaSpec {
|
|||
|
||||
"demonstrate restart section" in {
|
||||
//#restart-section
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
val decider: Supervision.Decider = {
|
||||
case _: IllegalArgumentException => Supervision.Restart
|
||||
case _ => Supervision.Stop
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ class FlowGraphDocSpec extends AkkaSpec {
|
|||
|
||||
implicit val ec = system.dispatcher
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"build simple graph" in {
|
||||
//format: OFF
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ class FlowStagesSpec extends AkkaSpec with ScalaFutures {
|
|||
import akka.stream.stage._
|
||||
//#import-stage
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"stages demo" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ import akka.stream.testkit.AkkaSpec
|
|||
|
||||
class GraphCyclesSpec extends AkkaSpec {
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"Cycle demonstration" must {
|
||||
val source = Source(() => Iterator.from(0))
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import scala.concurrent.duration._
|
|||
|
||||
class GraphStageDocSpec extends AkkaSpec {
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"Demonstrate creation of GraphStage boilerplate" in {
|
||||
//#boilerplate-example
|
||||
|
|
|
|||
|
|
@ -124,7 +124,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
|
|||
import TwitterStreamQuickstartDocSpec._
|
||||
import IntegrationDocSpec._
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"calling external service with mapAsync" in {
|
||||
val probe = TestProbe()
|
||||
|
|
@ -322,7 +322,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
|
|||
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
|
||||
val service = new SometimesSlowService
|
||||
|
||||
implicit val mat = ActorMaterializer(
|
||||
implicit val materializer = ActorMaterializer(
|
||||
ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
|
||||
|
||||
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
|
||||
|
|
@ -354,7 +354,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
|
|||
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
|
||||
val service = new SometimesSlowService
|
||||
|
||||
implicit val mat = ActorMaterializer(
|
||||
implicit val materializer = ActorMaterializer(
|
||||
ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
|
||||
|
||||
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ class RateTransformationDocSpec extends AkkaSpec {
|
|||
type Seq[+A] = immutable.Seq[A]
|
||||
val Seq = immutable.Seq
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"conflate should summarize" in {
|
||||
//#conflate-summarize
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import org.reactivestreams.Processor
|
|||
class ReactiveStreamsDocSpec extends AkkaSpec {
|
||||
import TwitterStreamQuickstartDocSpec._
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
//#imports
|
||||
import org.reactivestreams.Publisher
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import akka.stream.scaladsl._
|
|||
import akka.stream.testkit.AkkaSpec
|
||||
|
||||
class StreamBuffersRateSpec extends AkkaSpec {
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"Demonstrate pipelining" in {
|
||||
def println(s: Any) = ()
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
|
|||
|
||||
implicit val ec = system.dispatcher
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"build with open ports" in {
|
||||
//#simple-partial-flow-graph
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import akka.pattern
|
|||
|
||||
class StreamTestKitDocSpec extends AkkaSpec {
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"strict collection" in {
|
||||
//#strict-collection
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
|
|||
//#first-sample
|
||||
}
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"filter and map" in {
|
||||
//#first-sample
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import scala.concurrent.Future
|
|||
class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
||||
|
||||
implicit val ec = system.dispatcher
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
// silence sysout
|
||||
def println(s: String) = ()
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import scala.concurrent.Future
|
|||
class StreamTcpDocSpec extends AkkaSpec {
|
||||
|
||||
implicit val ec = system.dispatcher
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
// silence sysout
|
||||
def println(s: String) = ()
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import scala.util.{ Failure, Success }
|
|||
object EchoTestClientApp extends App {
|
||||
implicit val system = ActorSystem()
|
||||
import system.dispatcher
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
def delayedCompletion(delay: FiniteDuration): Source[Nothing, Unit] =
|
||||
Source.single(1)
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import akka.http.scaladsl.model.ws._
|
|||
object WSClientAutobahnTest extends App {
|
||||
implicit val system = ActorSystem()
|
||||
import system.dispatcher
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
val Agent = "akka-http"
|
||||
val Parallelism = 4
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ class DontLeakActorsOnFailingConnectionSpecs extends WordSpecLike with Matchers
|
|||
}""").withFallback(ConfigFactory.load())
|
||||
implicit val system = ActorSystem("DontLeakActorsOnFailingConnectionSpecs", config)
|
||||
import system.dispatcher
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
val log = Logging(system, getClass)
|
||||
|
||||
|
|
|
|||
|
|
@ -18,8 +18,8 @@ trait HttpServiceBase {
|
|||
*/
|
||||
def bindRoute(interface: String, port: Int, route: Route, system: ActorSystem): Future[ServerBinding] = {
|
||||
implicit val sys = system
|
||||
implicit val mat = ActorMaterializer()
|
||||
handleConnectionsWithRoute(interface, port, route, system, mat)
|
||||
implicit val materializer = ActorMaterializer()
|
||||
handleConnectionsWithRoute(interface, port, route, system, materializer)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -32,8 +32,8 @@ trait HttpServiceBase {
|
|||
* Uses the route to handle incoming connections and requests for the ServerBinding.
|
||||
*/
|
||||
def handleConnectionsWithRoute(interface: String, port: Int, route: Route, system: ActorSystem, materializer: Materializer): Future[ServerBinding] = {
|
||||
implicit val sys = system
|
||||
implicit val mat = materializer
|
||||
implicit val s = system
|
||||
implicit val m = materializer
|
||||
|
||||
import system.dispatcher
|
||||
val r: server.Route = RouteImplementation(route)
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import scala.concurrent.duration._
|
|||
|
||||
class StreamTestKitSpec extends AkkaSpec {
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
val ex = new Exception("Boom!")
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import scala.concurrent.duration.Duration
|
|||
|
||||
class GraphStageLogicSpec extends AkkaSpec with GraphInterpreterSpecKit with ConversionCheckedTripleEquals {
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
object emit1234 extends GraphStage[FlowShape[Int, Int]] {
|
||||
val in = Inlet[Int]("in")
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import scala.concurrent.duration._
|
|||
import scala.concurrent.{ Await, Future }
|
||||
|
||||
class TimeoutsSpec extends AkkaSpec {
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"InitialTimeout" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import scala.concurrent.Await
|
|||
import scala.concurrent.duration._
|
||||
|
||||
class ActorGraphInterpreterSpec extends AkkaSpec {
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"ActorGraphInterpreter" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import scala.concurrent.duration._
|
|||
|
||||
class KeepGoingStageSpec extends AkkaSpec {
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
trait PingCmd extends NoSerializationVerificationNeeded
|
||||
case class Register(probe: ActorRef) extends PingCmd
|
||||
|
|
|
|||
|
|
@ -94,13 +94,13 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
"use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped {
|
||||
targetFile { f ⇒
|
||||
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
|
||||
val mat = ActorMaterializer()(sys)
|
||||
val materializer = ActorMaterializer()(sys)
|
||||
implicit val timeout = Timeout(3.seconds)
|
||||
|
||||
try {
|
||||
Source(() ⇒ Iterator.continually(TestByteStrings.head)).runWith(Sink.file(f))(mat)
|
||||
Source(() ⇒ Iterator.continually(TestByteStrings.head)).runWith(Sink.file(f))(materializer)
|
||||
|
||||
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get
|
||||
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
|
||||
} finally shutdown(sys)
|
||||
|
|
@ -112,16 +112,16 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
pending
|
||||
targetFile { f ⇒
|
||||
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
|
||||
val mat = ActorMaterializer()(sys)
|
||||
val materializer = ActorMaterializer()(sys)
|
||||
implicit val timeout = Timeout(3.seconds)
|
||||
|
||||
try {
|
||||
Source(() ⇒ Iterator.continually(TestByteStrings.head))
|
||||
.to(Sink.file(f))
|
||||
.withAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher"))
|
||||
.run()(mat)
|
||||
.run()(materializer)
|
||||
|
||||
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get
|
||||
assertDispatcher(ref, "akka.actor.default-dispatcher")
|
||||
} finally shutdown(sys)
|
||||
|
|
|
|||
|
|
@ -168,13 +168,13 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
|
||||
"use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped {
|
||||
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
|
||||
val mat = ActorMaterializer()(sys)
|
||||
val materializer = ActorMaterializer()(sys)
|
||||
implicit val timeout = Timeout(500.millis)
|
||||
|
||||
try {
|
||||
val p = Source.file(manyLines).runWith(TestSink.probe)(mat)
|
||||
val p = Source.file(manyLines).runWith(TestSink.probe)(materializer)
|
||||
|
||||
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get
|
||||
try assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") finally p.cancel()
|
||||
} finally shutdown(sys)
|
||||
|
|
@ -184,15 +184,15 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
"allow overriding the dispatcher using Attributes" in {
|
||||
pending
|
||||
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
|
||||
val mat = ActorMaterializer()(sys)
|
||||
val materializer = ActorMaterializer()(sys)
|
||||
implicit val timeout = Timeout(500.millis)
|
||||
|
||||
try {
|
||||
val p = Source.file(manyLines)
|
||||
.withAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher"))
|
||||
.runWith(TestSink.probe)(mat)
|
||||
.runWith(TestSink.probe)(materializer)
|
||||
|
||||
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get
|
||||
try assertDispatcher(ref, "akka.actor.default-dispatcher") finally p.cancel()
|
||||
} finally shutdown(sys)
|
||||
|
|
|
|||
|
|
@ -226,11 +226,11 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
|
||||
"use dedicated default-blocking-io-dispatcher by default" in assertAllStagesStopped {
|
||||
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
|
||||
val mat = ActorMaterializer()(sys)
|
||||
val materializer = ActorMaterializer()(sys)
|
||||
|
||||
try {
|
||||
TestSource.probe[ByteString].runWith(Sink.inputStream())(mat)
|
||||
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
TestSource.probe[ByteString].runWith(Sink.inputStream())(materializer)
|
||||
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
val ref = expectMsgType[Children].children.find(_.path.toString contains "inputStreamSink").get
|
||||
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
|
||||
} finally shutdown(sys)
|
||||
|
|
|
|||
|
|
@ -148,11 +148,11 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
|
||||
"use dedicated default-blocking-io-dispatcher by default" in assertAllStagesStopped {
|
||||
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
|
||||
val mat = ActorMaterializer()(sys)
|
||||
val materializer = ActorMaterializer()(sys)
|
||||
|
||||
try {
|
||||
Source.outputStream().runWith(TestSink.probe[ByteString])(mat)
|
||||
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
Source.outputStream().runWith(TestSink.probe[ByteString])(materializer)
|
||||
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
val ref = expectMsgType[Children].children.find(_.path.toString contains "outputStreamSource").get
|
||||
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
|
||||
} finally shutdown(sys)
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import scala.util.control.NoStackTrace
|
|||
|
||||
class AcknowledgeSinkSpec extends AkkaSpec {
|
||||
implicit val ec = system.dispatcher
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
val ex = new RuntimeException("ex") with NoStackTrace
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import scala.concurrent._
|
|||
import akka.pattern.pipe
|
||||
|
||||
class AcknowledgeSourceSpec extends AkkaSpec {
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
implicit val ec = system.dispatcher
|
||||
|
||||
def assertSuccess(b: Boolean, fb: Future[Boolean]): Unit =
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ object ActorRefSinkSpec {
|
|||
|
||||
class ActorRefSinkSpec extends AkkaSpec {
|
||||
import ActorRefSinkSpec._
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"A ActorRefSink" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import akka.actor.PoisonPill
|
|||
import akka.actor.Status
|
||||
|
||||
class ActorRefSourceSpec extends AkkaSpec {
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"A ActorRefSource" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
|
|||
import Attributes._
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
val bidi = BidiFlow.fromFlows(
|
||||
Flow[Int].map(x ⇒ x.toLong + 2).withAttributes(name("top")),
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import akka.stream.testkit.scaladsl.TestSink
|
|||
import akka.testkit.TestLatch
|
||||
|
||||
class FlowFlattenMergeSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTripleEquals {
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
import system.dispatcher
|
||||
|
||||
def src10(i: Int) = Source(i until (i + 10))
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import akka.stream.testkit.Utils._
|
|||
import scala.concurrent.duration._
|
||||
|
||||
class FlowFoldSpec extends AkkaSpec {
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"A Fold" must {
|
||||
val input = 1 to 100
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import scala.concurrent.duration._
|
|||
|
||||
class FlowForeachSpec extends AkkaSpec {
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
import system.dispatcher
|
||||
|
||||
"A Foreach" must {
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ object FlowGraphCompileSpec {
|
|||
class FlowGraphCompileSpec extends AkkaSpec {
|
||||
import FlowGraphCompileSpec._
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
def op[In, Out]: () ⇒ PushStage[In, Out] = { () ⇒
|
||||
new PushStage[In, Out] {
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ class FlowJoinSpec extends AkkaSpec(ConfigFactory.parseString("akka.loglevel=INF
|
|||
val settings = ActorMaterializerSettings(system)
|
||||
.withInputBuffer(initialSize = 2, maxSize = 16)
|
||||
|
||||
implicit val mat = ActorMaterializer(settings)
|
||||
implicit val materializer = ActorMaterializer(settings)
|
||||
|
||||
"A Flow using join" must {
|
||||
"allow for cycles" in {
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ object FlowSectionSpec {
|
|||
|
||||
class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) {
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"A flow" can {
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,11 @@ import scala.concurrent.Await
|
|||
|
||||
class FlowSlidingSpec extends AkkaSpec with GeneratorDrivenPropertyChecks {
|
||||
import system.dispatcher
|
||||
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system))
|
||||
val settings = ActorMaterializerSettings(system)
|
||||
.withInputBuffer(initialSize = 2, maxSize = 16)
|
||||
|
||||
implicit val materializer = ActorMaterializer(settings)
|
||||
|
||||
"Sliding" must {
|
||||
import org.scalacheck.Shrink.shrinkAny
|
||||
def check(gen: Gen[(Int, Int, Int)]): Unit =
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
|
|||
val settings = ActorMaterializerSettings(system)
|
||||
.withInputBuffer(initialSize = 2, maxSize = 2)
|
||||
|
||||
implicit val mat = ActorMaterializer(settings)
|
||||
implicit val materializer = ActorMaterializer(settings)
|
||||
|
||||
val identity: Flow[Any, Any, Unit] ⇒ Flow[Any, Any, Unit] = in ⇒ in.map(e ⇒ e)
|
||||
val identity2: Flow[Any, Any, Unit] ⇒ Flow[Any, Any, Unit] = in ⇒ identity(in)
|
||||
|
|
@ -48,9 +48,9 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
|
|||
_logics: Array[GraphStageLogic],
|
||||
_shape: Shape,
|
||||
_settings: ActorMaterializerSettings,
|
||||
_mat: Materializer,
|
||||
_materializer: Materializer,
|
||||
brokenMessage: Any)
|
||||
extends ActorGraphInterpreter(_assembly, _inHandlers, _outHandlers, _logics, _shape, _settings, _mat) {
|
||||
extends ActorGraphInterpreter(_assembly, _inHandlers, _outHandlers, _logics, _shape, _settings, _materializer) {
|
||||
|
||||
import akka.stream.actor.ActorSubscriberMessage._
|
||||
|
||||
|
|
@ -76,7 +76,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
|
|||
|
||||
val (inHandlers, outHandlers, logics, _) = assembly.materialize(Attributes.none)
|
||||
|
||||
val props = Props(new BrokenActorInterpreter(assembly, inHandlers, outHandlers, logics, stage.shape, settings, mat, "a3"))
|
||||
val props = Props(new BrokenActorInterpreter(assembly, inHandlers, outHandlers, logics, stage.shape, settings, materializer, "a3"))
|
||||
.withDispatcher("akka.test.stream-dispatcher").withDeploy(Deploy.local)
|
||||
val impl = system.actorOf(props, "borken-stage-actor")
|
||||
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ object GraphStageTimersSpec {
|
|||
class GraphStageTimersSpec extends AkkaSpec {
|
||||
import GraphStageTimersSpec._
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
class TestStage(probe: ActorRef, sideChannel: SideChannel) extends SimpleLinearGraphStage[Int] {
|
||||
override def createLogic(inheritedAttributes: Attributes) = new TimerGraphStageLogic(shape) {
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import akka.stream.testkit.AkkaSpec
|
|||
import akka.stream.testkit._
|
||||
|
||||
class One2OneBidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"A One2OneBidiFlow" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import org.scalactic.ConversionCheckedTripleEquals
|
|||
class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
val source = Source(List(1, 2, 3))
|
||||
val sink = Flow[Int].grouped(10).toMat(Sink.head)(Keep.right)
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import scala.util.control.NoStackTrace
|
|||
|
||||
class SinkForeachParallelSpec extends AkkaSpec {
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"A ForeachParallel" must {
|
||||
"produce elements in the order they are ready" in assertAllStagesStopped {
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ class SinkSpec extends AkkaSpec {
|
|||
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"A Sink" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import scala.concurrent.{ Future, Promise }
|
|||
import scala.concurrent.duration._
|
||||
|
||||
class StageActorRefSpec extends AkkaSpec with ImplicitSender with ScalaFutures {
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
import StageActorRefSpec._
|
||||
import ControlProtocol._
|
||||
|
|
|
|||
|
|
@ -21,14 +21,14 @@ private[akka] final class FileSink(f: File, append: Boolean, val attributes: Att
|
|||
extends SinkModule[ByteString, Future[Long]](shape) {
|
||||
|
||||
override def create(context: MaterializationContext) = {
|
||||
val mat = ActorMaterializer.downcast(context.materializer)
|
||||
val settings = mat.effectiveSettings(context.effectiveAttributes)
|
||||
val materializer = ActorMaterializer.downcast(context.materializer)
|
||||
val settings = materializer.effectiveSettings(context.effectiveAttributes)
|
||||
|
||||
val bytesWrittenPromise = Promise[Long]()
|
||||
val props = FileSubscriber.props(f, bytesWrittenPromise, settings.maxInputBufferSize, append)
|
||||
val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher
|
||||
|
||||
val ref = mat.actorOf(context, props.withDispatcher(dispatcher))
|
||||
val ref = materializer.actorOf(context, props.withDispatcher(dispatcher))
|
||||
(akka.stream.actor.ActorSubscriber[ByteString](ref), bytesWrittenPromise.future)
|
||||
}
|
||||
|
||||
|
|
@ -48,15 +48,15 @@ private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, va
|
|||
extends SinkModule[ByteString, Future[Long]](shape) {
|
||||
|
||||
override def create(context: MaterializationContext) = {
|
||||
val mat = ActorMaterializer.downcast(context.materializer)
|
||||
val settings = mat.effectiveSettings(context.effectiveAttributes)
|
||||
val materializer = ActorMaterializer.downcast(context.materializer)
|
||||
val settings = materializer.effectiveSettings(context.effectiveAttributes)
|
||||
val bytesWrittenPromise = Promise[Long]()
|
||||
|
||||
val os = createOutput() // if it fails, we fail the materialization
|
||||
|
||||
val props = OutputStreamSubscriber.props(os, bytesWrittenPromise, settings.maxInputBufferSize)
|
||||
|
||||
val ref = mat.actorOf(context, props)
|
||||
val ref = materializer.actorOf(context, props)
|
||||
(akka.stream.actor.ActorSubscriber[ByteString](ref), bytesWrittenPromise.future)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,14 +23,14 @@ private[akka] final class FileSource(f: File, chunkSize: Int, val attributes: At
|
|||
require(chunkSize > 0, "chunkSize must be greater than 0")
|
||||
override def create(context: MaterializationContext) = {
|
||||
// FIXME rewrite to be based on GraphStage rather than dangerous downcasts
|
||||
val mat = ActorMaterializer.downcast(context.materializer)
|
||||
val settings = mat.effectiveSettings(context.effectiveAttributes)
|
||||
val materializer = ActorMaterializer.downcast(context.materializer)
|
||||
val settings = materializer.effectiveSettings(context.effectiveAttributes)
|
||||
|
||||
val bytesReadPromise = Promise[Long]()
|
||||
val props = FilePublisher.props(f, bytesReadPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize)
|
||||
val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher
|
||||
|
||||
val ref = mat.actorOf(context, props.withDispatcher(dispatcher))
|
||||
val ref = materializer.actorOf(context, props.withDispatcher(dispatcher))
|
||||
|
||||
(akka.stream.actor.ActorPublisher[ByteString](ref), bytesReadPromise.future)
|
||||
}
|
||||
|
|
@ -49,8 +49,8 @@ private[akka] final class FileSource(f: File, chunkSize: Int, val attributes: At
|
|||
private[akka] final class InputStreamSource(createInputStream: () ⇒ InputStream, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString])
|
||||
extends SourceModule[ByteString, Future[Long]](shape) {
|
||||
override def create(context: MaterializationContext) = {
|
||||
val mat = ActorMaterializer.downcast(context.materializer)
|
||||
val settings = mat.effectiveSettings(context.effectiveAttributes)
|
||||
val materializer = ActorMaterializer.downcast(context.materializer)
|
||||
val settings = materializer.effectiveSettings(context.effectiveAttributes)
|
||||
val bytesReadPromise = Promise[Long]()
|
||||
|
||||
val pub = try {
|
||||
|
|
@ -58,7 +58,7 @@ private[akka] final class InputStreamSource(createInputStream: () ⇒ InputStrea
|
|||
|
||||
val props = InputStreamPublisher.props(is, bytesReadPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize)
|
||||
|
||||
val ref = mat.actorOf(context, props)
|
||||
val ref = materializer.actorOf(context, props)
|
||||
akka.stream.actor.ActorPublisher[ByteString](ref)
|
||||
} catch {
|
||||
case ex: Exception ⇒
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue