diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index 342a8274e0..98c48eee78 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -35,10 +35,8 @@ class FlowGraphDocSpec extends AkkaSpec { val f1, f2, f3, f4 = Flow[Int].map(_ + 10) - in ~> f1 ~> bcast.in - bcast.out(0) ~> f2 ~> merge.in(0) - bcast.out(1) ~> f4 ~> merge.in(1) - merge.out ~> f3 ~> out + in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out + bcast ~> f4 ~> merge } //#simple-flow-graph //format: ON diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 7277bb29db..917b00ef8f 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -5,16 +5,15 @@ package akka.stream import java.util.Locale import java.util.concurrent.TimeUnit - import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props } import akka.stream.impl._ import akka.stream.scaladsl.RunnableFlow import com.typesafe.config.Config - import scala.concurrent.duration._ import akka.actor.Props import akka.actor.ActorRef import akka.stream.javadsl.japi +import scala.concurrent.ExecutionContextExecutor object ActorFlowMaterializer { @@ -152,13 +151,20 @@ abstract class FlowMaterializer { */ def withNamePrefix(name: String): FlowMaterializer - // FIXME this is scaladsl specific /** * This method interprets the given Flow description and creates the running * stream. The result can be highly implementation specific, ranging from * local actor chains to remote-deployed processing networks. */ - def materialize[Mat](runnable: RunnableFlow[Mat]): Mat + def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat + + /** + * Running a flow graph will require execution resources, as will computations + * within Sources, Sinks, etc. This [[scala.concurrent.ExecutionContextExecutor]] + * can be used by parts of the flow to submit processing jobs for execution, + * run Future callbacks, etc. + */ + def executionContext: ExecutionContextExecutor } diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala index 664a9f819f..6e82fb4f1e 100644 --- a/akka-stream/src/main/scala/akka/stream/Shape.scala +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala @@ -12,6 +12,12 @@ sealed abstract class OutPort final class Inlet[-T](override val toString: String) extends InPort final class Outlet[+T](override val toString: String) extends OutPort +/** + * A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the + * philosophy that a Graph is a freely reusable blueprint, everything that + * matters from the outside are the connections that can be made with it, + * otherwise it is just a black box. + */ abstract class Shape { /** * Scala API: get a list of all input ports @@ -55,7 +61,7 @@ abstract class Shape { /** * Compare this to another shape and determine whether the arrangement of ports is the same (including their ordering). */ - def hasSameShapeAs(s: Shape): Boolean = + def hasSamePortsAndShapeAs(s: Shape): Boolean = inlets == s.inlets && outlets == s.outlets /** @@ -66,17 +72,23 @@ abstract class Shape { /** * Asserting version of [[#hasSameShapeAs]]. */ - def requireSameShapeAs(s: Shape): Unit = require(hasSameShapeAs(s), nonCorrespondingMessage(s)) + def requireSamePortsAndShapeAs(s: Shape): Unit = require(hasSamePortsAndShapeAs(s), nonCorrespondingMessage(s)) private def nonCorrespondingMessage(s: Shape) = s"The inlets [${s.inlets.mkString(", ")}] and outlets [${s.outlets.mkString(", ")}] must correspond to the inlets [${inlets.mkString(", ")}] and outlets [${outlets.mkString(", ")}]" } /** - * Java API for creating custom Shape types. + * Java API for creating custom [[Shape]] types. */ abstract class AbstractShape extends Shape { + /** + * Provide the list of all input ports of this shape. + */ def allInlets: java.util.List[Inlet[_]] + /** + * Provide the list of all output ports of this shape. + */ def allOutlets: java.util.List[Outlet[_]] final override lazy val inlets: immutable.Seq[Inlet[_]] = allInlets.asScala.toList @@ -86,22 +98,33 @@ abstract class AbstractShape extends Shape { final override def getOutlets = allOutlets } -object EmptyShape extends Shape { +/** + * This [[Shape]] is used for graphs that have neither open inputs nor open + * outputs. Only such a [[Graph]] can be materialized by a [[FlowMaterializer]]. + */ +sealed abstract class ClosedShape extends Shape +object ClosedShape extends ClosedShape { override val inlets: immutable.Seq[Inlet[_]] = Nil override val outlets: immutable.Seq[Outlet[_]] = Nil override def deepCopy() = this override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { - require(inlets.isEmpty, s"proposed inlets [${inlets.mkString(", ")}] do not fit EmptyShape") - require(outlets.isEmpty, s"proposed outlets [${outlets.mkString(", ")}] do not fit EmptyShape") + require(inlets.isEmpty, s"proposed inlets [${inlets.mkString(", ")}] do not fit ClosedShape") + require(outlets.isEmpty, s"proposed outlets [${outlets.mkString(", ")}] do not fit ClosedShape") this } /** - * Java API: obtain EmptyShape instance + * Java API: obtain ClosedShape instance */ def getInstance: Shape = this } +/** + * This type of [[Shape]] can express any number of inputs and outputs at the + * expense of forgetting about their specific types. It is used mainly in the + * implementation of the [[Graph]] builders and typically replaced by a more + * meaningful type of Shape when the building is finished. + */ case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]) extends Shape { override def deepCopy() = AmorphousShape( inlets.map(i ⇒ new Inlet[Any](i.toString)), @@ -109,6 +132,10 @@ case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Se override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = AmorphousShape(inlets, outlets) } +/** + * A Source [[Shape]] has exactly one output and no inputs, it models a source + * of data. + */ final case class SourceShape[+T](outlet: Outlet[T]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = Nil override val outlets: immutable.Seq[Outlet[_]] = List(outlet) @@ -121,6 +148,11 @@ final case class SourceShape[+T](outlet: Outlet[T]) extends Shape { } } +/** + * A Flow [[Shape]] has exactly one input and one output, it looks from the + * outside like a pipe (but it can be a complex topology of streams within of + * course). + */ final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(inlet) override val outlets: immutable.Seq[Outlet[_]] = List(outlet) @@ -133,6 +165,9 @@ final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends S } } +/** + * A Sink [[Shape]] has exactly one input and no outputs, it models a data sink. + */ final case class SinkShape[-T](inlet: Inlet[T]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(inlet) override val outlets: immutable.Seq[Outlet[_]] = Nil @@ -146,8 +181,13 @@ final case class SinkShape[-T](inlet: Inlet[T]) extends Shape { } /** + * A bidirectional flow of elements that consequently has two inputs and two + * outputs, arranged like this: + * + * {{{ * In1 => Out1 * Out2 <= In2 + * }}} */ final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1], out1: Outlet[Out1], in2: Inlet[In2], out2: Outlet[Out2]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(in1, in2) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index c0332831fc..436e3f4779 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -17,13 +17,13 @@ import akka.stream.scaladsl._ import akka.stream._ import org.reactivestreams._ -import scala.concurrent.{ Await, ExecutionContext } +import scala.concurrent.{ Await, ExecutionContextExecutor } /** * INTERNAL API */ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterializerSettings, - dispatchers: Dispatchers, // FIXME is this the right choice for loading an EC? + dispatchers: Dispatchers, supervisor: ActorRef, flowNameCounter: AtomicLong, namePrefix: String, @@ -37,7 +37,7 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" - override def materialize[Mat](runnableFlow: RunnableFlow[Mat]): Mat = { + override def materialize[Mat](runnableFlow: Graph[ClosedShape, Mat]): Mat = { runnableFlow.module.validate() val session = new MaterializerSession(runnableFlow.module) { @@ -84,13 +84,13 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize op match { case fanin: FanInModule ⇒ val (props, inputs, output) = fanin match { + case MergeModule(shape, _) ⇒ (FairMerge.props(effectiveAttributes.settings(settings), shape.inArray.size), shape.inArray.toSeq, shape.out) case f: FlexiMergeModule[t, p] ⇒ val flexi = f.flexi(f.shape) (FlexiMerge.props(effectiveAttributes.settings(settings), f.shape, flexi), f.shape.inlets, f.shape.outlets.head) - // TODO each materialization needs its own logic case MergePreferredModule(shape, _) ⇒ (UnfairMerge.props(effectiveAttributes.settings(settings), shape.inlets.size), shape.preferred +: shape.inArray.toSeq, shape.out) @@ -112,20 +112,28 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize case fanout: FanOutModule ⇒ val (props, in, outs) = fanout match { + case r: FlexiRouteModule[t, p] ⇒ val flexi = r.flexi(r.shape) (FlexiRoute.props(effectiveAttributes.settings(settings), r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets) + case BroadcastModule(shape, _) ⇒ (Broadcast.props(effectiveAttributes.settings(settings), shape.outArray.size), shape.in, shape.outArray.toSeq) + case BalanceModule(shape, waitForDownstreams, _) ⇒ (Balance.props(effectiveAttributes.settings(settings), shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq) + case UnzipModule(shape, _) ⇒ (Unzip.props(effectiveAttributes.settings(settings)), shape.in, shape.outlets) } val impl = actorOf(props, stageName(effectiveAttributes), effectiveAttributes.settings(settings).dispatcher) - val publishers = Vector.tabulate(outs.size)(id ⇒ new ActorPublisher[Any](impl) { // FIXME switch to List.tabulate for inputCount < 8? + val size = outs.size + def factory(id: Int) = new ActorPublisher[Any](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) - }) + } + val publishers = + if (outs.size < 8) Vector.tabulate(size)(factory) + else List.tabulate(size)(factory) impl ! FanOut.ExposedPublishers(publishers) publishers.zip(outs).foreach { case (pub, out) ⇒ assignPort(out, pub) } @@ -140,7 +148,7 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize session.materialize().asInstanceOf[Mat] } - def executionContext: ExecutionContext = dispatchers.lookup(settings.dispatcher match { + lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match { case Deploy.NoDispatcherGiven ⇒ Dispatchers.DefaultDispatcherId case other ⇒ other }) @@ -238,7 +246,7 @@ private[akka] object ActorProcessorFactory { case MaterializingStageFactory(mkStageAndMat, _) ⇒ val (stage, mat) = mkStageAndMat() (ActorInterpreter.props(settings, List(stage)), mat) - + case DirectProcessor(p, m) ⇒ throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory") } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 654c9d2fe7..490fcdf69f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -106,10 +106,13 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) protected def onSubscribe(subscription: Subscription): Unit = { assert(subscription != null) - upstream = subscription - // Prefetch - upstream.request(inputBuffer.length) - subreceive.become(upstreamRunning) + if (upstreamCompleted) subscription.cancel() + else { + upstream = subscription + // Prefetch + upstream.request(inputBuffer.length) + subreceive.become(upstreamRunning) + } } protected def onError(e: Throwable): Unit = { @@ -132,7 +135,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) } protected def completed: Actor.Receive = { - case OnSubscribe(subscription) ⇒ throw new IllegalStateException("Cannot subscribe shutdown subscriber") // FIXME "shutdown subscriber"?! + case OnSubscribe(subscription) ⇒ throw new IllegalStateException("onSubscribe called after onError or onComplete") } protected def inputOnError(e: Throwable): Unit = { @@ -247,7 +250,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali } } - def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive + def activeReceive: Receive = primaryInputs.subreceive.orElse[Any, Unit](primaryOutputs.subreceive) protected def onError(e: Throwable): Unit = fail(e) diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 6bf5367ba4..6c25283368 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -12,18 +12,21 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] { import ReactiveStreamsCompliance._ override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = try tryOnComplete(subscriber) catch { - case _: SpecViolation ⇒ // nothing to do + case _: SpecViolation ⇒ // nothing we can do } def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] - override def toString: String = "empty-publisher" // FIXME is this a good name? + override def toString: String = "already-completed-publisher" } /** * INTERNAL API */ private[akka] final case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] { + import ReactiveStreamsCompliance._ override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = - ReactiveStreamsCompliance.tryOnError(subscriber, t) // FIXME how to deal with spec violations here? + try tryOnError(subscriber, t) catch { + case _: SpecViolation ⇒ // nothing we can do + } def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] override def toString: String = name } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala index b1bd9b50e5..ac8b9fd3a9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala @@ -133,7 +133,6 @@ private[akka] class FlexiMergeImpl[T, S <: Shape]( triggerCompletionAfterRead(inputHandle) case Read(input) ⇒ val elem = inputBunch.dequeue(indexOf(input)) - // FIXME: callOnInput callOnInput(input, elem) triggerCompletionAfterRead(input) case read: ReadAll[t] ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala index e3e40df3b0..e33936d22f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala @@ -41,16 +41,15 @@ private[stream] object ReactiveStreamsCompliance { final def rejectDueToNonPositiveDemand[T](subscriber: Subscriber[T]): Unit = tryOnError(subscriber, numberOfElementsInRequestMustBePositiveException) - sealed trait SpecViolation { - self: Throwable ⇒ - def violation: Throwable = self // this method is needed because Scalac is not smart enough to handle it otherwise - } - //FIXME serialVersionUid? + @SerialVersionUID(1L) + sealed trait SpecViolation extends Throwable + + @SerialVersionUID(1L) final class SignalThrewException(message: String, cause: Throwable) extends IllegalStateException(message, cause) with SpecViolation final def tryOnError[T](subscriber: Subscriber[T], error: Throwable): Unit = error match { - case sv: SpecViolation ⇒ throw new IllegalStateException("It is not legal to try to signal onError with a SpecViolation", sv.violation) + case sv: SpecViolation ⇒ throw new IllegalStateException("It is not legal to try to signal onError with a SpecViolation", sv) case other ⇒ try subscriber.onError(other) catch { case NonFatal(t) ⇒ throw new SignalThrewException(subscriber + ".onError", t) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index 158d106093..5b75df29be 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -100,6 +100,7 @@ final class FutureSource[Out](future: Future[Out], val attributes: OperationAttr } final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Unit]](shape) { + import ReactiveStreamsCompliance._ override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { val p = Promise[Unit]() @@ -109,15 +110,15 @@ final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: Sou // so we can enable it then, though it will require external completing of the promise val pub = new Publisher[Unit] { override def subscribe(s: Subscriber[_ >: Unit]) = { - s.onSubscribe(new Subscription { + tryOnSubscribe(s, new Subscription { override def request(n: Long): Unit = () override def cancel(): Unit = p.success(()) }) p.future.onComplete { - case Success(_) ⇒ s.onComplete() - case Failure(ex) ⇒ s.onError(ex) // due to external signal - }(materializer.asInstanceOf[ActorFlowMaterializerImpl].executionContext) // TODO: Should it use this EC or something else? + case Success(_) ⇒ tryOnComplete(s) + case Failure(ex) ⇒ tryOnError(s, ex) // due to external signal + }(materializer.executionContext) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index e8b7ec60f9..6602e67e6d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -19,8 +19,7 @@ import scala.concurrent.Future */ private[stream] object Stages { - // FIXME Fix the name `Defaults` is waaaay too opaque. How about "Names"? - object Defaults { + object DefaultAttributes { val timerTransform = name("timerTransform") val stageFactory = name("stageFactory") val fused = name("fused") @@ -57,7 +56,7 @@ private[stream] object Stages { val identityJunction = name("identityJunction") } - import Defaults._ + import DefaultAttributes._ sealed trait StageModule extends FlowModule[Any, Any, Any] { @@ -85,16 +84,16 @@ private[stream] object Stages { override protected def newInstance: StageModule = this.copy() } - object Fused { - def apply(ops: immutable.Seq[Stage[_, _]]): Fused = - Fused(ops, name(ops.map(x ⇒ Logging.simpleName(x).toLowerCase).mkString("+"))) //FIXME change to something more performant for name - } - final case class Identity(attributes: OperationAttributes = OperationAttributes.name("identity")) extends StageModule { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() } + object Fused { + def apply(ops: immutable.Seq[Stage[_, _]]): Fused = + Fused(ops, name(ops.iterator.map(x ⇒ Logging.simpleName(x).toLowerCase).mkString("+"))) + } + final case class Fused(ops: immutable.Seq[Stage[_, _]], attributes: OperationAttributes = fused) extends StageModule { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 7a54f74782..769d482e52 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -204,9 +204,9 @@ private[akka] object StreamLayout { } object EmptyModule extends Module { - override def shape = EmptyShape + override def shape = ClosedShape override def replaceShape(s: Shape) = - if (s == EmptyShape) this + if (s == ClosedShape) this else throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule") override def grow(that: Module): Module = that diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala index d6acf34cf9..56bede5603 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala @@ -126,8 +126,16 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff // if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore` if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0 } else if (buffer.count(subscription) > 0) { - subscription.dispatch(buffer.read(subscription)) // FIXME this does not gracefully handle the case if onNext throws - dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos) + val goOn = try { + subscription.dispatch(buffer.read(subscription)) + true + } catch { + case _: SpecViolation ⇒ + unregisterSubscriptionInternal(subscription) + false + } + if (goOn) dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos) + else Long.MinValue } else if (eos ne NotReached) Long.MinValue else requested @@ -225,11 +233,15 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff case eos ⇒ eos(subscriber) } - protected def addSubscription(subscriber: Subscriber[_ >: T]): Unit = { + private def addSubscription(subscriber: Subscriber[_ >: T]): Unit = { + import ReactiveStreamsCompliance._ val newSubscription = createSubscription(subscriber) subscriptions ::= newSubscription buffer.initCursor(newSubscription) - ReactiveStreamsCompliance.tryOnSubscribe(subscriber, newSubscription) // FIXME what if this throws? + try tryOnSubscribe(subscriber, newSubscription) + catch { + case _: SpecViolation ⇒ unregisterSubscriptionInternal(newSubscription) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala index 22ac209e13..c6931268f2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala @@ -41,7 +41,7 @@ private[akka] object SynchronousIterablePublisher { } catch { case sv: SpecViolation ⇒ cancel() - throw sv.violation // I think it is prudent to "escalate" the spec violation + throw sv // I think it is prudent to "escalate" the spec violation case NonFatal(e) ⇒ cancel() tryOnError(subscriber, e) @@ -80,7 +80,7 @@ private[akka] object SynchronousIterablePublisher { } catch { case sv: SpecViolation ⇒ cancel() - throw sv.violation // I think it is prudent to "escalate" the spec violation + throw sv // I think it is prudent to "escalate" the spec violation case NonFatal(e) ⇒ cancel() tryOnError(subscriber, e) diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index a318c3e318..267334c6b6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -77,7 +77,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite tryOnError(subscriber, error) } finally { subscriber = null - exposedPublisher.shutdown(Some(error)) // FIXME should this not be SupportsOnlyASingleSubscriber? + exposedPublisher.shutdown(Some(new IllegalStateException("TickPublisher " + SupportsOnlyASingleSubscriber))) context.stop(self) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index c51fbde3a8..5bef7b043e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -382,7 +382,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * * Flow with attached input and output, can be executed. */ -trait RunnableFlow[+Mat] { +trait RunnableFlow[+Mat] extends Graph[ClosedShape, Mat] { /** * Run this flow and return the [[MaterializedMap]] containing the values for the [[KeyedMaterializable]] of the flow. */ @@ -395,6 +395,8 @@ trait RunnableFlow[+Mat] { /** INTERNAL API */ private[akka] class RunnableFlowAdapter[Mat](runnable: scaladsl.RunnableFlow[Mat]) extends RunnableFlow[Mat] { + def shape = ClosedShape + def module = runnable.module override def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): RunnableFlow[Mat2] = new RunnableFlowAdapter(runnable.mapMaterialized(f.apply _)) override def run(materializer: ActorFlowMaterializer): Mat = runnable.run()(materializer) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index ba02acefc0..c58d068114 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -116,7 +116,6 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[ /** * Connect this `Sink` to a `Source` and run it. */ - // TODO shouldn’t this return M? def runWith[M](source: javadsl.Source[In, M], materializer: ActorFlowMaterializer): M = asScala.runWith(source.asScala)(materializer) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index bde9718eb0..fb36669700 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -175,8 +175,9 @@ object Flow extends FlowApply { /** * Flow with attached input and output, can be executed. */ -case class RunnableFlow[+Mat](private[stream] val module: StreamLayout.Module) { +case class RunnableFlow[+Mat](private[stream] val module: StreamLayout.Module) extends Graph[ClosedShape, Mat] { assert(module.isRunnable) + def shape = ClosedShape /** * Transform only the materialized value of this RunnableFlow, leaving all other properties as they were.