Merge pull request #26465 from akka/wip-from-tuple-pair-source-patriknw

Adds fromPairs to SourceWithContent object
This commit is contained in:
Patrik Nordwall 2019-03-05 11:56:44 +01:00 committed by GitHub
commit 7f0837a550
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 48 additions and 5 deletions

View file

@ -28,6 +28,16 @@ class SourceWithContextSpec extends StreamSpec {
.expectComplete()
}
"get created from a source of tuple2" in {
val msg = Message("a", 1L)
SourceWithContext.fromTuples(Source(Vector((msg, msg.offset))))
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(1)
.expectNext((msg, 1L))
.expectComplete()
}
"be able to get turned back into a normal Source" in {
val msg = Message("a", 1L)
Source(Vector(msg))

View file

@ -21,3 +21,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.snapshot.Materia
# Sets correct return type for withAttributes on Source/FlowWithContext #26411
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.GraphDelegate.withAttributes")
# rename `from` to `fromTuples` in WithContext Scala dsl #26370
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContext.from")

View file

@ -26,8 +26,11 @@ object FlowWithContext {
new FlowWithContext(scaladsl.FlowWithContext[In, Ctx])
}
/**
* Creates a FlowWithContext from a regular flow that operates on `Pair<data, context>` elements.
*/
def fromPairs[In, CtxIn, Out, CtxOut, Mat](under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = {
new FlowWithContext(scaladsl.FlowWithContext.from(scaladsl.Flow[(In, CtxIn)].map { case (i, c) Pair(i, c) }.viaMat(under.asScala.map(_.toScala))(scaladsl.Keep.right)))
new FlowWithContext(scaladsl.FlowWithContext.fromTuples(scaladsl.Flow[(In, CtxIn)].map { case (i, c) Pair(i, c) }.viaMat(under.asScala.map(_.toScala))(scaladsl.Keep.right)))
}
}

View file

@ -16,6 +16,20 @@ import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
/**
* API MAY CHANGE
*/
@ApiMayChange
object SourceWithContext {
/**
* Creates a SourceWithContext from a regular flow that operates on `Pair<data, context>` elements.
*/
def fromPairs[Out, CtxOut, Mat](under: Source[Pair[Out, CtxOut], Mat]): SourceWithContext[Out, CtxOut, Mat] = {
new SourceWithContext(scaladsl.SourceWithContext.fromTuples(under.asScala.map(_.toScala)))
}
}
/**
* A source that provides operations which automatically propagate the context of an element.
* Only a subset of common operations from [[Source]] is supported. As an escape hatch you can

View file

@ -23,9 +23,9 @@ object FlowWithContext {
}
/**
* Creates a FlowWithContext from a regular flow that operates on a pair of `(data, context)` elements.
* Creates a FlowWithContext from a regular flow that operates on a tuple of `(data, context)` elements.
*/
def from[In, CtxIn, Out, CtxOut, Mat](flow: Flow[(In, CtxIn), (Out, CtxOut), Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
def fromTuples[In, CtxIn, Out, CtxOut, Mat](flow: Flow[(In, CtxIn), (Out, CtxOut), Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
new FlowWithContext(flow)
}
@ -46,10 +46,10 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](
override type ReprMat[+O, +C, +M] = FlowWithContext[In @uncheckedVariance, CtxIn @uncheckedVariance, O, C, M @uncheckedVariance]
override def via[Out2, Ctx2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Out2, Ctx2] =
FlowWithContext.from(delegate.via(viaFlow))
new FlowWithContext(delegate.via(viaFlow))
override def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) Mat3): FlowWithContext[In, CtxIn, Out2, Ctx2, Mat3] =
FlowWithContext.from(delegate.viaMat(flow)(combine))
new FlowWithContext(delegate.viaMat(flow)(combine))
/**
* Context-preserving variant of [[akka.stream.scaladsl.Flow.withAttributes]].

View file

@ -9,6 +9,19 @@ import scala.annotation.unchecked.uncheckedVariance
import akka.annotation.ApiMayChange
import akka.stream._
/**
* API MAY CHANGE
*/
@ApiMayChange
object SourceWithContext {
/**
* Creates a SourceWithContext from a regular source that operates on a tuple of `(data, context)` elements.
*/
def fromTuples[Out, CtxOut, Mat](source: Source[(Out, CtxOut), Mat]): SourceWithContext[Out, CtxOut, Mat] =
new SourceWithContext(source)
}
/**
* A source that provides operations which automatically propagate the context of an element.
* Only a subset of common operations from [[FlowOps]] is supported. As an escape hatch you can