diff --git a/akka-docs/rst/java/code/docs/stream/CompositionDocTest.java b/akka-docs/rst/java/code/docs/stream/CompositionDocTest.java index ec45cfe451..ff4a30265c 100644 --- a/akka-docs/rst/java/code/docs/stream/CompositionDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/CompositionDocTest.java @@ -4,6 +4,7 @@ package docs.stream; import java.util.Arrays; +import java.util.Optional; import akka.stream.ClosedShape; import org.junit.AfterClass; @@ -213,21 +214,21 @@ public class CompositionDocTest { //#mat-combine-4a static class MyClass { - private Promise> p; + private Promise> p; private OutgoingConnection conn; - public MyClass(Promise> p, OutgoingConnection conn) { + public MyClass(Promise> p, OutgoingConnection conn) { this.p = p; this.conn = conn; } public void close() { - p.success(Option.empty()); + p.success(Optional.empty()); } } static class Combiner { - static Future f(Promise> p, + static Future f(Promise> p, Pair, Future> rest) { return rest.first().map(new Mapper() { public MyClass apply(OutgoingConnection c) { @@ -242,13 +243,13 @@ public class CompositionDocTest { public void materializedValues() throws Exception { //#mat-combine-1 // Materializes to Promise (red) - final Source>> source = Source.maybe(); + final Source>> source = Source.maybe(); // Materializes to BoxedUnit (black) final Flow flow1 = Flow.of(Integer.class).take(100); // Materializes to Promise> (red) - final Source>> nestedSource = + final Source>> nestedSource = source.viaMat(flow1, Keep.left()).named("nestedSource"); //#mat-combine-1 diff --git a/akka-docs/rst/java/code/docs/stream/FlowDocTest.java b/akka-docs/rst/java/code/docs/stream/FlowDocTest.java index 8d486cdfe0..f7c6ccab48 100644 --- a/akka-docs/rst/java/code/docs/stream/FlowDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/FlowDocTest.java @@ -5,10 +5,7 @@ package docs.stream; import static org.junit.Assert.assertEquals; -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -225,7 +222,7 @@ public class FlowDocTest { //#flow-mat-combine // An empty source that can be shut down explicitly from the outside - Source>> source = Source.maybe(); + Source>> source = Source.maybe(); // A flow that internally throttles elements to 1/second, and returns a Cancellable // which can be used to shut down the stream @@ -236,7 +233,7 @@ public class FlowDocTest { // By default, the materialized value of the leftmost stage is preserved - RunnableGraph>> r1 = source.via(flow).to(sink); + RunnableGraph>> r1 = source.via(flow).to(sink); // Simple selection of materialized values by using Keep.right RunnableGraph r2 = source.viaMat(flow, Keep.right()).to(sink); @@ -245,17 +242,17 @@ public class FlowDocTest { // Using runWith will always give the materialized values of the stages added // by runWith() itself Future r4 = source.via(flow).runWith(sink, mat); - Promise> r5 = flow.to(sink).runWith(source, mat); - Pair>, Future> r6 = flow.runWith(source, sink, mat); + Promise> r5 = flow.to(sink).runWith(source, mat); + Pair>, Future> r6 = flow.runWith(source, sink, mat); // Using more complext combinations - RunnableGraph>, Cancellable>> r7 = + RunnableGraph>, Cancellable>> r7 = source.viaMat(flow, Keep.both()).to(sink); - RunnableGraph>, Future>> r8 = + RunnableGraph>, Future>> r8 = source.via(flow).toMat(sink, Keep.both()); - RunnableGraph>, Cancellable>, Future>> r9 = + RunnableGraph>, Cancellable>, Future>> r9 = source.viaMat(flow, Keep.both()).toMat(sink, Keep.both()); RunnableGraph>> r10 = @@ -267,7 +264,7 @@ public class FlowDocTest { RunnableGraph r11 = r9.mapMaterializedValue( (nestedTuple) -> { - Promise> p = nestedTuple.first().first(); + Promise> p = nestedTuple.first().first(); Cancellable c = nestedTuple.first().second(); Future f = nestedTuple.second(); diff --git a/akka-docs/rst/java/code/docs/stream/MigrationsJava.java b/akka-docs/rst/java/code/docs/stream/MigrationsJava.java index 7300bb66f4..2877dce516 100644 --- a/akka-docs/rst/java/code/docs/stream/MigrationsJava.java +++ b/akka-docs/rst/java/code/docs/stream/MigrationsJava.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.nio.charset.Charset; @@ -148,7 +149,7 @@ public class MigrationsJava { } //#source-creators - Source>> src = Source.maybe(); + Source>> src = Source.maybe(); // Complete the promise with an empty option to emulate the old lazyEmpty promise.trySuccess(scala.Option.empty()); diff --git a/akka-stream/src/main/scala/akka/stream/impl/Emit.scala b/akka-stream/src/main/scala/akka/stream/impl/Emit.scala deleted file mode 100644 index f757d6910e..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/Emit.scala +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import scala.collection.immutable - -/** - * INTERNAL API - */ -private[akka] trait Emit { this: ActorProcessorImpl with Pump ⇒ - - // TODO performance improvement: mutable buffer? - var emits = immutable.Seq.empty[Any] - - // Save previous phase we should return to in a var to avoid allocation - private var phaseAfterFlush: TransferPhase = _ - - // Enters flushing phase if there are emits pending - def emitAndThen(andThen: TransferPhase): Unit = - if (emits.nonEmpty) { - phaseAfterFlush = andThen - nextPhase(emitting) - } else nextPhase(andThen) - - // Emits all pending elements, then returns to savedPhase - private val emitting = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ - primaryOutputs.enqueueOutputElement(emits.head) - emits = emits.tail - if (emits.isEmpty) nextPhase(phaseAfterFlush) - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index a74c4376c3..7845645f0e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -3,18 +3,16 @@ */ package akka.stream.javadsl -import java.io.{ InputStream, OutputStream, File } +import java.util.Optional import akka.actor.{ ActorRef, Props } import akka.dispatch.ExecutionContexts import akka.japi.function -import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout import akka.stream.{ javadsl, scaladsl, _ } -import akka.util.ByteString import org.reactivestreams.{ Publisher, Subscriber } -import scala.concurrent.duration.FiniteDuration +import scala.compat.java8.OptionConverters._ import scala.concurrent.{ ExecutionContext, Future } import scala.util.Try @@ -110,9 +108,9 @@ object Sink { * * See also [[head]]. */ - def headOption[In](): Sink[In, Future[akka.japi.Option[In]]] = + def headOption[In](): Sink[In, Future[Optional[In]]] = new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue( - _.map(akka.japi.Option.fromScalaOption)(ExecutionContexts.sameThreadExecutionContext))) + _.map(_.asJava)(ExecutionContexts.sameThreadExecutionContext))) /** * A `Sink` that materializes into a `Future` of the last value received. @@ -131,9 +129,9 @@ object Sink { * * See also [[head]]. */ - def lastOption[In](): Sink[In, Future[akka.japi.Option[In]]] = + def lastOption[In](): Sink[In, Future[Optional[In]]] = new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue( - _.map(akka.japi.Option.fromScalaOption)(ExecutionContexts.sameThreadExecutionContext))) + _.map(_.asJava)(ExecutionContexts.sameThreadExecutionContext))) /** * A `Sink` that keeps on collecting incoming elements until upstream terminates. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 583a68ef38..8051d46b4a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -5,6 +5,7 @@ package akka.stream.javadsl import java.io.{ OutputStream, InputStream, File } import java.util +import java.util.Optional import akka.actor.{ ActorRef, Cancellable, Props } import akka.event.LoggingAdapter @@ -24,6 +25,8 @@ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } import scala.language.{ higherKinds, implicitConversions } +import scala.compat.java8.OptionConverters._ + /** Java API */ object Source { private[this] val _empty = new Source[Any, Unit](scaladsl.Source.empty) @@ -45,8 +48,16 @@ object Source { * If the downstream of this source cancels before the promise has been completed, then the promise will be completed * with None. */ - def maybe[T]: Source[T, Promise[Option[T]]] = - new Source(scaladsl.Source.maybe[T]) + def maybe[T]: Source[T, Promise[Optional[T]]] = { + new Source(scaladsl.Source.maybe[T].mapMaterializedValue { scalaOptionPromise: Promise[Option[T]] ⇒ + val javaOptionPromise = Promise[Optional[T]]() + scalaOptionPromise.completeWith( + javaOptionPromise.future + .map(_.asScala)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)) + + javaOptionPromise + }) + } /** * Helper to create [[Source]] from `Publisher`. @@ -175,14 +186,16 @@ object Source { * Create a `Source` that will unfold a value of type `S` into * a pair of the next state `S` and output elements of type `E`. */ - def unfold[S, E](s: S, f: function.Function[S, Option[(S, E)]]): Source[E, Unit] = - new Source(scaladsl.Source.unfold(s)((s: S) ⇒ f.apply(s))) + def unfold[S, E](s: S, f: function.Function[S, Optional[(S, E)]]): Source[E, Unit] = + new Source(scaladsl.Source.unfold(s)((s: S) ⇒ f.apply(s).asScala)) /** * Same as [[unfold]], but uses an async function to generate the next state-element tuple. */ - def unfoldAsync[S, E](s: S, f: function.Function[S, Future[Option[(S, E)]]]): Source[E, Unit] = - new Source(scaladsl.Source.unfoldAsync(s)((s: S) ⇒ f.apply(s))) + def unfoldAsync[S, E](s: S, f: function.Function[S, Future[Optional[(S, E)]]]): Source[E, Unit] = + new Source( + scaladsl.Source.unfoldAsync(s)( + (s: S) ⇒ f.apply(s).map(_.asScala)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext))) /** * Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala index d7aa15935e..de64224cc1 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala @@ -4,6 +4,7 @@ package akka.stream.javadsl import java.lang.{ Iterable ⇒ JIterable } +import java.util.Optional import scala.collection.immutable import scala.concurrent.duration._ import java.net.InetSocketAddress @@ -19,6 +20,8 @@ import akka.util.ByteString import akka.japi.Util.immutableSeq import akka.io.Inet.SocketOption +import scala.compat.java8.OptionConverters._ + object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { /** @@ -158,12 +161,12 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { * independently whether the server is still attempting to write. */ def outgoingConnection(remoteAddress: InetSocketAddress, - localAddress: Option[InetSocketAddress], + localAddress: Optional[InetSocketAddress], options: JIterable[SocketOption], halfClose: Boolean, connectTimeout: Duration, idleTimeout: Duration): Flow[ByteString, ByteString, Future[OutgoingConnection]] = - Flow.fromGraph(delegate.outgoingConnection(remoteAddress, localAddress, immutableSeq(options), halfClose, connectTimeout, idleTimeout) + Flow.fromGraph(delegate.outgoingConnection(remoteAddress, localAddress.asScala, immutableSeq(options), halfClose, connectTimeout, idleTimeout) .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec))) /**