diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index dc8f453c56..49b5af2e3a 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -609,7 +609,7 @@ private[http] object HttpServerBluePrint { }) private var activeTimers = 0 - private def timeout = ActorMaterializer.downcast(materializer).settings.subscriptionTimeoutSettings.timeout + private def timeout = ActorMaterializerHelper.downcast(materializer).settings.subscriptionTimeoutSettings.timeout private def addTimeout(s: SubscriptionTimeout): Unit = { if (activeTimers == 0) setKeepGoing(true) activeTimers += 1 diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/RequestContextImpl.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/RequestContextImpl.scala index 2e17c3cbd0..b477b3858d 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/RequestContextImpl.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/RequestContextImpl.scala @@ -4,11 +4,11 @@ package akka.http.scaladsl.server -import scala.concurrent.{ Future, ExecutionContextExecutor } -import akka.stream.{ ActorMaterializer, Materializer } +import scala.concurrent.{ExecutionContextExecutor, Future} +import akka.stream.{ActorMaterializer, ActorMaterializerHelper, Materializer} import akka.event.LoggingAdapter -import akka.http.scaladsl.settings.{ RoutingSettings, ParserSettings } -import akka.http.scaladsl.marshalling.{ Marshal, ToResponseMarshallable } +import akka.http.scaladsl.settings.{ParserSettings, RoutingSettings} +import akka.http.scaladsl.marshalling.{Marshal, ToResponseMarshallable} import akka.http.scaladsl.model._ import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.util.FastFuture._ @@ -29,7 +29,7 @@ private[http] class RequestContextImpl( this(request, request.uri.path, ec, materializer, log, settings, parserSettings) def this(request: HttpRequest, log: LoggingAdapter, settings: RoutingSettings)(implicit ec: ExecutionContextExecutor, materializer: Materializer) = - this(request, request.uri.path, ec, materializer, log, settings, ParserSettings(ActorMaterializer.downcast(materializer).system)) + this(request, request.uri.path, ec, materializer, log, settings, ParserSettings(ActorMaterializerHelper.downcast(materializer).system)) def reconfigure(executionContext: ExecutionContextExecutor, materializer: Materializer, log: LoggingAdapter, settings: RoutingSettings): RequestContext = copy(executionContext = executionContext, materializer = materializer, log = log, routingSettings = settings) diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/Route.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/Route.scala index 41f6c8bf49..d1de8d5a44 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/Route.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/Route.scala @@ -5,12 +5,12 @@ package akka.http.scaladsl.server import akka.NotUsed -import akka.http.scaladsl.settings.{ RoutingSettings, ParserSettings } -import akka.stream.{ ActorMaterializer, Materializer } +import akka.http.scaladsl.settings.{ParserSettings, RoutingSettings} +import akka.stream.{ActorMaterializer, ActorMaterializerHelper, Materializer} -import scala.concurrent.{ ExecutionContextExecutor, Future } +import scala.concurrent.{ExecutionContextExecutor, Future} import akka.stream.scaladsl.Flow -import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.http.scaladsl.util.FastFuture._ object Route { @@ -66,7 +66,7 @@ object Route { { implicit val executionContext = effectiveEC // overrides parameter - val effectiveParserSettings = if (parserSettings ne null) parserSettings else ParserSettings(ActorMaterializer.downcast(materializer).system) + val effectiveParserSettings = if (parserSettings ne null) parserSettings else ParserSettings(ActorMaterializerHelper.downcast(materializer).system) val sealedRoute = seal(route) request ⇒ diff --git a/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/MultipartUnmarshallers.scala index 38aa5efe3b..8bf35e5656 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/MultipartUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/MultipartUnmarshallers.scala @@ -9,8 +9,8 @@ import akka.http.scaladsl.settings.ParserSettings import scala.collection.immutable import scala.collection.immutable.VectorBuilder import akka.util.ByteString -import akka.event.{ NoLogging, LoggingAdapter } -import akka.stream.ActorMaterializer +import akka.event.{LoggingAdapter, NoLogging} +import akka.stream.{ActorMaterializer, ActorMaterializerHelper} import akka.stream.impl.fusing.IteratorInterpreter import akka.stream.scaladsl._ import akka.http.impl.engine.parsing.BodyPartParser @@ -75,7 +75,7 @@ trait MultipartUnmarshallers { FastFuture.failed(new RuntimeException("Content-Type with a multipart media type must have a 'boundary' parameter")) case Some(boundary) ⇒ import BodyPartParser._ - val effectiveParserSettings = Option(parserSettings).getOrElse(ParserSettings(ActorMaterializer.downcast(mat).system)) + val effectiveParserSettings = Option(parserSettings).getOrElse(ParserSettings(ActorMaterializerHelper.downcast(mat).system)) val parser = new BodyPartParser(defaultContentType, boundary, log, effectiveParserSettings) FastFuture.successful { entity match { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala index 63b25ec15b..7f829cc215 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala @@ -3,10 +3,10 @@ */ package akka.stream.impl +import akka.stream._ import akka.stream.scaladsl._ import akka.testkit.AkkaSpec -import org.reactivestreams.{ Subscription, Subscriber, Publisher } -import akka.stream._ +import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.concurrent.duration._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala index a86294da6f..3f8521f030 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala @@ -9,9 +9,9 @@ import akka.stream.ActorAttributes._ import akka.stream.Attributes.LogLevels import akka.stream.Supervision._ import akka.stream.testkit.ScriptedTest -import akka.stream.javadsl -import akka.stream.{ ActorMaterializer, Materializer, Attributes } +import akka.stream._ import akka.testkit.TestProbe + import scala.concurrent.duration._ import scala.concurrent.Await import scala.util.control.NoStackTrace @@ -29,7 +29,7 @@ class FlowLogSpec extends AkkaSpec("akka.loglevel = DEBUG") with ScriptedTest { "A Log" must { - val supervisorPath = ActorMaterializer.downcast(mat).supervisor.path + val supervisorPath = ActorMaterializerHelper.downcast(mat).supervisor.path val LogSrc = s"akka.stream.Log($supervisorPath)" val LogClazz = classOf[Materializer] diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index 1e7ede7b60..848de90e74 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -5,7 +5,7 @@ package akka.stream import java.util.Locale import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.{ AtomicBoolean } +import java.util.concurrent.atomic.AtomicBoolean import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props } import akka.event.LoggingAdapter @@ -16,6 +16,7 @@ import com.typesafe.config.Config import scala.concurrent.duration._ import akka.japi.function +import akka.stream.impl.fusing.GraphInterpreterShell import scala.util.control.NoStackTrace @@ -126,6 +127,12 @@ object ActorMaterializer { system } +} + +/** + * INTERNAL API + */ +private[akka] object ActorMaterializerHelper { /** * INTERNAL API */ @@ -163,21 +170,23 @@ abstract class ActorMaterializer extends Materializer { def isShutdown: Boolean /** - * INTERNAL API: this might become public later + * INTERNAL API */ private[akka] def actorOf(context: MaterializationContext, props: Props): ActorRef /** * INTERNAL API */ - private[akka] def system: ActorSystem + def system: ActorSystem /** * INTERNAL API */ private[akka] def logger: LoggingAdapter - /** INTERNAL API */ + /** + * INTERNAL API + */ private[akka] def supervisor: ActorRef } diff --git a/akka-stream/src/main/scala/akka/stream/Materializer.scala b/akka-stream/src/main/scala/akka/stream/Materializer.scala index 5b0a6b3729..4467c21944 100644 --- a/akka-stream/src/main/scala/akka/stream/Materializer.scala +++ b/akka-stream/src/main/scala/akka/stream/Materializer.scala @@ -73,11 +73,10 @@ private[akka] object NoMaterializer extends Materializer { } /** - * INTERNAL API: this might become public later * * Context parameter to the `create` methods of sources and sinks. */ -private[akka] case class MaterializationContext( +case class MaterializationContext( materializer: Materializer, effectiveAttributes: Attributes, stageName: String) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index a8077aa412..a3eb77d2f1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -3,19 +3,21 @@ */ package akka.stream.impl -import java.util.concurrent.atomic.{ AtomicBoolean } +import java.util.concurrent.atomic.AtomicBoolean import java.{ util ⇒ ju } + import akka.NotUsed import akka.actor._ -import akka.event.Logging +import akka.event.{ Logging, LoggingAdapter } import akka.dispatch.Dispatchers import akka.pattern.ask import akka.stream._ -import akka.stream.impl.StreamLayout.{ Module, AtomicModule } +import akka.stream.impl.StreamLayout.{ AtomicModule, Module } import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule } import akka.stream.impl.io.TLSActor import akka.stream.impl.io.TlsModule import org.reactivestreams._ + import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Await, ExecutionContextExecutor } import akka.stream.impl.fusing.GraphStageModule @@ -23,6 +25,37 @@ import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly import akka.stream.impl.fusing.Fusing import akka.stream.impl.fusing.GraphInterpreterShell +/** + * ExtendedActorMaterializer used by subtypes which materializer using GraphInterpreterShell + */ +abstract class ExtendedActorMaterializer extends ActorMaterializer { + + override def withNamePrefix(name: String): ExtendedActorMaterializer + + /** + * INTERNAL API + */ + def materialize[Mat]( + _runnableGraph: Graph[ClosedShape, Mat], + subflowFuser: GraphInterpreterShell ⇒ ActorRef): Mat + + /** + * INTERNAL API + */ + def actorOf(context: MaterializationContext, props: Props): ActorRef + + /** + * INTERNAL API + */ + override def logger: LoggingAdapter + + /** + * INTERNAL API + */ + override def supervisor: ActorRef + +} + /** * INTERNAL API */ @@ -32,7 +65,7 @@ private[akka] case class ActorMaterializerImpl( dispatchers: Dispatchers, supervisor: ActorRef, haveShutDown: AtomicBoolean, - flowNames: SeqActorName) extends ActorMaterializer { + flowNames: SeqActorName) extends ExtendedActorMaterializer { import akka.stream.impl.Stages._ private val _logger = Logging.getLogger(system, this) override def logger = _logger @@ -79,7 +112,7 @@ private[akka] case class ActorMaterializerImpl( override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat = materialize(_runnableGraph, null) - private[stream] def materialize[Mat]( + override def materialize[Mat]( _runnableGraph: Graph[ClosedShape, Mat], subflowFuser: GraphInterpreterShell ⇒ ActorRef): Mat = { val runnableGraph = @@ -213,7 +246,7 @@ private[akka] case class ActorMaterializerImpl( } -private[akka] class SubFusingActorMaterializerImpl(val delegate: ActorMaterializerImpl, registerShell: GraphInterpreterShell ⇒ ActorRef) extends Materializer { +private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ⇒ ActorRef) extends Materializer { override def executionContext: ExecutionContextExecutor = delegate.executionContext override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = delegate.materialize(runnable, registerShell) @@ -223,7 +256,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ActorMaterializ override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable = delegate.schedulePeriodically(initialDelay, interval, task) - def withNamePrefix(name: String): SubFusingActorMaterializerImpl = + override def withNamePrefix(name: String): SubFusingActorMaterializerImpl = new SubFusingActorMaterializerImpl(delegate.withNamePrefix(name), registerShell) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index ae5771f15d..21afc53aa4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -94,7 +94,7 @@ private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: So private[akka] final class ActorPublisherSource[Out](props: Props, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { override def create(context: MaterializationContext) = { - val publisherRef = ActorMaterializer.downcast(context.materializer).actorOf(context, props) + val publisherRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props) (akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef) } @@ -113,7 +113,7 @@ private[akka] final class ActorRefSource[Out]( override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)" override def create(context: MaterializationContext) = { - val mat = ActorMaterializer.downcast(context.materializer) + val mat = ActorMaterializerHelper.downcast(context.materializer) val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings)) (akka.stream.actor.ActorPublisher[Out](ref), ref) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 0c4295627a..840f6a5d80 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -99,7 +99,7 @@ private[akka] final class FanoutPublisherSink[In]( extends SinkModule[In, Publisher[In]](shape) { override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = { - val actorMaterializer = ActorMaterializer.downcast(context.materializer) + val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer) val impl = actorMaterializer.actorOf( context, FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(attributes))) @@ -124,7 +124,7 @@ private[akka] final class FanoutPublisherSink[In]( private[akka] final class SinkholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Done]](shape) { override def create(context: MaterializationContext) = { - val effectiveSettings = ActorMaterializer.downcast(context.materializer).effectiveSettings(context.effectiveAttributes) + val effectiveSettings = ActorMaterializerHelper.downcast(context.materializer).effectiveSettings(context.effectiveAttributes) val p = Promise[Done]() (new SinkholeSubscriber[Any](p), p.future) } @@ -163,7 +163,7 @@ private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShap private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) { override def create(context: MaterializationContext) = { - val subscriberRef = ActorMaterializer.downcast(context.materializer).actorOf(context, props) + val subscriberRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props) (akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef) } @@ -179,7 +179,7 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) { override def create(context: MaterializationContext) = { - val actorMaterializer = ActorMaterializer.downcast(context.materializer) + val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer) val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes) val subscriberRef = actorMaterializer.actorOf( context, diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index a646eadf02..41128e3ad5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -801,7 +801,7 @@ private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Pub /** * INERNAL API */ -private[stream] object MaterializerSession { +object MaterializerSession { class MaterializationPanic(cause: Throwable) extends RuntimeException("Materialization aborted.", cause) with NoStackTrace final val Debug = false @@ -810,7 +810,7 @@ private[stream] object MaterializerSession { /** * INTERNAL API */ -private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Module, val initialAttributes: Attributes) { +abstract class MaterializerSession(val topLevel: StreamLayout.Module, val initialAttributes: Attributes) { import StreamLayout._ // the contained maps store either Subscriber[Any] or VirtualPublisher, but the type system cannot express that @@ -839,7 +839,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo // Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies // of the same module. // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter - private def enterScope(enclosing: CopiedModule): Unit = { + protected def enterScope(enclosing: CopiedModule): Unit = { if (MaterializerSession.Debug) println(f"entering scope [${System.identityHashCode(enclosing)}%08x]") subscribersStack ::= new ju.HashMap publishersStack ::= new ju.HashMap @@ -851,7 +851,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo // them to the copied ports instead of the original ones (since there might be multiple copies of the same module // leading to port identity collisions) // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter - private def exitScope(enclosing: CopiedModule): Unit = { + protected def exitScope(enclosing: CopiedModule): Unit = { if (MaterializerSession.Debug) println(f"exiting scope [${System.identityHashCode(enclosing)}%08x]") val scopeSubscribers = subscribers val scopePublishers = publishers @@ -969,7 +969,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo ret } - final protected def assignPort(in: InPort, subscriberOrVirtual: AnyRef): Unit = { + protected def assignPort(in: InPort, subscriberOrVirtual: AnyRef): Unit = { subscribers.put(in, subscriberOrVirtual) currentLayout.upstreams.get(in) match { @@ -981,7 +981,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo } } - final protected def assignPort(out: OutPort, publisher: Publisher[Any]): Unit = { + protected def assignPort(out: OutPort, publisher: Publisher[Any]): Unit = { publishers.put(out, publisher) currentLayout.downstreams.get(out) match { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 3f6c9a0de8..d50078b098 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -5,20 +5,19 @@ package akka.stream.impl.fusing import java.util import java.util.concurrent.TimeoutException + import akka.actor._ import akka.event.Logging import akka.stream._ -import akka.stream.impl._ import akka.stream.impl.ReactiveStreamsCompliance._ -import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module, AtomicModule } -import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic, GraphAssembly } +import akka.stream.impl.StreamLayout.{ AtomicModule, CompositeModule, CopiedModule, Module } +import akka.stream.impl.{ SubFusingActorMaterializerImpl, _ } +import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, GraphAssembly, UpstreamBoundaryStageLogic } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import org.reactivestreams.{ Subscriber, Subscription } -import scala.concurrent.forkjoin.ThreadLocalRandom -import scala.util.control.NonFatal -import akka.stream.impl.ActorMaterializerImpl -import akka.stream.impl.SubFusingActorMaterializerImpl + import scala.annotation.tailrec +import scala.util.control.NonFatal /** * INTERNAL API @@ -307,14 +306,14 @@ private[stream] object ActorGraphInterpreter { /** * INTERNAL API */ -private[stream] final class GraphInterpreterShell( +final class GraphInterpreterShell( assembly: GraphAssembly, inHandlers: Array[InHandler], outHandlers: Array[OutHandler], logics: Array[GraphStageLogic], shape: Shape, settings: ActorMaterializerSettings, - val mat: ActorMaterializerImpl) { + val mat: ExtendedActorMaterializer) { import ActorGraphInterpreter._ @@ -643,4 +642,4 @@ private[stream] class ActorGraphInterpreter(_initial: GraphInterpreterShell) ext activeInterpreters = Set.empty[GraphInterpreterShell] newShells.foreach(s ⇒ if (tryInit(s)) s.tryAbort(ex)) } -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 7b53a33510..aa5d0bd6f4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -267,7 +267,7 @@ object GraphStages { * * This source is not reusable, it is only created internally. */ - private[stream] final class MaterializedValueSource[T](val computation: MaterializedValueNode, val out: Outlet[T]) extends GraphStage[SourceShape[T]] { + final class MaterializedValueSource[T](val computation: MaterializedValueNode, val out: Outlet[T]) extends GraphStage[SourceShape[T]] { def this(computation: MaterializedValueNode) = this(computation, Outlet[T]("matValue")) override def initialAttributes: Attributes = DefaultAttributes.materializedValueSource override val shape = SourceShape(out) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 9e50f57ea2..4781dc1422 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -922,7 +922,7 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, log = logAdapter match { case Some(l) ⇒ l case _ ⇒ - val mat = try ActorMaterializer.downcast(ctx.materializer) + val mat = try ActorMaterializerHelper.downcast(ctx.materializer) catch { case ex: Exception ⇒ throw new RuntimeException("Log stage can only provide LoggingAdapter when used with ActorMaterializer! " + @@ -984,7 +984,7 @@ private[akka] object Log { override def getClazz(t: LifecycleContext): Class[_] = classOf[Materializer] override def genString(t: LifecycleContext): String = { - try s"$DefaultLoggerName(${ActorMaterializer.downcast(t.materializer).supervisor.path})" + try s"$DefaultLoggerName(${ActorMaterializerHelper.downcast(t.materializer).supervisor.path})" catch { case ex: Exception ⇒ LogSource.fromString.genString(DefaultLoggerName) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 5e7b3cec69..20a586be75 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -121,7 +121,7 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable. private val SubscriptionTimer = "SubstreamSubscriptionTimer" override protected def onTimer(timerKey: Any): Unit = { - val materializer = ActorMaterializer.downcast(interpreter.materializer) + val materializer = ActorMaterializerHelper.downcast(interpreter.materializer) val timeoutSettings = materializer.settings.subscriptionTimeoutSettings val timeout = timeoutSettings.timeout @@ -150,7 +150,7 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable. } private def openSubstream(): Source[T, NotUsed] = { - val timeout = ActorMaterializer.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout + val timeout = ActorMaterializerHelper.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout tailSource = new SubSourceOutlet[T]("TailSource") tailSource.setHandler(subHandler) setKeepGoing(true) @@ -254,7 +254,7 @@ final class GroupBy[T, K](maxSubstreams: Int, keyFor: T ⇒ K) extends GraphStag private def needToPull: Boolean = !(hasBeenPulled(in) || isClosed(in) || hasNextElement) override def preStart(): Unit = - timeout = ActorMaterializer.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout + timeout = ActorMaterializerHelper.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout override def onPull(): Unit = { substreamWaitingToBePushed match { @@ -424,7 +424,7 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, substreamC private var substreamCancelled = false override def preStart(): Unit = { - timeout = ActorMaterializer.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout + timeout = ActorMaterializerHelper.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout } setHandler(out, new OutHandler { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index d0f8db5a46..4e14ca8e6e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -5,13 +5,14 @@ package akka.stream.impl.io import java.io.OutputStream import java.nio.file.{ Path, StandardOpenOption } -import akka.stream.IOResult + +import akka.stream._ import akka.stream.impl.SinkModule import akka.stream.impl.StreamLayout.Module import akka.stream.impl.Stages.DefaultAttributes.IODispatcher -import akka.stream.{ ActorMaterializer, MaterializationContext, Attributes, SinkShape } import akka.stream.ActorAttributes.Dispatcher import akka.util.ByteString + import scala.concurrent.{ Future, Promise } /** @@ -25,7 +26,7 @@ private[akka] final class FileSink(f: Path, options: Set[StandardOpenOption], va override protected def label: String = s"FileSink($f, $options)" override def create(context: MaterializationContext) = { - val materializer = ActorMaterializer.downcast(context.materializer) + val materializer = ActorMaterializerHelper.downcast(context.materializer) val settings = materializer.effectiveSettings(context.effectiveAttributes) val ioResultPromise = Promise[IOResult]() @@ -51,7 +52,7 @@ private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, va extends SinkModule[ByteString, Future[IOResult]](shape) { override def create(context: MaterializationContext) = { - val materializer = ActorMaterializer.downcast(context.materializer) + val materializer = ActorMaterializerHelper.downcast(context.materializer) val settings = materializer.effectiveSettings(context.effectiveAttributes) val ioResultPromise = Promise[IOResult]() diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index 7fec482d18..082287f5f2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -25,7 +25,7 @@ private[akka] final class FileSource(f: Path, chunkSize: Int, val attributes: At require(chunkSize > 0, "chunkSize must be greater than 0") override def create(context: MaterializationContext) = { // FIXME rewrite to be based on GraphStage rather than dangerous downcasts - val materializer = ActorMaterializer.downcast(context.materializer) + val materializer = ActorMaterializerHelper.downcast(context.materializer) val settings = materializer.effectiveSettings(context.effectiveAttributes) val ioResultPromise = Promise[IOResult]() @@ -53,7 +53,7 @@ private[akka] final class FileSource(f: Path, chunkSize: Int, val attributes: At private[akka] final class InputStreamSource(createInputStream: () ⇒ InputStream, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) extends SourceModule[ByteString, Future[IOResult]](shape) { override def create(context: MaterializationContext) = { - val materializer = ActorMaterializer.downcast(context.materializer) + val materializer = ActorMaterializerHelper.downcast(context.materializer) val ioResultPromise = Promise[IOResult]() val pub = try { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala index 441444af95..21832b4832 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala @@ -21,6 +21,7 @@ import akka.stream.impl.Stages.DefaultAttributes.IODispatcher import akka.stream.ActorAttributes.Dispatcher import scala.concurrent.ExecutionContext import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerHelper private[stream] object OutputStreamSourceStage { sealed trait AdapterToStageMessage @@ -112,7 +113,7 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration } override def preStart(): Unit = { - dispatcher = ActorMaterializer.downcast(materializer).system.dispatchers.lookup(dispatcherId) + dispatcher = ActorMaterializerHelper.downcast(materializer).system.dispatchers.lookup(dispatcherId) super.preStart() } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index c6bf767138..cab6d14a26 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -900,7 +900,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: final protected def getStageActor(receive: ((ActorRef, Any)) ⇒ Unit): StageActor = { _stageActor match { case null ⇒ - val actorMaterializer = ActorMaterializer.downcast(interpreter.materializer) + val actorMaterializer = ActorMaterializerHelper.downcast(interpreter.materializer) _stageActor = new StageActor(actorMaterializer, getAsyncCallback, receive) _stageActor case existing ⇒ diff --git a/project/MiMa.scala b/project/MiMa.scala index 9b09a7c6da..a695fb63c2 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -873,6 +873,9 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.onUpstreamFinish"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.onPull"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.postStop"), + + // #20414 Allow different ActorMaterializer subtypes + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.ActorMaterializer.downcast"), // #20531 adding refuseUid to Gated FilterAnyProblem("akka.remote.EndpointManager$Gated")