=str #25045 adding Java/Scala interop to SourceQueue and SinkQueue

adding MiMa filters

preserving binary compat
This commit is contained in:
Stefano Bonetti 2018-05-14 12:11:20 +01:00 committed by Patrik Nordwall
parent 4399e499c4
commit 1ad174cf8c
11 changed files with 205 additions and 95 deletions

View file

@ -4,32 +4,36 @@
package akka.stream.impl
import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NonFatal
import akka.NotUsed
import akka.actor.{ ActorRef, Props }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.actor.ActorRef
import akka.actor.Props
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.Logging
import akka.stream.Attributes.InputBuffer
import akka.stream._
import akka.stream.impl.QueueSink.{ Output, Pull }
import akka.stream.impl.QueueSink.Output
import akka.stream.impl.QueueSink.Pull
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.scaladsl.{ Sink, SinkQueueWithCancel, Source }
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.SinkQueueWithCancel
import akka.stream.scaladsl.Source
import akka.stream.stage._
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.{ immutable, mutable }
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import scala.collection.immutable
import akka.util.ccompat._
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
/**
* INTERNAL API
@ -441,17 +445,6 @@ import akka.util.ccompat._
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T])
extends akka.stream.javadsl.SinkQueueWithCancel[T] {
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext => same }
def pull(): CompletionStage[Optional[T]] = delegate.pull().map(_.asJava)(same).toJava
def cancel(): Unit = delegate.cancel()
}
/**
* INTERNAL API
*