From 5a425c1dc71868cfdd6fdab8d71abde5ebb19b84 Mon Sep 17 00:00:00 2001 From: Luc Bourlier Date: Mon, 4 Mar 2019 15:48:24 +0100 Subject: [PATCH] Rename start/endContextPropagation functions to asFlow/Source.. (#26353) To match FlowWithContext.asFlow --- ...tPropagation.md => asSourceWithContext.md} | 6 ++-- .../main/paradox/stream/operators/index.md | 4 +-- .../scaladsl/FlowWithContextLogSpec.scala | 20 ++++++------- .../scaladsl/SourceWithContextSpec.scala | 30 +++++++++---------- .../scaladsl/WithContextUsageSpec.scala | 12 ++++---- .../mima-filters/2.5.21.backwards.excludes | 6 ++++ .../scala/akka/stream/javadsl/Source.scala | 2 +- .../stream/javadsl/SourceWithContext.scala | 6 ++-- .../scala/akka/stream/scaladsl/Source.scala | 2 +- .../stream/scaladsl/SourceWithContext.scala | 4 +-- 10 files changed, 49 insertions(+), 43 deletions(-) rename akka-docs/src/main/paradox/stream/operators/Source/{startContextPropagation.md => asSourceWithContext.md} (65%) diff --git a/akka-docs/src/main/paradox/stream/operators/Source/startContextPropagation.md b/akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md similarity index 65% rename from akka-docs/src/main/paradox/stream/operators/Source/startContextPropagation.md rename to akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md index 1a326093c0..1d18ddf496 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/startContextPropagation.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md @@ -1,4 +1,4 @@ -# Source.startContextPropagation +# Source.asSourceWithContext Turns a Source into a SourceWithContext which can propagate a context per element along a stream. @@ -8,11 +8,11 @@ Turns a Source into a SourceWithContext which can propagate a context per elemen ## Signature -@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #startContextPropagation } +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #asSourceWithContext } @@@ ## Description Turns a Source into a SourceWithContext which can propagate a context per element along a stream. -The function passed into startContextPropagation must turn elements into contexts, one context for every element. +The function passed into asSourceWithContext must turn elements into contexts, one context for every element. diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 51e4c1a133..87fddd207a 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -8,6 +8,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] | |Operator|Description| |--|--|--| |Source|@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.| +|Source|@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.| |Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.| |Source|@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.| |Source|@ref[cycle](Source/cycle.md)|Stream iterator in cycled manner.| @@ -27,7 +28,6 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] |Source|@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.| |Source|@ref[repeat](Source/repeat.md)|Stream a single object repeatedly| |Source|@ref[single](Source/single.md)|Stream a single object| -|Source|@ref[startContextPropagation](Source/startContextPropagation.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.| |Source|@ref[tick](Source/tick.md)|A periodical repetition of an arbitrary object.| |Source|@ref[unfold](Source/unfold.md)|Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`].| |Source|@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].| @@ -286,7 +286,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) @@@ index * [combine](Source/combine.md) -* [startContextPropagation](Source/startContextPropagation.md) +* [asSourceWithContext](Source/asSourceWithContext.md) * [fromPublisher](Source/fromPublisher.md) * [fromIterator](Source/fromIterator.md) * [cycle](Source/cycle.md) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala index 67280e0e3d..6e8267ad6e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala @@ -34,9 +34,9 @@ class FlowWithContextLogSpec extends StreamSpec(""" "log each element" in { val logging = FlowWithContext[Message, Long].log("my-log") Source(List(Message("a", 1L), Message("b", 2L))) - .startContextPropagation(m ⇒ m.offset) + .asSourceWithContext(m ⇒ m.offset) .via(logging) - .endContextPropagation + .asSource .runWith(Sink.ignore) logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log] Element: Message(a,1)")) @@ -47,9 +47,9 @@ class FlowWithContextLogSpec extends StreamSpec(""" "allow extracting value to be logged" in { val logging = FlowWithContext[Message, Long].log("my-log2", m ⇒ m.data) Source(List(Message("a", 1L))) - .startContextPropagation(m ⇒ m.offset) + .asSourceWithContext(m ⇒ m.offset) .via(logging) - .endContextPropagation + .asSource .runWith(Sink.ignore) logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log2] Element: a")) @@ -64,9 +64,9 @@ class FlowWithContextLogSpec extends StreamSpec(""" val logging = FlowWithContext[Message, Long].log("my-log3") Source(List(Message("a", 1L), Message("b", 2L))) - .startContextPropagation(m ⇒ m.offset) + .asSourceWithContext(m ⇒ m.offset) .via(logging) - .endContextPropagation + .asSource .withAttributes(disableElementLogging) .runWith(Sink.ignore) @@ -79,9 +79,9 @@ class FlowWithContextLogSpec extends StreamSpec(""" "log each element" in { Source(List(Message("a", 1L), Message("b", 2L))) - .startContextPropagation(m ⇒ m.offset) + .asSourceWithContext(m ⇒ m.offset) .log("my-log4") - .endContextPropagation + .asSource .runWith(Sink.ignore) logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log4] Element: Message(a,1)")) @@ -91,9 +91,9 @@ class FlowWithContextLogSpec extends StreamSpec(""" "allow extracting value to be logged" in { Source(List(Message("a", 1L))) - .startContextPropagation(m ⇒ m.offset) + .asSourceWithContext(m ⇒ m.offset) .log("my-log5", m ⇒ m.data) - .endContextPropagation + .asSource .runWith(Sink.ignore) logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log5] Element: a")) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala index 5efde24383..fb845d7342 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala @@ -17,11 +17,11 @@ class SourceWithContextSpec extends StreamSpec { "A SourceWithContext" must { - "get created from Source.startContextPropagation" in { + "get created from Source.asSourceWithContext" in { val msg = Message("a", 1L) Source(Vector(msg)) - .startContextPropagation(_.offset) - .endContextPropagation + .asSourceWithContext(_.offset) + .asSource .runWith(TestSink.probe[(Message, Long)]) .request(1) .expectNext((msg, 1L)) @@ -31,9 +31,9 @@ class SourceWithContextSpec extends StreamSpec { "be able to get turned back into a normal Source" in { val msg = Message("a", 1L) Source(Vector(msg)) - .startContextPropagation(_.offset) + .asSourceWithContext(_.offset) .map(_.data) - .endContextPropagation.map { case (e, _) ⇒ e } + .asSource.map { case (e, _) ⇒ e } .runWith(TestSink.probe[String]) .request(1) .expectNext("a") @@ -44,11 +44,11 @@ class SourceWithContextSpec extends StreamSpec { Source( Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)) ) - .startContextPropagation(_.offset) + .asSourceWithContext(_.offset) .map(_.data.toLowerCase) .filter(_ != "b") .filterNot(_ == "d") - .endContextPropagation + .asSource .runWith(TestSink.probe[(String, Long)]) .request(2) .expectNext(("a", 1L)) @@ -61,10 +61,10 @@ class SourceWithContextSpec extends StreamSpec { def flowWithContext[T] = FlowWithContext[T, Long] Source(Vector(Message("a", 1L))) - .startContextPropagation(_.offset) + .asSourceWithContext(_.offset) .map(_.data) .via(flowWithContext.map(s ⇒ s + "b")) - .endContextPropagation + .asSource .runWith(TestSink.probe[(String, Long)]) .request(1) .expectNext(("ab", 1L)) @@ -73,12 +73,12 @@ class SourceWithContextSpec extends StreamSpec { "pass through contexts via mapConcat" in { Source(Vector(Message("a", 1L))) - .startContextPropagation(_.offset) + .asSourceWithContext(_.offset) .map(_.data) .mapConcat { str ⇒ List(1, 2, 3).map(i ⇒ s"$str-$i") } - .endContextPropagation + .asSource .runWith(TestSink.probe[(String, Long)]) .request(3) .expectNext(("a-1", 1L), ("a-2", 1L), ("a-3", 1L)) @@ -87,13 +87,13 @@ class SourceWithContextSpec extends StreamSpec { "pass through a sequence of contexts per element via grouped" in { Source(Vector(Message("a", 1L))) - .startContextPropagation(_.offset) + .asSourceWithContext(_.offset) .map(_.data) .mapConcat { str ⇒ List(1, 2, 3, 4).map(i ⇒ s"$str-$i") } .grouped(2) - .endContextPropagation + .asSource .runWith(TestSink.probe[(Seq[String], Seq[Long])]) .request(2) .expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L))) @@ -109,10 +109,10 @@ class SourceWithContextSpec extends StreamSpec { } } Source(Vector(Message("a", 1L), Message("z", 2L))) - .startContextPropagation(_.offset) + .asSourceWithContext(_.offset) .map(_.data) .statefulMapConcat(statefulFunction) - .endContextPropagation + .asSource .runWith(TestSink.probe[(String, Long)]) .request(3) .expectNext(("a", 1L), ("z", 2L), ("z", 2L)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala index dd24b34291..57066f1f7f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala @@ -28,7 +28,7 @@ class WithContextUsageSpec extends StreamSpec { val src = createSourceWithContext(input) .map(f) - .endContextPropagation + .asSource src.map { case (e, _) ⇒ e } .runWith(TestSink.probe[Record]) @@ -54,7 +54,7 @@ class WithContextUsageSpec extends StreamSpec { val src = createSourceWithContext(input) .filter(f) - .endContextPropagation + .asSource src.map { case (e, _) ⇒ e } .runWith(TestSink.probe[Record]) @@ -80,7 +80,7 @@ class WithContextUsageSpec extends StreamSpec { val src = createSourceWithContext(input) .mapConcat(f) - .endContextPropagation + .asSource src.map { case (e, _) ⇒ e } .runWith(TestSink.probe[Record]) @@ -108,7 +108,7 @@ class WithContextUsageSpec extends StreamSpec { .grouped(groupSize) .map(l ⇒ MultiRecord(l)) .mapContext(_.last) - .endContextPropagation + .asSource src.map { case (e, _) ⇒ e } .runWith(TestSink.probe[MultiRecord]) @@ -141,7 +141,7 @@ class WithContextUsageSpec extends StreamSpec { .grouped(groupSize) .map(l ⇒ MultiRecord(l)) .mapContext(_.last) - .endContextPropagation + .asSource src.map { case (e, _) ⇒ e } .runWith(TestSink.probe[MultiRecord]) @@ -166,7 +166,7 @@ class WithContextUsageSpec extends StreamSpec { def createSourceWithContext(committableMessages: Vector[Consumer.CommittableMessage[Record]]): SourceWithContext[Record, Offset, NotUsed] = Consumer .committableSource(committableMessages) - .startContextPropagation(m ⇒ Offset(m.committableOffset.offset)) + .asSourceWithContext(m ⇒ Offset(m.committableOffset.offset)) .map(_.record) def commitOffsets = commit[Offset](Offset.Uninitialized) diff --git a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes index f2bc0a59c2..65ae5539bb 100644 --- a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes @@ -1,3 +1,9 @@ +# #26353 Rename start/endContextPropagation functions to asFlow/Source +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.startContextPropagation") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SourceWithContext.endContextPropagation") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.startContextPropagation") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.SourceWithContext.endContextPropagation") + # FlowWithContextOps.log() #26386 ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$2") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 02740ebfc9..44757c4147 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3472,6 +3472,6 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * API MAY CHANGE */ @ApiMayChange - def startContextPropagation[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] = + def asSourceWithContext[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] = new scaladsl.SourceWithContext(this.asScala.map(x ⇒ (x, extractContext.apply(x)))).asJava } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index a353e4a8c8..acf63b4ef7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -22,7 +22,7 @@ import scala.compat.java8.FutureConverters._ * use [[SourceWithContext.via]] to manually provide the context propagation for otherwise unsupported * operations. * - * Can be created by calling [[Source.startContextPropagation()]] + * Can be created by calling [[Source.asSourceWithContext()]] * * API MAY CHANGE */ @@ -44,8 +44,8 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon * Stops automatic context propagation from here and converts this to a regular * stream of a pair of (data, context). */ - def endContextPropagation(): Source[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Mat @uncheckedVariance] = - delegate.endContextPropagation.map { case (o, c) ⇒ Pair(o, c) }.asJava + def asSource(): Source[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Mat @uncheckedVariance] = + delegate.asSource.map { case (o, c) ⇒ Pair(o, c) }.asJava // remaining operations in alphabetic order diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index a4d19d4f3d..d34abf6d97 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -220,7 +220,7 @@ final class Source[+Out, +Mat]( * API MAY CHANGE */ @ApiMayChange - def startContextPropagation[Ctx](f: Out ⇒ Ctx): SourceWithContext[Out, Ctx, Mat] = new SourceWithContext(this.map(e ⇒ (e, f(e)))) + def asSourceWithContext[Ctx](f: Out ⇒ Ctx): SourceWithContext[Out, Ctx, Mat] = new SourceWithContext(this.map(e ⇒ (e, f(e)))) } object Source { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala index e4ef91198a..d8637ac9e1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala @@ -15,7 +15,7 @@ import akka.stream._ * use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported * operations. * - * Can be created by calling [[Source.startContextPropagation()]] + * Can be created by calling [[Source.asSourceWithContext()]] * * API MAY CHANGE */ @@ -35,7 +35,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] ( * Stops automatic context propagation from here and converts this to a regular * stream of a pair of (data, context). */ - def endContextPropagation: Source[(Out, Ctx), Mat] = delegate + def asSource: Source[(Out, Ctx), Mat] = delegate def asJava[JOut >: Out, JCtx >: Ctx, JMat >: Mat]: javadsl.SourceWithContext[JOut, JCtx, JMat] = new javadsl.SourceWithContext(this)