#20564 Fix compose method of EmptyModule to be able to Keep.left or right

... and use the proper version from the GraphDSL depending on whether materialized value must be altered on an import or not.
This commit is contained in:
Endre Sándor Varga 2016-05-24 12:35:32 +02:00
parent 03395d5739
commit e80d7e1801
4 changed files with 72 additions and 6 deletions

View file

@ -170,6 +170,7 @@ object StreamLayout {
/**
* Fuses this Module to `that` Module by wiring together `from` and `to`,
* retaining the materialized value of `this` in the result
*
* @param that a Module to fuse with
* @param from the data source to wire
* @param to the data sink to wire
@ -182,6 +183,7 @@ object StreamLayout {
* Fuses this Module to `that` Module by wiring together `from` and `to`,
* transforming the materialized values of `this` and `that` using the
* provided function `f`
*
* @param that a Module to fuse with
* @param from the data source to wire
* @param to the data sink to wire
@ -242,6 +244,7 @@ object StreamLayout {
* Creates a new Module which is `this` Module composed with `that` Module,
* using the given function `f` to compose the materialized value of `this` with
* the materialized value of `that`.
*
* @param that a Module to be composed with (cannot be itself)
* @param f a function which combines the materialized values
* @tparam A the type of the materialized value of `this`
@ -345,10 +348,32 @@ object StreamLayout {
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule")
else this
override def compose(that: Module): Module = that
override def compose(that: Module): Module = compose(that, scaladsl.Keep.left)
override def compose[A, B, C](that: Module, f: (A, B) C): Module =
throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule")
override def compose[A, B, C](that: Module, f: (A, B) C): Module = {
if (f eq scaladsl.Keep.right) {
that
} else if (f eq scaladsl.Keep.left) {
// If "that" has a fully ignorable materialized value, we ignore it, otherwise we keep the side effect and
// explicitly map to NotUsed
val mat =
if (IgnorableMatValComp(that))
Ignore
else
Transform(_ => NotUsed, that.materializedValueComputation)
CompositeModule(
if (that.isSealed) Set(that) else that.subModules,
that.shape,
that.downstreams,
that.upstreams,
mat,
if (this.isSealed) Attributes.none else attributes)
} else {
throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule " +
"except with Keep.left or Keep.right")
}
}
override def withAttributes(attributes: Attributes): Module =
throw new UnsupportedOperationException("EmptyModule cannot carry attributes")