From 01646d10ff2fd4b3010df08c4bc7fc80a3915cbd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 30 Jan 2015 10:30:56 +0100 Subject: [PATCH] !str #16448 Change error to failure * one API change OverflowStrategy.error -> OverflowStrategy.fail * error is still kept in the internals where we are at the reactive streams level --- akka-docs-dev/rst/java/stream-graphs.rst | 2 +- akka-docs-dev/rst/java/stream-quickstart.rst | 2 +- .../docs/stream/StreamBuffersRateSpec.scala | 6 ++--- .../stream/cookbook/RecipeReduceByKey.scala | 4 ++-- akka-docs-dev/rst/scala/stream-cookbook.rst | 2 +- akka-docs-dev/rst/scala/stream-customize.rst | 22 +++++++++---------- akka-docs-dev/rst/scala/stream-graphs.rst | 2 +- akka-docs-dev/rst/scala/stream-rate.rst | 2 +- .../akka/stream/scaladsl/FlowBufferSpec.scala | 6 ++--- .../scala/akka/stream/FlowMaterializer.scala | 2 +- .../scala/akka/stream/OverflowStrategy.scala | 7 +++--- .../main/scala/akka/stream/Transformer.scala | 4 ++-- .../akka/stream/impl/ActorPublisher.scala | 6 ++--- .../stream/impl/MapAsyncProcessorImpl.scala | 6 ++--- .../impl/MapAsyncUnorderedProcessorImpl.scala | 4 ++-- .../akka/stream/impl/TickPublisher.scala | 8 +++---- .../akka/stream/impl/fusing/Interpreter.scala | 2 +- .../impl/fusing/IteratorInterpreter.scala | 12 +++++----- .../scala/akka/stream/impl/fusing/Ops.scala | 4 ++-- .../akka/stream/javadsl/FlexiMerge.scala | 4 ++-- .../akka/stream/javadsl/FlexiRoute.scala | 4 ++-- .../main/scala/akka/stream/javadsl/Sink.scala | 6 ++--- .../scala/akka/stream/javadsl/Source.scala | 8 +++---- .../akka/stream/scaladsl/ActorFlowSink.scala | 6 ++--- .../stream/scaladsl/ActorFlowSource.scala | 4 ++-- .../akka/stream/scaladsl/FlexiMerge.scala | 4 ++-- .../akka/stream/scaladsl/FlexiRoute.scala | 4 ++-- .../scala/akka/stream/scaladsl/Flow.scala | 2 +- .../scala/akka/stream/scaladsl/Pipe.scala | 8 +++---- .../scala/akka/stream/scaladsl/Sink.scala | 6 ++--- .../scala/akka/stream/scaladsl/Source.scala | 8 +++---- .../main/scala/akka/stream/stage/Stage.scala | 8 +++---- 32 files changed, 87 insertions(+), 88 deletions(-) diff --git a/akka-docs-dev/rst/java/stream-graphs.rst b/akka-docs-dev/rst/java/stream-graphs.rst index a2ab83a929..b248650ecd 100644 --- a/akka-docs-dev/rst/java/stream-graphs.rst +++ b/akka-docs-dev/rst/java/stream-graphs.rst @@ -184,7 +184,7 @@ If we run this example we see that This example highlights that one solution to avoid deadlocks in the presence of potentially unbalanced cycles (cycles where the number of circulating elements are unbounded) is to drop elements. An alternative would be to -define a larger buffer with ``OverflowStrategy.error`` which would fail the stream instead of deadlocking it after +define a larger buffer with ``OverflowStrategy.fail`` which would fail the stream instead of deadlocking it after all buffer space has been consumed. As we discovered in the previous examples, the core problem was the unbalanced nature of the feedback loop. We diff --git a/akka-docs-dev/rst/java/stream-quickstart.rst b/akka-docs-dev/rst/java/stream-quickstart.rst index 5cb094d886..32f6d42c62 100644 --- a/akka-docs-dev/rst/java/stream-quickstart.rst +++ b/akka-docs-dev/rst/java/stream-quickstart.rst @@ -120,7 +120,7 @@ elements*" this can be expressed using the ``buffer`` element: The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react when it receives another element element while it is full. Strategies provided include dropping the oldest element (``dropHead``), -dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best. +dropping the entire buffer, signalling failures etc. Be sure to pick and choose the strategy that fits your use case best. Materialized values ------------------- diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala index 3e21feb995..e1084e5688 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala @@ -79,9 +79,9 @@ class StreamBuffersRateSpec extends AkkaSpec { jobs.buffer(1000, OverflowStrategy.dropBuffer) //#explicit-buffers-dropbuffer - //#explicit-buffers-error - jobs.buffer(1000, OverflowStrategy.error) - //#explicit-buffers-error + //#explicit-buffers-fail + jobs.buffer(1000, OverflowStrategy.fail) + //#explicit-buffers-fail } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala index 30f654b6c7..1a2e87b7b9 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala @@ -31,7 +31,7 @@ class RecipeReduceByKey extends RecipeSpec { // get a stream of word counts val counts: Source[(String, Int)] = countedWords - .buffer(MaximumDistinctWords, OverflowStrategy.error) + .buffer(MaximumDistinctWords, OverflowStrategy.fail) .mapAsync(identity) //#word-count @@ -62,7 +62,7 @@ class RecipeReduceByKey extends RecipeSpec { } } - reducedValues.buffer(maximumGroupSize, OverflowStrategy.error).mapAsync(identity) + reducedValues.buffer(maximumGroupSize, OverflowStrategy.fail).mapAsync(identity) } val wordCounts = words.via(reduceByKey( diff --git a/akka-docs-dev/rst/scala/stream-cookbook.rst b/akka-docs-dev/rst/scala/stream-cookbook.rst index 9d3ac7993e..5832ba976b 100644 --- a/akka-docs-dev/rst/scala/stream-cookbook.rst +++ b/akka-docs-dev/rst/scala/stream-cookbook.rst @@ -134,7 +134,7 @@ that the substreams produced by ``groupBy()`` can only complete when the origina that ``mapAsync()`` cannot pull for more substreams because it still waits on folding futures to finish, but these futures never finish if the additional group streams are not consumed. This typical deadlock situation is resolved by this buffer which either able to contain all the group streams (which ensures that they are already running and folding) -or fails with an explicit error instead of a silent deadlock. +or fails with an explicit failure instead of a silent deadlock. .. includecode:: code/docs/stream/cookbook/RecipeReduceByKey.scala#word-count diff --git a/akka-docs-dev/rst/scala/stream-customize.rst b/akka-docs-dev/rst/scala/stream-customize.rst index 82ff1571f7..27eeeaedd3 100644 --- a/akka-docs-dev/rst/scala/stream-customize.rst +++ b/akka-docs-dev/rst/scala/stream-customize.rst @@ -198,7 +198,7 @@ In our case we use the :class:`ReadPreferred` read condition which has the exact our preferring merge – it pulls elements from the preferred input port if there are any available, otherwise reverting to pulling from the secondary inputs. The context object passed into the state function allows us to interact with the connected streams, for example by emitting an ``element``, which was just pulled from the given ``input``, or signalling -completion or errors to the merges downstream stage. +completion or failure to the merges downstream stage. The state function must always return the next behaviour to be used when an element should be pulled from its upstreams, we use the special :class:`SameState` object which signals :class:`FlexiMerge` that no state transition is needed. @@ -233,21 +233,21 @@ Connecting your custom junction is as simple as creating an instance and connect Completion handling ^^^^^^^^^^^^^^^^^^^ Completion handling in :class:`FlexiMerge` is defined by an :class:`CompletionHandling` object which can react on -completion and error signals from its upstream input ports. The default strategy is to remain running while at-least-one +completion and failure signals from its upstream input ports. The default strategy is to remain running while at-least-one upstream input port which are declared to be consumed in the current state is still running (i.e. has not signalled -completion or error). +completion or failure). Customising completion can be done via overriding the ``MergeLogic#initialCompletionHandling`` method, or from within a :class:`State` by calling ``ctx.changeCompletionHandling(handling)``. Other than the default completion handling (as -late as possible) :class:`FlexiMerge` also provides an ``eagerClose`` completion handling which completes (or errors) its -downstream as soon as at least one of its upstream inputs completes (or errors). +late as possible) :class:`FlexiMerge` also provides an ``eagerClose`` completion handling which completes (or fails) its +downstream as soon as at least one of its upstream inputs completes (or fails). In the example below the we implement an ``ImportantWithBackups`` fan-in stage which can only keep operating while the ``important`` and at-least-one of the ``replica`` inputs are active. Therefore in our custom completion strategy we -have to investigate which input has completed or errored and act accordingly. If the important input completed or errored +have to investigate which input has completed or failed and act accordingly. If the important input completed or failed we propagate this downstream completing the stream, on the other hand if the first replicated input fails, we log the -exception and instead of erroring the downstream swallow this exception (as one failed replica is still acceptable). -Then we change the completion strategy to ``eagerClose`` which will propagate any future completion or error event right +exception and instead of failing the downstream swallow this exception (as one failed replica is still acceptable). +Then we change the completion strategy to ``eagerClose`` which will propagate any future completion or failure event right to this stages downstream effectively shutting down the stream. .. includecode:: code/docs/stream/FlexiDocSpec.scala#fleximerge-completion @@ -294,10 +294,10 @@ we use the special :class:`SameState` object which signals :class:`FlexiRoute` t Completion handling ^^^^^^^^^^^^^^^^^^^ Completion handling in :class:`FlexiRoute` is handled similarily to :class:`FlexiMerge` (which is explained in depth in -:ref:`flexi-merge-completion-handling-scala`), however in addition to reacting to its upstreams *completion* or *error* +:ref:`flexi-merge-completion-handling-scala`), however in addition to reacting to its upstreams *completion* or *failure* it can also react to its downstream stages *cancelling* their subscriptions. The default completion handling for :class:`FlexiRoute` (defined in ``RouteLogic#defaultCompletionHandling``) is to continue running until all of its -downstreams have cancelled their subscriptions, or the upstream has completed / errored. +downstreams have cancelled their subscriptions, or the upstream has completed / failed. In order to customise completion handling we can override overriding the ``RouteLogic#initialCompletionHandling`` method, or call ``ctx.changeCompletionHandling(handling)`` from within a :class:`State`. Other than the default completion handling @@ -309,7 +309,7 @@ downstream cancels, otherwise (if any other downstream cancels their subscriptio .. includecode:: code/docs/stream/FlexiDocSpec.scala#flexiroute-completion -Notice that State changes are only allowed in reaction to downstream cancellations, and not in the upstream completion/error +Notice that State changes are only allowed in reaction to downstream cancellations, and not in the upstream completion/failure cases. This is because since there is only one upstream, there is nothing else to do than possibly flush buffered elements and continue with shutting down the entire stream. diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst index bb203b86ec..3634843626 100644 --- a/akka-docs-dev/rst/scala/stream-graphs.rst +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -190,7 +190,7 @@ If we run this example we see that This example highlights that one solution to avoid deadlocks in the presence of potentially unbalanced cycles (cycles where the number of circulating elements are unbounded) is to drop elements. An alternative would be to -define a larger buffer with ``OverflowStrategy.error`` which would fail the stream instead of deadlocking it after +define a larger buffer with ``OverflowStrategy.fail`` which would fail the stream instead of deadlocking it after all buffer space has been consumed. As we discovered in the previous examples, the core problem was the unbalanced nature of the feedback loop. We diff --git a/akka-docs-dev/rst/scala/stream-rate.rst b/akka-docs-dev/rst/scala/stream-rate.rst index d59cb82c68..1c400d319d 100644 --- a/akka-docs-dev/rst/scala/stream-rate.rst +++ b/akka-docs-dev/rst/scala/stream-rate.rst @@ -127,7 +127,7 @@ otherwise we consider it flooding and terminate the connection. This is easily achievable by the error strategy which simply fails the stream once the buffer gets full. -.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-error +.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-fail Rate transformation =================== diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala index de91daf699..46423199d6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala @@ -10,7 +10,7 @@ import scala.concurrent.duration._ import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.OverflowStrategy -import akka.stream.OverflowStrategy.Error.BufferOverflowException +import akka.stream.OverflowStrategy.Fail.BufferOverflowException import akka.stream.testkit.{ AkkaSpec, StreamTestKit } class FlowBufferSpec extends AkkaSpec { @@ -155,11 +155,11 @@ class FlowBufferSpec extends AkkaSpec { sub.cancel() } - "error upstream if buffer is full and configured so" in { + "fail upstream if buffer is full and configured so" in { val publisher = StreamTestKit.PublisherProbe[Int] val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.error).runWith(Sink(subscriber)) + Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.fail).runWith(Sink(subscriber)) val autoPublisher = new StreamTestKit.AutoPublisher(publisher) val sub = subscriber.expectSubscription() diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index da6479b043..8386af3497 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -149,7 +149,7 @@ abstract class FlowMaterializer { /** * The `namePrefix` shall be used for deriving the names of processing * entities that are created during materialization. This is meant to aid - * logging and error reporting both during materialization and while the + * logging and failure reporting both during materialization and while the * stream is running. */ def withNamePrefix(name: String): FlowMaterializer diff --git a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala index 359edf7049..4a2e7d205d 100644 --- a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala @@ -33,7 +33,7 @@ object OverflowStrategy { /** * INTERNAL API */ - private[akka] final case object Error extends OverflowStrategy { + private[akka] final case object Fail extends OverflowStrategy { final case class BufferOverflowException(msg: String) extends RuntimeException(msg) } @@ -61,8 +61,7 @@ object OverflowStrategy { def backpressure: OverflowStrategy = Backpressure /** - * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until - * space becomes available in the buffer. + * If the buffer is full when a new element is available this strategy completes the stream with failure. */ - def error: OverflowStrategy = Error + def fail: OverflowStrategy = Fail } diff --git a/akka-stream/src/main/scala/akka/stream/Transformer.scala b/akka-stream/src/main/scala/akka/stream/Transformer.scala index fbb2a17af8..ea2a46e887 100644 --- a/akka-stream/src/main/scala/akka/stream/Transformer.scala +++ b/akka-stream/src/main/scala/akka/stream/Transformer.scala @@ -25,7 +25,7 @@ private[akka] abstract class TransformerLike[-T, +U] { * end-of-stream event. * * This method is only called if [[#onError]] does not throw an exception. The default implementation - * of [[#onError]] throws the received cause forcing the error to propagate downstream immediately. + * of [[#onError]] throws the received cause forcing the failure to propagate downstream immediately. * * @param e Contains a non-empty option with the error causing the termination or an empty option * if the Transformer was completed normally @@ -40,7 +40,7 @@ private[akka] abstract class TransformerLike[-T, +U] { def onError(cause: Throwable): Unit = throw cause /** - * Invoked after normal completion or error. + * Invoked after normal completion or failure. */ def cleanup(): Unit = () diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index fb06c53dfe..99158eba32 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -49,7 +49,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { @tailrec def doSubscribe(subscriber: Subscriber[_ >: T]): Unit = { val current = pendingSubscribers.get if (current eq null) - reportSubscribeError(subscriber) + reportSubscribeFailure(subscriber) else { if (pendingSubscribers.compareAndSet(current, subscriber +: current)) impl ! wakeUpMsg @@ -71,13 +71,13 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { shutdownReason = reason pendingSubscribers.getAndSet(null) match { case null ⇒ // already called earlier - case pending ⇒ pending foreach reportSubscribeError + case pending ⇒ pending foreach reportSubscribeFailure } } @volatile private var shutdownReason: Option[Throwable] = None - private def reportSubscribeError(subscriber: Subscriber[_ >: T]): Unit = + private def reportSubscribeFailure(subscriber: Subscriber[_ >: T]): Unit = shutdownReason match { case Some(e) ⇒ subscriber.onError(e) case None ⇒ subscriber.onComplete() diff --git a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala index 3925c158fe..f85fda2674 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala @@ -61,7 +61,7 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin // this is mutable for speed var n = 0 var elements = mutable.ListBuffer.empty[Any] - var error: Option[Throwable] = None + var failure: Option[Throwable] = None val iter = orderedBuffer.iterator @tailrec def split(): Unit = if (iter.hasNext) { @@ -111,7 +111,7 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin } override def onError(e: Throwable): Unit = { - // propagate upstream error immediately + // propagate upstream failure immediately fail(e) } @@ -134,7 +134,7 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin }.pipeTo(self) } catch { case NonFatal(err) ⇒ - // f threw, propagate error immediately + // f threw, propagate failure immediately fail(err) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala index f9cf698c4d..b0a56be874 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala @@ -51,7 +51,7 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali } override def onError(e: Throwable): Unit = { - // propagate upstream error immediately + // propagate upstream failure immediately fail(e) } @@ -73,7 +73,7 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali }.pipeTo(self) } catch { case NonFatal(err) ⇒ - // f threw, propagate error immediately + // f threw, propagate failure immediately fail(err) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index 96f1bb5eec..15e8518395 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -71,7 +71,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite context.become(active) } - def handleError(error: Throwable): Unit = { + def handleFailure(error: Throwable): Unit = { try { if (!error.isInstanceOf[SpecViolation]) tryOnError(subscriber, error) @@ -90,16 +90,16 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite tryOnNext(subscriber, tick) } } catch { - case NonFatal(e) ⇒ handleError(e) + case NonFatal(e) ⇒ handleFailure(e) } case RequestMore(elements) ⇒ if (elements < 1) { - handleError(numberOfElementsInRequestMustBePositiveException) + handleFailure(numberOfElementsInRequestMustBePositiveException) } else { demand += elements if (demand < 0) // Long has overflown, reactive-streams specification rule 3.17 - handleError(totalPendingDemandMustNotExceedLongMaxValueException) + handleFailure(totalPendingDemandMustNotExceedLongMaxValueException) } case Cancel ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index cb267d4f53..1557a3f6af 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -133,7 +133,7 @@ private[akka] object OneBoundedInterpreter { * - ctx.pushAndFinish() which is different from the forking ops above because the execution of push and finish happens on * the same execution region and they are order dependent, too. * The interpreter tracks the depth of recursive forking and allows various strategies of dealing with the situation - * when this depth reaches a certain limit. In the simplest case an error is reported (this is very useful for stress + * when this depth reaches a certain limit. In the simplest case a failure is reported (this is very useful for stress * testing and finding callstack wasting bugs), in the other case the forked call is scheduled via a list -- i.e. instead * of the stack the heap is used. */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala index 8dead5c510..f3a574e327 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala @@ -34,7 +34,7 @@ private[akka] object IteratorInterpreter { private var done = false private var nextElem: T = _ private var needsPull = true - private var lastError: Throwable = null + private var lastFailure: Throwable = null override def onPush(elem: Any, ctx: BoundaryContext): Directive = { nextElem = elem.asInstanceOf[T] @@ -52,7 +52,7 @@ private[akka] object IteratorInterpreter { override def onUpstreamFailure(cause: Throwable, ctx: BoundaryContext): TerminationDirective = { done = true - lastError = cause + lastFailure = cause ctx.finish() } @@ -64,13 +64,13 @@ private[akka] object IteratorInterpreter { override def hasNext: Boolean = { if (!done) pullIfNeeded() - !(done && needsPull) || (lastError ne null) + !(done && needsPull) || (lastFailure ne null) } override def next(): T = { - if (lastError ne null) { - val e = lastError - lastError = null + if (lastFailure ne null) { + val e = lastFailure + lastFailure = null throw e } else if (!hasNext) Iterator.empty.next() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 5cf745685f..70bc750c52 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -207,8 +207,8 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt if (buffer.isFull) ctx.hold() else ctx.pull() } - case Error ⇒ { (ctx, elem) ⇒ - if (buffer.isFull) ctx.fail(new Error.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!")) + case Fail ⇒ { (ctx, elem) ⇒ + if (buffer.isFull) ctx.fail(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!")) else { buffer.enqueue(elem) ctx.pull() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala index 6b24496dee..8db7ac2b99 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala @@ -220,7 +220,7 @@ object FlexiMerge { /** * Will continue to operate until a read becomes unsatisfiable, then it completes. - * Errors are immediately propagated. + * Failures are immediately propagated. */ def defaultCompletionHandling[A]: CompletionHandling[Out] = new CompletionHandling[Out] { @@ -234,7 +234,7 @@ object FlexiMerge { /** * Completes as soon as any input completes. - * Errors are immediately propagated. + * Failures are immediately propagated. */ def eagerClose[A]: CompletionHandling[Out] = new CompletionHandling[Out] { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala index f294a9c29e..c459591284 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala @@ -191,7 +191,7 @@ object FlexiRoute { /** * When an output cancels it continues with remaining outputs. - * Error or completion from upstream are immediately propagated. + * Failure or completion from upstream are immediately propagated. */ def defaultCompletionHandling: CompletionHandling[In] = new CompletionHandling[In] { @@ -203,7 +203,7 @@ object FlexiRoute { /** * Completes as soon as any output cancels. - * Error or completion from upstream are immediately propagated. + * Failure or completion from upstream are immediately propagated. */ def eagerClose[A]: CompletionHandling[In] = new CompletionHandling[In] { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 2fc8f77157..5c26a5501e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -27,7 +27,7 @@ object Sink { * output (or the given `zero` value) and the element as input. * The returned [[scala.concurrent.Future]] will be completed with value of the final * function evaluation when the input stream ends, or completed with `Failure` - * if there is an error is signaled in the stream. + * if there is a failure is signaled in the stream. */ def fold[U, In](zero: U, f: japi.Function2[U, In, U]): javadsl.KeyedSink[In, Future[U]] = new KeyedSink(scaladsl.Sink.fold[U, In](zero)(f.apply)) @@ -82,7 +82,7 @@ object Sink { /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized * into a [[scala.concurrent.Future]] will be completed with `Success` when reaching the - * normal end of the stream, or completed with `Failure` if there is an error is signaled in + * normal end of the stream, or completed with `Failure` if there is a failure is signaled in * the stream.. */ def foreach[T](f: japi.Procedure[T]): KeyedSink[T, Future[Unit]] = @@ -96,7 +96,7 @@ object Sink { new KeyedSink(scaladsl.Sink.fanoutPublisher(initialBufferSize, maximumBufferSize)) /** - * A `Sink` that when the flow is completed, either through an error or normal + * A `Sink` that when the flow is completed, either through a failure or normal * completion, apply the provided function with [[scala.util.Success]] * or [[scala.util.Failure]]. */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index bc5c043798..5498d2516f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -100,7 +100,7 @@ object Source { * Start a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which * may happen before or after materializing the `Flow`. - * The stream terminates with an error if the `Future` is completed with a failure. + * The stream terminates with a failure if the `Future` is completed with a failure. */ def from[O](future: Future[O]): javadsl.Source[O] = new Source(scaladsl.Source(future)) @@ -145,7 +145,7 @@ object Source { new Source(scaladsl.Source.single(element)) /** - * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. + * Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`. */ def failed[T](cause: Throwable): Source[T] = new Source(scaladsl.Source.failed(cause)) @@ -214,7 +214,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { * output (or the given `zero` value) and the element as input. * The returned [[scala.concurrent.Future]] will be completed with value of the final * function evaluation when the input stream ends, or completed with `Failure` - * if there is an error is signaled in the stream. + * if there is a failure is signaled in the stream. */ def runFold[U](zero: U, f: japi.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] = runWith(Sink.fold(zero, f), materializer) @@ -231,7 +231,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked * for each received element. * The returned [[scala.concurrent.Future]] will be completed with `Success` when reaching the - * normal end of the stream, or completed with `Failure` if there is an error is signaled in + * normal end of the stream, or completed with `Failure` if there is a failure is signaled in * the stream. */ def runForeach(f: japi.Procedure[Out], materializer: FlowMaterializer): Future[Unit] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala index 64083dafa9..064b4b2772 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala @@ -162,7 +162,7 @@ object OnCompleteSink { } /** - * When the flow is completed, either through an error or normal + * When the flow is completed, either through failure or normal * completion, apply the provided function with [[scala.util.Success]] * or [[scala.util.Failure]]. */ @@ -191,7 +191,7 @@ final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Unit) extends Simple /** * Invoke the given procedure for each received element. The sink holds a [[scala.concurrent.Future]] * that will be completed with `Success` when reaching the normal end of the stream, or completed - * with `Failure` if there is an error is signaled in the stream. + * with `Failure` if there is a failure signaled in the stream. */ final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In, Future[Unit]] { @@ -225,7 +225,7 @@ final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In, * output (or the given `zero` value) and the element as input. The sink holds a * [[scala.concurrent.Future]] that will be completed with value of the final * function evaluation when the input stream ends, or completed with `Failure` - * if there is an error is signaled in the stream. + * if there is a failure signaled in the stream. */ final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) extends KeyedActorFlowSink[In, Future[U]] { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index 345076372d..fa1f2c8191 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -133,7 +133,7 @@ final class FuncIterable[Out](f: () ⇒ Iterator[Out]) extends immutable.Iterabl * Start a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which * may happen before or after materializing the `Flow`. - * The stream terminates with an error if the `Future` is completed with a failure. + * The stream terminates with a failure if the `Future` is completed with a failure. */ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowSource[Out] { // FIXME Why does this have anything to do with Actors? override def attach(flowSubscriber: Subscriber[Out], materializer: ActorFlowMaterializer, flowName: String) = @@ -161,7 +161,7 @@ final case class LazyEmptySource[Out]() extends KeyedActorFlowSource[Out, Promis override def create(materializer: ActorFlowMaterializer, flowName: String) = { val p = Promise[Unit]() - // Not TCK verified as RC1 does not allow "empty publishers", + // Not TCK verified as RC1 does not allow "empty publishers", // reactive-streams on master now contains support for empty publishers. // so we can enable it then, though it will require external completing of the promise val pub = new Publisher[Unit] { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala index c60ef898f8..65dd134697 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala @@ -202,7 +202,7 @@ object FlexiMerge { /** * Will continue to operate until a read becomes unsatisfiable, then it completes. - * Errors are immediately propagated. + * Failures are immediately propagated. */ val defaultCompletionHandling: CompletionHandling = CompletionHandling( onUpstreamFinish = (_, _) ⇒ SameState, @@ -210,7 +210,7 @@ object FlexiMerge { /** * Completes as soon as any input completes. - * Errors are immediately propagated. + * Failures are immediately propagated. */ def eagerClose: CompletionHandling = CompletionHandling( onUpstreamFinish = (ctx, _) ⇒ { ctx.finish(); SameState }, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala index aa796db99b..3f9c020c65 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala @@ -173,7 +173,7 @@ object FlexiRoute { /** * When an output cancels it continues with remaining outputs. - * Error or completion from upstream are immediately propagated. + * Failure or completion from upstream are immediately propagated. */ val defaultCompletionHandling: CompletionHandling = CompletionHandling( onUpstreamFinish = _ ⇒ (), @@ -182,7 +182,7 @@ object FlexiRoute { /** * Completes as soon as any output cancels. - * Error or completion from upstream are immediately propagated. + * Failure or completion from upstream are immediately propagated. */ val eagerClose: CompletionHandling = CompletionHandling( onUpstreamFinish = _ ⇒ (), diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 7810dcea89..1ce84ab054 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -416,7 +416,7 @@ trait FlowOps[+Out] { * * [[akka.stream.TransformerLike#onError]] is called when failure is signaled from upstream. * - * After normal completion or error the [[akka.stream.TransformerLike#cleanup]] function is called. + * After normal completion or failure the [[akka.stream.TransformerLike#cleanup]] function is called. * * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala index ecd5fb97ca..c399fbc8e3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala @@ -51,9 +51,9 @@ private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Ke } override def to(sink: Sink[Out]): Sink[In] = sink match { - case sp: SinkPipe[Out] ⇒ sp.prependPipe(this) + case sp: SinkPipe[Out] ⇒ sp.prependPipe(this) case gs: GraphBackedSink[Out, _] ⇒ gs.prepend(this) - case d: Sink[Out] ⇒ this.withSink(d) + case d: Sink[Out] ⇒ this.withSink(d) } override def join(flow: Flow[Out, In]): RunnableFlow = flow match { @@ -99,9 +99,9 @@ private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[As } override def to(sink: Sink[Out]): RunnableFlow = sink match { - case sp: SinkPipe[Out] ⇒ RunnablePipe(input, sp.output, sp.ops ::: ops, keys ::: sp.keys) // FIXME raw addition of AstNodes + case sp: SinkPipe[Out] ⇒ RunnablePipe(input, sp.output, sp.ops ::: ops, keys ::: sp.keys) // FIXME raw addition of AstNodes case g: GraphBackedSink[Out, _] ⇒ g.prepend(this) - case d: Sink[Out] ⇒ this.withSink(d) + case d: Sink[Out] ⇒ this.withSink(d) } override def withKey(key: Key[_]): SourcePipe[Out] = SourcePipe(input, ops, keys :+ key) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 34f5fadaba..b80f5cef09 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -86,7 +86,7 @@ object Sink { /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized * into a [[scala.concurrent.Future]] will be completed with `Success` when reaching the - * normal end of the stream, or completed with `Failure` if there is an error is signaled in + * normal end of the stream, or completed with `Failure` if there is a failure signaled in * the stream.. */ def foreach[T](f: T ⇒ Unit): ForeachSink[T] = ForeachSink(f) @@ -96,12 +96,12 @@ object Sink { * output (or the given `zero` value) and the element as input. * The returned [[scala.concurrent.Future]] will be completed with value of the final * function evaluation when the input stream ends, or completed with `Failure` - * if there is an error is signaled in the stream. + * if there is a failure signaled in the stream. */ def fold[U, T](zero: U)(f: (U, T) ⇒ U): FoldSink[U, T] = FoldSink(zero)(f) /** - * A `Sink` that when the flow is completed, either through an error or normal + * A `Sink` that when the flow is completed, either through a failure or normal * completion, apply the provided function with [[scala.util.Success]] * or [[scala.util.Failure]]. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index eef654f780..35e1e8fe73 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -43,7 +43,7 @@ trait Source[+Out] extends FlowOps[Out] with Materializable { * output (or the given `zero` value) and the element as input. * The returned [[scala.concurrent.Future]] will be completed with value of the final * function evaluation when the input stream ends, or completed with `Failure` - * if there is an error is signaled in the stream. + * if there is a failure signaled in the stream. */ def runFold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: FlowMaterializer): Future[U] = runWith(FoldSink(zero)(f)) // FIXME why is fold always an end step? @@ -51,7 +51,7 @@ trait Source[+Out] extends FlowOps[Out] with Materializable { * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked * for each received element. * The returned [[scala.concurrent.Future]] will be completed with `Success` when reaching the - * normal end of the stream, or completed with `Failure` if there is an error is signaled in + * normal end of the stream, or completed with `Failure` if there is a failure signaled in * the stream. */ def runForeach(f: Out ⇒ Unit)(implicit materializer: FlowMaterializer): Future[Unit] = runWith(ForeachSink(f)) @@ -125,7 +125,7 @@ object Source { * Start a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which * may happen before or after materializing the `Flow`. - * The stream terminates with an error if the `Future` is completed with a failure. + * The stream terminates with a failure if the `Future` is completed with a failure. */ def apply[T](future: Future[T]): Source[T] = FutureSource(future) @@ -189,7 +189,7 @@ object Source { def lazyEmpty[T]() = LazyEmptySource[T]() /** - * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. + * Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`. */ def failed[T](cause: Throwable): Source[T] = apply(ErrorPublisher(cause, "failed")) diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index 5e9054c954..6def1b0e3b 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -70,12 +70,12 @@ private[stream] abstract class AbstractStage[-In, Out, PushD <: Directive, PullD /** * `onUpstreamFailure` is called when upstream has signaled that the stream is completed - * with error. It is not called if [[#onPull]] or [[#onPush]] of the stage itself + * with failure. It is not called if [[#onPull]] or [[#onPush]] of the stage itself * throws an exception. * - * Note that elements that were emitted by upstream before the error happened might + * Note that elements that were emitted by upstream before the failure happened might * not have been received by this stage when `onUpstreamFailure` is called, i.e. - * errors are not backpressured and might be propagated as soon as possible. + * failures are not backpressured and might be propagated as soon as possible. * * Here you cannot call [[akka.stream.stage.Context#push]], because there might not * be any demand from downstream. To emit additional elements before terminating you @@ -314,7 +314,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] { /** * Scala API: Can be used from [[#onUpstreamFinish]] to push final elements downstreams * before completing the stream successfully. Note that if this is used from - * [[#onUpstreamFailure]] the error will be absorbed and the stream will be completed + * [[#onUpstreamFailure]] the failure will be absorbed and the stream will be completed * successfully. */ final def terminationEmit(iter: Iterator[Out], ctx: Context[Out]): TerminationDirective = {