Skip to content

流插件

WARNING

此插件处于维护模式,不会接收新特性。我们建议使用 生成器流替代

此插件添加了对流响应或向客户端发送服务器发送事件的支持。

安装命令:

bash
bun add @elysiajs/stream

然后使用它:

typescript
import { Elysia } from 'elysia'
import { Stream } from '@elysiajs/stream'

new Elysia()
    .get('/', () => new Stream(async (stream) => {
        stream.send('hello')

        await stream.wait(1000)
        stream.send('world')

        stream.close()
    }))
    .listen(3000)

默认情况下,Stream 将返回 Response,其 content-typetext/event-stream; charset=utf8

构造函数

以下是 Stream 接受的构造函数参数:

  1. Stream:
    • 自动:从提供的值自动流响应
      • Iterable
      • AsyncIterable
      • ReadableStream
      • Response
    • 手动:(stream: this) => unknownundefined 的回调
  2. 选项: StreamOptions
    • event:标识所描述事件类型的字符串
    • retry:重新连接的时间,单位为毫秒

方法

以下是 Stream 提供的方法:

send

将数据入队以发送回客户端

close

关闭流

wait

返回一个承诺,在提供的毫秒值后解析

value

ReadableStream 的内部值

模式

以下是使用该插件的常见模式。

OpenAI

当参数为 IterableAsyncIterable 时,自动模式会被触发,自动将响应流发送回客户端。

以下是将 ChatGPT 集成到 Elysia 的示例。

ts
new Elysia()
    .get(
        '/ai',
        ({ query: { prompt } }) =>
            new Stream(
                openai.chat.completions.create({
                    model: 'gpt-3.5-turbo',
                    stream: true,
                    messages: [{
                        role: 'user',
                        content: prompt
                    }]
                })
            )
    )

默认情况下,openai 的 chatGPT 完成返回 AsyncIterable,因此您应该能够将 OpenAI 包装在 Stream 中。

抓取流

您可以传递来自返回流的端点的抓取以代理流。

这对使用 AI 文本生成的端点非常有用,因为您可以直接代理它,例如 Cloudflare AI

ts
const model = '@cf/meta/llama-2-7b-chat-int8'
const endpoint = `https://api.cloudflare.com/client/v4/accounts/${process.env.ACCOUNT_ID}/ai/run/${model}`

new Elysia()
    .get('/ai', ({ query: { prompt } }) =>
        fetch(endpoint, {
            method: 'POST',
            headers: {
                authorization: `Bearer ${API_TOKEN}`,
                'content-type': 'application/json'
            },
            body: JSON.stringify({
                messages: [
                    { role: 'system', content: '你是一个友好的助手' },
                    { role: 'user', content: prompt }
                ]
            })
        })
    )

服务器发送事件

当参数为 callbackundefined 时,手动模式被触发,允许您控制流。

基于回调

以下是使用构造函数回调创建服务器发送事件端点的示例。

ts
new Elysia()
    .get('/source', () =>
        new Stream((stream) => {
            const interval = setInterval(() => {
                stream.send('hello world')
            }, 500)

            setTimeout(() => {
                clearInterval(interval)
                stream.close()
            }, 3000)
        })
    )

基于值

以下是使用基于值的方法创建服务器发送事件端点的示例。

ts
new Elysia()
    .get('/source', () => {
        const stream = new Stream()

        const interval = setInterval(() => {
            stream.send('hello world')
        }, 500)

        setTimeout(() => {
            clearInterval(interval)
            stream.close()
        }, 3000)

        return stream
    })

基于回调和基于值的流在同样的工作方式,但语法有所不同以符合您的偏好。