!str #16448 Change error to failure

* one API change OverflowStrategy.error -> OverflowStrategy.fail
* error is still kept in the internals where we are at the reactive
  streams level
This commit is contained in:
Patrik Nordwall 2015-01-30 10:30:56 +01:00
parent 31497cac9b
commit 01646d10ff
32 changed files with 87 additions and 88 deletions

View file

@ -184,7 +184,7 @@ If we run this example we see that
This example highlights that one solution to avoid deadlocks in the presence of potentially unbalanced cycles
(cycles where the number of circulating elements are unbounded) is to drop elements. An alternative would be to
define a larger buffer with ``OverflowStrategy.error`` which would fail the stream instead of deadlocking it after
define a larger buffer with ``OverflowStrategy.fail`` which would fail the stream instead of deadlocking it after
all buffer space has been consumed.
As we discovered in the previous examples, the core problem was the unbalanced nature of the feedback loop. We

View file

@ -120,7 +120,7 @@ elements*" this can be expressed using the ``buffer`` element:
The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react
when it receives another element element while it is full. Strategies provided include dropping the oldest element (``dropHead``),
dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best.
dropping the entire buffer, signalling failures etc. Be sure to pick and choose the strategy that fits your use case best.
Materialized values
-------------------

View file

@ -79,9 +79,9 @@ class StreamBuffersRateSpec extends AkkaSpec {
jobs.buffer(1000, OverflowStrategy.dropBuffer)
//#explicit-buffers-dropbuffer
//#explicit-buffers-error
jobs.buffer(1000, OverflowStrategy.error)
//#explicit-buffers-error
//#explicit-buffers-fail
jobs.buffer(1000, OverflowStrategy.fail)
//#explicit-buffers-fail
}

View file

@ -31,7 +31,7 @@ class RecipeReduceByKey extends RecipeSpec {
// get a stream of word counts
val counts: Source[(String, Int)] =
countedWords
.buffer(MaximumDistinctWords, OverflowStrategy.error)
.buffer(MaximumDistinctWords, OverflowStrategy.fail)
.mapAsync(identity)
//#word-count
@ -62,7 +62,7 @@ class RecipeReduceByKey extends RecipeSpec {
}
}
reducedValues.buffer(maximumGroupSize, OverflowStrategy.error).mapAsync(identity)
reducedValues.buffer(maximumGroupSize, OverflowStrategy.fail).mapAsync(identity)
}
val wordCounts = words.via(reduceByKey(

View file

@ -134,7 +134,7 @@ that the substreams produced by ``groupBy()`` can only complete when the origina
that ``mapAsync()`` cannot pull for more substreams because it still waits on folding futures to finish, but these
futures never finish if the additional group streams are not consumed. This typical deadlock situation is resolved by
this buffer which either able to contain all the group streams (which ensures that they are already running and folding)
or fails with an explicit error instead of a silent deadlock.
or fails with an explicit failure instead of a silent deadlock.
.. includecode:: code/docs/stream/cookbook/RecipeReduceByKey.scala#word-count

View file

@ -198,7 +198,7 @@ In our case we use the :class:`ReadPreferred` read condition which has the exact
our preferring merge it pulls elements from the preferred input port if there are any available, otherwise reverting
to pulling from the secondary inputs. The context object passed into the state function allows us to interact with the
connected streams, for example by emitting an ``element``, which was just pulled from the given ``input``, or signalling
completion or errors to the merges downstream stage.
completion or failure to the merges downstream stage.
The state function must always return the next behaviour to be used when an element should be pulled from its upstreams,
we use the special :class:`SameState` object which signals :class:`FlexiMerge` that no state transition is needed.
@ -233,21 +233,21 @@ Connecting your custom junction is as simple as creating an instance and connect
Completion handling
^^^^^^^^^^^^^^^^^^^
Completion handling in :class:`FlexiMerge` is defined by an :class:`CompletionHandling` object which can react on
completion and error signals from its upstream input ports. The default strategy is to remain running while at-least-one
completion and failure signals from its upstream input ports. The default strategy is to remain running while at-least-one
upstream input port which are declared to be consumed in the current state is still running (i.e. has not signalled
completion or error).
completion or failure).
Customising completion can be done via overriding the ``MergeLogic#initialCompletionHandling`` method, or from within
a :class:`State` by calling ``ctx.changeCompletionHandling(handling)``. Other than the default completion handling (as
late as possible) :class:`FlexiMerge` also provides an ``eagerClose`` completion handling which completes (or errors) its
downstream as soon as at least one of its upstream inputs completes (or errors).
late as possible) :class:`FlexiMerge` also provides an ``eagerClose`` completion handling which completes (or fails) its
downstream as soon as at least one of its upstream inputs completes (or fails).
In the example below the we implement an ``ImportantWithBackups`` fan-in stage which can only keep operating while
the ``important`` and at-least-one of the ``replica`` inputs are active. Therefore in our custom completion strategy we
have to investigate which input has completed or errored and act accordingly. If the important input completed or errored
have to investigate which input has completed or failed and act accordingly. If the important input completed or failed
we propagate this downstream completing the stream, on the other hand if the first replicated input fails, we log the
exception and instead of erroring the downstream swallow this exception (as one failed replica is still acceptable).
Then we change the completion strategy to ``eagerClose`` which will propagate any future completion or error event right
exception and instead of failing the downstream swallow this exception (as one failed replica is still acceptable).
Then we change the completion strategy to ``eagerClose`` which will propagate any future completion or failure event right
to this stages downstream effectively shutting down the stream.
.. includecode:: code/docs/stream/FlexiDocSpec.scala#fleximerge-completion
@ -294,10 +294,10 @@ we use the special :class:`SameState` object which signals :class:`FlexiRoute` t
Completion handling
^^^^^^^^^^^^^^^^^^^
Completion handling in :class:`FlexiRoute` is handled similarily to :class:`FlexiMerge` (which is explained in depth in
:ref:`flexi-merge-completion-handling-scala`), however in addition to reacting to its upstreams *completion* or *error*
:ref:`flexi-merge-completion-handling-scala`), however in addition to reacting to its upstreams *completion* or *failure*
it can also react to its downstream stages *cancelling* their subscriptions. The default completion handling for
:class:`FlexiRoute` (defined in ``RouteLogic#defaultCompletionHandling``) is to continue running until all of its
downstreams have cancelled their subscriptions, or the upstream has completed / errored.
downstreams have cancelled their subscriptions, or the upstream has completed / failed.
In order to customise completion handling we can override overriding the ``RouteLogic#initialCompletionHandling`` method,
or call ``ctx.changeCompletionHandling(handling)`` from within a :class:`State`. Other than the default completion handling
@ -309,7 +309,7 @@ downstream cancels, otherwise (if any other downstream cancels their subscriptio
.. includecode:: code/docs/stream/FlexiDocSpec.scala#flexiroute-completion
Notice that State changes are only allowed in reaction to downstream cancellations, and not in the upstream completion/error
Notice that State changes are only allowed in reaction to downstream cancellations, and not in the upstream completion/failure
cases. This is because since there is only one upstream, there is nothing else to do than possibly flush buffered elements
and continue with shutting down the entire stream.

View file

@ -190,7 +190,7 @@ If we run this example we see that
This example highlights that one solution to avoid deadlocks in the presence of potentially unbalanced cycles
(cycles where the number of circulating elements are unbounded) is to drop elements. An alternative would be to
define a larger buffer with ``OverflowStrategy.error`` which would fail the stream instead of deadlocking it after
define a larger buffer with ``OverflowStrategy.fail`` which would fail the stream instead of deadlocking it after
all buffer space has been consumed.
As we discovered in the previous examples, the core problem was the unbalanced nature of the feedback loop. We

View file

@ -127,7 +127,7 @@ otherwise we consider it flooding and terminate the connection. This is
easily achievable by the error strategy which simply fails the stream
once the buffer gets full.
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-error
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-fail
Rate transformation
===================

View file

@ -10,7 +10,7 @@ import scala.concurrent.duration._
import akka.stream.ActorFlowMaterializer
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.OverflowStrategy
import akka.stream.OverflowStrategy.Error.BufferOverflowException
import akka.stream.OverflowStrategy.Fail.BufferOverflowException
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
class FlowBufferSpec extends AkkaSpec {
@ -155,11 +155,11 @@ class FlowBufferSpec extends AkkaSpec {
sub.cancel()
}
"error upstream if buffer is full and configured so" in {
"fail upstream if buffer is full and configured so" in {
val publisher = StreamTestKit.PublisherProbe[Int]
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.error).runWith(Sink(subscriber))
Source(publisher).buffer(100, overflowStrategy = OverflowStrategy.fail).runWith(Sink(subscriber))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
val sub = subscriber.expectSubscription()

View file

@ -149,7 +149,7 @@ abstract class FlowMaterializer {
/**
* The `namePrefix` shall be used for deriving the names of processing
* entities that are created during materialization. This is meant to aid
* logging and error reporting both during materialization and while the
* logging and failure reporting both during materialization and while the
* stream is running.
*/
def withNamePrefix(name: String): FlowMaterializer

View file

@ -33,7 +33,7 @@ object OverflowStrategy {
/**
* INTERNAL API
*/
private[akka] final case object Error extends OverflowStrategy {
private[akka] final case object Fail extends OverflowStrategy {
final case class BufferOverflowException(msg: String) extends RuntimeException(msg)
}
@ -61,8 +61,7 @@ object OverflowStrategy {
def backpressure: OverflowStrategy = Backpressure
/**
* If the buffer is full when a new element is available this strategy backpressures the upstream publisher until
* space becomes available in the buffer.
* If the buffer is full when a new element is available this strategy completes the stream with failure.
*/
def error: OverflowStrategy = Error
def fail: OverflowStrategy = Fail
}

View file

@ -25,7 +25,7 @@ private[akka] abstract class TransformerLike[-T, +U] {
* end-of-stream event.
*
* This method is only called if [[#onError]] does not throw an exception. The default implementation
* of [[#onError]] throws the received cause forcing the error to propagate downstream immediately.
* of [[#onError]] throws the received cause forcing the failure to propagate downstream immediately.
*
* @param e Contains a non-empty option with the error causing the termination or an empty option
* if the Transformer was completed normally
@ -40,7 +40,7 @@ private[akka] abstract class TransformerLike[-T, +U] {
def onError(cause: Throwable): Unit = throw cause
/**
* Invoked after normal completion or error.
* Invoked after normal completion or failure.
*/
def cleanup(): Unit = ()

View file

@ -49,7 +49,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
@tailrec def doSubscribe(subscriber: Subscriber[_ >: T]): Unit = {
val current = pendingSubscribers.get
if (current eq null)
reportSubscribeError(subscriber)
reportSubscribeFailure(subscriber)
else {
if (pendingSubscribers.compareAndSet(current, subscriber +: current))
impl ! wakeUpMsg
@ -71,13 +71,13 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
shutdownReason = reason
pendingSubscribers.getAndSet(null) match {
case null // already called earlier
case pending pending foreach reportSubscribeError
case pending pending foreach reportSubscribeFailure
}
}
@volatile private var shutdownReason: Option[Throwable] = None
private def reportSubscribeError(subscriber: Subscriber[_ >: T]): Unit =
private def reportSubscribeFailure(subscriber: Subscriber[_ >: T]): Unit =
shutdownReason match {
case Some(e) subscriber.onError(e)
case None subscriber.onComplete()

View file

@ -61,7 +61,7 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin
// this is mutable for speed
var n = 0
var elements = mutable.ListBuffer.empty[Any]
var error: Option[Throwable] = None
var failure: Option[Throwable] = None
val iter = orderedBuffer.iterator
@tailrec def split(): Unit =
if (iter.hasNext) {
@ -111,7 +111,7 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin
}
override def onError(e: Throwable): Unit = {
// propagate upstream error immediately
// propagate upstream failure immediately
fail(e)
}
@ -134,7 +134,7 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin
}.pipeTo(self)
} catch {
case NonFatal(err)
// f threw, propagate error immediately
// f threw, propagate failure immediately
fail(err)
}
}

View file

@ -51,7 +51,7 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali
}
override def onError(e: Throwable): Unit = {
// propagate upstream error immediately
// propagate upstream failure immediately
fail(e)
}
@ -73,7 +73,7 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali
}.pipeTo(self)
} catch {
case NonFatal(err)
// f threw, propagate error immediately
// f threw, propagate failure immediately
fail(err)
}
}

View file

@ -71,7 +71,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
context.become(active)
}
def handleError(error: Throwable): Unit = {
def handleFailure(error: Throwable): Unit = {
try {
if (!error.isInstanceOf[SpecViolation])
tryOnError(subscriber, error)
@ -90,16 +90,16 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
tryOnNext(subscriber, tick)
}
} catch {
case NonFatal(e) handleError(e)
case NonFatal(e) handleFailure(e)
}
case RequestMore(elements)
if (elements < 1) {
handleError(numberOfElementsInRequestMustBePositiveException)
handleFailure(numberOfElementsInRequestMustBePositiveException)
} else {
demand += elements
if (demand < 0) // Long has overflown, reactive-streams specification rule 3.17
handleError(totalPendingDemandMustNotExceedLongMaxValueException)
handleFailure(totalPendingDemandMustNotExceedLongMaxValueException)
}
case Cancel

View file

@ -133,7 +133,7 @@ private[akka] object OneBoundedInterpreter {
* - ctx.pushAndFinish() which is different from the forking ops above because the execution of push and finish happens on
* the same execution region and they are order dependent, too.
* The interpreter tracks the depth of recursive forking and allows various strategies of dealing with the situation
* when this depth reaches a certain limit. In the simplest case an error is reported (this is very useful for stress
* when this depth reaches a certain limit. In the simplest case a failure is reported (this is very useful for stress
* testing and finding callstack wasting bugs), in the other case the forked call is scheduled via a list -- i.e. instead
* of the stack the heap is used.
*/

View file

@ -34,7 +34,7 @@ private[akka] object IteratorInterpreter {
private var done = false
private var nextElem: T = _
private var needsPull = true
private var lastError: Throwable = null
private var lastFailure: Throwable = null
override def onPush(elem: Any, ctx: BoundaryContext): Directive = {
nextElem = elem.asInstanceOf[T]
@ -52,7 +52,7 @@ private[akka] object IteratorInterpreter {
override def onUpstreamFailure(cause: Throwable, ctx: BoundaryContext): TerminationDirective = {
done = true
lastError = cause
lastFailure = cause
ctx.finish()
}
@ -64,13 +64,13 @@ private[akka] object IteratorInterpreter {
override def hasNext: Boolean = {
if (!done) pullIfNeeded()
!(done && needsPull) || (lastError ne null)
!(done && needsPull) || (lastFailure ne null)
}
override def next(): T = {
if (lastError ne null) {
val e = lastError
lastError = null
if (lastFailure ne null) {
val e = lastFailure
lastFailure = null
throw e
} else if (!hasNext)
Iterator.empty.next()

View file

@ -207,8 +207,8 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
if (buffer.isFull) ctx.hold()
else ctx.pull()
}
case Error { (ctx, elem)
if (buffer.isFull) ctx.fail(new Error.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
case Fail { (ctx, elem)
if (buffer.isFull) ctx.fail(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
else {
buffer.enqueue(elem)
ctx.pull()

View file

@ -220,7 +220,7 @@ object FlexiMerge {
/**
* Will continue to operate until a read becomes unsatisfiable, then it completes.
* Errors are immediately propagated.
* Failures are immediately propagated.
*/
def defaultCompletionHandling[A]: CompletionHandling[Out] =
new CompletionHandling[Out] {
@ -234,7 +234,7 @@ object FlexiMerge {
/**
* Completes as soon as any input completes.
* Errors are immediately propagated.
* Failures are immediately propagated.
*/
def eagerClose[A]: CompletionHandling[Out] =
new CompletionHandling[Out] {

View file

@ -191,7 +191,7 @@ object FlexiRoute {
/**
* When an output cancels it continues with remaining outputs.
* Error or completion from upstream are immediately propagated.
* Failure or completion from upstream are immediately propagated.
*/
def defaultCompletionHandling: CompletionHandling[In] =
new CompletionHandling[In] {
@ -203,7 +203,7 @@ object FlexiRoute {
/**
* Completes as soon as any output cancels.
* Error or completion from upstream are immediately propagated.
* Failure or completion from upstream are immediately propagated.
*/
def eagerClose[A]: CompletionHandling[In] =
new CompletionHandling[In] {

View file

@ -27,7 +27,7 @@ object Sink {
* output (or the given `zero` value) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is an error is signaled in the stream.
* if there is a failure is signaled in the stream.
*/
def fold[U, In](zero: U, f: japi.Function2[U, In, U]): javadsl.KeyedSink[In, Future[U]] =
new KeyedSink(scaladsl.Sink.fold[U, In](zero)(f.apply))
@ -82,7 +82,7 @@ object Sink {
/**
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
* into a [[scala.concurrent.Future]] will be completed with `Success` when reaching the
* normal end of the stream, or completed with `Failure` if there is an error is signaled in
* normal end of the stream, or completed with `Failure` if there is a failure is signaled in
* the stream..
*/
def foreach[T](f: japi.Procedure[T]): KeyedSink[T, Future[Unit]] =
@ -96,7 +96,7 @@ object Sink {
new KeyedSink(scaladsl.Sink.fanoutPublisher(initialBufferSize, maximumBufferSize))
/**
* A `Sink` that when the flow is completed, either through an error or normal
* A `Sink` that when the flow is completed, either through a failure or normal
* completion, apply the provided function with [[scala.util.Success]]
* or [[scala.util.Failure]].
*/

View file

@ -100,7 +100,7 @@ object Source {
* Start a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
* The stream terminates with a failure if the `Future` is completed with a failure.
*/
def from[O](future: Future[O]): javadsl.Source[O] =
new Source(scaladsl.Source(future))
@ -145,7 +145,7 @@ object Source {
new Source(scaladsl.Source.single(element))
/**
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`.
* Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`.
*/
def failed[T](cause: Throwable): Source[T] =
new Source(scaladsl.Source.failed(cause))
@ -214,7 +214,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) {
* output (or the given `zero` value) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is an error is signaled in the stream.
* if there is a failure is signaled in the stream.
*/
def runFold[U](zero: U, f: japi.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] =
runWith(Sink.fold(zero, f), materializer)
@ -231,7 +231,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) {
* Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked
* for each received element.
* The returned [[scala.concurrent.Future]] will be completed with `Success` when reaching the
* normal end of the stream, or completed with `Failure` if there is an error is signaled in
* normal end of the stream, or completed with `Failure` if there is a failure is signaled in
* the stream.
*/
def runForeach(f: japi.Procedure[Out], materializer: FlowMaterializer): Future[Unit] =

View file

@ -162,7 +162,7 @@ object OnCompleteSink {
}
/**
* When the flow is completed, either through an error or normal
* When the flow is completed, either through failure or normal
* completion, apply the provided function with [[scala.util.Success]]
* or [[scala.util.Failure]].
*/
@ -191,7 +191,7 @@ final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Unit) extends Simple
/**
* Invoke the given procedure for each received element. The sink holds a [[scala.concurrent.Future]]
* that will be completed with `Success` when reaching the normal end of the stream, or completed
* with `Failure` if there is an error is signaled in the stream.
* with `Failure` if there is a failure signaled in the stream.
*/
final case class ForeachSink[In](f: In Unit) extends KeyedActorFlowSink[In, Future[Unit]] {
@ -225,7 +225,7 @@ final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In,
* output (or the given `zero` value) and the element as input. The sink holds a
* [[scala.concurrent.Future]] that will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is an error is signaled in the stream.
* if there is a failure signaled in the stream.
*/
final case class FoldSink[U, In](zero: U)(f: (U, In) U) extends KeyedActorFlowSink[In, Future[U]] {

View file

@ -133,7 +133,7 @@ final class FuncIterable[Out](f: () ⇒ Iterator[Out]) extends immutable.Iterabl
* Start a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
* The stream terminates with a failure if the `Future` is completed with a failure.
*/
final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowSource[Out] { // FIXME Why does this have anything to do with Actors?
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorFlowMaterializer, flowName: String) =

View file

@ -202,7 +202,7 @@ object FlexiMerge {
/**
* Will continue to operate until a read becomes unsatisfiable, then it completes.
* Errors are immediately propagated.
* Failures are immediately propagated.
*/
val defaultCompletionHandling: CompletionHandling = CompletionHandling(
onUpstreamFinish = (_, _) SameState,
@ -210,7 +210,7 @@ object FlexiMerge {
/**
* Completes as soon as any input completes.
* Errors are immediately propagated.
* Failures are immediately propagated.
*/
def eagerClose: CompletionHandling = CompletionHandling(
onUpstreamFinish = (ctx, _) { ctx.finish(); SameState },

View file

@ -173,7 +173,7 @@ object FlexiRoute {
/**
* When an output cancels it continues with remaining outputs.
* Error or completion from upstream are immediately propagated.
* Failure or completion from upstream are immediately propagated.
*/
val defaultCompletionHandling: CompletionHandling = CompletionHandling(
onUpstreamFinish = _ (),
@ -182,7 +182,7 @@ object FlexiRoute {
/**
* Completes as soon as any output cancels.
* Error or completion from upstream are immediately propagated.
* Failure or completion from upstream are immediately propagated.
*/
val eagerClose: CompletionHandling = CompletionHandling(
onUpstreamFinish = _ (),

View file

@ -416,7 +416,7 @@ trait FlowOps[+Out] {
*
* [[akka.stream.TransformerLike#onError]] is called when failure is signaled from upstream.
*
* After normal completion or error the [[akka.stream.TransformerLike#cleanup]] function is called.
* After normal completion or failure the [[akka.stream.TransformerLike#cleanup]] function is called.
*
* It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with
* ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and

View file

@ -86,7 +86,7 @@ object Sink {
/**
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
* into a [[scala.concurrent.Future]] will be completed with `Success` when reaching the
* normal end of the stream, or completed with `Failure` if there is an error is signaled in
* normal end of the stream, or completed with `Failure` if there is a failure signaled in
* the stream..
*/
def foreach[T](f: T Unit): ForeachSink[T] = ForeachSink(f)
@ -96,12 +96,12 @@ object Sink {
* output (or the given `zero` value) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is an error is signaled in the stream.
* if there is a failure signaled in the stream.
*/
def fold[U, T](zero: U)(f: (U, T) U): FoldSink[U, T] = FoldSink(zero)(f)
/**
* A `Sink` that when the flow is completed, either through an error or normal
* A `Sink` that when the flow is completed, either through a failure or normal
* completion, apply the provided function with [[scala.util.Success]]
* or [[scala.util.Failure]].
*/

View file

@ -43,7 +43,7 @@ trait Source[+Out] extends FlowOps[Out] with Materializable {
* output (or the given `zero` value) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is an error is signaled in the stream.
* if there is a failure signaled in the stream.
*/
def runFold[U](zero: U)(f: (U, Out) U)(implicit materializer: FlowMaterializer): Future[U] = runWith(FoldSink(zero)(f)) // FIXME why is fold always an end step?
@ -51,7 +51,7 @@ trait Source[+Out] extends FlowOps[Out] with Materializable {
* Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked
* for each received element.
* The returned [[scala.concurrent.Future]] will be completed with `Success` when reaching the
* normal end of the stream, or completed with `Failure` if there is an error is signaled in
* normal end of the stream, or completed with `Failure` if there is a failure signaled in
* the stream.
*/
def runForeach(f: Out Unit)(implicit materializer: FlowMaterializer): Future[Unit] = runWith(ForeachSink(f))
@ -125,7 +125,7 @@ object Source {
* Start a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
* The stream terminates with a failure if the `Future` is completed with a failure.
*/
def apply[T](future: Future[T]): Source[T] = FutureSource(future)
@ -189,7 +189,7 @@ object Source {
def lazyEmpty[T]() = LazyEmptySource[T]()
/**
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`.
* Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`.
*/
def failed[T](cause: Throwable): Source[T] = apply(ErrorPublisher(cause, "failed"))

View file

@ -70,12 +70,12 @@ private[stream] abstract class AbstractStage[-In, Out, PushD <: Directive, PullD
/**
* `onUpstreamFailure` is called when upstream has signaled that the stream is completed
* with error. It is not called if [[#onPull]] or [[#onPush]] of the stage itself
* with failure. It is not called if [[#onPull]] or [[#onPush]] of the stage itself
* throws an exception.
*
* Note that elements that were emitted by upstream before the error happened might
* Note that elements that were emitted by upstream before the failure happened might
* not have been received by this stage when `onUpstreamFailure` is called, i.e.
* errors are not backpressured and might be propagated as soon as possible.
* failures are not backpressured and might be propagated as soon as possible.
*
* Here you cannot call [[akka.stream.stage.Context#push]], because there might not
* be any demand from downstream. To emit additional elements before terminating you
@ -314,7 +314,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
/**
* Scala API: Can be used from [[#onUpstreamFinish]] to push final elements downstreams
* before completing the stream successfully. Note that if this is used from
* [[#onUpstreamFailure]] the error will be absorbed and the stream will be completed
* [[#onUpstreamFailure]] the failure will be absorbed and the stream will be completed
* successfully.
*/
final def terminationEmit(iter: Iterator[Out], ctx: Context[Out]): TerminationDirective = {