§串流遷移指南
Play 2.5 對串流資料和回應主體的方式進行了多項重大變更。
-
Play 2.5 使用Akka 串流進行串流。Play 的先前版本使用迭代器進行串流,以及其他幾種類型的串流,例如
WebSocket
、Chunks
等。變更為使用 Akka 串流有兩個主要好處。首先,Java 使用者現在可以存取 Play 的完整功能集,例如撰寫主體剖析器和篩選器。其次,串流程式庫現在在 Play 中更加一致。
-
Play 2.5 使用
ByteString
來存放位元組封包。先前,Play 使用位元組陣列 (byte[]
/ArrayByte
) 來存放位元組。ByteString
類別與 Java 的String
類別一樣是不可變的,因此使用上更安全且容易。與String
類別一樣,在建構時會複製資料,因此效能會稍微下降,但這點會被其便宜的串接和子字串操作所平衡。 -
Play 2.5 有新的
HttpEntity
類型來處理回應主體。先前,回應主體是位元組的純粹串流。現在,HTTP 主體是HttpEntity
的一種類型:Strict
、Streamed
或Chunked
。透過告訴 Play 要使用哪種類型的實體,應用程式可以更進一步控制 Play 傳送 HTTP 回應的方式。這也讓 Play 更容易最佳化傳送主體的方式。
§變更摘要
Play API 的下列部分已更新
- 結果 (
Result
主體、chunked
/feed
/stream
方法) - 動作 (
EssentialAction
) - 主體剖析 (
BodyParser
) - WebSockets (
WebSocket
) - 伺服器傳送事件 (
EventSource
)
下列類型已變更
目的 | 舊類型 | 新類型 |
---|---|---|
存放位元組 | byte[] /Array[Byte] |
ByteString |
產生串流 | Enumerator 、WebSocket.Out 、Chunks.Out 、EventSource.Out |
Source |
將串流轉換為另一個串流 | Enumeratee |
Flow |
將串流轉換為單一值 | Iteratee |
Accumulator |
使用串流 | Iteratee |
Sink |
§如何遷移 (依 API)
下列部分概述如何遷移使用 API 不同部分的程式碼。
§將分塊結果移轉 (chunked
、Results.Chunked
)
在 Play 2.4 中,你會使用 Enumerator
在 Scala 中建立分塊結果,並使用 Results.Chunked
物件在 Java 中建立分塊結果。在 Play 2.5 中,API 的這些部分仍然可用,但已標示為不建議使用。
如果你選擇移轉到新的 API,你可以呼叫 StatusHeader
物件上的 chunked
方法來建立分塊結果,並提供 Akka Streams Source
物件作為分塊串流。
進階使用者可能偏好明確建立 HttpEntity.Chunked
物件,並將其傳遞到 Result
物件建構函式中。
- 要了解如何將 Enumerator 移轉到 Source,請參閱 將 Enumerator 移轉到 Source。
§將串流結果移轉 (feed
、stream
) (僅限 Scala)
在 Play 2.4 中,Scala 使用者可以透過將 Enumerator
傳遞到 feed
或 stream
方法來串流結果。(Java 使用者沒有串流結果的方法,除了分塊結果之外。) feed
方法會串流 Enumerator
的資料,然後關閉連線。stream
方法會串流或分塊結果,並可能關閉連線,這取決於連線的 HTTP 版本,以及是否存在 Content-Length
標頭。
在 Play 2.5 中,stream
方法已移除,feed
方法已標示為不建議使用。你可以選擇是否將 feed
方法移轉到新的 API。如果你使用 stream
方法,你的程式碼將需要變更。
新 API 是直接建立一個 Result
物件,並選擇一個 HttpEntity
來表示其主體。對於串流結果,您可以使用 HttpEntity.Streamed
類別。Streamed
類別將 Source
作為主體,並將 Content-Length
標頭值設為選用。Source
的內容將傳送給用戶端。如果實體具有 Content-Length
標頭,則連線將保持開啟,否則會關閉以表示串流結束。
- 要了解如何將 Enumerator 移轉到 Source,請參閱 將 Enumerator 移轉到 Source。
§移轉 WebSocket (WebSocket
)
在 Play 2.4 中,WebSocket 的雙向串流在 Java 中以一對 WebSocket.In
和 WebSocket.Out
物件表示,在 Scala 中則以一對 Enumerator
和 Iteratee
物件表示。在 Play 2.5 中,Java 和 Scala 現在都使用 Akka Streams Flow
來表示雙向串流。
若要在 Play 2.5 中移轉您的 WebSocket 程式碼,您有兩個選項。
第一個選項是使用舊的 Play API,該 API 已棄用並重新命名為 LegacyWebSocket
。這是最簡單的選項。您只需要將參考 WebSocket
的程式碼變更為參考 LegacyWebSocket
即可。LegacyWebSocket
類別提供從 Play 2.4 到 Play 2.5 的簡易移轉路徑。
第二個選項是變更為新的 Play API。為此,您需要將 WebSocket 程式碼變更為使用 Akka Streams Flow
物件。
§移轉 Scala WebSocket
Play 2.4 Scala WebSocket API 需要一個 Enumerator
/Iteratee
對,該對會產生 In
物件並使用 Out
物件。一對 FrameFormatter
負責將資料從 In
和 Out
物件中取出。
case class WebSocket[In, Out](f: RequestHeader => Future[Either[Result, (Enumerator[In], Iteratee[Out, Unit]) => Unit]])(implicit val inFormatter: WebSocket.FrameFormatter[In], val outFormatter: WebSocket.FrameFormatter[Out]) extends Handler {
trait FrameFormatter[A] {
def transform[B](fba: B => A, fab: A => B): FrameFormatter[B]
}
Play 2.5 Scala WebSocket API 是圍繞著 Message
的 Flow
所建構的。Message
代表 WebSocket 框架。MessageFlowTransformer
類型處理將高階物件(例如 JSON、XML 和位元組)轉換成 Message
框架。提供一組內建的隱式 MessageFlowTransformer
,您也可以撰寫自己的 MessageFlowTransformer
。
trait WebSocket extends Handler {
def apply(request: RequestHeader): Future[Either[Result, Flow[Message, Message, _]]]
}
sealed trait Message
case class TextMessage(data: String) extends Message
case class BinaryMessage(data: ByteString) extends Message
case class CloseMessage(statusCode: Option[Int] = Some(CloseCodes.Regular), reason: String = "") extends Message
case class PingMessage(data: ByteString) extends Message
case class PongMessage(data: ByteString) extends Message
trait MessageFlowTransformer[+In, -Out] { self =>
def transform(flow: Flow[In, Out, _]): Flow[Message, Message, _]
}
若要進行移轉,您需要將雙向 Enumerator
/Iteratee
串流轉換成 Flow
。您可能還需要使用 MessageFlowTransformer
將您的 In
/Out
物件轉換成 Message
,不過對於 JSON 等常見類型來說,這是沒有必要的,因為提供了一些內建的隱式轉換。
- 要了解如何將 Enumerator 移轉到 Source,請參閱 將 Enumerator 移轉到 Source。
- 若要了解如何將 Iteratee 移轉至 Sink,請參閱 將 Iteratees 移轉至 Sinks 和 Accumulators。
§移轉 Java WebSockets
Play 2.4 Java WebSocket API 使用 WebSocket.In
物件來處理傳入訊息,並使用 WebSocket.Out
物件來傳送傳出訊息。該 API 支援傳輸文字、位元組或 JSON 框架的 WebSockets。
return WebSocket.whenReady((in, out) -> {
out.write("Hello!");
out.close();
});
新的 Play 2.5 API 強大許多。您現在可以建立 WebSocket
,並傳回任意 WebSocket Message
框架。雙向 Message
串流表示為 Flow
。
public abstract class WebSocket {
public abstract CompletionStage<F.Either<Result, Flow<Message, Message, ?>>> apply(Http.RequestHeader request);
}
如果您想將 WebSocket Message
框架轉換成您自己的類型,可以使用 MappedWebSocketAcceptor
類別。為您提供了幾個這樣的類別:Text
、Binary
和 Json
。例如
return WebSocket.Text.accept(requestHeader -> {
// return a Flow<String, String, ?>
})
您也可以透過定義如何轉換傳入傳出訊息來建立自己的 MappedWebSocketAcceptor
。
§移轉 Comet
要在 Play 中使用 [Comet](https://en.wikipedia.org/wiki/Comet_(programming)),您需要產生一個分塊的 HTTP 回應,其中包含特別格式化的分塊。Play 有 Comet
類別,用於協助在伺服器上產生可傳送至瀏覽器的事件。在 Play 2.4.x 中,必須建立新的 Comet 實例,並為 Java 使用回呼,而 Scala 則使用枚舉。在 Play 2.5 中,新增了基於 Akka Streams 的新 API。
§移轉 Java Comet
為您的物件建立 Akka Streams 來源,並將它們轉換成 String
或 JsonNode
物件。從那裡,您可以使用 play.libs.Comet.string
或 play.libs.Comet.json
將您的物件轉換成適合 Results.ok().chunked()
的格式。在 JavaComet 中有其他文件。
由於 Java Comet 輔助程式是圍繞回呼建立的,因此將基於回呼的類別直接轉換成 org.reactivestreams.Publisher
並使用 Source.fromPublisher
建立來源可能會比較容易。
§移轉 Scala Comet
為您的物件建立 Akka Streams 來源,並將它們轉換成 String
或 JsValue
物件。從那裡,您可以使用 play.api.libs.Comet.string
或 play.api.libs.Comet.json
將您的物件轉換成適合 Ok.chunked()
的格式。在 ScalaComet 中有其他文件。
§移轉伺服器傳送事件 (EventSource
)
要使用 Server-Sent Events,您需要產生一個具有特別格式區塊的區塊 HTTP 回應。Play 有個 EventSource
介面,用於協助在伺服器上產生事件,這些事件可傳送至瀏覽器。在 Play 2.4 中,Java 和 Scala 各有非常不同的 API,但在 Play 2.5 中,它們已變更,因此都基於 Akka Streams。
§遷移 Java Server-Sent 事件
在 Play 2.4 的 Java API 中,您可以使用 EventSource
產生區塊串流,這是一個延伸 Chunks<String>
的類別。您可以從字串或 JSON 物件建構 Event
物件,然後透過呼叫 EventSource
的 send
方法將它們傳送至回應中。
EventSource eventSource = new EventSource() {
@Override
public void onConnected() {
send(Event.event("hello"));
send(Event.event("world"));
...
}
};
return ok(eventSource);
在 Play 2.5 中,您通常會為應用程式物件建立一個 Akka Streams Source
,使用 Source.map
將物件轉換成 Event
,最後使用 EventSource.chunked
將 Event
轉換成區塊值。以下範例顯示如何將其用於傳送字串串流。
Source<String, ?> stringSource = ...;
Source<EventSource.Event, ?> eventSource = myStrings.map(Event::event);
return ok().chunked(EventSource.chunked(eventSource)).as("text/event-stream");
- 要將
EventSource.onConnected
、EventSource.send
等遷移至Source
,請在類別上實作org.reactivestreams.Publisher
,並使用Source.fromPublisher
從回呼建立來源。
如果您仍想使用與 Play 2.4 相同的 API,您可以使用 LegacyEventSource
類別。此類別與 Play 2.4 API 相同,但已重新命名並標示為不建議使用。如果您想使用新的 API,但保留與舊命令式 API 相同的感覺,您可以嘗試 GraphStage
。
§遷移 Scala Server-Sent 事件
要使用 Play 2.4 的 Scala API,您可以提供一個應用程式物件的 Enumerator
,然後使用 EventSource
Enumeratee
將它們轉換成 Event
。最後,您將 Event
傳遞給 chunked
方法,將它們轉換成區塊。
val someDataStream: Enumerator[SomeData] = ???
Ok.chunked(someDataStream &> EventSource())
在 Play 2.5 中,使用 EventSource
搭配 Enumerator
和 Enumeratee
已被棄用。您仍然可以使用 Enumerator
和 Enumeratee
,但建議您將程式碼轉換為使用 Source
和 Flow
。Source
會產生物件串流,而 EventSource.flow
的 Flow
會將它們轉換成 Event
。例如,上述程式碼會改寫成
val someDataStream: Source[SomeData, Unit] = ???
Ok.chunked(someDataStream via EventSource.flow).as("text/event-stream")
- 要了解如何將 Enumerator 移轉到 Source,請參閱 將 Enumerator 移轉到 Source。
§遷移自訂動作 (EssentialAction
)(僅限 Scala)
大多數 Scala 使用者會為其動作使用 Action
類別。Action
類別是一種 EssentialAction
,它會在執行其邏輯並傳送結果之前,總是完整地解析其主體。有些使用者可能已撰寫自己的自訂 EssentialAction
,以便執行諸如遞增處理請求主體等動作。
如果您在 Play 2.4 應用程式中只使用一般的 Action
,則它們不需要任何遷移。但是,如果您已撰寫 EssentialAction
,則需要將它遷移到 Play 2.5 中的新 API。EssentialAction
的行為仍然相同,但簽章已從 Play 2.4
trait EssentialAction extends (RequestHeader => Iteratee[Array[Byte], Result])
變更為 Play 2.5 中的新簽章
trait EssentialAction extends (RequestHeader => Accumulator[ByteString, Result])
要進行遷移,您需要將 Iteratee
取代為 Accumulator
,並將 Array[Byte]
取代為 ByteString
。
- 如需瞭解如何將 Iteratee 移轉至 Accumulator,請參閱 將 Iteratees 移轉至 Sinks 和 Accumulators。
- 如需瞭解如何將
Array[Byte]
移轉至ByteString
,請參閱 將位元組陣列移轉至 ByteStrings。
§移轉自訂主體剖析器 (BodyParser
)(僅限 Scala)
如果您是 Scala 使用者,且在 Play 2.4 應用程式中擁有自訂 BodyParser
,則需要將其移轉至新的 Play 2.5 API。BodyParser
特質簽章在 Play 2.4 中如下所示
trait BodyParser[+A] extends (RequestHeader => Iteratee[Array[Byte], Either[Result, A]])
在 Play 2.5 中,它已變更為使用 Akka Streams 類型
trait BodyParser[+A] extends (RequestHeader => Accumulator[ByteString, Either[Result, A]])
要進行遷移,您需要將 Iteratee
取代為 Accumulator
,並將 Array[Byte]
取代為 ByteString
。
- 如需瞭解如何將 Iteratee 移轉至 Accumulator,請參閱 將 Iteratees 移轉至 Sinks 和 Accumulators。
- 如需瞭解如何將
Array[Byte]
移轉至ByteString
,請參閱 將位元組陣列移轉至 ByteStrings。
§移轉 Result
主體(僅限 Scala)
Result
物件已變更其表示結果主體和連線關閉標記的方式。它不再採用 body: Enumerator[Array[Byte]], connection: Connection
,而是採用 body: HttpEntity
。HttpEntity
類型包含有關主體的資訊,以及有關如何關閉連線的隱含資訊。
您可以透過使用包含 Source
以及選用 Content-Length
和 Content-Type
標頭的 Streamed
實體,來移轉現有的 Enumerator
。
val bodyPublisher: Publisher[ByteString] = Streams.enumeratorToPublisher(bodyEnumerator)
val bodySource: Source[ByteString, _] = Source.fromPublisher(bodyPublisher)
val entity: HttpEntity = HttpEntity.Streamed(bodySource)
new Result(headers, entity)
請參閱有關移轉 Enumerator
和移轉至 ByteString
的區段,以取得有關移轉至這些類型的更多資訊。
- 如需瞭解如何將 Iteratee 移轉至 Accumulator,請參閱 將 Iteratees 移轉至 Sinks 和 Accumulators。
- 如需瞭解如何將
Array[Byte]
移轉至ByteString
,請參閱 將位元組陣列移轉至 ByteStrings。
您可能會發現您根本不需要 Result
主體的串流。如果是這樣,您可能想對主體使用 Strict
實體。
new Result(headers, HttpEntity.Strict(bytes))
§如何移轉(依類型)
本區段說明如何將您的位元組陣列和串流移轉至新的 Akka Streams API。
Akka Streams 是 Akka 專案的一部分。Play 使用 Akka Streams 提供串流功能:傳送和接收位元組序列和其他物件。Akka 專案有許多關於 Akka Streams 的良好文件。在 Play 中開始使用 Akka Streams 之前,值得查看 Akka Streams 文件,以了解有哪些資訊可用。
可以在主要 Akka API 文件中的 akka.stream
套件中找到 API 文件
在您第一次開始使用 Akka Streams 時,Akka 文件中的基礎和使用流程部分值得一看。它將向您介紹 Akka Streams API 中最重要的部分。
您不必一次轉換整個應用程式。應用程式的部分可以繼續使用迭代器,而其他部分則使用 Akka Streams。Akka Streams 提供 反應式串流 實作,而 Play 的迭代器函式庫也提供反應式串流實作,因此,Play 的迭代器可以輕鬆封裝在 Akka Streams 中,反之亦然。
§將位元組陣列 (byte[]
/Array[Byte]
) 移轉到 ByteString
請參閱 Java 和 Scala API 文件以取得 ByteString
。
範例
Scala
// Get the empty ByteString (this instance is cached)
ByteString.empty
// Create a ByteString from a String
ByteString("hello")
ByteString.fromString("hello")
// Create a ByteString from an Array[Byte]
ByteString(arr)
ByteString.fromArray(arr)
Java
// Get the empty ByteString (this instance is cached)
ByteString.empty();
// Create a ByteString from a String
ByteString.fromString("hello");
// Create a ByteString from an Array[Byte]
ByteString.fromArray(arr);
§將 *.Out
移轉到 Source
Play 現在使用 Source
來產生事件,而不是舊有的 WebSocket.Out
、Chunks.Out
和 EventSource.Out
類別。這些類別雖然使用簡單,但缺乏彈性,而且沒有正確實作 背壓。
你可以用任何產生串流的 Source
取代你的 *.Out
類別。有許多方法可以建立 Source
(Java/Scala.
如果你想用一個簡單的物件取代你的 *.Out
,你可以寫入訊息並關閉它,而不必擔心背壓,那麼你可以使用 Source.actorRef
方法
Java
Source<ByteString, ?> source = Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
sourceActor.tell(ByteString.fromString("hello"), null);
sourceActor.tell(ByteString.fromString("world"), null);
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
return null;
});
Scala
val source = Source.actorRef[ByteString](256, OverflowStrategy.dropNew).mapMaterializedValue { sourceActor =>
sourceActor ! ByteString("hello")
sourceActor ! ByteString("world")
sourceActor ! Status.Success(()) // close the source
}
§將 Enumerator
遷移到 Source
Play 在許多地方使用 Enumerator
來產生值串流。
步驟 1:使用過渡 API(如果可用)
如果你使用 Results.chunked
或 Results.feed
,你可以繼續使用現有的方法。這些方法已經標示為已過時,所以你可能還是想變更你的程式碼。
步驟 2:使用轉接器將 Enumerator
轉換為 Source
你可以先使用 Streams.enumeratorToPublisher
將現有的 Enumerator
轉換為 Reactive Streams Publisher
,然後再使用 Source.fromPublisher
將 Publisher 轉換為 Source,例如
val enumerator: Enumerator[T] = ...
val source = Source.fromPublisher(Streams.enumeratorToPublisher(enumerator))
步驟 3:(選用)改寫為 Source
以下是 Enumerator 工廠方法的一些常見對應
Iteratee | Akka Streams | 備註 |
---|---|---|
Enumerator.apply(a) |
Source.single(a) |
|
Enumerator.apply(a, b) |
Source.apply(List(a, b))) |
|
Enumerator.enumerate(seq) |
Source.apply(seq) |
seq 必須是不可變的 |
Enumerator.repeat |
Source.repeat |
在 Akka Streams 中,重複的元素並非每次都經過評估 |
Enumerator.empty |
Source.empty |
|
Enumerator.unfold |
Source.unfold |
|
Enumerator.generateM |
Source.unfoldAsync |
|
Enumerator.fromStream |
StreamConverters.fromInputStream |
|
Enumerator.fromFile |
StreamConverters.fromInputStream |
您必須為 java.io.File 建立一個 InputStream |
§將 Iteratee
遷移至 Sink
和 Accumulator
步驟 1:使用轉接器進行轉換
您可以使用 Streams.iterateeToSubscriber
將現有的 Iteratee
轉換為 Reactive Streams Subscriber
,然後再使用 Sink.fromSubscriber
將訂閱者轉換為接收器,例如
val iteratee: Iteratee[T, U] = ...
val (subscriber, resultIteratee) = Streams.iterateeToSubscriber(iteratee)
val sink = Sink.fromSubscriber(subscriber)
如果您需要傳回 Accumulator
,您可以改用 Streams.iterateeToAccumulator
方法。
步驟 2:(選用)改寫為 Sink
以下是常見的 Iteratee 工廠方法對應的清單
Iteratee | Akka Streams | 備註 |
---|---|---|
Iteratee.fold |
Sink.fold |
|
Iteratee.head |
Sink.headOption |
|
Iteratee.getChunks |
Sink.seq |
|
Iteratee.foreach |
Sink.foreach |
|
Iteratee.ignore |
Sink.ignore |
|
Done |
Sink.cancelled |
可以對已具體化的值進行對應,以產生結果,或者如果使用累加器,則可以使用 Accumulator.done 取代。 |
§將 Enumeratee
遷移至 Processor
步驟 1:使用轉接器進行轉換
你可以使用 Streams.enumerateeToProcessor
將現有的 Enumeratee
轉換為 reactive streams Processor
,然後使用 Flow.fromProcessor
將處理器轉換為流程,例如
val enumeratee: Enumeratee[A, B] = ...
val flow = Flow.fromProcessor(() => Streams.enumerateeToProcessor(enumeratee))
步驟 2:(選用)改寫為 Flow
以下列出一些列舉工廠方法的常見對應
Iteratee | Akka Streams | 備註 |
---|---|---|
Enumeratee.map |
Flow.map |
|
Enumeratee.mapM |
Flow.mapAsync |
你必須在 Akka Streams 中指定並行度,即一次並行對應的元素數量。 |
Enumeratee.mapConcat |
Flow.mapConcat |
|
Enumeratee.filter |
Flow.filter |
|
Enumeratee.take |
Flow.take |
|
Enumeratee.takeWhile |
Flow.takeWhile |
|
Enumeratee.drop |
Flow.drop |
|
Enumeratee.dropWhile |
Flow.dropWhile |
|
Enumeratee.collect |
Flow.collect |
下一步:Java 遷移指南
在這個文件中發現錯誤?此頁面的原始程式碼可以在 這裡 找到。閱讀 文件指南 後,請隨時提交 pull request。有問題或建議要分享?前往 我們的社群論壇 與社群展開對話。