diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntities.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntities.java index 48313c1742..e3d9ce49a4 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntities.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntities.java @@ -7,7 +7,7 @@ package akka.http.model.japi; import java.io.File; import akka.util.ByteString; -import akka.stream.scaladsl.Source; +import akka.stream.javadsl.Source; import akka.http.model.HttpEntity$; /** Constructors for HttpEntity instances */ @@ -43,20 +43,20 @@ public final class HttpEntities { } public static HttpEntityDefault create(ContentType contentType, long contentLength, Source data) { - return new akka.http.model.HttpEntity.Default((akka.http.model.ContentType) contentType, contentLength, data); + return new akka.http.model.HttpEntity.Default((akka.http.model.ContentType) contentType, contentLength, data.asScala()); } public static HttpEntityCloseDelimited createCloseDelimited(ContentType contentType, Source data) { - return new akka.http.model.HttpEntity.CloseDelimited((akka.http.model.ContentType) contentType, data); + return new akka.http.model.HttpEntity.CloseDelimited((akka.http.model.ContentType) contentType, data.asScala()); } public static HttpEntityIndefiniteLength createIndefiniteLength(ContentType contentType, Source data) { - return new akka.http.model.HttpEntity.IndefiniteLength((akka.http.model.ContentType) contentType, data); + return new akka.http.model.HttpEntity.IndefiniteLength((akka.http.model.ContentType) contentType, data.asScala()); } public static HttpEntityChunked createChunked(ContentType contentType, Source data) { return akka.http.model.HttpEntity.Chunked$.MODULE$.fromData( (akka.http.model.ContentType) contentType, - data); + data.asScala()); } } diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntity.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntity.java index 47cbed0d4c..75361129b1 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntity.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntity.java @@ -5,7 +5,7 @@ package akka.http.model.japi; import akka.http.model.HttpEntity$; -import akka.stream.scaladsl.Source; +import akka.stream.javadsl.Source; import akka.util.ByteString; /** diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityChunked.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityChunked.java index 76bc026b95..a506353e86 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityChunked.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityChunked.java @@ -4,7 +4,7 @@ package akka.http.model.japi; -import akka.stream.scaladsl.Source; +import akka.stream.javadsl.Source; /** * Represents an entity transferred using `Transfer-Encoding: chunked`. It consists of a diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityCloseDelimited.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityCloseDelimited.java index f163347bd2..40913ad811 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityCloseDelimited.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityCloseDelimited.java @@ -4,14 +4,9 @@ package akka.http.model.japi; -import akka.util.ByteString; -import akka.stream.scaladsl.Source; - /** * Represents an entity without a predetermined content-length. Its length is implicitly * determined by closing the underlying connection. Therefore, this entity type is only * available for Http responses. */ -public abstract class HttpEntityCloseDelimited implements ResponseEntity { - public abstract Source data(); -} +public abstract class HttpEntityCloseDelimited implements ResponseEntity {} diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityDefault.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityDefault.java index 006db272f6..aa18204371 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityDefault.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityDefault.java @@ -4,13 +4,9 @@ package akka.http.model.japi; -import akka.util.ByteString; -import akka.stream.scaladsl.Source; - /** * The default entity type which has a predetermined length and a stream of data bytes. */ public abstract class HttpEntityDefault implements BodyPartEntity, RequestEntity, ResponseEntity { public abstract long contentLength(); - public abstract Source data(); } diff --git a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityIndefiniteLength.java b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityIndefiniteLength.java index c889de5c14..7b37b7bc5c 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityIndefiniteLength.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/HttpEntityIndefiniteLength.java @@ -4,12 +4,7 @@ package akka.http.model.japi; -import akka.util.ByteString; -import akka.stream.scaladsl.Source; - /** * Represents an entity without a predetermined content-length to use in a BodyParts. */ -public abstract class HttpEntityIndefiniteLength implements BodyPartEntity { - public abstract Source data(); -} \ No newline at end of file +public abstract class HttpEntityIndefiniteLength implements BodyPartEntity {} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index 51373de1d4..efd6e06cd4 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -14,6 +14,7 @@ import akka.util.ByteString import akka.stream.OperationAttributes._ import akka.stream.FlowMaterializer import akka.stream.scaladsl._ +import akka.stream import akka.stream.TimerTransformer import akka.http.util._ import japi.JavaMapping.Implicits._ @@ -80,7 +81,7 @@ sealed trait HttpEntity extends japi.HttpEntity { def withContentType(contentType: ContentType): HttpEntity /** Java API */ - def getDataBytes: Source[ByteString, _] = dataBytes + def getDataBytes: stream.javadsl.Source[ByteString, _] = stream.javadsl.Source.adapt(dataBytes) // default implementations, should be overridden def isCloseDelimited: Boolean = false @@ -277,7 +278,7 @@ object HttpEntity { override def productPrefix = "HttpEntity.Chunked" /** Java API */ - def getChunks: Source[japi.ChunkStreamPart, Any] = chunks.asInstanceOf[Source[japi.ChunkStreamPart, Any]] + def getChunks: stream.javadsl.Source[japi.ChunkStreamPart, Any] = stream.javadsl.Source.adapt(chunks) } object Chunked { /** diff --git a/akka-http-core/src/test/scala/akka/http/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/ws/MessageSpec.scala index bbe635db35..c47ec4d5d9 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/ws/MessageSpec.scala @@ -433,7 +433,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) messageIn.expectComplete() - netIn.expectNoMsg(1.second) // especially the cancellation not yet + netIn.expectNoMsg(100.millis) // especially the cancellation not yet expectNoNetworkData() messageOutSub.sendComplete() @@ -478,6 +478,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { netOut.expectComplete() } "after receiving regular close frame when fragmented message is still open" in pendingUntilFixed { + pending new ServerTestSetup { netOutSub.request(10) messageInSub.request(10) @@ -534,7 +535,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { messageOutSub.sendComplete() expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) - netOut.expectNoMsg(1.second) // wait for peer to close regularly + netOut.expectNoMsg(100.millis) // wait for peer to close regularly pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) messageIn.expectComplete() @@ -562,7 +563,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { dataSub.sendComplete() expectFrameOnNetwork(Opcode.Continuation, ByteString.empty, fin = true) expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) - netOut.expectNoMsg(1.second) // wait for peer to close regularly + netOut.expectNoMsg(100.millis) // wait for peer to close regularly val mask = Random.nextInt() pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true)) @@ -596,6 +597,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { "reason is no valid utf8 data" in pending } "timeout if user handler closes and peer doesn't send a close frame" in new ServerTestSetup { + override protected def closeTimeout: FiniteDuration = 100.millis + netInSub.expectRequest() messageOutSub.sendComplete() expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular) @@ -604,6 +607,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { netInSub.expectCancellation() } "timeout after we close after error and peer doesn't send a close frame" in new ServerTestSetup { + override protected def closeTimeout: FiniteDuration = 100.millis + netInSub.expectRequest() pushInput(frameHeader(Opcode.Binary, 0, fin = true, rsv1 = true))