From 308b13f9ed22fa9cf4ace9c1aea9af8290ef59bd Mon Sep 17 00:00:00 2001 From: Alexey Romanchuk Date: Tue, 10 Jun 2014 16:41:47 +0700 Subject: [PATCH] Fix Duct#append type parameter --- .../src/main/scala/akka/stream/impl/FlowImpl.scala | 4 ++-- .../src/main/scala/akka/stream/javadsl/Duct.scala | 4 ++-- .../src/main/scala/akka/stream/scaladsl/Duct.scala | 4 ++-- .../test/java/akka/stream/javadsl/DuctTest.java | 14 +++++++------- .../src/test/scala/akka/stream/DuctSpec.scala | 8 ++++++-- 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 03c0e73a1b..69999eda2c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -82,10 +82,10 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[ // Storing ops in reverse order override protected def andThen[U](op: Ast.AstNode): Duct[In, U] = this.copy(ops = op :: ops) - override def append[U](duct: Duct[_ >: In, U]): Duct[In, U] = + override def append[U](duct: Duct[_ >: Out, U]): Duct[In, U] = copy(ops = duct.ops ++: ops) - override def appendJava[U](duct: akka.stream.javadsl.Duct[_ >: In, U]): Duct[In, U] = + override def appendJava[U](duct: akka.stream.javadsl.Duct[_ >: Out, U]): Duct[In, U] = copy(ops = duct.ops ++: ops) override def produceTo(materializer: FlowMaterializer, consumer: Consumer[Out]): Consumer[In] = diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala index c4f7fb1175..3354b55bec 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala @@ -238,7 +238,7 @@ abstract class Duct[In, Out] { /** * Append the operations of a [[Duct]] to this `Duct`. */ - def append[U](duct: Duct[_ >: In, U]): Duct[In, U] + def append[U](duct: Duct[_ >: Out, U]): Duct[In, U] /** * Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary @@ -412,7 +412,7 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, override def flatten[U](strategy: FlattenStrategy[T, U]): Duct[In, U] = new DuctAdapter(delegate.flatten(strategy)) - override def append[U](duct: Duct[_ >: In, U]): Duct[In, U] = + override def append[U](duct: Duct[_ >: T, U]): Duct[In, U] = new DuctAdapter(delegate.appendJava(duct)) override def produceTo(materializer: FlowMaterializer, consumer: Consumer[T]): Consumer[In] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index 9d4ab36633..5ea651963a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -265,12 +265,12 @@ trait Duct[In, +Out] { /** * Append the operations of a [[Duct]] to this `Duct`. */ - def append[U](duct: Duct[_ >: In, U]): Duct[In, U] + def append[U](duct: Duct[_ >: Out, U]): Duct[In, U] /** * INTERNAL API */ - private[akka] def appendJava[U](duct: akka.stream.javadsl.Duct[_ >: In, U]): Duct[In, U] + private[akka] def appendJava[U](duct: akka.stream.javadsl.Duct[_ >: Out, U]): Duct[In, U] /** * Materialize this `Duct` by attaching it to the specified downstream `consumer` diff --git a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java index 3eb6b82a80..2cf64f19fc 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java @@ -140,19 +140,19 @@ public class DuctTest { public void mustBeAppendableToDuct() { final JavaTestKit probe = new JavaTestKit(system); - Duct duct1 = Duct.create(Integer.class).map(new Function() { - public Integer apply(Integer elem) { - return elem + 10; + Duct duct1 = Duct.create(String.class).map(new Function() { + public Integer apply(String elem) { + return Integer.parseInt(elem); } }); - Consumer ductInConsumer = Duct.create(Integer.class).map(new Function() { - public Integer apply(Integer elem) { - return elem * 2; + Consumer ductInConsumer = Duct.create(Integer.class).map(new Function() { + public String apply(Integer elem) { + return Integer.toString(elem * 2); } }).append(duct1).map(new Function() { public String apply(Integer elem) { - return "elem-" + elem; + return "elem-" + (elem + 10); } }).foreach(new Procedure() { public void apply(String elem) { diff --git a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala index a3ddf957ff..346756c8f7 100644 --- a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala @@ -182,8 +182,12 @@ class DuctSpec extends AkkaSpec { "be appendable to a Duct" in { val c = StreamTestKit.consumerProbe[String] - val duct1 = Duct[Int].map(_ + 10).map(_.toString) - val ductInConsumer = Duct[Int].map(_ * 2).append(duct1).map((s: String) ⇒ "elem-" + s).produceTo(materializer, c) + val duct1 = Duct[String].map(Integer.parseInt) + val ductInConsumer = Duct[Int] + .map { i ⇒ (i * 2).toString } + .append(duct1) + .map { i ⇒ "elem-" + (i + 10) } + .produceTo(materializer, c) Flow(List(1, 2, 3)).produceTo(materializer, ductInConsumer)