misc: refine StreamResponse

This commit is contained in:
WJG 2024-02-25 00:55:04 +08:00
parent f31f64c286
commit b59d358a1f
No known key found for this signature in database
GPG Key ID: 258474EF8590014A
4 changed files with 60 additions and 64 deletions

View File

@ -7,6 +7,13 @@ import {
import { kEnvs } from "../utils/env";
import { kProxyAgent } from "./http";
export interface ChatOptions {
user: string;
system?: string;
tools?: Array<ChatCompletionTool>;
jsonMode?: boolean;
}
class OpenAIClient {
private _client = new OpenAI({
httpAgent: kProxyAgent,
@ -24,12 +31,7 @@ class OpenAIClient {
}
}
async chat(options: {
user: string;
system?: string;
tools?: Array<ChatCompletionTool>;
jsonMode?: boolean;
}) {
async chat(options: ChatOptions) {
const { user, system, tools, jsonMode } = options;
const systemMsg: ChatCompletionMessageParam[] = system
? [{ role: "system", content: system }]
@ -48,14 +50,12 @@ class OpenAIClient {
return chatCompletion?.choices?.[0]?.message;
}
async chatStream(options: {
user: string;
system?: string;
tools?: Array<ChatCompletionTool>;
jsonMode?: boolean;
requestId?: string;
onStream?: (text: string) => void;
}) {
async chatStream(
options: ChatOptions & {
requestId?: string;
onStream?: (text: string) => void;
}
) {
const { user, system, tools, jsonMode, onStream, requestId } = options;
const systemMsg: ChatCompletionMessageParam[] = system
? [{ role: "system", content: system }]

View File

@ -155,6 +155,7 @@ export class BaseSpeaker {
speaker = this._defaultSpeaker,
} = options ?? {};
const ttsText = text?.replace(/\n\s*\n/g, "\n")?.trim();
const ttsNotXiaoai = !stream && !!text && !audio && tts !== "xiaoai";
playSFX = ttsNotXiaoai && playSFX;
@ -169,7 +170,7 @@ export class BaseSpeaker {
await this.unWakeUp();
}
await this.MiNA!.play(args);
console.log("✅ " + text ?? audio);
console.log("✅ " + ttsText ?? audio);
// 等待回答播放完毕
while (true) {
const res = await this.MiNA!.getStatus();
@ -200,18 +201,18 @@ export class BaseSpeaker {
if (audio) {
// 音频回复
res = await play({ url: audio });
} else if (text) {
} else if (ttsText) {
// 文字回复
switch (tts) {
case "doubao":
const _text = encodeURIComponent(text);
const _text = encodeURIComponent(ttsText);
const doubaoTTS = process.env.TTS_DOUBAO;
const url = `${doubaoTTS}?speaker=${speaker}&text=${_text}`;
res = await play({ url });
break;
case "xiaoai":
default:
res = await play({ tts: text });
res = await play({ tts: ttsText });
break;
}
}

View File

@ -1,3 +1,5 @@
import { sleep } from "../../utils/base";
type ResponseStatus = "idle" | "responding" | "finished" | "canceled";
interface StreamResponseOptions {
@ -10,7 +12,7 @@ interface StreamResponseOptions {
*
* 100ms => 100ms Response
*
* 200(0 )
* 200(100)
*/
firstSubmitTimeout?: number;
}
@ -22,7 +24,7 @@ export class StreamResponse {
if (text.length > maxSentenceLength) {
const stream = new StreamResponse(options);
stream.addResponse(text);
stream.finish();
stream.finish(text);
return stream;
}
}
@ -32,7 +34,8 @@ export class StreamResponse {
constructor(options?: StreamResponseOptions) {
const { maxSentenceLength = 100, firstSubmitTimeout = 200 } = options ?? {};
this.maxSentenceLength = maxSentenceLength;
this.firstSubmitTimeout = firstSubmitTimeout;
this.firstSubmitTimeout =
firstSubmitTimeout < 100 ? 100 : firstSubmitTimeout;
}
status: ResponseStatus = "responding";
@ -56,8 +59,7 @@ export class StreamResponse {
private _nextChunkIdx = 0;
getNextResponse() {
const isFirstSubmit = this._preSubmitTimestamp === 0;
if (!isFirstSubmit) {
if (this._submitCount > 0) {
// 在请求下一条消息前,提交当前收到的所有消息
this._batchSubmitImmediately();
}
@ -71,28 +73,44 @@ export class StreamResponse {
return { nextSentence, noMore };
}
finish() {
private _finalResult?: string;
finish(finalResult?: string) {
if (["idle", "responding"].includes(this.status)) {
this._batchSubmitImmediately();
if (this._remainingText) {
// 提交完整句子
this._chunks.push(this._remainingText);
this._remainingText = "";
}
this._forceChunkText();
this._finalResult = finalResult;
this.status = "finished";
}
return this.status === "finished";
}
private _forceChunkText() {
if (this._remainingText) {
this._addResponse("", { force: true });
}
}
async getFinalResult() {
while (true) {
if (this.status === "finished") {
return this._finalResult;
} else if (this.status === "canceled") {
return undefined;
}
await sleep(10);
}
}
private _chunks: string[] = [];
private _tempText = "";
private _remainingText: string = "";
private _preSubmitTimestamp = 0;
private _submitCount = 0;
private _batchSubmitImmediately() {
if (this._tempText) {
this._addResponse(this._tempText);
this._tempText = "";
this._submitCount++;
}
}
@ -101,38 +119,29 @@ export class StreamResponse {
*
* 使 AI stream /
*/
private _batchSubmit(text: string, immediately?: boolean) {
private _batchSubmit(text: string) {
this._tempText += text;
immediately = immediately ?? this.firstSubmitTimeout < 100;
if (immediately) {
return this._batchSubmitImmediately();
}
const isFirstSubmit = this._preSubmitTimestamp === 0;
if (isFirstSubmit) {
this._preSubmitTimestamp = Date.now();
const batchSubmit = (timeout: number) => {
setTimeout(() => {
// 当消息长度积攒到一定长度,或达到一定时间间隔后,批量提交消息
if (
Date.now() - this._preSubmitTimestamp >= timeout ||
this._tempText.length >= this.maxSentenceLength
this._tempText.length > this.maxSentenceLength ||
Date.now() - this._preSubmitTimestamp > this.firstSubmitTimeout
) {
this._batchSubmitImmediately();
}
};
const submit = (timeout: number) => {
batchSubmit(timeout);
setTimeout(() => {
batchSubmit(timeout);
}, timeout);
};
submit(this.firstSubmitTimeout);
}, this.firstSubmitTimeout);
}
}
private _addResponse(text: string) {
private _addResponse(text: string, options?: { force: boolean }) {
this._remainingText += text;
while (this._remainingText.length > 0) {
let lastCutIndex = this._findLastCutIndex(this._remainingText);
let lastCutIndex = options?.force
? this.maxSentenceLength
: this._findLastCutIndex(this._remainingText);
if (lastCutIndex > 0) {
const currentChunk = this._remainingText.substring(0, lastCutIndex);
this._chunks.push(currentChunk);
@ -145,7 +154,7 @@ export class StreamResponse {
}
private _findLastCutIndex(text: string): number {
const punctuations = "。?!……,.?!:;";
const punctuations = "。?!;?!;";
let lastCutIndex = -1;
for (let i = 0; i < Math.min(text.length, this.maxSentenceLength); i++) {
if (punctuations.includes(text[i])) {
@ -155,17 +164,3 @@ export class StreamResponse {
return lastCutIndex;
}
}
// ai onNewText
// {
// onNewText(text:string){
// if(stream.status==='canceled'){
// return 'canceled';
// }
// if(finished){
// stream.finish()
// }else{
// stream.addResponse(text)
// }
// }
// }

View File

@ -21,6 +21,6 @@ async function testStreamResponse() {
const speaker = new AISpeaker(config);
await speaker.initMiServices();
await speaker.response({ stream });
const res = await stream.wasFinished();
const res = await stream.getFinalResult();
console.log("\nFinal result 222:\n", res);
}