diff --git a/akka-http-core/src/main/scala/akka/http/util/package.scala b/akka-http-core/src/main/scala/akka/http/util/package.scala index b26931c1d7..0ac503fdf7 100644 --- a/akka-http-core/src/main/scala/akka/http/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/util/package.scala @@ -8,8 +8,7 @@ import language.implicitConversions import language.higherKinds import java.nio.charset.Charset import com.typesafe.config.Config -import akka.stream.FlattenStrategy -import akka.stream.scaladsl.{ Flow, Source } +import akka.stream.scaladsl.{ FlattenStrategy, Flow, Source } import akka.stream.stage._ import scala.concurrent.duration.Duration import scala.concurrent.{ Await, Future } diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala index dcb29884f9..006eda03e7 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala @@ -11,7 +11,7 @@ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import org.scalatest.matchers.Matcher import akka.stream.scaladsl._ import akka.stream.scaladsl.OperationAttributes._ -import akka.stream.FlattenStrategy +import akka.stream.scaladsl.FlattenStrategy import akka.stream.ActorFlowMaterializer import akka.util.ByteString import akka.actor.ActorSystem diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala index bc661930e1..ddd8adaeb6 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala @@ -11,7 +11,7 @@ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import org.scalatest.matchers.Matcher import akka.stream.scaladsl._ import akka.stream.scaladsl.OperationAttributes._ -import akka.stream.FlattenStrategy +import akka.stream.scaladsl.FlattenStrategy import akka.stream.ActorFlowMaterializer import akka.util.ByteString import akka.actor.ActorSystem diff --git a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala index 64c92165d2..79060a288f 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala @@ -8,7 +8,7 @@ import akka.event.{ NoLogging, LoggingAdapter } import scala.concurrent.forkjoin.ThreadLocalRandom import akka.parboiled2.util.Base64 -import akka.stream.FlattenStrategy +import akka.stream.scaladsl.FlattenStrategy import akka.stream.scaladsl._ import akka.http.engine.rendering.BodyPartRenderer import akka.http.util.FastFuture diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala index 72a7600065..15510414fd 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala @@ -3,10 +3,10 @@ */ package akka.stream.tck +import akka.stream.scaladsl.FlattenStrategy import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import org.reactivestreams.Publisher -import akka.stream.FlattenStrategy class FlattenTest extends AkkaPublisherVerification[Int] { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala index a5132a0953..e3ec71c402 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala @@ -5,7 +5,6 @@ package akka.stream.scaladsl import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import akka.stream.FlattenStrategy import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.{ StreamTestKit, AkkaSpec } diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template index eb8641d4a8..a443368b43 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template @@ -5,7 +5,6 @@ package akka.stream.javadsl import akka.stream.scaladsl import akka.stream.{ Inlet, Outlet, Shape, Graph, BidiShape } -import akka.stream.scaladsl.JavaConverters._ import akka.japi.Pair trait BidiFlowCreate { diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template index 64ca7ddd32..01e4f4445a 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template @@ -5,7 +5,6 @@ package akka.stream.javadsl import akka.stream.scaladsl import akka.stream.{ Inlet, Outlet, Shape, Graph } -import akka.stream.scaladsl.JavaConverters._ import akka.japi.Pair trait FlowCreate { diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template index 37cf4a82b1..c3ed28ff9d 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template @@ -5,7 +5,6 @@ package akka.stream.javadsl import akka.stream.scaladsl import akka.stream.{ Inlet, Shape, Graph } -import akka.stream.scaladsl.JavaConverters._ trait GraphCreate { diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template index fa33ff0a54..b78be90d2d 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template @@ -5,7 +5,6 @@ package akka.stream.javadsl import akka.stream.scaladsl import akka.stream.{ Inlet, Shape, Graph } -import akka.stream.scaladsl.JavaConverters._ trait SinkCreate { diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template index e12a470106..cd0b097358 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template @@ -5,7 +5,6 @@ package akka.stream.javadsl import akka.stream.scaladsl import akka.stream.{ Outlet, Shape, Graph } -import akka.stream.scaladsl.JavaConverters._ trait SourceCreate { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala index 18a140659a..bcda86b3c6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala @@ -3,7 +3,12 @@ */ package akka.stream.javadsl -import akka.stream.javadsl +import akka.stream.scaladsl + +/** + * Strategy that defines how a stream of streams should be flattened into a stream of simple elements. + */ +abstract class FlattenStrategy[-S, T] extends scaladsl.FlattenStrategy[S, T] object FlattenStrategy { @@ -12,8 +17,11 @@ object FlattenStrategy { * emitting its elements directly to the output until it completes and then taking the next stream. This has the * consequence that if one of the input stream is infinite, no other streams after that will be consumed from. */ - def concat[T]: akka.stream.FlattenStrategy[javadsl.Source[T, Unit], T] = - akka.stream.FlattenStrategy.Concat[T]().asInstanceOf[akka.stream.FlattenStrategy[javadsl.Source[T, _], T]] - // TODO so in theory this should be safe, but let's rethink the design later + def concat[T]: FlattenStrategy[Source[T, Unit], T] = Concat[T]() + + /** + * INTERNAL API + */ + private[akka] final case class Concat[T]() extends FlattenStrategy[Source[T, _], T] } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 67bec8b44c..e54c1d8e7a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -14,8 +14,6 @@ import akka.stream.impl.StreamLayout object Flow { - import akka.stream.scaladsl.JavaConverters._ - val factory: FlowCreate = new FlowCreate {} /** Adapt [[scaladsl.Flow]] for use within Java DSL */ @@ -44,12 +42,11 @@ object Flow { /** Create a `Flow` which can process elements of type `T`. */ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph[FlowShape[In, Out], Mat] { import scala.collection.JavaConverters._ - import akka.stream.scaladsl.JavaConverters._ override def shape: FlowShape[In, Out] = delegate.shape private[stream] def module: StreamLayout.Module = delegate.module - /** Converts this Flow to it's Scala DSL counterpart */ + /** Converts this Flow to its Scala DSL counterpart */ def asScala: scaladsl.Flow[In, Out, Mat] = delegate /** @@ -401,7 +398,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * This operation can be used on a stream of element type [[Source]]. */ - def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Flow[In, U, Mat] = + def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Flow[In, U, Mat] = new Flow(delegate.flatten(strategy)) /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 85e3e5eb73..3a765947aa 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -279,7 +279,7 @@ object FlowGraph { */ def builder[M](): Builder[M] = new Builder()(new scaladsl.FlowGraph.Builder[M]) - final class Builder[Mat]()(private implicit val delegate: scaladsl.FlowGraph.Builder[Mat]) { self ⇒ + final class Builder[+Mat]()(private implicit val delegate: scaladsl.FlowGraph.Builder[Mat]) { self ⇒ import akka.stream.scaladsl.FlowGraph.Implicits._ def flow[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = delegate.addEdge(from, via.asScala, to) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 6010845443..96dc483c39 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -17,8 +17,6 @@ import scala.util.Try /** Java API */ object Sink { - import akka.stream.scaladsl.JavaConverters._ - val factory: SinkCreate = new SinkCreate {} /** Adapt [[scaladsl.Sink]] for use within Java DSL */ @@ -116,7 +114,7 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[ override def shape: SinkShape[In] = delegate.shape private[stream] def module: StreamLayout.Module = delegate.module - /** Converts this Sink to it's Scala DSL counterpart */ + /** Converts this Sink to its Scala DSL counterpart */ def asScala: scaladsl.Sink[In, Mat] = delegate /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 5875727e63..eba226a3bd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -23,8 +23,6 @@ import scala.annotation.varargs /** Java API */ object Source { - import scaladsl.JavaConverters._ - val factory: SourceCreate = new SourceCreate {} /** Adapt [[scaladsl.Source]] for use within JavaDSL */ @@ -181,14 +179,12 @@ object Source { * Can be used as a `Publisher` */ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[SourceShape[Out], Mat] { - import akka.stream.scaladsl.JavaConverters._ - import scala.collection.JavaConverters._ override def shape: SourceShape[Out] = delegate.shape private[stream] def module: StreamLayout.Module = delegate.module - /** Converts this Java DSL element to it's Scala DSL counterpart. */ + /** Converts this Java DSL element to its Scala DSL counterpart. */ def asScala: scaladsl.Source[Out, Mat] = delegate /** @@ -472,7 +468,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * This operation can be used on a stream of element type [[Source]]. */ - def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Source[U, Mat] = + def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U, Mat] = new Source(delegate.flatten(strategy)) /** diff --git a/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala similarity index 77% rename from akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala rename to akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala index 64b88105d3..290db28f6b 100644 --- a/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala @@ -1,12 +1,14 @@ /** * Copyright (C) 2009-2014 Typesafe Inc. */ -package akka.stream +package akka.stream.scaladsl + +import akka.stream.javadsl /** * Strategy that defines how a stream of streams should be flattened into a stream of simple elements. */ -abstract class FlattenStrategy[-T, U] +abstract class FlattenStrategy[-S, T] object FlattenStrategy { @@ -15,7 +17,7 @@ object FlattenStrategy { * emitting its elements directly to the output until it completes and then taking the next stream. This has the * consequence that if one of the input stream is infinite, no other streams after that will be consumed from. */ - def concat[T]: FlattenStrategy[scaladsl.Source[T, _], T] = Concat[T]() + def concat[T]: FlattenStrategy[Source[T, _], T] = Concat[T]() - private[akka] final case class Concat[T]() extends FlattenStrategy[scaladsl.Source[T, _], T] + private[akka] final case class Concat[T]() extends FlattenStrategy[Source[T, _], T] } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index c11b587595..96dcd09382 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -286,6 +286,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) this.section[O, O2, Mat2, Mat2](attributes, Keep.right)(section) } + /** Converts this Scala DSL element to it's Java DSL counterpart. */ + def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this) + } object Flow extends FlowApply { @@ -627,8 +630,8 @@ trait FlowOps[+Out, +Mat] { * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * This operation can be used on a stream of element type [[akka.stream.scaladsl.Source]]. */ - def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match { - case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll()) + def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match { + case _: FlattenStrategy.Concat[Out] | _: javadsl.FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll()) case _ ⇒ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]") } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index d17c3f572c..574330107f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -297,6 +297,9 @@ object FlowGraph extends GraphApply { private[stream] def module: Module = moduleInProgress + /** Converts this Scala DSL element to it's Java DSL counterpart. */ + def asJava: javadsl.FlowGraph.Builder[M] = new javadsl.FlowGraph.Builder()(this) + } object Implicits { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala deleted file mode 100644 index 093aba8d5e..0000000000 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.scaladsl - -import akka.stream.javadsl -import akka.stream.scaladsl - -/** - * Implicit converters allowing to convert between Java and Scala DSL elements. - */ -private[akka] object JavaConverters { - - implicit final class AddAsJavaSource[Out, Mat](val source: scaladsl.Source[Out, Mat]) extends AnyVal { - def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(source) - } - implicit final class AddAsJavaFlow[In, Out, Mat](val flow: scaladsl.Flow[In, Out, Mat]) extends AnyVal { - def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(flow) - } - implicit final class AddAsJavaBidiFlow[I1, O1, I2, O2, Mat](val flow: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends AnyVal { - def asJava: javadsl.BidiFlow[I1, O1, I2, O2, Mat] = new javadsl.BidiFlow(flow) - } - implicit final class AddAsJavaSink[In, Mat](val sink: scaladsl.Sink[In, Mat]) extends AnyVal { - def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(sink) - } - implicit final class AsAsJavaFlowGraphBuilder[Out, Mat](val builder: scaladsl.FlowGraph.Builder[Mat]) extends AnyVal { - def asJava: javadsl.FlowGraph.Builder[Mat] = new javadsl.FlowGraph.Builder()(builder) - } - - implicit final class AddAsScalaSource[Out, Mat](val source: javadsl.Source[Out, Mat]) extends AnyVal { - def asScala: scaladsl.Source[Out, Mat] = source.asScala - } - implicit final class AddAsScalaFlow[In, Out, Mat](val flow: javadsl.Flow[In, Out, Mat]) extends AnyVal { - def asScala: scaladsl.Flow[In, Out, Mat] = flow.asScala - } - implicit final class AddAsScalaBidiFlow[I1, O1, I2, O2, Mat](val flow: javadsl.BidiFlow[I1, O1, I2, O2, Mat]) extends AnyVal { - def asScala: scaladsl.BidiFlow[I1, O1, I2, O2, Mat] = flow.asScala - } - implicit final class AddAsScalaSink[In, Mat](val sink: javadsl.Sink[In, Mat]) extends AnyVal { - def asScala: scaladsl.Sink[In, Mat] = sink.asScala - } - implicit final class AsAsScalaFlowGraphBuilder[Out, Mat](val builder: javadsl.FlowGraph.Builder[Mat]) extends AnyVal { - def asScala: FlowGraph.Builder[Mat] = builder.asScala - } -} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index f5436c7024..f8faaf94be 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -3,6 +3,7 @@ */ package akka.stream.scaladsl +import akka.stream.javadsl import akka.actor.{ ActorRef, Props } import akka.stream.impl._ import akka.stream.{ SinkShape, Inlet, Outlet, Graph } @@ -40,6 +41,9 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) new Sink(module.withAttributes(attr).wrap()) def named(name: String): Sink[In, Mat] = withAttributes(OperationAttributes.name(name)) + + /** Converts this Scala DSL element to it's Java DSL counterpart. */ + def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(this) } object Sink extends SinkApply { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 14ed935920..776c918086 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -3,6 +3,7 @@ */ package akka.stream.scaladsl +import akka.stream.javadsl import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } import akka.stream.{ SourceShape, Inlet, Outlet } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } @@ -152,6 +153,9 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = new Source(module.withAttributes(attr).wrap()) + /** Converts this Scala DSL element to it's Java DSL counterpart. */ + def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this) + } object Source extends SourceApply {