diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index cd9212e0d8..29808429f8 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -743,6 +743,8 @@ recover ^^^^^^^ Allow sending of one last element downstream when a failure has happened upstream. +Throwing an exception inside ``recover`` _will_ be logged on ERROR level automatically. + **emits** when the element is available from the upstream or upstream is failed and pf returns an element **backpressures** when downstream backpressures, not when failure happened @@ -753,12 +755,45 @@ recoverWith ^^^^^^^^^^^ Allow switching to alternative Source when a failure has happened upstream. +Throwing an exception inside ``recoverWith`` _will_ be logged on ERROR level automatically. + **emits** the element is available from the upstream or upstream is failed and pf returns alternative Source **backpressures** downstream backpressures, after failure happened it backprssures to alternative Source **completes** upstream completes or upstream failed with exception pf can handle +recoverWithRetries +^^^^^^^^^^^^^^^^^^ +RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after +a failure has been recovered up to `attempts` number of times so that each time there is a failure +it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't +attempt to recover at all. Passing -1 will behave exactly the same as `recoverWith`. + +Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. +This stage can recover the failure signal, but not the skipped elements, which will be dropped. + +**emits** when element is available from the upstream or upstream is failed and element is available from alternative Source + +**backpressures** when downstream backpressures + +**completes** when upstream completes or upstream failed with exception pf can handle + +mapError +^^^^^^^^ +While similar to ``recover`` this stage can be used to transform an error signal to a different one *without* logging +it as an error in the process. So in that sense it is NOT exactly equivalent to ``recover(t -> throw t2)`` since recover +would log the ``t2`` error. + +Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. +This stage can recover the failure signal, but not the skipped elements, which will be dropped. + +Similarily to ``recover`` throwing an exception inside ``mapError`` _will_ be logged on ERROR level automatically. + +**emits** when element is available from the upstream or upstream is failed and pf returns an element +**backpressures** when downstream backpressures +**completes** when upstream completes or upstream failed with exception pf can handle + detach ^^^^^^ Detach upstream demand from downstream demand without detaching the stream rates. diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index 2bd6cfb341..f455bafaec 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -732,6 +732,8 @@ recover ^^^^^^^ Allow sending of one last element downstream when a failure has happened upstream. +Throwing an exception inside ``recover`` _will_ be logged on ERROR level automatically. + **emits** when the element is available from the upstream or upstream is failed and pf returns an element **backpressures** when downstream backpressures, not when failure happened @@ -742,12 +744,45 @@ recoverWith ^^^^^^^^^^^ Allow switching to alternative Source when a failure has happened upstream. +Throwing an exception inside ``recoverWith`` _will_ be logged on ERROR level automatically. + **emits** the element is available from the upstream or upstream is failed and pf returns alternative Source **backpressures** downstream backpressures, after failure happened it backprssures to alternative Source **completes** upstream completes or upstream failed with exception pf can handle +recoverWithRetries +^^^^^^^^^^^^^^^^^^ +RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after +a failure has been recovered up to `attempts` number of times so that each time there is a failure +it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't +attempt to recover at all. Passing -1 will behave exactly the same as `recoverWith`. + +Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. +This stage can recover the failure signal, but not the skipped elements, which will be dropped. + +**emits** when element is available from the upstream or upstream is failed and element is available from alternative Source + +**backpressures** when downstream backpressures + +**completes** when upstream completes or upstream failed with exception pf can handle + +mapError +^^^^^^^^ +While similar to ``recover`` this stage can be used to transform an error signal to a different one *without* logging +it as an error in the process. So in that sense it is NOT exactly equivalent to ``recover(t => throw t2)`` since recover +would log the ``t2`` error. + +Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. +This stage can recover the failure signal, but not the skipped elements, which will be dropped. + +Similarily to ``recover`` throwing an exception inside ``mapError`` _will_ be logged on ERROR level automatically. + +**emits** when element is available from the upstream or upstream is failed and pf returns an element +**backpressures** when downstream backpressures +**completes** when upstream completes or upstream failed with exception pf can handle + detach ^^^^^^ Detach upstream demand from downstream demand without detaching the stream rates. diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapErrorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapErrorSpec.scala new file mode 100644 index 0000000000..226a7e9d5e --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapErrorSpec.scala @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.StreamSpec +import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } + +import scala.util.control.NoStackTrace + +class FlowMapErrorSpec extends StreamSpec { + + val settings = ActorMaterializerSettings(system).withInputBuffer(initialSize = 1, maxSize = 1) + + implicit val materializer = ActorMaterializer(settings) + + val ex = new RuntimeException("ex") with NoStackTrace + val boom = new Exception("BOOM!") with NoStackTrace + + "A MapError" must { + "mapError when there is a handler" in assertAllStagesStopped { + Source(1 to 4).map { a ⇒ if (a == 3) throw ex else a } + .mapError { case t: Throwable ⇒ boom } + .runWith(TestSink.probe[Int]) + .request(3) + .expectNext(1) + .expectNext(2) + .expectError(boom) + } + + "fail the stream with exception thrown in handler (and log it)" in assertAllStagesStopped { + Source(1 to 3).map { a ⇒ if (a == 2) throw ex else a } + .mapError { case t: Exception ⇒ throw boom } + .runWith(TestSink.probe[Int]) + .requestNext(1) + .request(1) + .expectError(boom) + } + + "pass through the original exception if partial function does not handle it" in assertAllStagesStopped { + Source(1 to 3).map { a ⇒ if (a == 2) throw ex else a } + .mapError { case t: IndexOutOfBoundsException ⇒ boom } + .runWith(TestSink.probe[Int]) + .requestNext(1) + .request(1) + .expectError(ex) + } + + "not influence stream when there is no exceptions" in assertAllStagesStopped { + Source(1 to 3).map(identity) + .mapError { case t: Throwable ⇒ boom } + .runWith(TestSink.probe[Int]) + .request(3) + .expectNextN(1 to 3) + .expectComplete() + } + + "finish stream if it's empty" in assertAllStagesStopped { + Source.empty.map(identity) + .mapError { case t: Throwable ⇒ boom } + .runWith(TestSink.probe[Int]) + .request(1) + .expectComplete() + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index d2b1f70fb5..89f641cdb5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -269,6 +269,28 @@ final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLin } } +/** + * Maps error with the provided function if it is defined for an error or, otherwise, passes it on unchanged. + * + * While similar to [[Recover]] this stage can be used to transform an error signal to a different one *without* logging + * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover + * would log the `t2` error. + */ +final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends SimpleLinearGraphStage[T] { + override def createLogic(attr: Attributes) = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = push(out, grab(in)) + + override def onUpstreamFailure(ex: Throwable): Unit = + if (f.isDefinedAt(ex)) super.onUpstreamFailure(f(ex)) + else super.onUpstreamFailure(ex) + + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index 01a3692c87..85d735d7db 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -4,8 +4,10 @@ package akka.stream.impl.io import java.net.InetSocketAddress -import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } +import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } +import akka.NotUsed import akka.actor.{ ActorRef, Terminated } import akka.dispatch.ExecutionContexts import akka.io.Inet.SocketOption @@ -15,13 +17,14 @@ import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance import akka.stream.impl.fusing.GraphStages.detacher import akka.stream.scaladsl.Tcp.{ OutgoingConnection, ServerBinding } -import akka.stream.scaladsl.{ BidiFlow, Flow, Tcp ⇒ StreamTcp } +import akka.stream.scaladsl.{ BidiFlow, Flow, TcpIdleTimeoutException, Tcp ⇒ StreamTcp } import akka.stream.stage._ import akka.util.ByteString import scala.collection.immutable import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.{ Future, Promise } +import scala.util.Try /** * INTERNAL API @@ -111,7 +114,7 @@ private[stream] class ConnectionSourceStage( // FIXME: Previous code was wrong, must add new tests val handler = idleTimeout match { - case d: FiniteDuration ⇒ tcpFlow.join(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](d)) + case d: FiniteDuration ⇒ tcpFlow.join(TcpIdleTimeout(d, Some(connected.remoteAddress))) case _ ⇒ tcpFlow } @@ -354,3 +357,26 @@ private[stream] class OutgoingConnectionStage( override def toString = s"TCP-to($remoteAddress)" } + +/** INTERNAL API */ +private[akka] object TcpIdleTimeout { + def apply(idleTimeout: FiniteDuration, remoteAddress: Option[InetSocketAddress]): BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = { + val connectionToString = remoteAddress match { + case Some(addr) ⇒ s"on connection to [$addr]" + case _ ⇒ "" + } + + val toNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = + BidiFlow.fromFlows( + Flow[ByteString].mapError { case t: TimeoutException ⇒ new TcpIdleTimeoutException(s"TCP idle-timeout encountered $connectionToString, no bytes passed in the last $idleTimeout", idleTimeout) }, + Flow[ByteString] + ) + val fromNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = + BidiFlow.fromFlows( + Flow[ByteString], + Flow[ByteString].mapError { case t: TimeoutException ⇒ new TcpIdleTimeoutException(s"TCP idle-timeout encountered $connectionToString, no bytes passed in the last $idleTimeout", idleTimeout) } + ) + + fromNetTimeout atop BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](idleTimeout) atop toNetTimeout + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index b0dd0b209f..0835cc1c56 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -835,6 +835,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element * * '''Backpressures when''' downstream backpressures @@ -846,6 +848,28 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.recover(pf)) + /** + * While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging + * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover + * would log the `t2` error. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Similarily to [[recover]] throwing an exception inside `mapError` _will_ be logged. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def mapError(pf: PartialFunction[Throwable, Throwable]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.mapError(pf)) + /** * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new @@ -854,6 +878,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * + * Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * @@ -877,6 +903,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * + * Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index e3d7e4ac46..e773284db5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -929,6 +929,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element * * '''Backpressures when''' downstream backpressures @@ -941,6 +943,28 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Source[T, Mat] = new Source(delegate.recover(pf)) + /** + * While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging + * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover + * would log the `t2` error. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Similarily to [[recover]] throwing an exception inside `mapError` _will_ be logged. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def mapError(pf: PartialFunction[Throwable, Throwable]): javadsl.Source[Out, Mat] = + new Source(delegate.mapError(pf)) + /** * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new @@ -949,6 +973,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * + * Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * @@ -971,6 +997,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * + * Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 40044932de..65b557e4e0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -8,14 +8,18 @@ import akka.event.LoggingAdapter import akka.japi.function import akka.stream._ import akka.stream.impl.ConstantFun + import scala.collection.JavaConverters._ import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration import akka.japi.Util import java.util.Comparator + import scala.compat.java8.FutureConverters._ import java.util.concurrent.CompletionStage +import akka.stream.impl.fusing.MapError + /** * A “stream of streams” sub-flow of data elements, e.g. produced by `groupBy`. * SubFlows cannot contribute to the super-flow’s materialized value since they @@ -664,6 +668,8 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element * * '''Backpressures when''' downstream backpressures @@ -684,6 +690,8 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * + * Throwing an exception inside ``recoverWith`` _will_ be logged on ERROR level automatically. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * @@ -707,6 +715,8 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * + * Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * @@ -720,6 +730,28 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubFlow[In, T, Mat @uncheckedVariance] = new SubFlow(delegate.recoverWithRetries(attempts, pf)) + /** + * While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging + * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover + * would log the `t2` error. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Similarily to [[recover]] throwing an exception inside `mapError` _will_ be logged. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def mapError(pf: PartialFunction[Throwable, Throwable]): SubFlow[In, Out, Mat @uncheckedVariance] = + new SubFlow(delegate.mapError(pf)) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 96f6d4c943..c1f42116fa 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -8,12 +8,16 @@ import akka.event.LoggingAdapter import akka.japi.function import akka.stream._ import akka.stream.impl.ConstantFun + import scala.collection.JavaConverters._ import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration import akka.japi.Util import java.util.Comparator import java.util.concurrent.CompletionStage + +import akka.stream.impl.fusing.MapError + import scala.compat.java8.FutureConverters._ /** @@ -718,6 +722,28 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] = new SubSource(delegate.recoverWithRetries(attempts, pf)) + /** + * While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging + * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover + * would log the `t2` error. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Similarily to [[recover]] throwing an exception inside `mapError` _will_ be logged. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def mapError(pf: PartialFunction[Throwable, Throwable]): SubSource[Out, Mat] = + new SubSource(delegate.mapError(pf)) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index b27d817df7..0323c40772 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -238,4 +238,5 @@ object BidiFlow { */ def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, NotUsed] = fromGraph(new Timers.IdleTimeoutBidi(timeout)) + } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 1f89de2f25..b14a30276d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -405,6 +405,8 @@ trait FlowOps[+Out, +Mat] { * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element * * '''Backpressures when''' downstream backpressures @@ -424,6 +426,8 @@ trait FlowOps[+Out, +Mat] { * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * + * Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * @@ -447,6 +451,8 @@ trait FlowOps[+Out, +Mat] { * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * + * Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * @@ -464,6 +470,27 @@ trait FlowOps[+Out, +Mat] { def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] = via(new RecoverWith(attempts, pf)) + /** + * While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging + * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover + * would log the `t2` error. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Similarily to [[recover]] throwing an exception inside `mapError` _will_ be logged. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def mapError(pf: PartialFunction[Throwable, Throwable]): Repr[Out] = via(MapError(pf)) + /** * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index 43e2dd2b2e..69fc2372f1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -4,6 +4,7 @@ package akka.stream.scaladsl import java.net.InetSocketAddress +import java.util.concurrent.TimeoutException import akka.NotUsed import akka.actor._ @@ -11,7 +12,7 @@ import akka.io.Inet.SocketOption import akka.io.{ IO, Tcp ⇒ IoTcp } import akka.stream._ import akka.stream.impl.fusing.GraphStages.detacher -import akka.stream.impl.io.{ ConnectionSourceStage, OutgoingConnectionStage } +import akka.stream.impl.io.{ ConnectionSourceStage, OutgoingConnectionStage, TcpIdleTimeout } import akka.util.ByteString import scala.collection.immutable @@ -172,7 +173,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { connectTimeout)).via(detacher[ByteString]) // must read ahead for proper completions idleTimeout match { - case d: FiniteDuration ⇒ tcpFlow.join(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](d)) + case d: FiniteDuration ⇒ tcpFlow.join(TcpIdleTimeout(d, Some(remoteAddress))) case _ ⇒ tcpFlow } @@ -185,3 +186,5 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]] = outgoingConnection(InetSocketAddress.createUnresolved(host, port)) } + +final class TcpIdleTimeoutException(msg: String, timeout: Duration) extends TimeoutException(msg: String)