Skip to content

WebSocket 流式对话

介绍

WebSocket 流式对话提供了实时、双向的 AI 对话能力,适合构建聊天应用、在线客服等场景。相比传统的 HTTP 请求,WebSocket 可以实现更低延迟的流式响应和更好的用户体验。

核心特性:

  • 实时通信 - 基于 WebSocket 的双向实时通信
  • 流式输出 - AI 响应逐字输出,即时可见
  • 会话保持 - 自动管理会话状态和历史记录
  • 连接管理 - 自动处理连接、断线重连等

快速开始

1. 创建 WebSocket 端点

java
@Component
@ServerEndpoint(value = "/ws/chat/{sessionId}")
@Slf4j
public class AiChatWebSocket {

    private static ChatService chatService;
    private static ChatMemoryManager memoryManager;

    @Autowired
    public void setChatService(ChatService chatService) {
        AiChatWebSocket.chatService = chatService;
    }

    @Autowired
    public void setMemoryManager(ChatMemoryManager memoryManager) {
        AiChatWebSocket.memoryManager = memoryManager;
    }

    private Session session;
    private String sessionId;

    /**
     * 连接建立
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("sessionId") String sessionId) {
        this.session = session;
        this.sessionId = sessionId;
        log.info("WebSocket 连接建立: {}", sessionId);

        // 发送欢迎消息
        sendMessage("连接成功,开始对话吧!");
    }

    /**
     * 接收消息
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("收到消息: {}", message);

        try {
            // 解析请求
            AiChatRequest request = parseRequest(message);

            // 构建对话请求
            ChatRequest chatRequest = new ChatRequest()
                .setProvider(request.getProvider())
                .setSessionId(sessionId)
                .setMessage(request.getMessage())
                .setMode(ChatMode.CONTINUOUS)
                .setStream(true);

            // 流式对话
            chatService.streamChat(chatRequest, response -> {
                if (!response.getFinished()) {
                    // 发送内容片段
                    sendMessage(response.getContent());
                } else {
                    // 发送完成标记
                    sendComplete(response);
                }
            });

        } catch (Exception e) {
            log.error("处理消息失败", e);
            sendError(e.getMessage());
        }
    }

    /**
     * 连接关闭
     */
    @OnClose
    public void onClose() {
        log.info("WebSocket 连接关闭: {}", sessionId);
    }

    /**
     * 错误处理
     */
    @OnError
    public void onError(Throwable error) {
        log.error("WebSocket 错误: {}", sessionId, error);
    }

    /**
     * 发送消息
     */
    private void sendMessage(String content) {
        try {
            AiChatResponse response = new AiChatResponse()
                .setType("content")
                .setContent(content);
            session.getBasicRemote().sendText(toJson(response));
        } catch (IOException e) {
            log.error("发送消息失败", e);
        }
    }

    /**
     * 发送完成标记
     */
    private void sendComplete(ChatResponse response) {
        try {
            AiChatResponse wsResponse = new AiChatResponse()
                .setType("complete")
                .setTokenUsage(response.getTokenUsage())
                .setResponseTime(response.getResponseTime());
            session.getBasicRemote().sendText(toJson(wsResponse));
        } catch (IOException e) {
            log.error("发送完成标记失败", e);
        }
    }

    /**
     * 发送错误
     */
    private void sendError(String error) {
        try {
            AiChatResponse response = new AiChatResponse()
                .setType("error")
                .setError(error);
            session.getBasicRemote().sendText(toJson(response));
        } catch (IOException e) {
            log.error("发送错误消息失败", e);
        }
    }

    private AiChatRequest parseRequest(String message) {
        return JSON.parseObject(message, AiChatRequest.class);
    }

    private String toJson(Object obj) {
        return JSON.toJSONString(obj);
    }
}

2. 配置 WebSocket

java
@Configuration
@EnableWebSocket
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3. 前端连接示例

javascript
// 建立 WebSocket 连接
const sessionId = 'user-123';
const ws = new WebSocket(`ws://localhost:8080/ws/chat/${sessionId}`);

// 连接打开
ws.onopen = () => {
  console.log('WebSocket 连接已建立');
};

// 接收消息
ws.onmessage = (event) => {
  const response = JSON.parse(event.data);

  if (response.type === 'content') {
    // 追加内容片段
    appendMessage(response.content);
  } else if (response.type === 'complete') {
    // 流式完成
    console.log('响应完成');
    console.log('Token 使用:', response.tokenUsage);
  } else if (response.type === 'error') {
    // 错误处理
    console.error('错误:', response.error);
  }
};

// 发送消息
function sendMessage(message) {
  const request = {
    provider: 'deepseek',
    message: message
  };
  ws.send(JSON.stringify(request));
}

// 关闭连接
ws.onclose = () => {
  console.log('WebSocket 连接已关闭');
};

// 错误处理
ws.onerror = (error) => {
  console.error('WebSocket 错误:', error);
};

完整示例

聊天界面

html
<!DOCTYPE html>
<html>
<head>
  <title>AI 聊天</title>
  <style>
    .chat-container {
      width: 600px;
      height: 500px;
      border: 1px solid #ccc;
      overflow-y: auto;
      padding: 10px;
    }
    .message {
      margin: 10px 0;
      padding: 8px;
      border-radius: 5px;
    }
    .user-message {
      background: #e3f2fd;
      text-align: right;
    }
    .ai-message {
      background: #f5f5f5;
    }
    .input-area {
      margin-top: 10px;
    }
    #messageInput {
      width: 500px;
      padding: 8px;
    }
    #sendButton {
      padding: 8px 20px;
    }
  </style>
</head>
<body>
  <div class="chat-container" id="chatContainer"></div>
  <div class="input-area">
    <input type="text" id="messageInput" placeholder="输入消息...">
    <button id="sendButton">发送</button>
  </div>

  <script>
    let ws;
    let currentAiMessage = null;

    // 初始化 WebSocket
    function initWebSocket() {
      const sessionId = 'user-' + Date.now();
      ws = new WebSocket(`ws://localhost:8080/ws/chat/${sessionId}`);

      ws.onopen = () => {
        console.log('连接成功');
        addSystemMessage('连接成功,开始对话吧!');
      };

      ws.onmessage = (event) => {
        const response = JSON.parse(event.data);

        if (response.type === 'content') {
          // 追加 AI 回复内容
          if (!currentAiMessage) {
            currentAiMessage = addAiMessage('');
          }
          currentAiMessage.textContent += response.content;
        } else if (response.type === 'complete') {
          // 回复完成
          currentAiMessage = null;
          console.log('Token 使用:', response.tokenUsage);
        } else if (response.type === 'error') {
          addSystemMessage('错误: ' + response.error);
        }
      };

      ws.onclose = () => {
        addSystemMessage('连接已关闭');
      };

      ws.onerror = (error) => {
        console.error('WebSocket 错误:', error);
        addSystemMessage('连接错误');
      };
    }

    // 发送消息
    function sendMessage() {
      const input = document.getElementById('messageInput');
      const message = input.value.trim();

      if (!message) return;

      // 显示用户消息
      addUserMessage(message);

      // 发送到服务器
      const request = {
        provider: 'deepseek',
        message: message
      };
      ws.send(JSON.stringify(request));

      // 清空输入框
      input.value = '';
    }

    // 添加用户消息
    function addUserMessage(content) {
      const container = document.getElementById('chatContainer');
      const div = document.createElement('div');
      div.className = 'message user-message';
      div.textContent = content;
      container.appendChild(div);
      container.scrollTop = container.scrollHeight;
    }

    // 添加 AI 消息
    function addAiMessage(content) {
      const container = document.getElementById('chatContainer');
      const div = document.createElement('div');
      div.className = 'message ai-message';
      div.textContent = content;
      container.appendChild(div);
      container.scrollTop = container.scrollHeight;
      return div;
    }

    // 添加系统消息
    function addSystemMessage(content) {
      const container = document.getElementById('chatContainer');
      const div = document.createElement('div');
      div.style.color = '#999';
      div.style.textAlign = 'center';
      div.textContent = content;
      container.appendChild(div);
      container.scrollTop = container.scrollHeight;
    }

    // 绑定事件
    document.getElementById('sendButton').onclick = sendMessage;
    document.getElementById('messageInput').onkeypress = (e) => {
      if (e.key === 'Enter') sendMessage();
    };

    // 初始化
    initWebSocket();
  </script>
</body>
</html>

Vue 3 示例

vue
<template>
  <div class="chat-container">
    <!-- 消息列表 -->
    <div class="message-list" ref="messageList">
      <div
        v-for="(msg, index) in messages"
        :key="index"
        :class="['message', msg.role === 'user' ? 'user' : 'ai']"
      >
        {{ msg.content }}
      </div>
    </div>

    <!-- 输入区域 -->
    <div class="input-area">
      <input
        v-model="inputMessage"
        @keypress.enter="sendMessage"
        placeholder="输入消息..."
      />
      <button @click="sendMessage" :disabled="!connected">
        发送
      </button>
    </div>
  </div>
</template>

<script setup>
import { ref, onMounted, onUnmounted, nextTick } from 'vue';

const messages = ref([]);
const inputMessage = ref('');
const connected = ref(false);
const currentAiMessage = ref(null);

let ws = null;

// 初始化 WebSocket
const initWebSocket = () => {
  const sessionId = 'user-' + Date.now();
  ws = new WebSocket(`ws://localhost:8080/ws/chat/${sessionId}`);

  ws.onopen = () => {
    connected.value = true;
    addSystemMessage('连接成功');
  };

  ws.onmessage = (event) => {
    const response = JSON.parse(event.data);

    if (response.type === 'content') {
      if (!currentAiMessage.value) {
        currentAiMessage.value = addAiMessage('');
      }
      currentAiMessage.value.content += response.content;
      scrollToBottom();
    } else if (response.type === 'complete') {
      currentAiMessage.value = null;
    } else if (response.type === 'error') {
      addSystemMessage('错误: ' + response.error);
    }
  };

  ws.onclose = () => {
    connected.value = false;
    addSystemMessage('连接已关闭');
  };
};

// 发送消息
const sendMessage = () => {
  if (!inputMessage.value.trim()) return;

  addUserMessage(inputMessage.value);

  const request = {
    provider: 'deepseek',
    message: inputMessage.value
  };
  ws.send(JSON.stringify(request));

  inputMessage.value = '';
};

// 添加用户消息
const addUserMessage = (content) => {
  messages.value.push({
    role: 'user',
    content: content
  });
  scrollToBottom();
};

// 添加 AI 消息
const addAiMessage = (content) => {
  const message = {
    role: 'ai',
    content: content
  };
  messages.value.push(message);
  scrollToBottom();
  return message;
};

// 添加系统消息
const addSystemMessage = (content) => {
  messages.value.push({
    role: 'system',
    content: content
  });
};

// 滚动到底部
const scrollToBottom = () => {
  nextTick(() => {
    const list = messageList.value;
    if (list) {
      list.scrollTop = list.scrollHeight;
    }
  });
};

onMounted(() => {
  initWebSocket();
});

onUnmounted(() => {
  if (ws) {
    ws.close();
  }
});

const messageList = ref(null);
</script>

<style scoped>
.chat-container {
  display: flex;
  flex-direction: column;
  height: 600px;
  border: 1px solid #ddd;
  border-radius: 8px;
}

.message-list {
  flex: 1;
  overflow-y: auto;
  padding: 16px;
}

.message {
  margin: 8px 0;
  padding: 10px;
  border-radius: 8px;
  max-width: 70%;
}

.message.user {
  background: #e3f2fd;
  margin-left: auto;
  text-align: right;
}

.message.ai {
  background: #f5f5f5;
}

.input-area {
  display: flex;
  padding: 16px;
  border-top: 1px solid #ddd;
}

.input-area input {
  flex: 1;
  padding: 8px 12px;
  border: 1px solid #ddd;
  border-radius: 4px;
  margin-right: 8px;
}

.input-area button {
  padding: 8px 24px;
  background: #1890ff;
  color: white;
  border: none;
  border-radius: 4px;
  cursor: pointer;
}

.input-area button:disabled {
  background: #d9d9d9;
  cursor: not-allowed;
}
</style>

最佳实践

1. 心跳保活

java
@Scheduled(fixedRate = 30000)  // 每30秒
public void sendHeartbeat() {
    if (session != null && session.isOpen()) {
        try {
            session.getBasicRemote().sendText("{\"type\":\"heartbeat\"}");
        } catch (IOException e) {
            log.error("心跳发送失败", e);
        }
    }
}

2. 错误重连

javascript
let reconnectAttempts = 0;
const maxReconnectAttempts = 5;

function connect() {
  ws = new WebSocket(url);

  ws.onerror = () => {
    if (reconnectAttempts < maxReconnectAttempts) {
      reconnectAttempts++;
      setTimeout(() => connect(), 3000);
    }
  };

  ws.onopen = () => {
    reconnectAttempts = 0;
  };
}

3. 连接管理

java
// 管理所有连接
private static Map<String, Session> sessions = new ConcurrentHashMap<>();

@OnOpen
public void onOpen(Session session, @PathParam("sessionId") String sessionId) {
    sessions.put(sessionId, session);
}

@OnClose
public void onClose() {
    sessions.remove(sessionId);
}

// 广播消息
public static void broadcast(String message) {
    sessions.values().forEach(session -> {
        try {
            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.error("广播失败", e);
        }
    });
}

4. 消息压缩

java
// 启用压缩
@ServerEndpoint(
    value = "/ws/chat/{sessionId}",
    configurator = MessageCompressionConfigurator.class
)

常见问题

1. 连接断开怎么办

实现自动重连机制(见最佳实践)

2. 如何控制并发连接数

java
private static final int MAX_CONNECTIONS = 1000;

@OnOpen
public void onOpen(Session session, @PathParam("sessionId") String sessionId) {
    if (sessions.size() >= MAX_CONNECTIONS) {
        throw new RuntimeException("连接数已达上限");
    }
    sessions.put(sessionId, session);
}

3. 如何处理大消息

java
// 设置最大消息大小
@OnMessage(maxMessageSize = 1024 * 1024)  // 1MB
public void onMessage(String message) {
    // ...
}

总结

WebSocket 流式对话提供了实时、流畅的 AI 对话体验,适合构建现代化的聊天应用。通过合理的连接管理和错误处理,可以构建出稳定可靠的实时对话系统。