From 666bfade1e03662bb131df78da50ac49fcebd552 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 6 Mar 2015 12:22:14 +0100 Subject: [PATCH] !str #16993 Separate scaladsl/javadsl for FlattenStategy `abstract class FlattenStrategy` was used in both javadsl and scaladsl, but the concrete concat for the javadsl was in javadsl.FlattenStrategy and the concrete concat for the scaladsl is in stream.FlattenStrategy. Now there are separate FlattenStategy in scaladsl and javadsl packages and conversion as we have in other places. * replace JavaConverters with explicit methods * remove asJava/asScala for FlattenStrategy --- .../main/scala/akka/http/util/package.scala | 3 +- .../engine/parsing/RequestParserSpec.scala | 2 +- .../engine/parsing/ResponseParserSpec.scala | 2 +- .../marshalling/MultipartMarshallers.scala | 2 +- .../scala/akka/stream/tck/FlattenTest.scala | 2 +- .../stream/scaladsl/FlowConcatAllSpec.scala | 1 - .../javadsl/BidiFlowCreate.scala.template | 1 - .../stream/javadsl/FlowCreate.scala.template | 1 - .../stream/javadsl/GraphCreate.scala.template | 1 - .../stream/javadsl/SinkCreate.scala.template | 1 - .../javadsl/SourceCreate.scala.template | 1 - .../akka/stream/javadsl/FlattenStrategy.scala | 16 +++++-- .../main/scala/akka/stream/javadsl/Flow.scala | 7 +-- .../scala/akka/stream/javadsl/Graph.scala | 2 +- .../main/scala/akka/stream/javadsl/Sink.scala | 4 +- .../scala/akka/stream/javadsl/Source.scala | 8 +--- .../{ => scaladsl}/FlattenStrategy.scala | 10 +++-- .../scala/akka/stream/scaladsl/Flow.scala | 7 ++- .../scala/akka/stream/scaladsl/Graph.scala | 3 ++ .../akka/stream/scaladsl/JavaConverters.scala | 45 ------------------- .../scala/akka/stream/scaladsl/Sink.scala | 4 ++ .../scala/akka/stream/scaladsl/Source.scala | 4 ++ 22 files changed, 45 insertions(+), 82 deletions(-) rename akka-stream/src/main/scala/akka/stream/{ => scaladsl}/FlattenStrategy.scala (77%) delete mode 100644 akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala diff --git a/akka-http-core/src/main/scala/akka/http/util/package.scala b/akka-http-core/src/main/scala/akka/http/util/package.scala index b26931c1d7..0ac503fdf7 100644 --- a/akka-http-core/src/main/scala/akka/http/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/util/package.scala @@ -8,8 +8,7 @@ import language.implicitConversions import language.higherKinds import java.nio.charset.Charset import com.typesafe.config.Config -import akka.stream.FlattenStrategy -import akka.stream.scaladsl.{ Flow, Source } +import akka.stream.scaladsl.{ FlattenStrategy, Flow, Source } import akka.stream.stage._ import scala.concurrent.duration.Duration import scala.concurrent.{ Await, Future } diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala index dcb29884f9..006eda03e7 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala @@ -11,7 +11,7 @@ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import org.scalatest.matchers.Matcher import akka.stream.scaladsl._ import akka.stream.scaladsl.OperationAttributes._ -import akka.stream.FlattenStrategy +import akka.stream.scaladsl.FlattenStrategy import akka.stream.ActorFlowMaterializer import akka.util.ByteString import akka.actor.ActorSystem diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala index bc661930e1..ddd8adaeb6 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala @@ -11,7 +11,7 @@ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import org.scalatest.matchers.Matcher import akka.stream.scaladsl._ import akka.stream.scaladsl.OperationAttributes._ -import akka.stream.FlattenStrategy +import akka.stream.scaladsl.FlattenStrategy import akka.stream.ActorFlowMaterializer import akka.util.ByteString import akka.actor.ActorSystem diff --git a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala index 64c92165d2..79060a288f 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala @@ -8,7 +8,7 @@ import akka.event.{ NoLogging, LoggingAdapter } import scala.concurrent.forkjoin.ThreadLocalRandom import akka.parboiled2.util.Base64 -import akka.stream.FlattenStrategy +import akka.stream.scaladsl.FlattenStrategy import akka.stream.scaladsl._ import akka.http.engine.rendering.BodyPartRenderer import akka.http.util.FastFuture diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala index 72a7600065..15510414fd 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala @@ -3,10 +3,10 @@ */ package akka.stream.tck +import akka.stream.scaladsl.FlattenStrategy import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import org.reactivestreams.Publisher -import akka.stream.FlattenStrategy class FlattenTest extends AkkaPublisherVerification[Int] { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala index a5132a0953..e3ec71c402 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala @@ -5,7 +5,6 @@ package akka.stream.scaladsl import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import akka.stream.FlattenStrategy import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.{ StreamTestKit, AkkaSpec } diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template index eb8641d4a8..a443368b43 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template @@ -5,7 +5,6 @@ package akka.stream.javadsl import akka.stream.scaladsl import akka.stream.{ Inlet, Outlet, Shape, Graph, BidiShape } -import akka.stream.scaladsl.JavaConverters._ import akka.japi.Pair trait BidiFlowCreate { diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template index 64ca7ddd32..01e4f4445a 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template @@ -5,7 +5,6 @@ package akka.stream.javadsl import akka.stream.scaladsl import akka.stream.{ Inlet, Outlet, Shape, Graph } -import akka.stream.scaladsl.JavaConverters._ import akka.japi.Pair trait FlowCreate { diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template index 37cf4a82b1..c3ed28ff9d 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template @@ -5,7 +5,6 @@ package akka.stream.javadsl import akka.stream.scaladsl import akka.stream.{ Inlet, Shape, Graph } -import akka.stream.scaladsl.JavaConverters._ trait GraphCreate { diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template index fa33ff0a54..b78be90d2d 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template @@ -5,7 +5,6 @@ package akka.stream.javadsl import akka.stream.scaladsl import akka.stream.{ Inlet, Shape, Graph } -import akka.stream.scaladsl.JavaConverters._ trait SinkCreate { diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template index e12a470106..cd0b097358 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template @@ -5,7 +5,6 @@ package akka.stream.javadsl import akka.stream.scaladsl import akka.stream.{ Outlet, Shape, Graph } -import akka.stream.scaladsl.JavaConverters._ trait SourceCreate { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala index 18a140659a..bcda86b3c6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala @@ -3,7 +3,12 @@ */ package akka.stream.javadsl -import akka.stream.javadsl +import akka.stream.scaladsl + +/** + * Strategy that defines how a stream of streams should be flattened into a stream of simple elements. + */ +abstract class FlattenStrategy[-S, T] extends scaladsl.FlattenStrategy[S, T] object FlattenStrategy { @@ -12,8 +17,11 @@ object FlattenStrategy { * emitting its elements directly to the output until it completes and then taking the next stream. This has the * consequence that if one of the input stream is infinite, no other streams after that will be consumed from. */ - def concat[T]: akka.stream.FlattenStrategy[javadsl.Source[T, Unit], T] = - akka.stream.FlattenStrategy.Concat[T]().asInstanceOf[akka.stream.FlattenStrategy[javadsl.Source[T, _], T]] - // TODO so in theory this should be safe, but let's rethink the design later + def concat[T]: FlattenStrategy[Source[T, Unit], T] = Concat[T]() + + /** + * INTERNAL API + */ + private[akka] final case class Concat[T]() extends FlattenStrategy[Source[T, _], T] } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 67bec8b44c..e54c1d8e7a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -14,8 +14,6 @@ import akka.stream.impl.StreamLayout object Flow { - import akka.stream.scaladsl.JavaConverters._ - val factory: FlowCreate = new FlowCreate {} /** Adapt [[scaladsl.Flow]] for use within Java DSL */ @@ -44,12 +42,11 @@ object Flow { /** Create a `Flow` which can process elements of type `T`. */ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph[FlowShape[In, Out], Mat] { import scala.collection.JavaConverters._ - import akka.stream.scaladsl.JavaConverters._ override def shape: FlowShape[In, Out] = delegate.shape private[stream] def module: StreamLayout.Module = delegate.module - /** Converts this Flow to it's Scala DSL counterpart */ + /** Converts this Flow to its Scala DSL counterpart */ def asScala: scaladsl.Flow[In, Out, Mat] = delegate /** @@ -401,7 +398,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * This operation can be used on a stream of element type [[Source]]. */ - def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Flow[In, U, Mat] = + def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Flow[In, U, Mat] = new Flow(delegate.flatten(strategy)) /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 85e3e5eb73..3a765947aa 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -279,7 +279,7 @@ object FlowGraph { */ def builder[M](): Builder[M] = new Builder()(new scaladsl.FlowGraph.Builder[M]) - final class Builder[Mat]()(private implicit val delegate: scaladsl.FlowGraph.Builder[Mat]) { self ⇒ + final class Builder[+Mat]()(private implicit val delegate: scaladsl.FlowGraph.Builder[Mat]) { self ⇒ import akka.stream.scaladsl.FlowGraph.Implicits._ def flow[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = delegate.addEdge(from, via.asScala, to) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 6010845443..96dc483c39 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -17,8 +17,6 @@ import scala.util.Try /** Java API */ object Sink { - import akka.stream.scaladsl.JavaConverters._ - val factory: SinkCreate = new SinkCreate {} /** Adapt [[scaladsl.Sink]] for use within Java DSL */ @@ -116,7 +114,7 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[ override def shape: SinkShape[In] = delegate.shape private[stream] def module: StreamLayout.Module = delegate.module - /** Converts this Sink to it's Scala DSL counterpart */ + /** Converts this Sink to its Scala DSL counterpart */ def asScala: scaladsl.Sink[In, Mat] = delegate /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 5875727e63..eba226a3bd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -23,8 +23,6 @@ import scala.annotation.varargs /** Java API */ object Source { - import scaladsl.JavaConverters._ - val factory: SourceCreate = new SourceCreate {} /** Adapt [[scaladsl.Source]] for use within JavaDSL */ @@ -181,14 +179,12 @@ object Source { * Can be used as a `Publisher` */ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[SourceShape[Out], Mat] { - import akka.stream.scaladsl.JavaConverters._ - import scala.collection.JavaConverters._ override def shape: SourceShape[Out] = delegate.shape private[stream] def module: StreamLayout.Module = delegate.module - /** Converts this Java DSL element to it's Scala DSL counterpart. */ + /** Converts this Java DSL element to its Scala DSL counterpart. */ def asScala: scaladsl.Source[Out, Mat] = delegate /** @@ -472,7 +468,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * This operation can be used on a stream of element type [[Source]]. */ - def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Source[U, Mat] = + def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U, Mat] = new Source(delegate.flatten(strategy)) /** diff --git a/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala similarity index 77% rename from akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala rename to akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala index 64b88105d3..290db28f6b 100644 --- a/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala @@ -1,12 +1,14 @@ /** * Copyright (C) 2009-2014 Typesafe Inc. */ -package akka.stream +package akka.stream.scaladsl + +import akka.stream.javadsl /** * Strategy that defines how a stream of streams should be flattened into a stream of simple elements. */ -abstract class FlattenStrategy[-T, U] +abstract class FlattenStrategy[-S, T] object FlattenStrategy { @@ -15,7 +17,7 @@ object FlattenStrategy { * emitting its elements directly to the output until it completes and then taking the next stream. This has the * consequence that if one of the input stream is infinite, no other streams after that will be consumed from. */ - def concat[T]: FlattenStrategy[scaladsl.Source[T, _], T] = Concat[T]() + def concat[T]: FlattenStrategy[Source[T, _], T] = Concat[T]() - private[akka] final case class Concat[T]() extends FlattenStrategy[scaladsl.Source[T, _], T] + private[akka] final case class Concat[T]() extends FlattenStrategy[Source[T, _], T] } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index c11b587595..96dcd09382 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -286,6 +286,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) this.section[O, O2, Mat2, Mat2](attributes, Keep.right)(section) } + /** Converts this Scala DSL element to it's Java DSL counterpart. */ + def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this) + } object Flow extends FlowApply { @@ -627,8 +630,8 @@ trait FlowOps[+Out, +Mat] { * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * This operation can be used on a stream of element type [[akka.stream.scaladsl.Source]]. */ - def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match { - case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll()) + def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match { + case _: FlattenStrategy.Concat[Out] | _: javadsl.FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll()) case _ ⇒ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]") } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index d17c3f572c..574330107f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -297,6 +297,9 @@ object FlowGraph extends GraphApply { private[stream] def module: Module = moduleInProgress + /** Converts this Scala DSL element to it's Java DSL counterpart. */ + def asJava: javadsl.FlowGraph.Builder[M] = new javadsl.FlowGraph.Builder()(this) + } object Implicits { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala deleted file mode 100644 index 093aba8d5e..0000000000 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.scaladsl - -import akka.stream.javadsl -import akka.stream.scaladsl - -/** - * Implicit converters allowing to convert between Java and Scala DSL elements. - */ -private[akka] object JavaConverters { - - implicit final class AddAsJavaSource[Out, Mat](val source: scaladsl.Source[Out, Mat]) extends AnyVal { - def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(source) - } - implicit final class AddAsJavaFlow[In, Out, Mat](val flow: scaladsl.Flow[In, Out, Mat]) extends AnyVal { - def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(flow) - } - implicit final class AddAsJavaBidiFlow[I1, O1, I2, O2, Mat](val flow: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends AnyVal { - def asJava: javadsl.BidiFlow[I1, O1, I2, O2, Mat] = new javadsl.BidiFlow(flow) - } - implicit final class AddAsJavaSink[In, Mat](val sink: scaladsl.Sink[In, Mat]) extends AnyVal { - def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(sink) - } - implicit final class AsAsJavaFlowGraphBuilder[Out, Mat](val builder: scaladsl.FlowGraph.Builder[Mat]) extends AnyVal { - def asJava: javadsl.FlowGraph.Builder[Mat] = new javadsl.FlowGraph.Builder()(builder) - } - - implicit final class AddAsScalaSource[Out, Mat](val source: javadsl.Source[Out, Mat]) extends AnyVal { - def asScala: scaladsl.Source[Out, Mat] = source.asScala - } - implicit final class AddAsScalaFlow[In, Out, Mat](val flow: javadsl.Flow[In, Out, Mat]) extends AnyVal { - def asScala: scaladsl.Flow[In, Out, Mat] = flow.asScala - } - implicit final class AddAsScalaBidiFlow[I1, O1, I2, O2, Mat](val flow: javadsl.BidiFlow[I1, O1, I2, O2, Mat]) extends AnyVal { - def asScala: scaladsl.BidiFlow[I1, O1, I2, O2, Mat] = flow.asScala - } - implicit final class AddAsScalaSink[In, Mat](val sink: javadsl.Sink[In, Mat]) extends AnyVal { - def asScala: scaladsl.Sink[In, Mat] = sink.asScala - } - implicit final class AsAsScalaFlowGraphBuilder[Out, Mat](val builder: javadsl.FlowGraph.Builder[Mat]) extends AnyVal { - def asScala: FlowGraph.Builder[Mat] = builder.asScala - } -} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index f5436c7024..f8faaf94be 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -3,6 +3,7 @@ */ package akka.stream.scaladsl +import akka.stream.javadsl import akka.actor.{ ActorRef, Props } import akka.stream.impl._ import akka.stream.{ SinkShape, Inlet, Outlet, Graph } @@ -40,6 +41,9 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) new Sink(module.withAttributes(attr).wrap()) def named(name: String): Sink[In, Mat] = withAttributes(OperationAttributes.name(name)) + + /** Converts this Scala DSL element to it's Java DSL counterpart. */ + def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(this) } object Sink extends SinkApply { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 14ed935920..776c918086 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -3,6 +3,7 @@ */ package akka.stream.scaladsl +import akka.stream.javadsl import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } import akka.stream.{ SourceShape, Inlet, Outlet } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } @@ -152,6 +153,9 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = new Source(module.withAttributes(attr).wrap()) + /** Converts this Scala DSL element to it's Java DSL counterpart. */ + def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this) + } object Source extends SourceApply {