You’d get this:
Chunk 1 (len=2)
{"
Chunk 2 (len=10)
translated
Chunk 3 (len=5)
_text
Chunk 4 (len=3)
":"
Chunk 5 (len=4)
Sono
Chunk 6 (len=4)
pre
Chunk 7 (len=5)
occup
Chunk 8 (len=3)
ato
Chunk 9 (len=4)
per
Chunk 10 (len=3)
la
Chunk 11 (len=6)
barri
Chunk 12 (len=3)
era
Chunk 13 (len=6)
lingu
Chunk 14 (len=6)
istica
Chunk 15 (len=1)
.
Chunk 16 (len=3)
Il
Chunk 17 (len=6)
collo
Chunk 18 (len=2)
qu
Chunk 19 (len=2)
io
Chunk 20 (len=5)
sarà
Chunk 21 (len=3)
in
Chunk 22 (len=7)
ingles
Chunk 23 (len=1)
e
Chunk 24 (len=1)
,
Chunk 25 (len=3)
ma
Chunk 26 (len=9)
potrebbe
Chunk 27 (len=4)
non
Chunk 28 (len=7)
essere
Chunk 29 (len=3)
la
Chunk 30 (len=5)
loro
Chunk 31 (len=7)
lingua
Chunk 32 (len=6)
madre
Chunk 33 (len=1)
.
Chunk 34 (len=5)
Come
Chunk 35 (len=6)
posso
Chunk 36 (len=5)
gest
Chunk 37 (len=3)
ire
Chunk 38 (len=7)
questa
Chunk 39 (len=11)
situazione
Chunk 40 (len=2)
?"
Chunk 41 (len=1)
}
And a collected JSON would be:
{
"translated_text": "Sono preoccupato per la barriera linguistica. Il colloquio sarà in inglese, ma potrebbe non essere la loro lingua madre. Come posso gestire questa situazione?"
}
Python code with your structured Italian translation app. Besides a live “streaming”, it saves to ai_response.json, and ai_response_log.txt for the same chunk log just seen.
import os
import json
import httpx
from typing import Iterator, List
API_ENDPOINT = "https://api.openai.com/v1/chat/completions"
MODEL = "gpt-4.1-mini"
API_KEY = os.environ.get("OPENAI_API_KEY")
if not API_KEY:
raise RuntimeError("OPENAI_API_KEY environment variable is not set.")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}",
}
input_string = r"""
I'm worried about the language barrier. The interview will be in English, but it might not be their native language. How can I handle this?
""".strip()
class OpenAIStream:
def __init__(self, client: httpx.Client, url: str, headers: dict, payload: dict) -> None:
self.client = client
self.url = url
self.headers = headers
self.payload = payload
self._cm: httpx._client._StreamContextManager | None = None # type: ignore[attr-defined]
self._response: httpx.Response | None = None
self.chunks: List[str] = []
self.collected_text: str = ""
def __enter__(self) -> "OpenAIStream":
self._cm = self.client.stream(
"POST",
self.url,
headers=self.headers,
json=self.payload,
timeout=60.0,
)
self._response = self._cm.__enter__()
if self._response.status_code != 200:
body = self._response.text
raise RuntimeError(
f"OpenAI streaming request failed with status {self._response.status_code}: {body}"
)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
if self._cm is not None:
self._cm.__exit__(exc_type, exc_val, exc_tb)
def __iter__(self) -> Iterator[str]:
if self._response is None:
return iter(())
for raw_line in self._response.iter_lines():
if not raw_line:
continue
line = raw_line.decode("utf-8") if isinstance(raw_line, bytes) else raw_line
if not line.startswith("data: "):
continue
data = line[6:].strip()
if data == "[DONE]":
break
try:
payload = json.loads(data)
except json.JSONDecodeError:
continue
choices = payload.get("choices") or []
if not choices:
continue
delta = choices[0].get("delta", {})
if not delta:
continue
text_piece = self._extract_text_from_delta(delta)
if not text_piece:
continue
self.chunks.append(text_piece)
self.collected_text += text_piece
yield text_piece
@staticmethod
def _extract_text_from_delta(delta: dict) -> str:
# Handle multiple possible streaming formats.
if isinstance(delta.get("content"), str):
return delta["content"]
content_list = delta.get("content")
if isinstance(content_list, list):
text_parts: List[str] = []
for item in content_list:
if not isinstance(item, dict):
continue
if "text" in item:
text_parts.append(str(item["text"]))
return "".join(text_parts)
return ""
def translate_with_openai(translation_string: str) -> None:
json_schema = {
"name": "translation_response",
"strict": True,
"schema": {
"type": "object",
"required": ["translated_text"],
"properties": {
"translated_text": {
"type": "string",
"description": "The user input text translated into the target language.",
}
},
"additionalProperties": False,
},
}
params = {
"model": MODEL,
"max_completion_tokens": 50,
"response_format": {
"type": "json_schema",
"json_schema": json_schema,
},
"messages": [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are a language translator. Destination: Italian.",
}
],
},
{
"role": "user",
"content": [
{
"type": "text",
"text": f"Translate all text: \n\n{translation_string}",
}
],
},
],
"stream": True,
}
with httpx.Client() as client, OpenAIStream(
client=client,
url=API_ENDPOINT,
headers=headers,
payload=params,
) as stream:
print("Streaming response:\n")
for chunk in stream:
print(chunk, end="", flush=True)
print("\n\nStreaming complete.\n")
raw_text = stream.collected_text
chunks = stream.chunks
parsed_json: dict | None = None
try:
parsed_json = json.loads(raw_text)
except json.JSONDecodeError:
pass
if parsed_json is None:
ai_response_obj = {"raw_text": raw_text}
else:
ai_response_obj = parsed_json
with open("ai_response.json", "w", encoding="utf-8") as f:
json.dump(ai_response_obj, f, ensure_ascii=False, indent=2)
with open("ai_response_log.txt", "w", encoding="utf-8") as f:
for i, chunk in enumerate(chunks, start=1):
f.write(f"### Chunk {i} (len={len(chunk)})\n\n")
f.write("```text\n")
f.write(chunk)
f.write("\n```\n\n")
def main() -> None:
translate_with_openai(input_string)
print("Reports have been saved to ai_response.json and ai_response_log.txt.")
if __name__ == "__main__":
main()
Generally, OpenAI chunks by tokens, unless a single token cannot represent a complete character.