diff --git a/akka-docs/src/main/paradox/stream/operators/Source/startContextPropagation.md b/akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md
similarity index 65%
rename from akka-docs/src/main/paradox/stream/operators/Source/startContextPropagation.md
rename to akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md
index 1a326093c0..1d18ddf496 100644
--- a/akka-docs/src/main/paradox/stream/operators/Source/startContextPropagation.md
+++ b/akka-docs/src/main/paradox/stream/operators/Source/asSourceWithContext.md
@@ -1,4 +1,4 @@
-# Source.startContextPropagation
+# Source.asSourceWithContext
Turns a Source into a SourceWithContext which can propagate a context per element along a stream.
@@ -8,11 +8,11 @@ Turns a Source into a SourceWithContext which can propagate a context per elemen
## Signature
-@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #startContextPropagation }
+@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #asSourceWithContext }
@@@
## Description
Turns a Source into a SourceWithContext which can propagate a context per element along a stream.
-The function passed into startContextPropagation must turn elements into contexts, one context for every element.
+The function passed into asSourceWithContext must turn elements into contexts, one context for every element.
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index 51e4c1a133..87fddd207a 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -8,6 +8,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
| |Operator|Description|
|--|--|--|
|Source|@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.|
+|Source|@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.|
|Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.|
|Source|@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.|
|Source|@ref[cycle](Source/cycle.md)|Stream iterator in cycled manner.|
@@ -27,7 +28,6 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|Source|@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.|
|Source|@ref[repeat](Source/repeat.md)|Stream a single object repeatedly|
|Source|@ref[single](Source/single.md)|Stream a single object|
-|Source|@ref[startContextPropagation](Source/startContextPropagation.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.|
|Source|@ref[tick](Source/tick.md)|A periodical repetition of an arbitrary object.|
|Source|@ref[unfold](Source/unfold.md)|Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`].|
|Source|@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].|
@@ -286,7 +286,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
@@@ index
* [combine](Source/combine.md)
-* [startContextPropagation](Source/startContextPropagation.md)
+* [asSourceWithContext](Source/asSourceWithContext.md)
* [fromPublisher](Source/fromPublisher.md)
* [fromIterator](Source/fromIterator.md)
* [cycle](Source/cycle.md)
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala
index 67280e0e3d..6e8267ad6e 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala
@@ -34,9 +34,9 @@ class FlowWithContextLogSpec extends StreamSpec("""
"log each element" in {
val logging = FlowWithContext[Message, Long].log("my-log")
Source(List(Message("a", 1L), Message("b", 2L)))
- .startContextPropagation(m ⇒ m.offset)
+ .asSourceWithContext(m ⇒ m.offset)
.via(logging)
- .endContextPropagation
+ .asSource
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log] Element: Message(a,1)"))
@@ -47,9 +47,9 @@ class FlowWithContextLogSpec extends StreamSpec("""
"allow extracting value to be logged" in {
val logging = FlowWithContext[Message, Long].log("my-log2", m ⇒ m.data)
Source(List(Message("a", 1L)))
- .startContextPropagation(m ⇒ m.offset)
+ .asSourceWithContext(m ⇒ m.offset)
.via(logging)
- .endContextPropagation
+ .asSource
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log2] Element: a"))
@@ -64,9 +64,9 @@ class FlowWithContextLogSpec extends StreamSpec("""
val logging = FlowWithContext[Message, Long].log("my-log3")
Source(List(Message("a", 1L), Message("b", 2L)))
- .startContextPropagation(m ⇒ m.offset)
+ .asSourceWithContext(m ⇒ m.offset)
.via(logging)
- .endContextPropagation
+ .asSource
.withAttributes(disableElementLogging)
.runWith(Sink.ignore)
@@ -79,9 +79,9 @@ class FlowWithContextLogSpec extends StreamSpec("""
"log each element" in {
Source(List(Message("a", 1L), Message("b", 2L)))
- .startContextPropagation(m ⇒ m.offset)
+ .asSourceWithContext(m ⇒ m.offset)
.log("my-log4")
- .endContextPropagation
+ .asSource
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log4] Element: Message(a,1)"))
@@ -91,9 +91,9 @@ class FlowWithContextLogSpec extends StreamSpec("""
"allow extracting value to be logged" in {
Source(List(Message("a", 1L)))
- .startContextPropagation(m ⇒ m.offset)
+ .asSourceWithContext(m ⇒ m.offset)
.log("my-log5", m ⇒ m.data)
- .endContextPropagation
+ .asSource
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log5] Element: a"))
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala
index 5efde24383..fb845d7342 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala
@@ -17,11 +17,11 @@ class SourceWithContextSpec extends StreamSpec {
"A SourceWithContext" must {
- "get created from Source.startContextPropagation" in {
+ "get created from Source.asSourceWithContext" in {
val msg = Message("a", 1L)
Source(Vector(msg))
- .startContextPropagation(_.offset)
- .endContextPropagation
+ .asSourceWithContext(_.offset)
+ .asSource
.runWith(TestSink.probe[(Message, Long)])
.request(1)
.expectNext((msg, 1L))
@@ -31,9 +31,9 @@ class SourceWithContextSpec extends StreamSpec {
"be able to get turned back into a normal Source" in {
val msg = Message("a", 1L)
Source(Vector(msg))
- .startContextPropagation(_.offset)
+ .asSourceWithContext(_.offset)
.map(_.data)
- .endContextPropagation.map { case (e, _) ⇒ e }
+ .asSource.map { case (e, _) ⇒ e }
.runWith(TestSink.probe[String])
.request(1)
.expectNext("a")
@@ -44,11 +44,11 @@ class SourceWithContextSpec extends StreamSpec {
Source(
Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))
)
- .startContextPropagation(_.offset)
+ .asSourceWithContext(_.offset)
.map(_.data.toLowerCase)
.filter(_ != "b")
.filterNot(_ == "d")
- .endContextPropagation
+ .asSource
.runWith(TestSink.probe[(String, Long)])
.request(2)
.expectNext(("a", 1L))
@@ -61,10 +61,10 @@ class SourceWithContextSpec extends StreamSpec {
def flowWithContext[T] = FlowWithContext[T, Long]
Source(Vector(Message("a", 1L)))
- .startContextPropagation(_.offset)
+ .asSourceWithContext(_.offset)
.map(_.data)
.via(flowWithContext.map(s ⇒ s + "b"))
- .endContextPropagation
+ .asSource
.runWith(TestSink.probe[(String, Long)])
.request(1)
.expectNext(("ab", 1L))
@@ -73,12 +73,12 @@ class SourceWithContextSpec extends StreamSpec {
"pass through contexts via mapConcat" in {
Source(Vector(Message("a", 1L)))
- .startContextPropagation(_.offset)
+ .asSourceWithContext(_.offset)
.map(_.data)
.mapConcat { str ⇒
List(1, 2, 3).map(i ⇒ s"$str-$i")
}
- .endContextPropagation
+ .asSource
.runWith(TestSink.probe[(String, Long)])
.request(3)
.expectNext(("a-1", 1L), ("a-2", 1L), ("a-3", 1L))
@@ -87,13 +87,13 @@ class SourceWithContextSpec extends StreamSpec {
"pass through a sequence of contexts per element via grouped" in {
Source(Vector(Message("a", 1L)))
- .startContextPropagation(_.offset)
+ .asSourceWithContext(_.offset)
.map(_.data)
.mapConcat { str ⇒
List(1, 2, 3, 4).map(i ⇒ s"$str-$i")
}
.grouped(2)
- .endContextPropagation
+ .asSource
.runWith(TestSink.probe[(Seq[String], Seq[Long])])
.request(2)
.expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L)))
@@ -109,10 +109,10 @@ class SourceWithContextSpec extends StreamSpec {
}
}
Source(Vector(Message("a", 1L), Message("z", 2L)))
- .startContextPropagation(_.offset)
+ .asSourceWithContext(_.offset)
.map(_.data)
.statefulMapConcat(statefulFunction)
- .endContextPropagation
+ .asSource
.runWith(TestSink.probe[(String, Long)])
.request(3)
.expectNext(("a", 1L), ("z", 2L), ("z", 2L))
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala
index dd24b34291..57066f1f7f 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala
@@ -28,7 +28,7 @@ class WithContextUsageSpec extends StreamSpec {
val src = createSourceWithContext(input)
.map(f)
- .endContextPropagation
+ .asSource
src.map { case (e, _) ⇒ e }
.runWith(TestSink.probe[Record])
@@ -54,7 +54,7 @@ class WithContextUsageSpec extends StreamSpec {
val src = createSourceWithContext(input)
.filter(f)
- .endContextPropagation
+ .asSource
src.map { case (e, _) ⇒ e }
.runWith(TestSink.probe[Record])
@@ -80,7 +80,7 @@ class WithContextUsageSpec extends StreamSpec {
val src = createSourceWithContext(input)
.mapConcat(f)
- .endContextPropagation
+ .asSource
src.map { case (e, _) ⇒ e }
.runWith(TestSink.probe[Record])
@@ -108,7 +108,7 @@ class WithContextUsageSpec extends StreamSpec {
.grouped(groupSize)
.map(l ⇒ MultiRecord(l))
.mapContext(_.last)
- .endContextPropagation
+ .asSource
src.map { case (e, _) ⇒ e }
.runWith(TestSink.probe[MultiRecord])
@@ -141,7 +141,7 @@ class WithContextUsageSpec extends StreamSpec {
.grouped(groupSize)
.map(l ⇒ MultiRecord(l))
.mapContext(_.last)
- .endContextPropagation
+ .asSource
src.map { case (e, _) ⇒ e }
.runWith(TestSink.probe[MultiRecord])
@@ -166,7 +166,7 @@ class WithContextUsageSpec extends StreamSpec {
def createSourceWithContext(committableMessages: Vector[Consumer.CommittableMessage[Record]]): SourceWithContext[Record, Offset, NotUsed] =
Consumer
.committableSource(committableMessages)
- .startContextPropagation(m ⇒ Offset(m.committableOffset.offset))
+ .asSourceWithContext(m ⇒ Offset(m.committableOffset.offset))
.map(_.record)
def commitOffsets = commit[Offset](Offset.Uninitialized)
diff --git a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes
index f2bc0a59c2..65ae5539bb 100644
--- a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes
+++ b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes
@@ -1,3 +1,9 @@
+# #26353 Rename start/endContextPropagation functions to asFlow/Source
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.startContextPropagation")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SourceWithContext.endContextPropagation")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.startContextPropagation")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.SourceWithContext.endContextPropagation")
+
# FlowWithContextOps.log() #26386
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$2")
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index 02740ebfc9..44757c4147 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -3472,6 +3472,6 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* API MAY CHANGE
*/
@ApiMayChange
- def startContextPropagation[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] =
+ def asSourceWithContext[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] =
new scaladsl.SourceWithContext(this.asScala.map(x ⇒ (x, extractContext.apply(x)))).asJava
}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala
index a353e4a8c8..acf63b4ef7 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala
@@ -22,7 +22,7 @@ import scala.compat.java8.FutureConverters._
* use [[SourceWithContext.via]] to manually provide the context propagation for otherwise unsupported
* operations.
*
- * Can be created by calling [[Source.startContextPropagation()]]
+ * Can be created by calling [[Source.asSourceWithContext()]]
*
* API MAY CHANGE
*/
@@ -44,8 +44,8 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
* Stops automatic context propagation from here and converts this to a regular
* stream of a pair of (data, context).
*/
- def endContextPropagation(): Source[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Mat @uncheckedVariance] =
- delegate.endContextPropagation.map { case (o, c) ⇒ Pair(o, c) }.asJava
+ def asSource(): Source[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Mat @uncheckedVariance] =
+ delegate.asSource.map { case (o, c) ⇒ Pair(o, c) }.asJava
// remaining operations in alphabetic order
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
index a4d19d4f3d..d34abf6d97 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
@@ -220,7 +220,7 @@ final class Source[+Out, +Mat](
* API MAY CHANGE
*/
@ApiMayChange
- def startContextPropagation[Ctx](f: Out ⇒ Ctx): SourceWithContext[Out, Ctx, Mat] = new SourceWithContext(this.map(e ⇒ (e, f(e))))
+ def asSourceWithContext[Ctx](f: Out ⇒ Ctx): SourceWithContext[Out, Ctx, Mat] = new SourceWithContext(this.map(e ⇒ (e, f(e))))
}
object Source {
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala
index e4ef91198a..d8637ac9e1 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala
@@ -15,7 +15,7 @@ import akka.stream._
* use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported
* operations.
*
- * Can be created by calling [[Source.startContextPropagation()]]
+ * Can be created by calling [[Source.asSourceWithContext()]]
*
* API MAY CHANGE
*/
@@ -35,7 +35,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (
* Stops automatic context propagation from here and converts this to a regular
* stream of a pair of (data, context).
*/
- def endContextPropagation: Source[(Out, Ctx), Mat] = delegate
+ def asSource: Source[(Out, Ctx), Mat] = delegate
def asJava[JOut >: Out, JCtx >: Ctx, JMat >: Mat]: javadsl.SourceWithContext[JOut, JCtx, JMat] =
new javadsl.SourceWithContext(this)