Added mapError to SourceWithContext and FlowWithContext. (#27900)

* Added mapError to SourceWithContext and FlowWithContext.

* Formatting.

* Doc fix.

* Formatted tests.
This commit is contained in:
Raymond Roestenburg 2019-10-07 17:29:26 +02:00 committed by Arnout Engelen
parent 888a638ed2
commit e5772b1107
5 changed files with 72 additions and 2 deletions

View file

@ -7,6 +7,8 @@ package akka.stream.scaladsl
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import scala.util.control.NoStackTrace
class FlowWithContextSpec extends StreamSpec { class FlowWithContextSpec extends StreamSpec {
"A FlowWithContext" must { "A FlowWithContext" must {
@ -42,5 +44,27 @@ class FlowWithContextSpec extends StreamSpec {
matValue shouldBe (42 -> materializedValue) matValue shouldBe (42 -> materializedValue)
probe.request(1).expectNext(((Message("a", 1L), 1L))).expectComplete() probe.request(1).expectNext(((Message("a", 1L), 1L))).expectComplete()
} }
"be able to map error via FlowWithContext.mapError" in {
val ex = new RuntimeException("ex") with NoStackTrace
val boom = new Exception("BOOM!") with NoStackTrace
val mapErrorFlow = FlowWithContext[Message, Long]
.map {
case m @ Message(_, offset) => if (offset == 3) throw ex else m
}
.mapError { case _: Throwable => boom }
Source(1L to 4L)
.map { offset =>
Message("a", offset)
}
.asSourceWithContext(_.offset)
.via(mapErrorFlow)
.runWith(TestSink.probe[(Message, Long)])
.request(3)
.expectNext((Message("a", 1L), 1L))
.expectNext((Message("a", 2L), 2L))
.expectError(boom)
}
} }
} }

View file

@ -7,6 +7,8 @@ package akka.stream.scaladsl
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import scala.util.control.NoStackTrace
case class Message(data: String, offset: Long) case class Message(data: String, offset: Long)
class SourceWithContextSpec extends StreamSpec { class SourceWithContextSpec extends StreamSpec {
@ -104,7 +106,7 @@ class SourceWithContextSpec extends StreamSpec {
.expectComplete() .expectComplete()
} }
"be able to change meterialized value via mapMaterializedValue" in { "be able to change materialized value via mapMaterializedValue" in {
val materializedValue = "MatedValue" val materializedValue = "MatedValue"
Source Source
.empty[Message] .empty[Message]
@ -113,5 +115,25 @@ class SourceWithContextSpec extends StreamSpec {
.to(Sink.ignore) .to(Sink.ignore)
.run() shouldBe materializedValue .run() shouldBe materializedValue
} }
"be able to map error via mapError" in {
val ex = new RuntimeException("ex") with NoStackTrace
val boom = new Exception("BOOM!") with NoStackTrace
Source(1L to 4L)
.map { offset =>
Message("a", offset)
}
.map {
case m @ Message(_, offset) => if (offset == 3) throw ex else m
}
.asSourceWithContext(_.offset)
.mapError { case _: Throwable => boom }
.runWith(TestSink.probe[(Message, Long)])
.request(3)
.expectNext((Message("a", 1L), 1L))
.expectNext((Message("a", 2L), 2L))
.expectError(boom)
}
} }
} }

View file

@ -66,10 +66,18 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
override def withAttributes(attr: Attributes): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = override def withAttributes(attr: Attributes): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.withAttributes(attr)) viaScala(_.withAttributes(attr))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.mapError]].
*
* @see [[akka.stream.javadsl.Flow.mapError]]
*/
def mapError(pf: PartialFunction[Throwable, Throwable]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.mapError(pf))
/** /**
* Context-preserving variant of [[akka.stream.javadsl.Flow.mapMaterializedValue]]. * Context-preserving variant of [[akka.stream.javadsl.Flow.mapMaterializedValue]].
* *
* @see [[akka.stream.scaladsl.Flow.mapMaterializedValue]] * @see [[akka.stream.javadsl.Flow.mapMaterializedValue]]
*/ */
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat2] = def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat2] =
new FlowWithContext(delegate.mapMaterializedValue[Mat2](f)) new FlowWithContext(delegate.mapMaterializedValue[Mat2](f))

View file

@ -61,6 +61,14 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
override def withAttributes(attr: Attributes): SourceWithContext[Out, Ctx, Mat] = override def withAttributes(attr: Attributes): SourceWithContext[Out, Ctx, Mat] =
viaScala(_.withAttributes(attr)) viaScala(_.withAttributes(attr))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.mapError]].
*
* @see [[akka.stream.javadsl.Source.mapError]]
*/
def mapError(pf: PartialFunction[Throwable, Throwable]): SourceWithContext[Out, Ctx, Mat] =
viaScala(_.mapError(pf))
/** /**
* Context-preserving variant of [[akka.stream.javadsl.Source.mapMaterializedValue]]. * Context-preserving variant of [[akka.stream.javadsl.Source.mapMaterializedValue]].
* *

View file

@ -59,6 +59,14 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
def map[Out2](f: Out => Out2): Repr[Out2, Ctx] = def map[Out2](f: Out => Out2): Repr[Out2, Ctx] =
via(flow.map { case (e, ctx) => (f(e), ctx) }) via(flow.map { case (e, ctx) => (f(e), ctx) })
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapError]].
*
* @see [[akka.stream.scaladsl.FlowOps.mapError]]
*/
def mapError(pf: PartialFunction[Throwable, Throwable]): Repr[Out, Ctx] =
via(flow.mapError(pf))
/** /**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapAsync]]. * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapAsync]].
* *