diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala index 174a49429f..c41f6a832c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala @@ -7,6 +7,8 @@ package akka.stream.scaladsl import akka.stream.testkit.StreamSpec import akka.stream.testkit.scaladsl.TestSink +import scala.util.control.NoStackTrace + class FlowWithContextSpec extends StreamSpec { "A FlowWithContext" must { @@ -42,5 +44,27 @@ class FlowWithContextSpec extends StreamSpec { matValue shouldBe (42 -> materializedValue) probe.request(1).expectNext(((Message("a", 1L), 1L))).expectComplete() } + + "be able to map error via FlowWithContext.mapError" in { + val ex = new RuntimeException("ex") with NoStackTrace + val boom = new Exception("BOOM!") with NoStackTrace + val mapErrorFlow = FlowWithContext[Message, Long] + .map { + case m @ Message(_, offset) => if (offset == 3) throw ex else m + } + .mapError { case _: Throwable => boom } + + Source(1L to 4L) + .map { offset => + Message("a", offset) + } + .asSourceWithContext(_.offset) + .via(mapErrorFlow) + .runWith(TestSink.probe[(Message, Long)]) + .request(3) + .expectNext((Message("a", 1L), 1L)) + .expectNext((Message("a", 2L), 2L)) + .expectError(boom) + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala index 2c65d418d7..d5740d7882 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala @@ -7,6 +7,8 @@ package akka.stream.scaladsl import akka.stream.testkit.StreamSpec import akka.stream.testkit.scaladsl.TestSink +import scala.util.control.NoStackTrace + case class Message(data: String, offset: Long) class SourceWithContextSpec extends StreamSpec { @@ -104,7 +106,7 @@ class SourceWithContextSpec extends StreamSpec { .expectComplete() } - "be able to change meterialized value via mapMaterializedValue" in { + "be able to change materialized value via mapMaterializedValue" in { val materializedValue = "MatedValue" Source .empty[Message] @@ -113,5 +115,25 @@ class SourceWithContextSpec extends StreamSpec { .to(Sink.ignore) .run() shouldBe materializedValue } + + "be able to map error via mapError" in { + val ex = new RuntimeException("ex") with NoStackTrace + val boom = new Exception("BOOM!") with NoStackTrace + + Source(1L to 4L) + .map { offset => + Message("a", offset) + } + .map { + case m @ Message(_, offset) => if (offset == 3) throw ex else m + } + .asSourceWithContext(_.offset) + .mapError { case _: Throwable => boom } + .runWith(TestSink.probe[(Message, Long)]) + .request(3) + .expectNext((Message("a", 1L), 1L)) + .expectNext((Message("a", 2L), 2L)) + .expectError(boom) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index 5210865079..a07db5bfc2 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -66,10 +66,18 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( override def withAttributes(attr: Attributes): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = viaScala(_.withAttributes(attr)) + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.mapError]]. + * + * @see [[akka.stream.javadsl.Flow.mapError]] + */ + def mapError(pf: PartialFunction[Throwable, Throwable]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.mapError(pf)) + /** * Context-preserving variant of [[akka.stream.javadsl.Flow.mapMaterializedValue]]. * - * @see [[akka.stream.scaladsl.Flow.mapMaterializedValue]] + * @see [[akka.stream.javadsl.Flow.mapMaterializedValue]] */ def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat2] = new FlowWithContext(delegate.mapMaterializedValue[Mat2](f)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index 2affc07dd1..27cda17b38 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -61,6 +61,14 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon override def withAttributes(attr: Attributes): SourceWithContext[Out, Ctx, Mat] = viaScala(_.withAttributes(attr)) + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.mapError]]. + * + * @see [[akka.stream.javadsl.Source.mapError]] + */ + def mapError(pf: PartialFunction[Throwable, Throwable]): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.mapError(pf)) + /** * Context-preserving variant of [[akka.stream.javadsl.Source.mapMaterializedValue]]. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala index d392183fc3..3c9e7c60c9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -59,6 +59,14 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { def map[Out2](f: Out => Out2): Repr[Out2, Ctx] = via(flow.map { case (e, ctx) => (f(e), ctx) }) + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapError]]. + * + * @see [[akka.stream.scaladsl.FlowOps.mapError]] + */ + def mapError(pf: PartialFunction[Throwable, Throwable]): Repr[Out, Ctx] = + via(flow.mapError(pf)) + /** * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapAsync]]. *