Merge pull request #15384 from 13h3r/release-2.3-dev
Fix Duct#append type parameter
This commit is contained in:
commit
5eaa2bfa18
5 changed files with 19 additions and 15 deletions
|
|
@ -82,10 +82,10 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[
|
|||
// Storing ops in reverse order
|
||||
override protected def andThen[U](op: Ast.AstNode): Duct[In, U] = this.copy(ops = op :: ops)
|
||||
|
||||
override def append[U](duct: Duct[_ >: In, U]): Duct[In, U] =
|
||||
override def append[U](duct: Duct[_ >: Out, U]): Duct[In, U] =
|
||||
copy(ops = duct.ops ++: ops)
|
||||
|
||||
override def appendJava[U](duct: akka.stream.javadsl.Duct[_ >: In, U]): Duct[In, U] =
|
||||
override def appendJava[U](duct: akka.stream.javadsl.Duct[_ >: Out, U]): Duct[In, U] =
|
||||
copy(ops = duct.ops ++: ops)
|
||||
|
||||
override def produceTo(materializer: FlowMaterializer, consumer: Consumer[Out]): Consumer[In] =
|
||||
|
|
|
|||
|
|
@ -238,7 +238,7 @@ abstract class Duct[In, Out] {
|
|||
/**
|
||||
* Append the operations of a [[Duct]] to this `Duct`.
|
||||
*/
|
||||
def append[U](duct: Duct[_ >: In, U]): Duct[In, U]
|
||||
def append[U](duct: Duct[_ >: Out, U]): Duct[In, U]
|
||||
|
||||
/**
|
||||
* Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary
|
||||
|
|
@ -412,7 +412,7 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In,
|
|||
override def flatten[U](strategy: FlattenStrategy[T, U]): Duct[In, U] =
|
||||
new DuctAdapter(delegate.flatten(strategy))
|
||||
|
||||
override def append[U](duct: Duct[_ >: In, U]): Duct[In, U] =
|
||||
override def append[U](duct: Duct[_ >: T, U]): Duct[In, U] =
|
||||
new DuctAdapter(delegate.appendJava(duct))
|
||||
|
||||
override def produceTo(materializer: FlowMaterializer, consumer: Consumer[T]): Consumer[In] =
|
||||
|
|
|
|||
|
|
@ -265,12 +265,12 @@ trait Duct[In, +Out] {
|
|||
/**
|
||||
* Append the operations of a [[Duct]] to this `Duct`.
|
||||
*/
|
||||
def append[U](duct: Duct[_ >: In, U]): Duct[In, U]
|
||||
def append[U](duct: Duct[_ >: Out, U]): Duct[In, U]
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] def appendJava[U](duct: akka.stream.javadsl.Duct[_ >: In, U]): Duct[In, U]
|
||||
private[akka] def appendJava[U](duct: akka.stream.javadsl.Duct[_ >: Out, U]): Duct[In, U]
|
||||
|
||||
/**
|
||||
* Materialize this `Duct` by attaching it to the specified downstream `consumer`
|
||||
|
|
|
|||
|
|
@ -140,19 +140,19 @@ public class DuctTest {
|
|||
public void mustBeAppendableToDuct() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
|
||||
Duct<Integer, Integer> duct1 = Duct.create(Integer.class).map(new Function<Integer, Integer>() {
|
||||
public Integer apply(Integer elem) {
|
||||
return elem + 10;
|
||||
Duct<String, Integer> duct1 = Duct.create(String.class).map(new Function<String, Integer>() {
|
||||
public Integer apply(String elem) {
|
||||
return Integer.parseInt(elem);
|
||||
}
|
||||
});
|
||||
|
||||
Consumer<Integer> ductInConsumer = Duct.create(Integer.class).map(new Function<Integer, Integer>() {
|
||||
public Integer apply(Integer elem) {
|
||||
return elem * 2;
|
||||
Consumer<Integer> ductInConsumer = Duct.create(Integer.class).map(new Function<Integer, String>() {
|
||||
public String apply(Integer elem) {
|
||||
return Integer.toString(elem * 2);
|
||||
}
|
||||
}).append(duct1).map(new Function<Integer, String>() {
|
||||
public String apply(Integer elem) {
|
||||
return "elem-" + elem;
|
||||
return "elem-" + (elem + 10);
|
||||
}
|
||||
}).foreach(new Procedure<String>() {
|
||||
public void apply(String elem) {
|
||||
|
|
|
|||
|
|
@ -182,8 +182,12 @@ class DuctSpec extends AkkaSpec {
|
|||
|
||||
"be appendable to a Duct" in {
|
||||
val c = StreamTestKit.consumerProbe[String]
|
||||
val duct1 = Duct[Int].map(_ + 10).map(_.toString)
|
||||
val ductInConsumer = Duct[Int].map(_ * 2).append(duct1).map((s: String) ⇒ "elem-" + s).produceTo(materializer, c)
|
||||
val duct1 = Duct[String].map(Integer.parseInt)
|
||||
val ductInConsumer = Duct[Int]
|
||||
.map { i ⇒ (i * 2).toString }
|
||||
.append(duct1)
|
||||
.map { i ⇒ "elem-" + (i + 10) }
|
||||
.produceTo(materializer, c)
|
||||
|
||||
Flow(List(1, 2, 3)).produceTo(materializer, ductInConsumer)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue