Skip to content

Commit 9e11c03

Browse files
committed
Support text streaming in chat implementations in both PHP and JavaScript (see #3).
1 parent 3d80f9b commit 9e11c03

File tree

3 files changed

+185
-1
lines changed

3 files changed

+185
-1
lines changed

includes/Services/API/Types/Chat_Session.php

+55
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
use Felix_Arntz\AI_Services\Services\Contracts\With_Text_Generation;
1414
use Felix_Arntz\AI_Services\Services\Exception\Generative_AI_Exception;
1515
use Felix_Arntz\AI_Services\Services\Util\Formatter;
16+
use Generator;
1617
use InvalidArgumentException;
1718

1819
/**
@@ -107,6 +108,60 @@ public function send_message( $content, array $request_options = array() ): Cont
107108
return $response_content;
108109
}
109110

111+
/**
112+
* Sends a chat message to the model, streaming the response.
113+
*
114+
* @since n.e.x.t
115+
*
116+
* @param string|Parts|Content $content The message to send.
117+
* @param array<string, mixed> $request_options Optional. The request options. Default empty array.
118+
* @return Generator<Content> Generator that yields the chunks of content with generated text.
119+
*
120+
* @throws Generative_AI_Exception Thrown if the request fails or the response is invalid.
121+
*/
122+
public function stream_send_message( $content, array $request_options = array() ): Generator {
123+
$new_content = Formatter::format_new_content( $content );
124+
125+
$contents = $this->history;
126+
$contents[] = $new_content;
127+
128+
$candidate_filter_args = array();
129+
if ( isset( $request_options['candidate_filter_args'] ) ) {
130+
$candidate_filter_args = $request_options['candidate_filter_args'];
131+
unset( $request_options['candidate_filter_args'] );
132+
}
133+
134+
$candidates_generator = $this->model->stream_generate_text( $contents, $request_options );
135+
136+
$candidates_processor = Helpers::process_candidates_stream( $candidates_generator );
137+
foreach ( $candidates_generator as $candidates ) {
138+
if ( $candidate_filter_args ) {
139+
$candidates = $candidates->filter( $candidate_filter_args );
140+
}
141+
142+
if ( count( $candidates ) === 0 ) {
143+
throw new Generative_AI_Exception(
144+
esc_html__( 'The response did not include any relevant candidates.', 'ai-services' )
145+
);
146+
}
147+
148+
$candidates_processor->add_chunk( $candidates );
149+
150+
$partial_contents = Helpers::get_candidate_contents( $candidates );
151+
$partial_content = $partial_contents[0];
152+
153+
yield $partial_content;
154+
}
155+
156+
$complete_candidates = $candidates_processor->get_complete();
157+
158+
$complete_contents = Helpers::get_candidate_contents( $complete_candidates );
159+
$complete_content = $complete_contents[0];
160+
161+
$this->history[] = $new_content;
162+
$this->history[] = $complete_content;
163+
}
164+
110165
/**
111166
* Validates the chat history.
112167
*

src/ai/chat.js

+69
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,22 @@ function sanitizeHistory( history ) {
5656
} );
5757
}
5858

59+
/**
60+
* Processes a stream of content and yields its chunks.
61+
*
62+
* @since n.e.x.t
63+
*
64+
* @param {Object} responseGenerator The generator that yields the chunks of content.
65+
* @param {Function} completeCallback Callback that is called once the generator has been processed.
66+
* @return {Object} The generator that yields the chunks of content.
67+
*/
68+
async function* processContentStream( responseGenerator, completeCallback ) {
69+
for await ( const response of responseGenerator ) {
70+
yield response;
71+
}
72+
completeCallback();
73+
}
74+
5975
const actions = {
6076
/**
6177
* Starts a chat session.
@@ -165,6 +181,59 @@ const actions = {
165181
};
166182
},
167183

184+
/**
185+
* Sends a message to the chat, streaming the response.
186+
*
187+
* @since n.e.x.t
188+
*
189+
* @param {string} chatId Identifier of the chat.
190+
* @param {string|Object|Object[]} content Chat message content.
191+
* @return {Function} Action creator.
192+
*/
193+
streamSendMessage( chatId, content ) {
194+
return async ( { dispatch } ) => {
195+
const session = chatSessionInstances[ chatId ];
196+
if ( ! session ) {
197+
// eslint-disable-next-line no-console
198+
console.error( `Chat ${ chatId } not found.` );
199+
return;
200+
}
201+
202+
const newContent = formatNewContent( content );
203+
dispatch.receiveContent( chatId, newContent );
204+
205+
await dispatch( {
206+
type: LOAD_CHAT_START,
207+
payload: { chatId },
208+
} );
209+
210+
let responseGenerator;
211+
try {
212+
responseGenerator =
213+
await session.streamSendMessage( newContent );
214+
} catch ( error ) {
215+
dispatch.revertContent( chatId );
216+
await dispatch( {
217+
type: LOAD_CHAT_FINISH,
218+
payload: { chatId },
219+
} );
220+
throw error;
221+
}
222+
223+
await dispatch( {
224+
type: LOAD_CHAT_FINISH,
225+
payload: { chatId },
226+
} );
227+
228+
return processContentStream( responseGenerator, () => {
229+
// Once the stream is complete, get the final response from the chat session and dispatch it.
230+
const history = session.getHistory();
231+
const response = { ...history[ history.length - 1 ] };
232+
dispatch.receiveContent( chatId, response );
233+
} );
234+
};
235+
},
236+
168237
/**
169238
* Receives a chat session.
170239
*

src/ai/classes/chat-session.js

+61-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Internal dependencies
33
*/
44
import GenerativeAiModel from './generative-ai-model';
5-
import { getCandidateContents } from '../helpers';
5+
import { getCandidateContents, processCandidatesStream } from '../helpers';
66
import { validateChatHistory, formatNewContent } from '../util';
77

88
/**
@@ -65,4 +65,64 @@ export default class ChatSession {
6565

6666
return responseContent;
6767
}
68+
69+
/**
70+
* Sends a chat message to the model, streaming the response.
71+
*
72+
* @since n.e.x.t
73+
*
74+
* @param {string|Object|Object[]} content Chat message content.
75+
* @return {Promise<Object>} The generator that yields chunks of the response content.
76+
*/
77+
async streamSendMessage( content ) {
78+
const newContent = formatNewContent( content );
79+
80+
const contents = [ ...this.history, newContent ];
81+
82+
const candidatesGenerator =
83+
await this.model.streamGenerateText( contents );
84+
85+
return processCandidatesStreamToContent(
86+
candidatesGenerator,
87+
( responseContent ) => {
88+
this.history = [ ...this.history, newContent, responseContent ];
89+
}
90+
);
91+
}
92+
}
93+
94+
/**
95+
* Processes a stream of candidates and yields their first content's chunks.
96+
*
97+
* Other than yielding the content chunks, this generator function aggregates the candidates chunks into a single
98+
* candidates instance, and once completed invokes the provided callback with the complete response content.
99+
*
100+
* @since n.e.x.t
101+
*
102+
* @param {Object} candidatesGenerator The generator that yields the chunks of response candidates.
103+
* @param {Function} completeCallback Callback that is called with the complete response content.
104+
* @return {Object} The generator that yields chunks of the response content.
105+
*/
106+
async function* processCandidatesStreamToContent(
107+
candidatesGenerator,
108+
completeCallback
109+
) {
110+
const candidatesProcessor = processCandidatesStream( candidatesGenerator );
111+
for await ( const candidates of candidatesGenerator ) {
112+
candidatesProcessor.addChunk( candidates );
113+
114+
// TODO: Support optional candidateFilterArgs, similar to PHP implementation.
115+
const partialContents = getCandidateContents( candidates );
116+
const partialContent = partialContents[ 0 ];
117+
118+
yield partialContent;
119+
}
120+
121+
const completeCandidates = candidatesProcessor.getComplete();
122+
123+
// TODO: Support optional candidateFilterArgs, similar to PHP implementation.
124+
const completeContents = getCandidateContents( completeCandidates );
125+
const completeContent = completeContents[ 0 ];
126+
127+
completeCallback( completeContent );
68128
}

0 commit comments

Comments
 (0)