fix a good swath of FIXMEs

This commit is contained in:
Roland Kuhn 2015-02-26 11:58:29 +01:00
parent ac9c61a3a5
commit aeb31d2a1f
17 changed files with 133 additions and 63 deletions

View file

@ -35,10 +35,8 @@ class FlowGraphDocSpec extends AkkaSpec {
val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
in ~> f1 ~> bcast.in
bcast.out(0) ~> f2 ~> merge.in(0)
bcast.out(1) ~> f4 ~> merge.in(1)
merge.out ~> f3 ~> out
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
}
//#simple-flow-graph
//format: ON

View file

@ -5,16 +5,15 @@ package akka.stream
import java.util.Locale
import java.util.concurrent.TimeUnit
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props }
import akka.stream.impl._
import akka.stream.scaladsl.RunnableFlow
import com.typesafe.config.Config
import scala.concurrent.duration._
import akka.actor.Props
import akka.actor.ActorRef
import akka.stream.javadsl.japi
import scala.concurrent.ExecutionContextExecutor
object ActorFlowMaterializer {
@ -152,13 +151,20 @@ abstract class FlowMaterializer {
*/
def withNamePrefix(name: String): FlowMaterializer
// FIXME this is scaladsl specific
/**
* This method interprets the given Flow description and creates the running
* stream. The result can be highly implementation specific, ranging from
* local actor chains to remote-deployed processing networks.
*/
def materialize[Mat](runnable: RunnableFlow[Mat]): Mat
def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
/**
* Running a flow graph will require execution resources, as will computations
* within Sources, Sinks, etc. This [[scala.concurrent.ExecutionContextExecutor]]
* can be used by parts of the flow to submit processing jobs for execution,
* run Future callbacks, etc.
*/
def executionContext: ExecutionContextExecutor
}

View file

@ -12,6 +12,12 @@ sealed abstract class OutPort
final class Inlet[-T](override val toString: String) extends InPort
final class Outlet[+T](override val toString: String) extends OutPort
/**
* A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the
* philosophy that a Graph is a freely reusable blueprint, everything that
* matters from the outside are the connections that can be made with it,
* otherwise it is just a black box.
*/
abstract class Shape {
/**
* Scala API: get a list of all input ports
@ -55,7 +61,7 @@ abstract class Shape {
/**
* Compare this to another shape and determine whether the arrangement of ports is the same (including their ordering).
*/
def hasSameShapeAs(s: Shape): Boolean =
def hasSamePortsAndShapeAs(s: Shape): Boolean =
inlets == s.inlets && outlets == s.outlets
/**
@ -66,17 +72,23 @@ abstract class Shape {
/**
* Asserting version of [[#hasSameShapeAs]].
*/
def requireSameShapeAs(s: Shape): Unit = require(hasSameShapeAs(s), nonCorrespondingMessage(s))
def requireSamePortsAndShapeAs(s: Shape): Unit = require(hasSamePortsAndShapeAs(s), nonCorrespondingMessage(s))
private def nonCorrespondingMessage(s: Shape) =
s"The inlets [${s.inlets.mkString(", ")}] and outlets [${s.outlets.mkString(", ")}] must correspond to the inlets [${inlets.mkString(", ")}] and outlets [${outlets.mkString(", ")}]"
}
/**
* Java API for creating custom Shape types.
* Java API for creating custom [[Shape]] types.
*/
abstract class AbstractShape extends Shape {
/**
* Provide the list of all input ports of this shape.
*/
def allInlets: java.util.List[Inlet[_]]
/**
* Provide the list of all output ports of this shape.
*/
def allOutlets: java.util.List[Outlet[_]]
final override lazy val inlets: immutable.Seq[Inlet[_]] = allInlets.asScala.toList
@ -86,22 +98,33 @@ abstract class AbstractShape extends Shape {
final override def getOutlets = allOutlets
}
object EmptyShape extends Shape {
/**
* This [[Shape]] is used for graphs that have neither open inputs nor open
* outputs. Only such a [[Graph]] can be materialized by a [[FlowMaterializer]].
*/
sealed abstract class ClosedShape extends Shape
object ClosedShape extends ClosedShape {
override val inlets: immutable.Seq[Inlet[_]] = Nil
override val outlets: immutable.Seq[Outlet[_]] = Nil
override def deepCopy() = this
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = {
require(inlets.isEmpty, s"proposed inlets [${inlets.mkString(", ")}] do not fit EmptyShape")
require(outlets.isEmpty, s"proposed outlets [${outlets.mkString(", ")}] do not fit EmptyShape")
require(inlets.isEmpty, s"proposed inlets [${inlets.mkString(", ")}] do not fit ClosedShape")
require(outlets.isEmpty, s"proposed outlets [${outlets.mkString(", ")}] do not fit ClosedShape")
this
}
/**
* Java API: obtain EmptyShape instance
* Java API: obtain ClosedShape instance
*/
def getInstance: Shape = this
}
/**
* This type of [[Shape]] can express any number of inputs and outputs at the
* expense of forgetting about their specific types. It is used mainly in the
* implementation of the [[Graph]] builders and typically replaced by a more
* meaningful type of Shape when the building is finished.
*/
case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]) extends Shape {
override def deepCopy() = AmorphousShape(
inlets.map(i new Inlet[Any](i.toString)),
@ -109,6 +132,10 @@ case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Se
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = AmorphousShape(inlets, outlets)
}
/**
* A Source [[Shape]] has exactly one output and no inputs, it models a source
* of data.
*/
final case class SourceShape[+T](outlet: Outlet[T]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = Nil
override val outlets: immutable.Seq[Outlet[_]] = List(outlet)
@ -121,6 +148,11 @@ final case class SourceShape[+T](outlet: Outlet[T]) extends Shape {
}
}
/**
* A Flow [[Shape]] has exactly one input and one output, it looks from the
* outside like a pipe (but it can be a complex topology of streams within of
* course).
*/
final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = List(inlet)
override val outlets: immutable.Seq[Outlet[_]] = List(outlet)
@ -133,6 +165,9 @@ final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends S
}
}
/**
* A Sink [[Shape]] has exactly one input and no outputs, it models a data sink.
*/
final case class SinkShape[-T](inlet: Inlet[T]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = List(inlet)
override val outlets: immutable.Seq[Outlet[_]] = Nil
@ -146,8 +181,13 @@ final case class SinkShape[-T](inlet: Inlet[T]) extends Shape {
}
/**
* A bidirectional flow of elements that consequently has two inputs and two
* outputs, arranged like this:
*
* {{{
* In1 => Out1
* Out2 <= In2
* }}}
*/
final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1], out1: Outlet[Out1], in2: Inlet[In2], out2: Outlet[Out2]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = List(in1, in2)

View file

@ -17,13 +17,13 @@ import akka.stream.scaladsl._
import akka.stream._
import org.reactivestreams._
import scala.concurrent.{ Await, ExecutionContext }
import scala.concurrent.{ Await, ExecutionContextExecutor }
/**
* INTERNAL API
*/
case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterializerSettings,
dispatchers: Dispatchers, // FIXME is this the right choice for loading an EC?
dispatchers: Dispatchers,
supervisor: ActorRef,
flowNameCounter: AtomicLong,
namePrefix: String,
@ -37,7 +37,7 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize
private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
override def materialize[Mat](runnableFlow: RunnableFlow[Mat]): Mat = {
override def materialize[Mat](runnableFlow: Graph[ClosedShape, Mat]): Mat = {
runnableFlow.module.validate()
val session = new MaterializerSession(runnableFlow.module) {
@ -84,13 +84,13 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize
op match {
case fanin: FanInModule
val (props, inputs, output) = fanin match {
case MergeModule(shape, _)
(FairMerge.props(effectiveAttributes.settings(settings), shape.inArray.size), shape.inArray.toSeq, shape.out)
case f: FlexiMergeModule[t, p]
val flexi = f.flexi(f.shape)
(FlexiMerge.props(effectiveAttributes.settings(settings), f.shape, flexi), f.shape.inlets, f.shape.outlets.head)
// TODO each materialization needs its own logic
case MergePreferredModule(shape, _)
(UnfairMerge.props(effectiveAttributes.settings(settings), shape.inlets.size), shape.preferred +: shape.inArray.toSeq, shape.out)
@ -112,20 +112,28 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize
case fanout: FanOutModule
val (props, in, outs) = fanout match {
case r: FlexiRouteModule[t, p]
val flexi = r.flexi(r.shape)
(FlexiRoute.props(effectiveAttributes.settings(settings), r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets)
case BroadcastModule(shape, _)
(Broadcast.props(effectiveAttributes.settings(settings), shape.outArray.size), shape.in, shape.outArray.toSeq)
case BalanceModule(shape, waitForDownstreams, _)
(Balance.props(effectiveAttributes.settings(settings), shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq)
case UnzipModule(shape, _)
(Unzip.props(effectiveAttributes.settings(settings)), shape.in, shape.outlets)
}
val impl = actorOf(props, stageName(effectiveAttributes), effectiveAttributes.settings(settings).dispatcher)
val publishers = Vector.tabulate(outs.size)(id new ActorPublisher[Any](impl) { // FIXME switch to List.tabulate for inputCount < 8?
val size = outs.size
def factory(id: Int) = new ActorPublisher[Any](impl) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
})
}
val publishers =
if (outs.size < 8) Vector.tabulate(size)(factory)
else List.tabulate(size)(factory)
impl ! FanOut.ExposedPublishers(publishers)
publishers.zip(outs).foreach { case (pub, out) assignPort(out, pub) }
@ -140,7 +148,7 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize
session.materialize().asInstanceOf[Mat]
}
def executionContext: ExecutionContext = dispatchers.lookup(settings.dispatcher match {
lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match {
case Deploy.NoDispatcherGiven Dispatchers.DefaultDispatcherId
case other other
})
@ -238,7 +246,7 @@ private[akka] object ActorProcessorFactory {
case MaterializingStageFactory(mkStageAndMat, _)
val (stage, mat) = mkStageAndMat()
(ActorInterpreter.props(settings, List(stage)), mat)
case DirectProcessor(p, m) throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory")
}
}

View file

@ -106,10 +106,13 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump)
protected def onSubscribe(subscription: Subscription): Unit = {
assert(subscription != null)
upstream = subscription
// Prefetch
upstream.request(inputBuffer.length)
subreceive.become(upstreamRunning)
if (upstreamCompleted) subscription.cancel()
else {
upstream = subscription
// Prefetch
upstream.request(inputBuffer.length)
subreceive.become(upstreamRunning)
}
}
protected def onError(e: Throwable): Unit = {
@ -132,7 +135,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump)
}
protected def completed: Actor.Receive = {
case OnSubscribe(subscription) throw new IllegalStateException("Cannot subscribe shutdown subscriber") // FIXME "shutdown subscriber"?!
case OnSubscribe(subscription) throw new IllegalStateException("onSubscribe called after onError or onComplete")
}
protected def inputOnError(e: Throwable): Unit = {
@ -247,7 +250,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali
}
}
def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive
def activeReceive: Receive = primaryInputs.subreceive.orElse[Any, Unit](primaryOutputs.subreceive)
protected def onError(e: Throwable): Unit = fail(e)

View file

@ -12,18 +12,21 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] {
import ReactiveStreamsCompliance._
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
try tryOnComplete(subscriber) catch {
case _: SpecViolation // nothing to do
case _: SpecViolation // nothing we can do
}
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
override def toString: String = "empty-publisher" // FIXME is this a good name?
override def toString: String = "already-completed-publisher"
}
/**
* INTERNAL API
*/
private[akka] final case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] {
import ReactiveStreamsCompliance._
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
ReactiveStreamsCompliance.tryOnError(subscriber, t) // FIXME how to deal with spec violations here?
try tryOnError(subscriber, t) catch {
case _: SpecViolation // nothing we can do
}
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
override def toString: String = name
}

View file

@ -133,7 +133,6 @@ private[akka] class FlexiMergeImpl[T, S <: Shape](
triggerCompletionAfterRead(inputHandle)
case Read(input)
val elem = inputBunch.dequeue(indexOf(input))
// FIXME: callOnInput
callOnInput(input, elem)
triggerCompletionAfterRead(input)
case read: ReadAll[t]

View file

@ -41,16 +41,15 @@ private[stream] object ReactiveStreamsCompliance {
final def rejectDueToNonPositiveDemand[T](subscriber: Subscriber[T]): Unit =
tryOnError(subscriber, numberOfElementsInRequestMustBePositiveException)
sealed trait SpecViolation {
self: Throwable
def violation: Throwable = self // this method is needed because Scalac is not smart enough to handle it otherwise
}
//FIXME serialVersionUid?
@SerialVersionUID(1L)
sealed trait SpecViolation extends Throwable
@SerialVersionUID(1L)
final class SignalThrewException(message: String, cause: Throwable) extends IllegalStateException(message, cause) with SpecViolation
final def tryOnError[T](subscriber: Subscriber[T], error: Throwable): Unit =
error match {
case sv: SpecViolation throw new IllegalStateException("It is not legal to try to signal onError with a SpecViolation", sv.violation)
case sv: SpecViolation throw new IllegalStateException("It is not legal to try to signal onError with a SpecViolation", sv)
case other
try subscriber.onError(other) catch {
case NonFatal(t) throw new SignalThrewException(subscriber + ".onError", t)

View file

@ -100,6 +100,7 @@ final class FutureSource[Out](future: Future[Out], val attributes: OperationAttr
}
final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Unit]](shape) {
import ReactiveStreamsCompliance._
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val p = Promise[Unit]()
@ -109,15 +110,15 @@ final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: Sou
// so we can enable it then, though it will require external completing of the promise
val pub = new Publisher[Unit] {
override def subscribe(s: Subscriber[_ >: Unit]) = {
s.onSubscribe(new Subscription {
tryOnSubscribe(s, new Subscription {
override def request(n: Long): Unit = ()
override def cancel(): Unit = p.success(())
})
p.future.onComplete {
case Success(_) s.onComplete()
case Failure(ex) s.onError(ex) // due to external signal
}(materializer.asInstanceOf[ActorFlowMaterializerImpl].executionContext) // TODO: Should it use this EC or something else?
case Success(_) tryOnComplete(s)
case Failure(ex) tryOnError(s, ex) // due to external signal
}(materializer.executionContext)
}
}

View file

@ -19,8 +19,7 @@ import scala.concurrent.Future
*/
private[stream] object Stages {
// FIXME Fix the name `Defaults` is waaaay too opaque. How about "Names"?
object Defaults {
object DefaultAttributes {
val timerTransform = name("timerTransform")
val stageFactory = name("stageFactory")
val fused = name("fused")
@ -57,7 +56,7 @@ private[stream] object Stages {
val identityJunction = name("identityJunction")
}
import Defaults._
import DefaultAttributes._
sealed trait StageModule extends FlowModule[Any, Any, Any] {
@ -85,16 +84,16 @@ private[stream] object Stages {
override protected def newInstance: StageModule = this.copy()
}
object Fused {
def apply(ops: immutable.Seq[Stage[_, _]]): Fused =
Fused(ops, name(ops.map(x Logging.simpleName(x).toLowerCase).mkString("+"))) //FIXME change to something more performant for name
}
final case class Identity(attributes: OperationAttributes = OperationAttributes.name("identity")) extends StageModule {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()
}
object Fused {
def apply(ops: immutable.Seq[Stage[_, _]]): Fused =
Fused(ops, name(ops.iterator.map(x Logging.simpleName(x).toLowerCase).mkString("+")))
}
final case class Fused(ops: immutable.Seq[Stage[_, _]], attributes: OperationAttributes = fused) extends StageModule {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()

View file

@ -204,9 +204,9 @@ private[akka] object StreamLayout {
}
object EmptyModule extends Module {
override def shape = EmptyShape
override def shape = ClosedShape
override def replaceShape(s: Shape) =
if (s == EmptyShape) this
if (s == ClosedShape) this
else throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule")
override def grow(that: Module): Module = that

View file

@ -126,8 +126,16 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
// if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore`
if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0
} else if (buffer.count(subscription) > 0) {
subscription.dispatch(buffer.read(subscription)) // FIXME this does not gracefully handle the case if onNext throws
dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos)
val goOn = try {
subscription.dispatch(buffer.read(subscription))
true
} catch {
case _: SpecViolation
unregisterSubscriptionInternal(subscription)
false
}
if (goOn) dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos)
else Long.MinValue
} else if (eos ne NotReached) Long.MinValue
else requested
@ -225,11 +233,15 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
case eos eos(subscriber)
}
protected def addSubscription(subscriber: Subscriber[_ >: T]): Unit = {
private def addSubscription(subscriber: Subscriber[_ >: T]): Unit = {
import ReactiveStreamsCompliance._
val newSubscription = createSubscription(subscriber)
subscriptions ::= newSubscription
buffer.initCursor(newSubscription)
ReactiveStreamsCompliance.tryOnSubscribe(subscriber, newSubscription) // FIXME what if this throws?
try tryOnSubscribe(subscriber, newSubscription)
catch {
case _: SpecViolation unregisterSubscriptionInternal(newSubscription)
}
}
/**

View file

@ -41,7 +41,7 @@ private[akka] object SynchronousIterablePublisher {
} catch {
case sv: SpecViolation
cancel()
throw sv.violation // I think it is prudent to "escalate" the spec violation
throw sv // I think it is prudent to "escalate" the spec violation
case NonFatal(e)
cancel()
tryOnError(subscriber, e)
@ -80,7 +80,7 @@ private[akka] object SynchronousIterablePublisher {
} catch {
case sv: SpecViolation
cancel()
throw sv.violation // I think it is prudent to "escalate" the spec violation
throw sv // I think it is prudent to "escalate" the spec violation
case NonFatal(e)
cancel()
tryOnError(subscriber, e)

View file

@ -77,7 +77,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
tryOnError(subscriber, error)
} finally {
subscriber = null
exposedPublisher.shutdown(Some(error)) // FIXME should this not be SupportsOnlyASingleSubscriber?
exposedPublisher.shutdown(Some(new IllegalStateException("TickPublisher " + SupportsOnlyASingleSubscriber)))
context.stop(self)
}
}

View file

@ -382,7 +382,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
*
* Flow with attached input and output, can be executed.
*/
trait RunnableFlow[+Mat] {
trait RunnableFlow[+Mat] extends Graph[ClosedShape, Mat] {
/**
* Run this flow and return the [[MaterializedMap]] containing the values for the [[KeyedMaterializable]] of the flow.
*/
@ -395,6 +395,8 @@ trait RunnableFlow[+Mat] {
/** INTERNAL API */
private[akka] class RunnableFlowAdapter[Mat](runnable: scaladsl.RunnableFlow[Mat]) extends RunnableFlow[Mat] {
def shape = ClosedShape
def module = runnable.module
override def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): RunnableFlow[Mat2] =
new RunnableFlowAdapter(runnable.mapMaterialized(f.apply _))
override def run(materializer: ActorFlowMaterializer): Mat = runnable.run()(materializer)

View file

@ -116,7 +116,6 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[
/**
* Connect this `Sink` to a `Source` and run it.
*/
// TODO shouldnt this return M?
def runWith[M](source: javadsl.Source[In, M], materializer: ActorFlowMaterializer): M =
asScala.runWith(source.asScala)(materializer)

View file

@ -175,8 +175,9 @@ object Flow extends FlowApply {
/**
* Flow with attached input and output, can be executed.
*/
case class RunnableFlow[+Mat](private[stream] val module: StreamLayout.Module) {
case class RunnableFlow[+Mat](private[stream] val module: StreamLayout.Module) extends Graph[ClosedShape, Mat] {
assert(module.isRunnable)
def shape = ClosedShape
/**
* Transform only the materialized value of this RunnableFlow, leaving all other properties as they were.