Slow Inference Performance
When experiencing slow inference times, first establish a performance baseline. Use the VRAM Calculator to determine your GPU’s expected throughput and compare it against your model specifications. Zylon provides a benchmarking script that simulates concurrent inference requests to measure Time To First Token (TTFT), latency, and throughput. This can help identify if performance is below expectations and whether it degrades under load.Zylon allocates compute resources to maintain consistent response times under concurrent load (8-10 simultaneous users). This means single-inference benchmarks may show lower tokens/s than the hardware’s theoretical maximum, but real-world performance with multiple users will meet or exceed expectations.
Copy
import asyncio
import json
import random
import time
from dataclasses import dataclass
from typing import Optional
BASE_URL = "https://<host>/api/gpt"
BEARER_TOKEN = "your_token_here"
MODEL = "qwen-3-14b-awq"
MAX_TOKENS = 4096
PROMPT = "Write a paragraph about artificial intelligence."
CONCURRENCY_LEVELS = [1, 4, 8, 16, 32]
DEBUG = False
JITTER_MAX_MS = 200
@dataclass
class RequestResult:
success: bool
ttft: Optional[float] = None
latency: float = 0.0
generation_time: float = 0.0
tokens: int = 0
throughput: float = 0.0
error: Optional[str] = None
@dataclass
class BenchmarkStatistics:
success_rate: float
avg_ttft: float
avg_latency: float
avg_generation_time: float
avg_throughput: float
p50_ttft: float
p95_ttft: float
p99_ttft: float
@dataclass
class ConnectionConfig:
host: str
port: int
use_ssl: bool
path: str
def debug_log(message: str, force: bool = False) -> None:
if DEBUG or force:
timestamp = time.strftime("%H:%M:%S", time.localtime())
print(f"[DEBUG {timestamp}] {message}")
def parse_url(url: str) -> ConnectionConfig:
use_ssl = url.startswith("https")
host_and_path = url.split("//")[1]
parts = host_and_path.split("/", 1)
host = parts[0]
path = "/" + parts[1] if len(parts) > 1 else "/"
port = 443 if use_ssl else 80
return ConnectionConfig(host=host, port=port, use_ssl=use_ssl, path=path)
def build_http_request(
config: ConnectionConfig, payload: dict[str, object], bearer_token: str
) -> bytes:
path = config.path + "/v1/messages"
body = json.dumps(payload)
request_lines = [
f"POST {path} HTTP/1.1",
f"Host: {config.host}",
"Content-Type: application/json",
f"Authorization: Bearer {bearer_token}",
f"Content-Length: {len(body)}",
"Connection: close",
"",
body,
]
return "\r\n".join(request_lines).encode()
async def read_stream_response(
reader: asyncio.StreamReader, session_id: int, start_time: float
) -> tuple[Optional[float], Optional[float], int, int]:
ttft: Optional[float] = None
content_block_stop_time: Optional[float] = None
output_tokens = 0
event_count = 0
headers_done = False
buffer = b""
while True:
chunk = await reader.read(8192)
if not chunk:
break
buffer += chunk
if not headers_done:
if b"\r\n\r\n" in buffer:
headers_done = True
buffer = buffer.split(b"\r\n\r\n", 1)[1]
if headers_done:
lines = buffer.split(b"\n")
buffer = lines[-1]
for line in lines[:-1]:
line_str = line.decode("utf-8").strip()
if not line_str or not line_str.startswith("data: "):
continue
data_str = line_str[6:]
if data_str == "[DONE]":
debug_log(f"Session {session_id}: Stream complete")
continue
try:
event = json.loads(data_str)
event_count += 1
if ttft is None and event.get("type") == "content_block_delta":
ttft = time.perf_counter() - start_time
debug_log(f"Session {session_id}: TTFT = {ttft:.3f}s")
if event.get("type") == "content_block_stop":
content_block_stop_time = time.perf_counter()
debug_log(
f"Session {session_id}: Content block stopped at {content_block_stop_time - start_time:.3f}s"
)
if event.get("type") == "message_delta":
usage = event.get("usage", {})
output_tokens = usage.get("output_tokens", 0)
debug_log(
f"Session {session_id}: Received usage data - {output_tokens} tokens"
)
except json.JSONDecodeError as e:
debug_log(f"Session {session_id}: JSON decode error - {e}")
continue
return ttft, content_block_stop_time, output_tokens, event_count
async def make_request(session_id: int) -> RequestResult:
jitter = random.randint(0, JITTER_MAX_MS) / 1000.0
debug_log(f"Session {session_id}: Waiting {jitter:.3f}s before starting")
await asyncio.sleep(jitter)
debug_log(f"Session {session_id}: Starting request")
payload: dict[str, object] = {
"model": MODEL,
"max_tokens": MAX_TOKENS,
"messages": [{"role": "user", "content": PROMPT}],
"stream": True,
"correlation_id": f"session-{session_id}",
}
start_time = time.perf_counter()
try:
debug_log(f"Session {session_id}: Opening connection")
config = parse_url(BASE_URL)
reader, writer = await asyncio.open_connection(
config.host, config.port, ssl=config.use_ssl
)
request_bytes = build_http_request(config, payload, BEARER_TOKEN)
writer.write(request_bytes)
await writer.drain()
debug_log(f"Session {session_id}: Connection established")
ttft, content_block_stop_time, output_tokens, event_count = (
await read_stream_response(reader, session_id, start_time)
)
writer.close()
await writer.wait_closed()
end_time = time.perf_counter()
total_latency = end_time - start_time
generation_time = 0.0
throughput = 0.0
if ttft is not None and content_block_stop_time is not None:
generation_time = content_block_stop_time - (start_time + ttft)
if generation_time > 0:
throughput = output_tokens / generation_time
debug_log(
f"Session {session_id}: Completed - "
f"Latency: {total_latency:.3f}s, "
f"Generation: {generation_time:.3f}s, "
f"Tokens: {output_tokens}, "
f"Events: {event_count}, "
f"Throughput: {throughput:.2f} tok/s"
)
return RequestResult(
success=True,
ttft=ttft,
latency=total_latency,
generation_time=generation_time,
tokens=output_tokens,
throughput=throughput,
)
except Exception as e:
debug_log(f"Session {session_id}: Error - {type(e).__name__}: {e}")
return RequestResult(success=False, error=str(e))
async def run_concurrent_requests(num_users: int) -> list[RequestResult]:
debug_log(f"Starting {num_users} concurrent requests")
tasks = [make_request(i) for i in range(num_users)]
results = await asyncio.gather(*tasks)
debug_log(f"All {num_users} requests completed")
return list(results)
def calculate_percentile(data: list[float], p: float) -> float:
if not data:
return 0.0
k = (len(data) - 1) * p
f = int(k)
c = f + 1
if c >= len(data):
return data[f]
return data[f] + (k - f) * (data[c] - data[f])
def calculate_statistics(results: list[RequestResult]) -> BenchmarkStatistics:
successful = [r for r in results if r.success]
debug_log(
f"Calculating statistics for {len(successful)}/{len(results)} successful requests"
)
if not successful:
debug_log("No successful requests to calculate statistics")
return BenchmarkStatistics(
success_rate=0.0,
avg_ttft=0.0,
avg_latency=0.0,
avg_generation_time=0.0,
avg_throughput=0.0,
p50_ttft=0.0,
p95_ttft=0.0,
p99_ttft=0.0,
)
ttfts = sorted([r.ttft for r in successful if r.ttft is not None])
latencies = [r.latency for r in successful]
generation_times = [r.generation_time for r in successful]
throughputs = [r.throughput for r in successful if r.throughput > 0]
return BenchmarkStatistics(
success_rate=len(successful) / len(results),
avg_ttft=sum(ttfts) / len(ttfts) if ttfts else 0.0,
avg_latency=sum(latencies) / len(latencies),
avg_generation_time=sum(generation_times) / len(generation_times)
if generation_times
else 0.0,
avg_throughput=sum(throughputs) / len(throughputs) if throughputs else 0.0,
p50_ttft=calculate_percentile(ttfts, 0.5) if ttfts else 0.0,
p95_ttft=calculate_percentile(ttfts, 0.95) if ttfts else 0.0,
p99_ttft=calculate_percentile(ttfts, 0.99) if ttfts else 0.0,
)
def print_statistics(concurrency: int, stats: BenchmarkStatistics) -> None:
print(f"\nTesting with {concurrency} concurrent inferences...")
print(f" Success Rate: {stats.success_rate:.2%}")
print(f" Avg TTFT: {stats.avg_ttft:.3f}s")
print(f" P50 TTFT: {stats.p50_ttft:.3f}s")
print(f" P95 TTFT: {stats.p95_ttft:.3f}s")
print(f" P99 TTFT: {stats.p99_ttft:.3f}s")
print(f" Avg Latency: {stats.avg_latency:.3f}s")
print(f" Avg Generation Time: {stats.avg_generation_time:.3f}s")
print(f" Avg Throughput: {stats.avg_throughput:.2f} tok/s")
async def benchmark() -> None:
print(f"Benchmarking API: {BASE_URL}")
print(f"Model: {MODEL}")
print(f"Prompt: {PROMPT[:50]}...")
print("-" * 80)
for concurrency in CONCURRENCY_LEVELS:
debug_log(f"Starting concurrency level: {concurrency}")
results = await run_concurrent_requests(concurrency)
stats = calculate_statistics(results)
print_statistics(concurrency, stats)
if __name__ == "__main__":
asyncio.run(benchmark())
BASE_URL, BEARER_TOKEN, and MODEL in the script, then run it to measure TTFT, throughput, and latency under different concurrency levels. Compare your results against the theoretical maximum from the VRAM calculator to identify if performance is below expectations.