前言
在使用 Cloudfalre
边缘计算开发产品时,有些耗时比较长的接口会导致超时,因为Cloudflare限制传入请求 100s 超时
而 SSE
配合心跳机制可以绕过此限制,但浏览器提供的 EventSource
是基于 GET
请求的,于是这里提供一个封装 POST
请求的实现
服务器发送事件 (SSE) 是一种单向的服务器推送技术,允许服务器通过持久的 HTTP 连接向客户端实时发送数据更新。
代码
PostSSE.ts
/**
* SSE 配置选项接口
*/
export interface SSEOptions {
/** 处理服务器发送的消息回调函数 */
onMessage: (event: string, data: any, raw: string) => void
/** 错误处理回调函数 */
onError?: (error: any) => void
/** 连接关闭回调函数 */
onClose?: () => void
/** 自定义请求头 */
headers?: HeadersInit
}
/**
* 发送 POST 请求并处理 SSE 响应的函数
* @param url - 请求地址
* @param params - POST 请求参数
* @param options - SSE 配置选项
*/
export async function postSSE(
url: string,
params: Record<string, any>,
options: SSEOptions
): Promise<void> {
let buffer = '' // 用于存储不完整的行
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...options.headers
},
body: JSON.stringify(params)
})
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`)
}
const reader = response.body?.getReader()
const decoder = new TextDecoder()
if (!reader) {
throw new Error('Failed to get response stream')
}
// 事件解析函数
const parseEvent = (data: string) => {
const lines = data.split('\n')
let currentEvent = 'message'
let currentData = ''
for (const line of lines) {
const trimmedLine = line.trim()
if (!trimmedLine) continue
if (trimmedLine.startsWith('event:')) {
currentEvent = trimmedLine.slice(6).trim()
} else if (trimmedLine.startsWith('data:')) {
currentData = trimmedLine.slice(5).trim()
try {
const obj = JSON.parse(currentData)
options.onMessage(currentEvent, obj, currentData)
} catch (e) {
console.warn('Failed to parse SSE data:', e)
options.onMessage(currentEvent, {}, currentData)
}
}
}
}
while (true) {
const { value, done } = await reader.read()
if (done) {
if (buffer.trim()) {
// 处理最后可能残留的数据
parseEvent(buffer)
}
options.onClose?.()
break
}
const chunk = decoder.decode(value, { stream: true }) // 使用 stream 选项以支持多字节字符
buffer += chunk
// 查找最后一个完整的换行符
const lastNewlineIndex = buffer.lastIndexOf('\n')
if (lastNewlineIndex !== -1) {
// 处理完整的行
const completeLines = buffer.substring(0, lastNewlineIndex)
// 保留不完整的部分到 buffer
buffer = buffer.substring(lastNewlineIndex + 1)
// 解析完整的行
parseEvent(completeLines)
}
}
} catch (error) {
console.error('SSE Error:', error)
options.onError?.(error)
options.onClose?.()
throw error
}
}
使用示例
postSSE(
`${toyTemplate.value?.apiPath}`,
{
param1: 114,
param2: '514'
},
{
onMessage: (event, data) => {
if (event === 'result') {
console.log(event, data)
} else {
console.log(event, data)
}
},
onError: (error) => {
console.error(error)
},
onClose: () => {
// do something
}
}
)
服务端参考
使用 Nuxt3
,基于 Nitro
服务器引擎,代码示例
export default defineEventHandler(async (event) => {
const sse = createEventStream(event)
const heartbeat = setInterval(() => {
sse.push({ event: 'heartbeat', data: JSON.stringify(Date.now()) })
}, 10000)
sse.onClosed(() => {
clearInterval(heartbeat)
})
new Promise<void>(async (resolve) => {
try {
// do something
await sse.push({ event: 'result', data: JSON.stringify(result) })
} finally {
clearInterval(heartbeat)
sse.close()
resolve()
}
})
return sse.send()
}