=str: TickSource is now a GraphStage

This commit is contained in:
Endre Sándor Varga 2015-09-18 14:30:43 +02:00
parent e713591e5f
commit eec8bd689f
9 changed files with 124 additions and 191 deletions

View file

@ -401,7 +401,8 @@ object GraphInterpreterSpec {
(Vector.fill(upstreams.size)(null) ++ outs).toArray,
(Vector.fill(upstreams.size)(-1) ++ outOwners).toArray)
_interpreter = new GraphInterpreter(assembly, NoMaterializer, (_, _, _) ())
val (inHandlers, outHandlers, logics, _) = assembly.materialize()
_interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ())
for ((upstream, i) upstreams.zipWithIndex) {
_interpreter.attachUpstreamBoundary(i, upstream._1)
@ -415,8 +416,10 @@ object GraphInterpreterSpec {
}
}
def manualInit(assembly: GraphAssembly): Unit =
_interpreter = new GraphInterpreter(assembly, NoMaterializer, (_, _, _) ())
def manualInit(assembly: GraphAssembly): Unit = {
val (inHandlers, outHandlers, logics, _) = assembly.materialize()
_interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ())
}
def builder(stages: GraphStage[_ <: Shape]*): AssemblyBuilder = new AssemblyBuilder(stages.toSeq)

View file

@ -121,7 +121,11 @@ private[akka] case class ActorMaterializerImpl(val system: ActorSystem,
case graph: GraphModule
val calculatedSettings = effectiveSettings(effectiveAttributes)
val props = ActorGraphInterpreter.props(graph.assembly, graph.shape, calculatedSettings, ActorMaterializerImpl.this)
val (inHandlers, outHandlers, logics, mat) = graph.assembly.materialize()
val props = ActorGraphInterpreter.props(
graph.assembly, inHandlers, outHandlers, logics, graph.shape, calculatedSettings, ActorMaterializerImpl.this)
val impl = actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher)
for ((inlet, i) graph.shape.inlets.iterator.zipWithIndex) {
val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, i)
@ -132,6 +136,7 @@ private[akka] case class ActorMaterializerImpl(val system: ActorSystem,
impl ! ActorGraphInterpreter.ExposedPublisher(i, publisher)
assignPort(outlet, publisher)
}
mat
case junction: JunctionModule
materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes))

View file

@ -105,35 +105,6 @@ private[akka] final class LazyEmptySource[Out](val attributes: Attributes, shape
override def withAttributes(attr: Attributes): Module = new LazyEmptySource(attr, amendShape(attr))
}
/**
* INTERNAL API
* Elements are emitted periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
private[akka] final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Cancellable](shape) {
override def create(context: MaterializationContext) = {
val cancelled = new AtomicBoolean(false)
val actorMaterializer = ActorMaterializer.downcast(context.materializer)
val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes)
val ref = actorMaterializer.actorOf(context,
TickPublisher.props(initialDelay, interval, tick, effectiveSettings, cancelled))
(ActorPublisher[Out](ref), new Cancellable {
override def cancel(): Boolean = {
if (!isCancelled) ref ! PoisonPill
true
}
override def isCancelled: Boolean = cancelled.get()
})
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Cancellable] = new TickSource[Out](initialDelay, interval, tick, attributes, shape)
override def withAttributes(attr: Attributes): Module = new TickSource(initialDelay, interval, tick, attr, amendShape(attr))
}
/**
* INTERNAL API
* Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`,

View file

@ -1,133 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicBoolean
import akka.actor._
import akka.stream.ActorMaterializerSettings
import org.reactivestreams.{ Subscriber, Subscription }
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import akka.event.Logging
/**
* INTERNAL API
*/
private[akka] object TickPublisher {
def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: Any,
settings: ActorMaterializerSettings, cancelled: AtomicBoolean): Props =
Props(new TickPublisher(initialDelay, interval, tick, settings, cancelled))
.withDispatcher(settings.dispatcher)
.withDeploy(Deploy.local)
object TickPublisherSubscription {
case object Cancel extends DeadLetterSuppression
final case class RequestMore(elements: Long) extends DeadLetterSuppression
}
class TickPublisherSubscription(ref: ActorRef) extends Subscription {
import akka.stream.impl.TickPublisher.TickPublisherSubscription._
def cancel(): Unit = ref ! Cancel
def request(elements: Long): Unit = ref ! RequestMore(elements)
override def toString = "TickPublisherSubscription"
}
private case object Tick
}
/**
* INTERNAL API
*
* Elements are emitted with the specified interval. Supports only one subscriber.
* The subscriber will receive the tick element if it has requested any elements,
* otherwise the tick element is dropped.
*/
private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: Any,
settings: ActorMaterializerSettings, cancelled: AtomicBoolean) extends Actor {
import akka.stream.impl.TickPublisher.TickPublisherSubscription._
import akka.stream.impl.TickPublisher._
import ReactiveStreamsCompliance._
var exposedPublisher: ActorPublisher[Any] = _
private var subscriber: Subscriber[_ >: Any] = null
private var demand: Long = 0
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
var tickTask: Option[Cancellable] = None
def receive = {
case ExposedPublisher(publisher)
exposedPublisher = publisher
context.become(waitingForFirstSubscriber)
case _ throw new IllegalStateException("The first message must be ExposedPublisher")
}
def waitingForFirstSubscriber: Receive = {
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
import context.dispatcher
tickTask = Some(context.system.scheduler.schedule(initialDelay, interval, self, Tick))
context.become(active)
}
def handleFailure(error: Throwable): Unit = {
try {
if (!error.isInstanceOf[SpecViolation])
tryOnError(subscriber, error)
} finally {
subscriber = null
exposedPublisher.shutdown(Some(new IllegalStateException("TickPublisher " + SupportsOnlyASingleSubscriber)))
context.stop(self)
}
}
def active: Receive = {
case Tick
try {
if (demand > 0) {
demand -= 1
tryOnNext(subscriber, tick)
}
} catch {
case NonFatal(e) handleFailure(e)
}
case RequestMore(elements)
if (elements < 1) {
handleFailure(numberOfElementsInRequestMustBePositiveException)
} else {
demand += elements
if (demand < 0)
demand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
}
case Cancel
subscriber = null
context.stop(self)
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
}
def registerSubscriber(s: Subscriber[_ >: Any]): Unit = subscriber match {
case null
val subscription = new TickPublisherSubscription(self)
subscriber = s
tryOnSubscribe(s, subscription)
case _
rejectAdditionalSubscriber(s, s"${Logging.simpleName(this)}")
}
override def postStop(): Unit = {
tickTask.foreach(_.cancel())
cancelled.set(true)
if (exposedPublisher ne null)
exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason)
if (subscriber ne null)
tryOnComplete(subscriber)
}
}

View file

@ -73,8 +73,14 @@ private[stream] object ActorGraphInterpreter {
}
}
def props(assembly: GraphAssembly, shape: Shape, settings: ActorMaterializerSettings, mat: Materializer): Props =
Props(new ActorGraphInterpreter(assembly, shape, settings, mat)).withDeploy(Deploy.local)
def props(assembly: GraphAssembly,
inHandlers: Array[InHandler],
outHandlers: Array[OutHandler],
logics: Array[GraphStageLogic],
shape: Shape,
settings: ActorMaterializerSettings,
mat: Materializer): Props =
Props(new ActorGraphInterpreter(assembly, inHandlers, outHandlers, logics, shape, settings, mat)).withDeploy(Deploy.local)
class BatchingActorInputBoundary(size: Int, id: Int) extends UpstreamBoundaryStageLogic[Any] {
require(size > 0, "buffer size cannot be zero")
@ -281,6 +287,9 @@ private[stream] object ActorGraphInterpreter {
*/
private[stream] class ActorGraphInterpreter(
assembly: GraphAssembly,
inHandlers: Array[InHandler],
outHandlers: Array[OutHandler],
logics: Array[GraphStageLogic],
shape: Shape,
settings: ActorMaterializerSettings,
mat: Materializer) extends Actor {
@ -289,7 +298,11 @@ private[stream] class ActorGraphInterpreter(
val interpreter = new GraphInterpreter(
assembly,
mat,
inHandlers,
outHandlers,
logics,
(logic, event, handler) self ! AsyncInput(logic, event, handler))
val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _))
val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _))
// Limits the number of events processed by the interpreter before scheduling a self-message for fairness with other

View file

@ -3,7 +3,7 @@
*/
package akka.stream.impl.fusing
import akka.stream.stage.{ OutHandler, InHandler, GraphStage, GraphStageLogic }
import akka.stream.stage._
import akka.stream.{ Materializer, Shape, Inlet, Outlet }
import scala.util.control.NonFatal
@ -76,7 +76,7 @@ private[stream] object GraphInterpreter {
* corresponding segments of these arrays matches the exact same order of the ports in the [[Shape]].
*
*/
final case class GraphAssembly(stages: Array[GraphStage[_]],
final case class GraphAssembly(stages: Array[GraphStageWithMaterializedValue[_, _]],
ins: Array[Inlet[_]],
inOwners: Array[Int],
outs: Array[Outlet[_]],
@ -87,12 +87,24 @@ private[stream] object GraphInterpreter {
/**
* Takes an interpreter and returns three arrays required by the interpreter containing the input, output port
* handlers and the stage logic instances.
*
* Returns a tuple of
* - lookup table for InHandlers
* - lookup table for OutHandlers
* - array of the logics
* - materialized value
*/
def materialize(interpreter: GraphInterpreter): (Array[InHandler], Array[OutHandler], Array[GraphStageLogic]) = {
def materialize(): (Array[InHandler], Array[OutHandler], Array[GraphStageLogic], Any) = {
val logics = Array.ofDim[GraphStageLogic](stages.length)
var finalMat: Any = ()
for (i stages.indices) {
logics(i) = stages(i).createLogic
logics(i).interpreter = interpreter
// FIXME: Support for materialized values in fused islands is not yet figured out!
val (logic, mat) = stages(i).createLogicAndMaterializedValue
// FIXME: Current temporary hack to support non-fused stages. If there is one stage that will be under index 0.
if (i == 0) finalMat = mat
logics(i) = logic
}
val inHandlers = Array.ofDim[InHandler](connectionCount)
@ -109,7 +121,7 @@ private[stream] object GraphInterpreter {
}
}
(inHandlers, outHandlers, logics)
(inHandlers, outHandlers, logics, finalMat)
}
override def toString: String =
@ -183,6 +195,9 @@ private[stream] object GraphInterpreter {
private[stream] final class GraphInterpreter(
private val assembly: GraphInterpreter.GraphAssembly,
val materializer: Materializer,
val inHandlers: Array[InHandler], // Lookup table for the InHandler of a connection
val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection
val logics: Array[GraphStageLogic], // Array of stage logics
val onAsyncInput: (GraphStageLogic, Any, (Any) Unit) Unit) {
import GraphInterpreter._
@ -202,10 +217,6 @@ private[stream] final class GraphInterpreter(
// the corresponding event in the queue has been processed
val outAvailable = Array.fill[Boolean](assembly.connectionCount)(false)
// Lookup tables for the InHandler and OutHandler for a given connection ID, and a lookup table for the
// GraphStageLogic instances
val (inHandlers, outHandlers, logics) = assembly.materialize(this)
// The number of currently running stages. Once this counter reaches zero, the interpreter is considered to be
// completed
private var runningStages = assembly.stages.length
@ -259,6 +270,7 @@ private[stream] final class GraphInterpreter(
var i = 0
while (i < logics.length) {
logics(i).stageId = i
logics(i).interpreter = this
logics(i).beforePreStart()
logics(i).preStart()
i += 1

View file

@ -3,8 +3,14 @@
*/
package akka.stream.impl.fusing
import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.Cancellable
import akka.stream._
import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage }
import akka.stream.stage._
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration
/**
* INTERNAL API
@ -240,4 +246,53 @@ object GraphStages {
override def toString = "Balance"
}
private object TickSource {
class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable {
private val cancelPromise = Promise[Unit]()
def cancelFuture: Future[Unit] = cancelPromise.future
override def cancel(): Boolean = {
if (!isCancelled) cancelPromise.trySuccess(())
true
}
override def isCancelled: Boolean = cancelled.get()
}
}
class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T)
extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] {
val out = Outlet[T]("TimerSource.out")
override val shape = SourceShape(out)
override def createLogicAndMaterializedValue: (GraphStageLogic, Cancellable) = {
import TickSource._
val cancelled = new AtomicBoolean(false)
val cancellable = new TickSourceCancellable(cancelled)
val logic = new GraphStageLogic {
override def preStart() = {
schedulePeriodicallyWithInitialDelay("TickTimer", initialDelay, interval)
val callback = getAsyncCallback[Unit]((_) {
completeStage()
cancelled.set(true)
})
cancellable.cancelFuture.onComplete(_ callback.invoke(()))(interpreter.materializer.executionContext)
}
setHandler(out, new OutHandler {
override def onPull() = () // Do nothing
})
override protected def onTimer(timerKey: Any) =
if (isAvailable(out)) push(out, tick)
}
(logic, cancellable)
}
}
}

View file

@ -6,6 +6,7 @@ package akka.stream.scaladsl
import akka.actor.{ ActorRef, Cancellable, Props }
import akka.stream.impl.Stages.{ DefaultAttributes, MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.GraphStages.TickSource
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ }
import akka.stream.{ Outlet, SourceShape, _ }
import org.reactivestreams.{ Publisher, Subscriber }
@ -240,7 +241,7 @@ object Source extends SourceApply {
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] =
new Source(new TickSource(initialDelay, interval, tick, DefaultAttributes.tickSource, shape("TickSource")))
wrap(new TickSource[T](initialDelay, interval, tick))
/**
* Create a `Source` with one element.

View file

@ -12,14 +12,9 @@ import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
/**
* A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes
* its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing
* logic that ties the ports together.
*/
abstract class GraphStage[S <: Shape] extends Graph[S, Unit] {
abstract class GraphStageWithMaterializedValue[S <: Shape, M] extends Graph[S, M] {
def shape: S
def createLogic: GraphStageLogic
def createLogicAndMaterializedValue: (GraphStageLogic, M)
final override private[stream] lazy val module: Module = {
val connectionCount = shape.inlets.size + shape.outlets.size
@ -47,14 +42,25 @@ abstract class GraphStage[S <: Shape] extends Graph[S, Unit] {
* This method throws an [[UnsupportedOperationException]] by default. The subclass can override this method
* and provide a correct implementation that creates an exact copy of the stage with the provided new attributes.
*/
final override def withAttributes(attr: Attributes): Graph[S, Unit] = new Graph[S, Unit] {
override def shape = GraphStage.this.shape
override private[stream] def module = GraphStage.this.module.withAttributes(attr)
final override def withAttributes(attr: Attributes): Graph[S, M] = new Graph[S, M] {
override def shape = GraphStageWithMaterializedValue.this.shape
override private[stream] def module = GraphStageWithMaterializedValue.this.module.withAttributes(attr)
override def withAttributes(attr: Attributes) = GraphStage.this.withAttributes(attr)
override def withAttributes(attr: Attributes) = GraphStageWithMaterializedValue.this.withAttributes(attr)
}
}
/**
* A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes
* its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing
* logic that ties the ports together.
*/
abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, Unit] {
final override def createLogicAndMaterializedValue = (createLogic, Unit)
def createLogic: GraphStageLogic
}
private object TimerMessages {
final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression