diff --git a/src/core/config.ts b/src/core/config.ts index f661337..d2f286d 100644 --- a/src/core/config.ts +++ b/src/core/config.ts @@ -106,7 +106,7 @@ export const config: BotConfig = { ), classificationFallbackModels: getCsvEnvOrDefault("AI_CLASSIFICATION_FALLBACK_MODELS", [ "meta-llama/llama-3.3-70b-instruct:free", - "mistralai/mistral-small-3.1-24b-instruct:free", + "qwen/qwen-2.5-7b-instruct", ]), maxTokens: parseInt(getEnvOrDefault("AI_MAX_TOKENS", "500")), temperature: parseFloat(getEnvOrDefault("AI_TEMPERATURE", "1.2")), diff --git a/src/features/joel/responder.ts b/src/features/joel/responder.ts index 915b26e..d72b5e4 100644 --- a/src/features/joel/responder.ts +++ b/src/features/joel/responder.ts @@ -13,6 +13,8 @@ import { buildStyledPrompt, STYLE_MODIFIERS } from "./personalities"; import { getRandomMention } from "./mentions"; import { speakVoiceover } from "./voice"; import { TypingIndicator } from "./typing"; +import { StreamingReply } from "./streaming-reply"; +import { splitMessage } from "../../utils"; const logger = createLogger("Features:Joel"); @@ -128,18 +130,12 @@ export const joelResponder = { } const typing = new TypingIndicator(message.channel); + const streamingReply = new StreamingReply(message); try { typing.start(); - - let response = await this.generateResponse(message); - - if (!response) { - await message.reply("\\*Ignorerar dig\\*"); - return; - } - // If Joel is rebelling against channel restriction, add a prefix + let rebellionPrefix = ""; if (channelCheck.rebellionResponse) { const rebellionPrefixes = [ "*sneaks in from the shadows*\n\n", @@ -149,20 +145,34 @@ export const joelResponder = { "I'm not supposed to be here but I don't care.\n\n", "*escapes from his designated channel*\n\n", ]; - const prefix = rebellionPrefixes[Math.floor(Math.random() * rebellionPrefixes.length)]; - response = prefix + response; + rebellionPrefix = rebellionPrefixes[Math.floor(Math.random() * rebellionPrefixes.length)]; + } + let response = await this.generateResponse(message, async (partialResponse) => { + const content = partialResponse ? rebellionPrefix + partialResponse : ""; + await streamingReply.update(content); + }); + + if (!response) { + await streamingReply.finalize(""); + await message.reply("\\*Ignorerar dig\\*"); + return; + } + + if (rebellionPrefix) { + response = rebellionPrefix + response; } // Occasionally add a random mention const mention = await getRandomMention(message); const fullResponse = response + mention; - await this.sendResponse(message, fullResponse); + await streamingReply.finalize(fullResponse); speakVoiceover(message, fullResponse).catch((error) => { logger.error("Failed to play voiceover", error); }); } catch (error) { logger.error("Failed to respond", error); + await streamingReply.finalize(""); await this.handleError(message, error); } finally { typing.stop(); @@ -301,7 +311,10 @@ export const joelResponder = { /** * Generate a response using AI with tool calling support */ - async generateResponse(message: Message): Promise { + async generateResponse( + message: Message, + onTextStream?: (text: string) => Promise | void, + ): Promise { const ai = getAiService(); const author = message.author.displayName; const userId = message.author.id; @@ -431,7 +444,8 @@ The image URL will appear in your response for the user to see.`; const response = await ai.generateResponseWithTools( prompt, systemPromptWithTools, - toolContext + toolContext, + onTextStream, ); return response.text || null; @@ -662,17 +676,12 @@ The image URL will appear in your response for the user to see.`; * Send response, splitting if necessary */ async sendResponse(message: Message, content: string): Promise { - // Discord message limit is 2000, stay under to be safe - const MAX_LENGTH = 1900; - - if (content.length <= MAX_LENGTH) { + const chunks = splitMessage(content, 1900); + if (chunks.length === 1) { await message.reply(content); return; } - // Split into chunks - const chunks = content.match(/.{1,1900}/gs) ?? [content]; - // First chunk as reply await message.reply(chunks[0]); diff --git a/src/features/joel/streaming-reply.ts b/src/features/joel/streaming-reply.ts new file mode 100644 index 0000000..26fa903 --- /dev/null +++ b/src/features/joel/streaming-reply.ts @@ -0,0 +1,138 @@ +import type { Message } from "discord.js"; +import { createLogger } from "../../core/logger"; +import { splitMessage } from "../../utils"; + +const logger = createLogger("Features:Joel:StreamingReply"); + +const MAX_MESSAGE_LENGTH = 1900; +const EDIT_INTERVAL_MS = 1250; + +export class StreamingReply { + private sourceMessage: Message; + private sentMessages: Message[] = []; + private targetContent = ""; + private sentContent = ""; + private lastFlushAt = 0; + private flushTimer: ReturnType | null = null; + private flushChain: Promise = Promise.resolve(); + + constructor(sourceMessage: Message) { + this.sourceMessage = sourceMessage; + } + + async update(content: string): Promise { + this.targetContent = content; + + if (this.targetContent === this.sentContent) { + return; + } + + const now = Date.now(); + if (this.lastFlushAt === 0 || now - this.lastFlushAt >= EDIT_INTERVAL_MS) { + await this.enqueueFlush(); + return; + } + + this.scheduleFlush(); + } + + async finalize(content: string): Promise { + this.targetContent = content; + + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + + await this.enqueueFlush(); + } + + private scheduleFlush(): void { + if (this.flushTimer) { + return; + } + + const remaining = Math.max(0, EDIT_INTERVAL_MS - (Date.now() - this.lastFlushAt)); + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + void this.enqueueFlush().catch((error) => { + logger.error("Scheduled stream flush failed", error); + }); + }, remaining); + } + + private enqueueFlush(): Promise { + this.flushChain = this.flushChain + .catch(() => undefined) + .then(() => this.flush()); + return this.flushChain; + } + + private async flush(): Promise { + const desiredContent = this.targetContent; + if (desiredContent === this.sentContent) { + return; + } + + const desiredChunks = desiredContent.length > 0 + ? splitMessage(desiredContent, MAX_MESSAGE_LENGTH) + : []; + + if (desiredChunks.length === 0) { + await this.deleteAllMessages(); + this.sentContent = ""; + this.lastFlushAt = Date.now(); + return; + } + + for (let i = 0; i < desiredChunks.length; i++) { + const chunk = desiredChunks[i]; + const existingMessage = this.sentMessages[i]; + + if (!existingMessage) { + const createdMessage = i === 0 + ? await this.sourceMessage.reply(chunk) + : await this.sourceMessage.channel.send(chunk); + this.sentMessages.push(createdMessage); + continue; + } + + if (existingMessage.content !== chunk) { + this.sentMessages[i] = await existingMessage.edit(chunk); + } + } + + while (this.sentMessages.length > desiredChunks.length) { + const extraMessage = this.sentMessages.pop(); + if (!extraMessage) { + continue; + } + + try { + await extraMessage.delete(); + } catch (error) { + logger.error("Failed to delete extra streamed message", error); + } + } + + this.sentContent = desiredContent; + this.lastFlushAt = Date.now(); + + if (this.targetContent !== this.sentContent) { + this.scheduleFlush(); + } + } + + private async deleteAllMessages(): Promise { + const messages = [...this.sentMessages]; + this.sentMessages = []; + + for (const sentMessage of messages) { + try { + await sentMessage.delete(); + } catch (error) { + logger.error("Failed to delete streamed message", error); + } + } + } +} diff --git a/src/features/joel/voice.ts b/src/features/joel/voice.ts index 7e467cc..edfd8fe 100644 --- a/src/features/joel/voice.ts +++ b/src/features/joel/voice.ts @@ -26,6 +26,8 @@ const logger = createLogger("Features:Joel:Voice"); const MAX_VOICE_TEXT_LENGTH = 800; const PLAYBACK_TIMEOUT_MS = 60_000; const READY_TIMEOUT_MS = 15_000; +const READY_RETRY_DELAY_MS = 1_000; +const READY_MAX_ATTEMPTS = 3; const VOICE_DEPENDENCY_REPORT = generateDependencyReport(); type VoiceDependencyHealth = { @@ -109,6 +111,10 @@ function getErrorMessage(error: unknown): string { return typeof error === "string" ? error : "Unknown error"; } +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + function resolveMentions(message: Message, content: string): string { let text = content; @@ -196,61 +202,88 @@ async function getOrCreateConnection(message: Message): Promise { logger.debug("Generating response", { promptLength: prompt.length }); - return this.provider.ask({ prompt, systemPrompt }); + return this.provider.ask({ prompt, systemPrompt, onTextStream }); } /** @@ -35,14 +36,15 @@ export class AiService { async generateResponseWithTools( prompt: string, systemPrompt: string, - context: ToolContext + context: ToolContext, + onTextStream?: TextStreamHandler, ): Promise { if (this.provider.askWithTools) { logger.debug("Generating response with tools", { promptLength: prompt.length }); - return this.provider.askWithTools({ prompt, systemPrompt, context }); + return this.provider.askWithTools({ prompt, systemPrompt, context, onTextStream }); } // Fallback to regular response if tools not supported - return this.generateResponse(prompt, systemPrompt); + return this.generateResponse(prompt, systemPrompt, onTextStream); } /** @@ -90,7 +92,7 @@ export function getAiService(): AiService { return aiService; } -export type { AiProvider, AiResponse, MessageStyle } from "./types"; +export type { AiProvider, AiResponse, MessageStyle, TextStreamHandler } from "./types"; export type { ToolContext, ToolCall, ToolResult } from "./tools"; export { JOEL_TOOLS, MEMORY_EXTRACTION_TOOLS } from "./tools"; export { getEmbeddingService, EmbeddingService } from "./embeddings"; diff --git a/src/services/ai/openrouter.ts b/src/services/ai/openrouter.ts index 4b021c7..180d887 100644 --- a/src/services/ai/openrouter.ts +++ b/src/services/ai/openrouter.ts @@ -3,11 +3,15 @@ */ import OpenAI from "openai"; -import type { ChatCompletionMessageParam, ChatCompletionTool } from "openai/resources/chat/completions"; +import type { + ChatCompletionMessageParam, + ChatCompletionMessageToolCall, + ChatCompletionTool, +} from "openai/resources/chat/completions"; import { config } from "../../core/config"; import { createLogger } from "../../core/logger"; -import type { AiProvider, AiResponse, AskOptions, AskWithToolsOptions, MessageStyle } from "./types"; -import { JOEL_TOOLS, MEMORY_EXTRACTION_TOOLS, getToolsForContext, type ToolCall, type ToolContext } from "./tools"; +import type { AiProvider, AiResponse, AskOptions, AskWithToolsOptions, MessageStyle, TextStreamHandler } from "./types"; +import { MEMORY_EXTRACTION_TOOLS, getToolsForContext, type ToolCall, type ToolContext } from "./tools"; import { executeTools } from "./tool-handlers"; const logger = createLogger("AI:OpenRouter"); @@ -18,6 +22,20 @@ const STYLE_OPTIONS: MessageStyle[] = ["story", "snarky", "insult", "explicit", // Maximum tool call iterations to prevent infinite loops const MAX_TOOL_ITERATIONS = 5; +interface StreamedToolCall { + id: string; + type: "function"; + function: { + name: string; + arguments: string; + }; +} + +interface StreamedCompletionResult { + text: string; + toolCalls: StreamedToolCall[]; +} + export class OpenRouterProvider implements AiProvider { private client: OpenAI; @@ -70,10 +88,24 @@ export class OpenRouterProvider implements AiProvider { } async ask(options: AskOptions): Promise { - const { prompt, systemPrompt, maxTokens, temperature } = options; + const { prompt, systemPrompt, maxTokens, temperature, onTextStream } = options; const model = config.ai.model; try { + if (onTextStream) { + const streamed = await this.streamChatCompletion({ + model, + messages: [ + { role: "system", content: systemPrompt }, + { role: "user", content: prompt }, + ], + max_tokens: maxTokens ?? config.ai.maxTokens, + temperature: temperature ?? config.ai.temperature, + }, onTextStream); + + return { text: streamed.text }; + } + const completion = await this.client.chat.completions.create({ model, messages: [ @@ -85,9 +117,7 @@ export class OpenRouterProvider implements AiProvider { }); const text = completion.choices[0]?.message?.content ?? ""; - - // Discord message limit safety - return { text: text.slice(0, 1900) }; + return { text }; } catch (error: unknown) { logger.error("Failed to generate response (ask)", { method: "ask", @@ -105,7 +135,7 @@ export class OpenRouterProvider implements AiProvider { * The AI can call tools (like looking up memories) during response generation */ async askWithTools(options: AskWithToolsOptions): Promise { - const { prompt, systemPrompt, context, maxTokens, temperature } = options; + const { prompt, systemPrompt, context, maxTokens, temperature, onTextStream } = options; const messages: ChatCompletionMessageParam[] = [ { role: "system", content: systemPrompt }, @@ -121,6 +151,53 @@ export class OpenRouterProvider implements AiProvider { iterations++; try { + if (onTextStream) { + const streamed = await this.streamChatCompletion({ + model: config.ai.model, + messages, + tools, + tool_choice: "auto", + max_tokens: maxTokens ?? config.ai.maxTokens, + temperature: temperature ?? config.ai.temperature, + }, onTextStream); + + if (streamed.toolCalls.length > 0) { + logger.debug("AI requested tool calls", { + count: streamed.toolCalls.length, + tools: streamed.toolCalls.map((tc) => tc.function.name), + }); + + messages.push({ + role: "assistant", + content: streamed.text || null, + tool_calls: streamed.toolCalls, + }); + + await onTextStream(""); + + const toolCalls = this.parseToolCalls(streamed.toolCalls); + const results = await executeTools(toolCalls, context); + + for (let i = 0; i < toolCalls.length; i++) { + messages.push({ + role: "tool", + tool_call_id: toolCalls[i].id, + content: results[i].result, + }); + } + + continue; + } + + logger.debug("AI response generated", { + iterations, + textLength: streamed.text.length, + streamed: true, + }); + + return { text: streamed.text }; + } + const completion = await this.client.chat.completions.create({ model: config.ai.model, messages, @@ -177,7 +254,7 @@ export class OpenRouterProvider implements AiProvider { textLength: text.length }); - return { text: text.slice(0, 1900) }; + return { text }; } catch (error: unknown) { logger.error("Failed to generate response with tools (askWithTools)", { method: "askWithTools", @@ -196,6 +273,92 @@ export class OpenRouterProvider implements AiProvider { return { text: "I got stuck in a loop thinking about that..." }; } + private async streamChatCompletion( + params: { + model: string; + messages: ChatCompletionMessageParam[]; + tools?: ChatCompletionTool[]; + tool_choice?: "auto" | "none"; + max_tokens: number; + temperature: number; + }, + onTextStream: TextStreamHandler, + ): Promise { + const stream = await this.client.chat.completions.create({ + ...params, + stream: true, + }); + + let text = ""; + const toolCalls = new Map(); + + for await (const chunk of stream) { + const choice = chunk.choices[0]; + if (!choice) { + continue; + } + + const delta = choice.delta; + const content = delta.content ?? ""; + if (content) { + text += content; + await onTextStream(text); + } + + for (const toolCallDelta of delta.tool_calls ?? []) { + const current = toolCalls.get(toolCallDelta.index) ?? { + id: "", + type: "function" as const, + function: { + name: "", + arguments: "", + }, + }; + + if (toolCallDelta.id) { + current.id = toolCallDelta.id; + } + + if (toolCallDelta.function?.name) { + current.function.name = toolCallDelta.function.name; + } + + if (toolCallDelta.function?.arguments) { + current.function.arguments += toolCallDelta.function.arguments; + } + + toolCalls.set(toolCallDelta.index, current); + } + } + + return { + text, + toolCalls: Array.from(toolCalls.entries()) + .sort((a, b) => a[0] - b[0]) + .map(([, toolCall]) => toolCall), + }; + } + + private parseToolCalls(toolCalls: ChatCompletionMessageToolCall[]): ToolCall[] { + return toolCalls.map((toolCall) => { + try { + return { + id: toolCall.id, + name: toolCall.function.name, + arguments: JSON.parse(toolCall.function.arguments || "{}"), + }; + } catch (error) { + logger.error("Failed to parse streamed tool call arguments", { + toolName: toolCall.function.name, + toolCallId: toolCall.id, + arguments: toolCall.function.arguments, + error, + }); + throw error; + } + }); + } + /** * Analyze a message to extract memorable information */ diff --git a/src/services/ai/types.ts b/src/services/ai/types.ts index cd8784b..2a0c760 100644 --- a/src/services/ai/types.ts +++ b/src/services/ai/types.ts @@ -9,6 +9,8 @@ export interface AiResponse { text: string; } +export type TextStreamHandler = (text: string) => Promise | void; + /** * Message style classification options */ @@ -55,6 +57,7 @@ export interface AskOptions { systemPrompt: string; maxTokens?: number; temperature?: number; + onTextStream?: TextStreamHandler; } export interface AskWithToolsOptions extends AskOptions { diff --git a/src/services/ai/voiceover.ts b/src/services/ai/voiceover.ts index 0d4a882..4fc1bf8 100644 --- a/src/services/ai/voiceover.ts +++ b/src/services/ai/voiceover.ts @@ -11,12 +11,45 @@ const DEFAULT_OUTPUT_FORMAT = "mp3_44100_128" as const; const DEFAULT_STABILITY = 0.1; const DEFAULT_SIMILARITY = 0.90; const DEFAULT_STYLE = 0.25; -const DEFAULT_SPEED = 1.20 +const DEFAULT_SPEED = 1.20; + +const IMPORTANT_RESPONSE_HEADERS = [ + "content-type", + "content-length", + "request-id", + "x-request-id", + "cf-ray", + "ratelimit-limit", + "ratelimit-remaining", + "ratelimit-reset", + "current-concurrent-requests", +] as const; function clamp01(value: number): number { return Math.max(0, Math.min(1, value)); } +function getResponseMetadata(response: Response, durationMs: number): Record { + const headers: Record = {}; + + for (const header of IMPORTANT_RESPONSE_HEADERS) { + const value = response.headers.get(header); + if (value) { + headers[header] = value; + } + } + + return { + ok: response.ok, + status: response.status, + statusText: response.statusText, + url: response.url, + redirected: response.redirected, + durationMs, + headers, + }; +} + export interface VoiceoverOptions { text: string; voiceId?: string; @@ -64,6 +97,7 @@ export class VoiceoverService { modelId, }); + const requestStartedAt = Date.now(); const response = await fetch(url.toString(), { method: "POST", headers: { @@ -77,17 +111,22 @@ export class VoiceoverService { voice_settings: voiceSettings, }), }); + const responseDurationMs = Date.now() - requestStartedAt; if (!response.ok) { const errorBody = await response.text(); logger.error("ElevenLabs API error", { - status: response.status, - body: errorBody.slice(0, 300), + ...getResponseMetadata(response, responseDurationMs), + bodyPreview: errorBody.slice(0, 500), }); throw new Error(`ElevenLabs API error (HTTP ${response.status}).`); } const audioBuffer = await response.arrayBuffer(); + logger.debug("ElevenLabs API response", { + ...getResponseMetadata(response, responseDurationMs), + audioBytes: audioBuffer.byteLength, + }); return Buffer.from(audioBuffer); }