Rename start/endContextPropagation functions to asFlow/Source.. (#26353)

To match FlowWithContext.asFlow
This commit is contained in:
Luc Bourlier 2019-03-04 15:48:24 +01:00 committed by Patrik Nordwall
parent ff9289a089
commit 5a425c1dc7
10 changed files with 49 additions and 43 deletions

View file

@ -1,4 +1,4 @@
# Source.startContextPropagation
# Source.asSourceWithContext
Turns a Source into a SourceWithContext which can propagate a context per element along a stream.
@ -8,11 +8,11 @@ Turns a Source into a SourceWithContext which can propagate a context per elemen
## Signature
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #startContextPropagation }
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #asSourceWithContext }
@@@
## Description
Turns a Source into a SourceWithContext which can propagate a context per element along a stream.
The function passed into startContextPropagation must turn elements into contexts, one context for every element.
The function passed into asSourceWithContext must turn elements into contexts, one context for every element.

View file

@ -8,6 +8,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
| |Operator|Description|
|--|--|--|
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.|
|Source|<a name="assourcewithcontext"></a>@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.|
|Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.|
|Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.|
|Source|<a name="cycle"></a>@ref[cycle](Source/cycle.md)|Stream iterator in cycled manner.|
@ -27,7 +28,6 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|Source|<a name="range"></a>@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.|
|Source|<a name="repeat"></a>@ref[repeat](Source/repeat.md)|Stream a single object repeatedly|
|Source|<a name="single"></a>@ref[single](Source/single.md)|Stream a single object|
|Source|<a name="startcontextpropagation"></a>@ref[startContextPropagation](Source/startContextPropagation.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.|
|Source|<a name="tick"></a>@ref[tick](Source/tick.md)|A periodical repetition of an arbitrary object.|
|Source|<a name="unfold"></a>@ref[unfold](Source/unfold.md)|Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`].|
|Source|<a name="unfoldasync"></a>@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].|
@ -286,7 +286,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
@@@ index
* [combine](Source/combine.md)
* [startContextPropagation](Source/startContextPropagation.md)
* [asSourceWithContext](Source/asSourceWithContext.md)
* [fromPublisher](Source/fromPublisher.md)
* [fromIterator](Source/fromIterator.md)
* [cycle](Source/cycle.md)

View file

@ -34,9 +34,9 @@ class FlowWithContextLogSpec extends StreamSpec("""
"log each element" in {
val logging = FlowWithContext[Message, Long].log("my-log")
Source(List(Message("a", 1L), Message("b", 2L)))
.startContextPropagation(m m.offset)
.asSourceWithContext(m m.offset)
.via(logging)
.endContextPropagation
.asSource
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log] Element: Message(a,1)"))
@ -47,9 +47,9 @@ class FlowWithContextLogSpec extends StreamSpec("""
"allow extracting value to be logged" in {
val logging = FlowWithContext[Message, Long].log("my-log2", m m.data)
Source(List(Message("a", 1L)))
.startContextPropagation(m m.offset)
.asSourceWithContext(m m.offset)
.via(logging)
.endContextPropagation
.asSource
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log2] Element: a"))
@ -64,9 +64,9 @@ class FlowWithContextLogSpec extends StreamSpec("""
val logging = FlowWithContext[Message, Long].log("my-log3")
Source(List(Message("a", 1L), Message("b", 2L)))
.startContextPropagation(m m.offset)
.asSourceWithContext(m m.offset)
.via(logging)
.endContextPropagation
.asSource
.withAttributes(disableElementLogging)
.runWith(Sink.ignore)
@ -79,9 +79,9 @@ class FlowWithContextLogSpec extends StreamSpec("""
"log each element" in {
Source(List(Message("a", 1L), Message("b", 2L)))
.startContextPropagation(m m.offset)
.asSourceWithContext(m m.offset)
.log("my-log4")
.endContextPropagation
.asSource
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log4] Element: Message(a,1)"))
@ -91,9 +91,9 @@ class FlowWithContextLogSpec extends StreamSpec("""
"allow extracting value to be logged" in {
Source(List(Message("a", 1L)))
.startContextPropagation(m m.offset)
.asSourceWithContext(m m.offset)
.log("my-log5", m m.data)
.endContextPropagation
.asSource
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log5] Element: a"))

View file

@ -17,11 +17,11 @@ class SourceWithContextSpec extends StreamSpec {
"A SourceWithContext" must {
"get created from Source.startContextPropagation" in {
"get created from Source.asSourceWithContext" in {
val msg = Message("a", 1L)
Source(Vector(msg))
.startContextPropagation(_.offset)
.endContextPropagation
.asSourceWithContext(_.offset)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(1)
.expectNext((msg, 1L))
@ -31,9 +31,9 @@ class SourceWithContextSpec extends StreamSpec {
"be able to get turned back into a normal Source" in {
val msg = Message("a", 1L)
Source(Vector(msg))
.startContextPropagation(_.offset)
.asSourceWithContext(_.offset)
.map(_.data)
.endContextPropagation.map { case (e, _) e }
.asSource.map { case (e, _) e }
.runWith(TestSink.probe[String])
.request(1)
.expectNext("a")
@ -44,11 +44,11 @@ class SourceWithContextSpec extends StreamSpec {
Source(
Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))
)
.startContextPropagation(_.offset)
.asSourceWithContext(_.offset)
.map(_.data.toLowerCase)
.filter(_ != "b")
.filterNot(_ == "d")
.endContextPropagation
.asSource
.runWith(TestSink.probe[(String, Long)])
.request(2)
.expectNext(("a", 1L))
@ -61,10 +61,10 @@ class SourceWithContextSpec extends StreamSpec {
def flowWithContext[T] = FlowWithContext[T, Long]
Source(Vector(Message("a", 1L)))
.startContextPropagation(_.offset)
.asSourceWithContext(_.offset)
.map(_.data)
.via(flowWithContext.map(s s + "b"))
.endContextPropagation
.asSource
.runWith(TestSink.probe[(String, Long)])
.request(1)
.expectNext(("ab", 1L))
@ -73,12 +73,12 @@ class SourceWithContextSpec extends StreamSpec {
"pass through contexts via mapConcat" in {
Source(Vector(Message("a", 1L)))
.startContextPropagation(_.offset)
.asSourceWithContext(_.offset)
.map(_.data)
.mapConcat { str
List(1, 2, 3).map(i s"$str-$i")
}
.endContextPropagation
.asSource
.runWith(TestSink.probe[(String, Long)])
.request(3)
.expectNext(("a-1", 1L), ("a-2", 1L), ("a-3", 1L))
@ -87,13 +87,13 @@ class SourceWithContextSpec extends StreamSpec {
"pass through a sequence of contexts per element via grouped" in {
Source(Vector(Message("a", 1L)))
.startContextPropagation(_.offset)
.asSourceWithContext(_.offset)
.map(_.data)
.mapConcat { str
List(1, 2, 3, 4).map(i s"$str-$i")
}
.grouped(2)
.endContextPropagation
.asSource
.runWith(TestSink.probe[(Seq[String], Seq[Long])])
.request(2)
.expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L)))
@ -109,10 +109,10 @@ class SourceWithContextSpec extends StreamSpec {
}
}
Source(Vector(Message("a", 1L), Message("z", 2L)))
.startContextPropagation(_.offset)
.asSourceWithContext(_.offset)
.map(_.data)
.statefulMapConcat(statefulFunction)
.endContextPropagation
.asSource
.runWith(TestSink.probe[(String, Long)])
.request(3)
.expectNext(("a", 1L), ("z", 2L), ("z", 2L))

View file

@ -28,7 +28,7 @@ class WithContextUsageSpec extends StreamSpec {
val src = createSourceWithContext(input)
.map(f)
.endContextPropagation
.asSource
src.map { case (e, _) e }
.runWith(TestSink.probe[Record])
@ -54,7 +54,7 @@ class WithContextUsageSpec extends StreamSpec {
val src = createSourceWithContext(input)
.filter(f)
.endContextPropagation
.asSource
src.map { case (e, _) e }
.runWith(TestSink.probe[Record])
@ -80,7 +80,7 @@ class WithContextUsageSpec extends StreamSpec {
val src = createSourceWithContext(input)
.mapConcat(f)
.endContextPropagation
.asSource
src.map { case (e, _) e }
.runWith(TestSink.probe[Record])
@ -108,7 +108,7 @@ class WithContextUsageSpec extends StreamSpec {
.grouped(groupSize)
.map(l MultiRecord(l))
.mapContext(_.last)
.endContextPropagation
.asSource
src.map { case (e, _) e }
.runWith(TestSink.probe[MultiRecord])
@ -141,7 +141,7 @@ class WithContextUsageSpec extends StreamSpec {
.grouped(groupSize)
.map(l MultiRecord(l))
.mapContext(_.last)
.endContextPropagation
.asSource
src.map { case (e, _) e }
.runWith(TestSink.probe[MultiRecord])
@ -166,7 +166,7 @@ class WithContextUsageSpec extends StreamSpec {
def createSourceWithContext(committableMessages: Vector[Consumer.CommittableMessage[Record]]): SourceWithContext[Record, Offset, NotUsed] =
Consumer
.committableSource(committableMessages)
.startContextPropagation(m Offset(m.committableOffset.offset))
.asSourceWithContext(m Offset(m.committableOffset.offset))
.map(_.record)
def commitOffsets = commit[Offset](Offset.Uninitialized)

View file

@ -1,3 +1,9 @@
# #26353 Rename start/endContextPropagation functions to asFlow/Source
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.startContextPropagation")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SourceWithContext.endContextPropagation")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.startContextPropagation")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.SourceWithContext.endContextPropagation")
# FlowWithContextOps.log() #26386
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$2")

View file

@ -3472,6 +3472,6 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* API MAY CHANGE
*/
@ApiMayChange
def startContextPropagation[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] =
def asSourceWithContext[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] =
new scaladsl.SourceWithContext(this.asScala.map(x (x, extractContext.apply(x)))).asJava
}

View file

@ -22,7 +22,7 @@ import scala.compat.java8.FutureConverters._
* use [[SourceWithContext.via]] to manually provide the context propagation for otherwise unsupported
* operations.
*
* Can be created by calling [[Source.startContextPropagation()]]
* Can be created by calling [[Source.asSourceWithContext()]]
*
* API MAY CHANGE
*/
@ -44,8 +44,8 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
* Stops automatic context propagation from here and converts this to a regular
* stream of a pair of (data, context).
*/
def endContextPropagation(): Source[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Mat @uncheckedVariance] =
delegate.endContextPropagation.map { case (o, c) Pair(o, c) }.asJava
def asSource(): Source[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Mat @uncheckedVariance] =
delegate.asSource.map { case (o, c) Pair(o, c) }.asJava
// remaining operations in alphabetic order

View file

@ -220,7 +220,7 @@ final class Source[+Out, +Mat](
* API MAY CHANGE
*/
@ApiMayChange
def startContextPropagation[Ctx](f: Out Ctx): SourceWithContext[Out, Ctx, Mat] = new SourceWithContext(this.map(e (e, f(e))))
def asSourceWithContext[Ctx](f: Out Ctx): SourceWithContext[Out, Ctx, Mat] = new SourceWithContext(this.map(e (e, f(e))))
}
object Source {

View file

@ -15,7 +15,7 @@ import akka.stream._
* use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported
* operations.
*
* Can be created by calling [[Source.startContextPropagation()]]
* Can be created by calling [[Source.asSourceWithContext()]]
*
* API MAY CHANGE
*/
@ -35,7 +35,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (
* Stops automatic context propagation from here and converts this to a regular
* stream of a pair of (data, context).
*/
def endContextPropagation: Source[(Out, Ctx), Mat] = delegate
def asSource: Source[(Out, Ctx), Mat] = delegate
def asJava[JOut >: Out, JCtx >: Ctx, JMat >: Mat]: javadsl.SourceWithContext[JOut, JCtx, JMat] =
new javadsl.SourceWithContext(this)