Streaming & Iterative Emulation¶
OpenHosta supports streaming results from Large Language Models (LLMs) both for raw text (via ask_stream) and for structured Python objects (via emulate).
This is particularly useful for: - Long responses: Avoiding timeouts and providing immediate feedback in user interfaces. - Processing sequences: Handling lists of items as they are generated. - Parallelism: Starting the next processing step as soon as the first item of a collection is available.
Streaming Raw Text with ask_stream¶
The ask_stream function allows you to receive tokens from the model as they are generated. To avoid overwhelming your application with too many events, you can specify an interval_ms to buffer tokens.
Synchronous Usage¶
from OpenHosta import ask_stream
# Receive groups of tokens every 100ms
for chunk in ask_stream("Tell me a long story about space exploration.", interval_ms=100):
print(chunk, end="", flush=True)
Asynchronous Usage¶
import asyncio
from OpenHosta import ask_stream_async
async def main():
async for chunk in ask_stream_async("Explain quantum computing simply.", interval_ms=50):
print(chunk, end="", flush=True)
asyncio.run(main())
Iterative Emulation with emulate¶
When a function is defined as a generator (using yield), emulate() automatically switches to Streaming Mode. Instead of waiting for the full response, it parses individual Python objects out of the LLM stream as they are completed.
How it works¶
- Meta-Prompt instruction: OpenHosta tells the LLM to output a sequence of items, each wrapped in its own
```pythonblock. - On-the-fly parsing: The pipeline monitors the stream. As soon as a code block is closed (
```), it extracts the content, casts it to the target type, and yields it immediately.
Example: Processing Paragraphs¶
If you want to process a long text paragraph by paragraph:
from typing import Iterator
from OpenHosta import emulate
def split_into_paragraphs(text: str) -> Iterator[str]:
"""
Analyzes the input text and yields each paragraph as a separate string.
Each paragraph should be a coherent unit of thought.
"""
yield from emulate()
# Note: You can also use 'return emulate()' if you prefer,
# as long as the return type is annotated as an Iterator.
def split_into_paragraphs_alt(text: str) -> Iterator[str]:
return emulate()
long_text = "... some very long text ..."
for paragraph in split_into_paragraphs(long_text):
print(f"--- Received Paragraph ---\n{paragraph}\n")
Example: Generating Complex Objects¶
Streaming works with any supported type, including Dataclasses and Pydantic models.
from typing import Iterator
from dataclasses import dataclass
from OpenHosta import emulate
@dataclass
class ActionItem:
task: str
priority: int
def extract_action_items(meeting_notes: str) -> Iterator[ActionItem]:
"""
Extracts action items from the meeting notes.
Yields one ActionItem at a time.
"""
yield from emulate()
notes = "We need to fix the bug by tomorrow (high priority). Also, remember to buy milk."
for item in extract_action_items(notes):
print(f"Task: {item.task} (Priority: {item.priority})")
The "Streaming Upgrade"¶
Changing a single return type from list[T] to Iterator[T] is enough to enable streaming. This simple change allows you to start downstream LLM calls or process data before the entire list is fully generated by the model.
# Standard mode: waits for the full list
def extract_clauses(contract_text: str) -> list[Clause]:
"""list every distinct liability or obligation clause from the contract."""
return emulate()
# Streaming mode: yields items as they are generated
def extract_clauses(contract_text: str) -> Iterator[Clause]:
"""Yield every distinct liability or obligation clause from the contract."""
return emulate()
1. Synchronous Iteration & Network Buffering¶
Even in a simple synchronous loop, using an iterator provides a significant performance boost. When you use return emulate() (or yield from emulate()), OpenHosta starts yielding objects as soon as the LLM finishes a block (e.g., one item in a list).
While your code is processing the first item, the LLM is already streaming the next tokens into your network socket buffer. You don't "waste" time waiting for the entire response to be downloaded; you process it piece by piece as it arrives.
def generate_ideas(topic: str) -> Iterator[str]:
"""Yields creative ideas about the topic."""
yield from emulate()
# The loop starts as soon as the first idea is fully received!
for idea in generate_ideas("Future of Transportation"):
print(f"Processing: {idea}")
# During this print, the next idea is already being buffered by the OS
2. Asynchronous Iteration & Task Parallelism¶
Asynchronous iteration (async for) is used when you want true concurrency. This is particularly powerful in two cases:
* Chained AI Tasks: You can start an expensive AI task (like cost estimation) on the first item while the generator is still waiting for the next items from the LLM.
* Local Models (Torch/CUDA): If you are running local models, asyncio allows one model to be computing on the GPU/CPU while another part of your code handles I/O or prepares the next request.
async def generate_ideas(topic: str) -> AsyncIterator[str]:
"""Yields creative ideas about the topic."""
# Option 1: Direct delegation (preferred)
async for idea in emulate_async():
yield idea
# Option 2: Return the generator directly
async def generate_ideas_alt(topic: str) -> AsyncIterator[str]:
return emulate_async()
async def estimate_cost(idea: str) -> float:
"""AI task to estimate cost."""
return await emulate_async()
async def process_topic(topic: str):
# Both tasks can overlap!
# generate_ideas yields an idea -> we start estimate_cost ->
# estimate_cost waits for its API response -> generate_ideas continues in parallel.
async for idea in generate_ideas(topic):
cost = await estimate_cost(idea)
print(f"Idea: {idea} | Cost: ${cost}M")
[!TIP] Your interpretation is correct: using
async defallows the execution ofestimate_costto overlap with the ongoing generation ofgenerate_ideas. In the case of local models (Torch), this allows the system to utilize computing resources (like the GPU) for one task while the other is managing data or waiting for the next generation step.
- Atomicity: Items are yielded only when their corresponding code block is fully received. For very large objects, there might still be a delay.
- Rate Limits: Streaming uses the SSE (Server-Sent Events) API of the model provider. Ensure your provider and API key support streaming.
- Context Detection:
emulate()detects the generator context automatically. You must use theyield(oryield from) keyword in your function body for this to work.