backgroundbackground

基于Post方法的SSE封装

SSE / Cloudflare / Timeout

技术

2025-03-12 03:00

前言

在使用 Cloudfalre 边缘计算开发产品时,有些耗时比较长的接口会导致超时,因为Cloudflare限制传入请求 100s 超时

SSE 配合心跳机制可以绕过此限制,但浏览器提供的 EventSource 是基于 GET 请求的,于是这里提供一个封装 POST 请求的实现

服务器发送事件 (SSE) 是一种单向的服务器推送技术,允许服务器通过持久的 HTTP 连接向客户端实时发送数据更新。

代码

PostSSE.ts

/**
 * SSE 配置选项接口
 */
export interface SSEOptions {
  /** 处理服务器发送的消息回调函数 */
  onMessage: (event: string, data: any, raw: string) => void
  /** 错误处理回调函数 */
  onError?: (error: any) => void
  /** 连接关闭回调函数 */
  onClose?: () => void
  /** 自定义请求头 */
  headers?: HeadersInit
}

/**
 * 发送 POST 请求并处理 SSE 响应的函数
 * @param url - 请求地址
 * @param params - POST 请求参数
 * @param options - SSE 配置选项
 */
export async function postSSE(
  url: string,
  params: Record<string, any>,
  options: SSEOptions
): Promise<void> {
  let buffer = '' // 用于存储不完整的行

  try {
    const response = await fetch(url, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        ...options.headers
      },
      body: JSON.stringify(params)
    })

    if (!response.ok) {
      throw new Error(`HTTP error! status: ${response.status}`)
    }

    const reader = response.body?.getReader()
    const decoder = new TextDecoder()

    if (!reader) {
      throw new Error('Failed to get response stream')
    }

    // 事件解析函数
    const parseEvent = (data: string) => {
      const lines = data.split('\n')
      let currentEvent = 'message'
      let currentData = ''

      for (const line of lines) {
        const trimmedLine = line.trim()
        if (!trimmedLine) continue

        if (trimmedLine.startsWith('event:')) {
          currentEvent = trimmedLine.slice(6).trim()
        } else if (trimmedLine.startsWith('data:')) {
          currentData = trimmedLine.slice(5).trim()
          try {
            const obj = JSON.parse(currentData)
            options.onMessage(currentEvent, obj, currentData)
          } catch (e) {
            console.warn('Failed to parse SSE data:', e)
            options.onMessage(currentEvent, {}, currentData)
          }
        }
      }
    }

    while (true) {
      const { value, done } = await reader.read()

      if (done) {
        if (buffer.trim()) {
          // 处理最后可能残留的数据
          parseEvent(buffer)
        }
        options.onClose?.()
        break
      }

      const chunk = decoder.decode(value, { stream: true }) // 使用 stream 选项以支持多字节字符
      buffer += chunk

      // 查找最后一个完整的换行符
      const lastNewlineIndex = buffer.lastIndexOf('\n')

      if (lastNewlineIndex !== -1) {
        // 处理完整的行
        const completeLines = buffer.substring(0, lastNewlineIndex)
        // 保留不完整的部分到 buffer
        buffer = buffer.substring(lastNewlineIndex + 1)
        // 解析完整的行
        parseEvent(completeLines)
      }
    }
  } catch (error) {
    console.error('SSE Error:', error)
    options.onError?.(error)
    options.onClose?.()
    throw error
  }
}

使用示例

postSSE(
  `${toyTemplate.value?.apiPath}`,
  {
    param1: 114,
    param2: '514'
  },
  {
    onMessage: (event, data) => {
      if (event === 'result') {
        console.log(event, data)
      } else {
        console.log(event, data)
      }
    },
    onError: (error) => {
      console.error(error)
    },
    onClose: () => {
      // do something
    }
  }
)

服务端参考

使用 Nuxt3,基于 Nitro 服务器引擎,代码示例

export default defineEventHandler(async (event) => {
  const sse = createEventStream(event)
  const heartbeat = setInterval(() => {
    sse.push({ event: 'heartbeat', data: JSON.stringify(Date.now()) })
  }, 10000)

  sse.onClosed(() => {
    clearInterval(heartbeat)
  })


  new Promise<void>(async (resolve) => {
    try {
      // do something

      await sse.push({ event: 'result', data: JSON.stringify(result) })

    } finally {
      clearInterval(heartbeat)
      sse.close()
      resolve()
    }
  })

  return sse.send()
}