From c344287b9dc71028dc9c0a9acde878fcb113e190 Mon Sep 17 00:00:00 2001 From: Luc Bourlier Date: Tue, 12 Feb 2019 17:05:07 +0100 Subject: [PATCH] Adds fromPairs to SourceWithContent object Like in FlowWithContext. To simplify the creation of sourceWithContext from sources with compatible types. Renames from to fromPairs in FlowWithContext. --- .../stream/scaladsl/SourceWithContextSpec.scala | 10 ++++++++++ .../main/mima-filters/2.5.21.backwards.excludes | 3 +++ .../akka/stream/javadsl/FlowWithContext.scala | 5 ++++- .../akka/stream/javadsl/SourceWithContext.scala | 14 ++++++++++++++ .../akka/stream/scaladsl/FlowWithContext.scala | 8 ++++---- .../akka/stream/scaladsl/SourceWithContext.scala | 13 +++++++++++++ 6 files changed, 48 insertions(+), 5 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala index 83edfb2247..b37aa31174 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala @@ -28,6 +28,16 @@ class SourceWithContextSpec extends StreamSpec { .expectComplete() } + "get created from a source of tuple2" in { + val msg = Message("a", 1L) + SourceWithContext.fromTuples(Source(Vector((msg, msg.offset)))) + .asSource + .runWith(TestSink.probe[(Message, Long)]) + .request(1) + .expectNext((msg, 1L)) + .expectComplete() + } + "be able to get turned back into a normal Source" in { val msg = Message("a", 1L) Source(Vector(msg)) diff --git a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes index d16117cbcb..6573730526 100644 --- a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes @@ -21,3 +21,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.snapshot.Materia # Sets correct return type for withAttributes on Source/FlowWithContext #26411 ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.GraphDelegate.withAttributes") + +# rename `from` to `fromTuples` in WithContext Scala dsl #26370 +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContext.from") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index 681bde134c..b8f5c30b3a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -26,8 +26,11 @@ object FlowWithContext { new FlowWithContext(scaladsl.FlowWithContext[In, Ctx]) } + /** + * Creates a FlowWithContext from a regular flow that operates on `Pair` elements. + */ def fromPairs[In, CtxIn, Out, CtxOut, Mat](under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = { - new FlowWithContext(scaladsl.FlowWithContext.from(scaladsl.Flow[(In, CtxIn)].map { case (i, c) ⇒ Pair(i, c) }.viaMat(under.asScala.map(_.toScala))(scaladsl.Keep.right))) + new FlowWithContext(scaladsl.FlowWithContext.fromTuples(scaladsl.Flow[(In, CtxIn)].map { case (i, c) ⇒ Pair(i, c) }.viaMat(under.asScala.map(_.toScala))(scaladsl.Keep.right))) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index eb1b39d96c..b12fc5dc63 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -16,6 +16,20 @@ import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ +/** + * API MAY CHANGE + */ +@ApiMayChange +object SourceWithContext { + + /** + * Creates a SourceWithContext from a regular flow that operates on `Pair` elements. + */ + def fromPairs[Out, CtxOut, Mat](under: Source[Pair[Out, CtxOut], Mat]): SourceWithContext[Out, CtxOut, Mat] = { + new SourceWithContext(scaladsl.SourceWithContext.fromTuples(under.asScala.map(_.toScala))) + } +} + /** * A source that provides operations which automatically propagate the context of an element. * Only a subset of common operations from [[Source]] is supported. As an escape hatch you can diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala index 522202ebe5..d8bb4adb5d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala @@ -23,9 +23,9 @@ object FlowWithContext { } /** - * Creates a FlowWithContext from a regular flow that operates on a pair of `(data, context)` elements. + * Creates a FlowWithContext from a regular flow that operates on a tuple of `(data, context)` elements. */ - def from[In, CtxIn, Out, CtxOut, Mat](flow: Flow[(In, CtxIn), (Out, CtxOut), Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + def fromTuples[In, CtxIn, Out, CtxOut, Mat](flow: Flow[(In, CtxIn), (Out, CtxOut), Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = new FlowWithContext(flow) } @@ -46,10 +46,10 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat]( override type ReprMat[+O, +C, +M] = FlowWithContext[In @uncheckedVariance, CtxIn @uncheckedVariance, O, C, M @uncheckedVariance] override def via[Out2, Ctx2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Out2, Ctx2] = - FlowWithContext.from(delegate.via(viaFlow)) + new FlowWithContext(delegate.via(viaFlow)) override def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): FlowWithContext[In, CtxIn, Out2, Ctx2, Mat3] = - FlowWithContext.from(delegate.viaMat(flow)(combine)) + new FlowWithContext(delegate.viaMat(flow)(combine)) /** * Context-preserving variant of [[akka.stream.scaladsl.Flow.withAttributes]]. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala index f7296ff2bb..232b54e7fa 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala @@ -9,6 +9,19 @@ import scala.annotation.unchecked.uncheckedVariance import akka.annotation.ApiMayChange import akka.stream._ +/** + * API MAY CHANGE + */ +@ApiMayChange +object SourceWithContext { + + /** + * Creates a SourceWithContext from a regular source that operates on a tuple of `(data, context)` elements. + */ + def fromTuples[Out, CtxOut, Mat](source: Source[(Out, CtxOut), Mat]): SourceWithContext[Out, CtxOut, Mat] = + new SourceWithContext(source) +} + /** * A source that provides operations which automatically propagate the context of an element. * Only a subset of common operations from [[FlowOps]] is supported. As an escape hatch you can