diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala index d9c7ac6d9d..99256fa069 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala @@ -401,7 +401,8 @@ object GraphInterpreterSpec { (Vector.fill(upstreams.size)(null) ++ outs).toArray, (Vector.fill(upstreams.size)(-1) ++ outOwners).toArray) - _interpreter = new GraphInterpreter(assembly, NoMaterializer, (_, _, _) ⇒ ()) + val (inHandlers, outHandlers, logics, _) = assembly.materialize() + _interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ⇒ ()) for ((upstream, i) ← upstreams.zipWithIndex) { _interpreter.attachUpstreamBoundary(i, upstream._1) @@ -415,8 +416,10 @@ object GraphInterpreterSpec { } } - def manualInit(assembly: GraphAssembly): Unit = - _interpreter = new GraphInterpreter(assembly, NoMaterializer, (_, _, _) ⇒ ()) + def manualInit(assembly: GraphAssembly): Unit = { + val (inHandlers, outHandlers, logics, _) = assembly.materialize() + _interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ⇒ ()) + } def builder(stages: GraphStage[_ <: Shape]*): AssemblyBuilder = new AssemblyBuilder(stages.toSeq) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 5d3206256e..8e2596d891 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -121,7 +121,11 @@ private[akka] case class ActorMaterializerImpl(val system: ActorSystem, case graph: GraphModule ⇒ val calculatedSettings = effectiveSettings(effectiveAttributes) - val props = ActorGraphInterpreter.props(graph.assembly, graph.shape, calculatedSettings, ActorMaterializerImpl.this) + val (inHandlers, outHandlers, logics, mat) = graph.assembly.materialize() + + val props = ActorGraphInterpreter.props( + graph.assembly, inHandlers, outHandlers, logics, graph.shape, calculatedSettings, ActorMaterializerImpl.this) + val impl = actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher) for ((inlet, i) ← graph.shape.inlets.iterator.zipWithIndex) { val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, i) @@ -132,6 +136,7 @@ private[akka] case class ActorMaterializerImpl(val system: ActorSystem, impl ! ActorGraphInterpreter.ExposedPublisher(i, publisher) assignPort(outlet, publisher) } + mat case junction: JunctionModule ⇒ materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 8e48cb7f0c..a1c25c33ad 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -105,35 +105,6 @@ private[akka] final class LazyEmptySource[Out](val attributes: Attributes, shape override def withAttributes(attr: Attributes): Module = new LazyEmptySource(attr, amendShape(attr)) } -/** - * INTERNAL API - * Elements are emitted periodically with the specified interval. - * The tick element will be delivered to downstream consumers that has requested any elements. - * If a consumer has not requested any elements at the point in time when the tick - * element is produced it will not receive that tick element later. It will - * receive new tick elements as soon as it has requested more elements. - */ -private[akka] final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Cancellable](shape) { - - override def create(context: MaterializationContext) = { - val cancelled = new AtomicBoolean(false) - val actorMaterializer = ActorMaterializer.downcast(context.materializer) - val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes) - val ref = actorMaterializer.actorOf(context, - TickPublisher.props(initialDelay, interval, tick, effectiveSettings, cancelled)) - (ActorPublisher[Out](ref), new Cancellable { - override def cancel(): Boolean = { - if (!isCancelled) ref ! PoisonPill - true - } - override def isCancelled: Boolean = cancelled.get() - }) - } - - override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Cancellable] = new TickSource[Out](initialDelay, interval, tick, attributes, shape) - override def withAttributes(attr: Attributes): Module = new TickSource(initialDelay, interval, tick, attr, amendShape(attr)) -} - /** * INTERNAL API * Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`, diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala deleted file mode 100644 index b022c3a39b..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import java.util.concurrent.atomic.AtomicBoolean -import akka.actor._ -import akka.stream.ActorMaterializerSettings -import org.reactivestreams.{ Subscriber, Subscription } -import scala.collection.mutable -import scala.concurrent.duration.FiniteDuration -import scala.util.control.NonFatal -import akka.event.Logging - -/** - * INTERNAL API - */ -private[akka] object TickPublisher { - def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: Any, - settings: ActorMaterializerSettings, cancelled: AtomicBoolean): Props = - Props(new TickPublisher(initialDelay, interval, tick, settings, cancelled)) - .withDispatcher(settings.dispatcher) - .withDeploy(Deploy.local) - - object TickPublisherSubscription { - case object Cancel extends DeadLetterSuppression - final case class RequestMore(elements: Long) extends DeadLetterSuppression - } - - class TickPublisherSubscription(ref: ActorRef) extends Subscription { - import akka.stream.impl.TickPublisher.TickPublisherSubscription._ - def cancel(): Unit = ref ! Cancel - def request(elements: Long): Unit = ref ! RequestMore(elements) - override def toString = "TickPublisherSubscription" - } - - private case object Tick -} - -/** - * INTERNAL API - * - * Elements are emitted with the specified interval. Supports only one subscriber. - * The subscriber will receive the tick element if it has requested any elements, - * otherwise the tick element is dropped. - */ -private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: Any, - settings: ActorMaterializerSettings, cancelled: AtomicBoolean) extends Actor { - import akka.stream.impl.TickPublisher.TickPublisherSubscription._ - import akka.stream.impl.TickPublisher._ - import ReactiveStreamsCompliance._ - - var exposedPublisher: ActorPublisher[Any] = _ - private var subscriber: Subscriber[_ >: Any] = null - private var demand: Long = 0 - - override val supervisorStrategy = SupervisorStrategy.stoppingStrategy - - var tickTask: Option[Cancellable] = None - - def receive = { - case ExposedPublisher(publisher) ⇒ - exposedPublisher = publisher - context.become(waitingForFirstSubscriber) - case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") - } - - def waitingForFirstSubscriber: Receive = { - case SubscribePending ⇒ - exposedPublisher.takePendingSubscribers() foreach registerSubscriber - import context.dispatcher - tickTask = Some(context.system.scheduler.schedule(initialDelay, interval, self, Tick)) - context.become(active) - } - - def handleFailure(error: Throwable): Unit = { - try { - if (!error.isInstanceOf[SpecViolation]) - tryOnError(subscriber, error) - } finally { - subscriber = null - exposedPublisher.shutdown(Some(new IllegalStateException("TickPublisher " + SupportsOnlyASingleSubscriber))) - context.stop(self) - } - } - - def active: Receive = { - case Tick ⇒ - try { - if (demand > 0) { - demand -= 1 - tryOnNext(subscriber, tick) - } - } catch { - case NonFatal(e) ⇒ handleFailure(e) - } - - case RequestMore(elements) ⇒ - if (elements < 1) { - handleFailure(numberOfElementsInRequestMustBePositiveException) - } else { - demand += elements - if (demand < 0) - demand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded - } - - case Cancel ⇒ - subscriber = null - context.stop(self) - - case SubscribePending ⇒ - exposedPublisher.takePendingSubscribers() foreach registerSubscriber - } - - def registerSubscriber(s: Subscriber[_ >: Any]): Unit = subscriber match { - case null ⇒ - val subscription = new TickPublisherSubscription(self) - subscriber = s - tryOnSubscribe(s, subscription) - case _ ⇒ - rejectAdditionalSubscriber(s, s"${Logging.simpleName(this)}") - } - - override def postStop(): Unit = { - tickTask.foreach(_.cancel()) - cancelled.set(true) - if (exposedPublisher ne null) - exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason) - if (subscriber ne null) - tryOnComplete(subscriber) - } -} - diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index f2924f2e08..bcd7652c44 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -73,8 +73,14 @@ private[stream] object ActorGraphInterpreter { } } - def props(assembly: GraphAssembly, shape: Shape, settings: ActorMaterializerSettings, mat: Materializer): Props = - Props(new ActorGraphInterpreter(assembly, shape, settings, mat)).withDeploy(Deploy.local) + def props(assembly: GraphAssembly, + inHandlers: Array[InHandler], + outHandlers: Array[OutHandler], + logics: Array[GraphStageLogic], + shape: Shape, + settings: ActorMaterializerSettings, + mat: Materializer): Props = + Props(new ActorGraphInterpreter(assembly, inHandlers, outHandlers, logics, shape, settings, mat)).withDeploy(Deploy.local) class BatchingActorInputBoundary(size: Int, id: Int) extends UpstreamBoundaryStageLogic[Any] { require(size > 0, "buffer size cannot be zero") @@ -281,6 +287,9 @@ private[stream] object ActorGraphInterpreter { */ private[stream] class ActorGraphInterpreter( assembly: GraphAssembly, + inHandlers: Array[InHandler], + outHandlers: Array[OutHandler], + logics: Array[GraphStageLogic], shape: Shape, settings: ActorMaterializerSettings, mat: Materializer) extends Actor { @@ -289,7 +298,11 @@ private[stream] class ActorGraphInterpreter( val interpreter = new GraphInterpreter( assembly, mat, + inHandlers, + outHandlers, + logics, (logic, event, handler) ⇒ self ! AsyncInput(logic, event, handler)) + val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _)) val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _)) // Limits the number of events processed by the interpreter before scheduling a self-message for fairness with other diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 00a7d01ef7..ab89994eac 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -3,7 +3,7 @@ */ package akka.stream.impl.fusing -import akka.stream.stage.{ OutHandler, InHandler, GraphStage, GraphStageLogic } +import akka.stream.stage._ import akka.stream.{ Materializer, Shape, Inlet, Outlet } import scala.util.control.NonFatal @@ -76,7 +76,7 @@ private[stream] object GraphInterpreter { * corresponding segments of these arrays matches the exact same order of the ports in the [[Shape]]. * */ - final case class GraphAssembly(stages: Array[GraphStage[_]], + final case class GraphAssembly(stages: Array[GraphStageWithMaterializedValue[_, _]], ins: Array[Inlet[_]], inOwners: Array[Int], outs: Array[Outlet[_]], @@ -87,12 +87,24 @@ private[stream] object GraphInterpreter { /** * Takes an interpreter and returns three arrays required by the interpreter containing the input, output port * handlers and the stage logic instances. + * + * Returns a tuple of + * - lookup table for InHandlers + * - lookup table for OutHandlers + * - array of the logics + * - materialized value */ - def materialize(interpreter: GraphInterpreter): (Array[InHandler], Array[OutHandler], Array[GraphStageLogic]) = { + def materialize(): (Array[InHandler], Array[OutHandler], Array[GraphStageLogic], Any) = { val logics = Array.ofDim[GraphStageLogic](stages.length) + var finalMat: Any = () + for (i ← stages.indices) { - logics(i) = stages(i).createLogic - logics(i).interpreter = interpreter + // FIXME: Support for materialized values in fused islands is not yet figured out! + val (logic, mat) = stages(i).createLogicAndMaterializedValue + // FIXME: Current temporary hack to support non-fused stages. If there is one stage that will be under index 0. + if (i == 0) finalMat = mat + + logics(i) = logic } val inHandlers = Array.ofDim[InHandler](connectionCount) @@ -109,7 +121,7 @@ private[stream] object GraphInterpreter { } } - (inHandlers, outHandlers, logics) + (inHandlers, outHandlers, logics, finalMat) } override def toString: String = @@ -183,6 +195,9 @@ private[stream] object GraphInterpreter { private[stream] final class GraphInterpreter( private val assembly: GraphInterpreter.GraphAssembly, val materializer: Materializer, + val inHandlers: Array[InHandler], // Lookup table for the InHandler of a connection + val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection + val logics: Array[GraphStageLogic], // Array of stage logics val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit) { import GraphInterpreter._ @@ -202,10 +217,6 @@ private[stream] final class GraphInterpreter( // the corresponding event in the queue has been processed val outAvailable = Array.fill[Boolean](assembly.connectionCount)(false) - // Lookup tables for the InHandler and OutHandler for a given connection ID, and a lookup table for the - // GraphStageLogic instances - val (inHandlers, outHandlers, logics) = assembly.materialize(this) - // The number of currently running stages. Once this counter reaches zero, the interpreter is considered to be // completed private var runningStages = assembly.stages.length @@ -259,6 +270,7 @@ private[stream] final class GraphInterpreter( var i = 0 while (i < logics.length) { logics(i).stageId = i + logics(i).interpreter = this logics(i).beforePreStart() logics(i).preStart() i += 1 diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 00e544f0e7..5eb0961520 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -3,8 +3,14 @@ */ package akka.stream.impl.fusing +import java.util.concurrent.atomic.AtomicBoolean + +import akka.actor.Cancellable import akka.stream._ -import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage } +import akka.stream.stage._ + +import scala.concurrent.{ Future, Promise } +import scala.concurrent.duration.FiniteDuration /** * INTERNAL API @@ -240,4 +246,53 @@ object GraphStages { override def toString = "Balance" } + private object TickSource { + class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable { + private val cancelPromise = Promise[Unit]() + + def cancelFuture: Future[Unit] = cancelPromise.future + + override def cancel(): Boolean = { + if (!isCancelled) cancelPromise.trySuccess(()) + true + } + + override def isCancelled: Boolean = cancelled.get() + } + } + + class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T) + extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { + + val out = Outlet[T]("TimerSource.out") + override val shape = SourceShape(out) + + override def createLogicAndMaterializedValue: (GraphStageLogic, Cancellable) = { + import TickSource._ + + val cancelled = new AtomicBoolean(false) + val cancellable = new TickSourceCancellable(cancelled) + + val logic = new GraphStageLogic { + override def preStart() = { + schedulePeriodicallyWithInitialDelay("TickTimer", initialDelay, interval) + val callback = getAsyncCallback[Unit]((_) ⇒ { + completeStage() + cancelled.set(true) + }) + + cancellable.cancelFuture.onComplete(_ ⇒ callback.invoke(()))(interpreter.materializer.executionContext) + } + + setHandler(out, new OutHandler { + override def onPull() = () // Do nothing + }) + + override protected def onTimer(timerKey: Any) = + if (isAvailable(out)) push(out, tick) + } + + (logic, cancellable) + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 3c80dd0a1d..ba76e92e91 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.actor.{ ActorRef, Cancellable, Props } import akka.stream.impl.Stages.{ DefaultAttributes, MaterializingStageFactory, StageModule } import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.fusing.GraphStages.TickSource import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ } import akka.stream.{ Outlet, SourceShape, _ } import org.reactivestreams.{ Publisher, Subscriber } @@ -240,7 +241,7 @@ object Source extends SourceApply { * receive new tick elements as soon as it has requested more elements. */ def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] = - new Source(new TickSource(initialDelay, interval, tick, DefaultAttributes.tickSource, shape("TickSource"))) + wrap(new TickSource[T](initialDelay, interval, tick)) /** * Create a `Source` with one element. diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index e1a9b6a9f4..02c6092791 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -12,14 +12,9 @@ import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly import scala.collection.mutable import scala.concurrent.duration.FiniteDuration -/** - * A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes - * its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing - * logic that ties the ports together. - */ -abstract class GraphStage[S <: Shape] extends Graph[S, Unit] { +abstract class GraphStageWithMaterializedValue[S <: Shape, M] extends Graph[S, M] { def shape: S - def createLogic: GraphStageLogic + def createLogicAndMaterializedValue: (GraphStageLogic, M) final override private[stream] lazy val module: Module = { val connectionCount = shape.inlets.size + shape.outlets.size @@ -47,14 +42,25 @@ abstract class GraphStage[S <: Shape] extends Graph[S, Unit] { * This method throws an [[UnsupportedOperationException]] by default. The subclass can override this method * and provide a correct implementation that creates an exact copy of the stage with the provided new attributes. */ - final override def withAttributes(attr: Attributes): Graph[S, Unit] = new Graph[S, Unit] { - override def shape = GraphStage.this.shape - override private[stream] def module = GraphStage.this.module.withAttributes(attr) + final override def withAttributes(attr: Attributes): Graph[S, M] = new Graph[S, M] { + override def shape = GraphStageWithMaterializedValue.this.shape + override private[stream] def module = GraphStageWithMaterializedValue.this.module.withAttributes(attr) - override def withAttributes(attr: Attributes) = GraphStage.this.withAttributes(attr) + override def withAttributes(attr: Attributes) = GraphStageWithMaterializedValue.this.withAttributes(attr) } } +/** + * A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes + * its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing + * logic that ties the ports together. + */ +abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, Unit] { + final override def createLogicAndMaterializedValue = (createLogic, Unit) + + def createLogic: GraphStageLogic +} + private object TimerMessages { final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression