Skip to main content

Slow Inference Performance

When experiencing slow inference times, first establish a performance baseline. Use the VRAM Calculator to determine your GPU’s expected throughput and compare it against your model specifications. Zylon provides a benchmarking script that simulates concurrent inference requests to measure Time To First Token (TTFT), latency, and throughput. This can help identify if performance is below expectations and whether it degrades under load.
Zylon allocates compute resources to maintain consistent response times under concurrent load (8-10 simultaneous users). This means single-inference benchmarks may show lower tokens/s than the hardware’s theoretical maximum, but real-world performance with multiple users will meet or exceed expectations.
import asyncio
import json
import random
import time
from dataclasses import dataclass
from typing import Optional

BASE_URL = "https://<host>/api/gpt"
BEARER_TOKEN = "your_token_here"
MODEL = "qwen-3-14b-awq"
MAX_TOKENS = 4096
PROMPT = "Write a paragraph about artificial intelligence."
CONCURRENCY_LEVELS = [1, 4, 8, 16, 32]
DEBUG = False
JITTER_MAX_MS = 200


@dataclass
class RequestResult:
    success: bool
    ttft: Optional[float] = None
    latency: float = 0.0
    generation_time: float = 0.0
    tokens: int = 0
    throughput: float = 0.0
    error: Optional[str] = None


@dataclass
class BenchmarkStatistics:
    success_rate: float
    avg_ttft: float
    avg_latency: float
    avg_generation_time: float
    avg_throughput: float
    p50_ttft: float
    p95_ttft: float
    p99_ttft: float


@dataclass
class ConnectionConfig:
    host: str
    port: int
    use_ssl: bool
    path: str


def debug_log(message: str, force: bool = False) -> None:
    if DEBUG or force:
        timestamp = time.strftime("%H:%M:%S", time.localtime())
        print(f"[DEBUG {timestamp}] {message}")


def parse_url(url: str) -> ConnectionConfig:
    use_ssl = url.startswith("https")
    host_and_path = url.split("//")[1]
    parts = host_and_path.split("/", 1)
    host = parts[0]
    path = "/" + parts[1] if len(parts) > 1 else "/"
    port = 443 if use_ssl else 80

    return ConnectionConfig(host=host, port=port, use_ssl=use_ssl, path=path)


def build_http_request(
    config: ConnectionConfig, payload: dict[str, object], bearer_token: str
) -> bytes:
    path = config.path + "/v1/messages"
    body = json.dumps(payload)

    request_lines = [
        f"POST {path} HTTP/1.1",
        f"Host: {config.host}",
        "Content-Type: application/json",
        f"Authorization: Bearer {bearer_token}",
        f"Content-Length: {len(body)}",
        "Connection: close",
        "",
        body,
    ]

    return "\r\n".join(request_lines).encode()


async def read_stream_response(
    reader: asyncio.StreamReader, session_id: int, start_time: float
) -> tuple[Optional[float], Optional[float], int, int]:
    ttft: Optional[float] = None
    content_block_stop_time: Optional[float] = None
    output_tokens = 0
    event_count = 0
    headers_done = False
    buffer = b""

    while True:
        chunk = await reader.read(8192)
        if not chunk:
            break

        buffer += chunk

        if not headers_done:
            if b"\r\n\r\n" in buffer:
                headers_done = True
                buffer = buffer.split(b"\r\n\r\n", 1)[1]

        if headers_done:
            lines = buffer.split(b"\n")
            buffer = lines[-1]

            for line in lines[:-1]:
                line_str = line.decode("utf-8").strip()

                if not line_str or not line_str.startswith("data: "):
                    continue

                data_str = line_str[6:]
                if data_str == "[DONE]":
                    debug_log(f"Session {session_id}: Stream complete")
                    continue

                try:
                    event = json.loads(data_str)
                    event_count += 1

                    if ttft is None and event.get("type") == "content_block_delta":
                        ttft = time.perf_counter() - start_time
                        debug_log(f"Session {session_id}: TTFT = {ttft:.3f}s")

                    if event.get("type") == "content_block_stop":
                        content_block_stop_time = time.perf_counter()
                        debug_log(
                            f"Session {session_id}: Content block stopped at {content_block_stop_time - start_time:.3f}s"
                        )

                    if event.get("type") == "message_delta":
                        usage = event.get("usage", {})
                        output_tokens = usage.get("output_tokens", 0)
                        debug_log(
                            f"Session {session_id}: Received usage data - {output_tokens} tokens"
                        )

                except json.JSONDecodeError as e:
                    debug_log(f"Session {session_id}: JSON decode error - {e}")
                    continue

    return ttft, content_block_stop_time, output_tokens, event_count


async def make_request(session_id: int) -> RequestResult:
    jitter = random.randint(0, JITTER_MAX_MS) / 1000.0
    debug_log(f"Session {session_id}: Waiting {jitter:.3f}s before starting")
    await asyncio.sleep(jitter)

    debug_log(f"Session {session_id}: Starting request")

    payload: dict[str, object] = {
        "model": MODEL,
        "max_tokens": MAX_TOKENS,
        "messages": [{"role": "user", "content": PROMPT}],
        "stream": True,
        "correlation_id": f"session-{session_id}",
    }

    start_time = time.perf_counter()

    try:
        debug_log(f"Session {session_id}: Opening connection")

        config = parse_url(BASE_URL)
        reader, writer = await asyncio.open_connection(
            config.host, config.port, ssl=config.use_ssl
        )

        request_bytes = build_http_request(config, payload, BEARER_TOKEN)
        writer.write(request_bytes)
        await writer.drain()

        debug_log(f"Session {session_id}: Connection established")

        ttft, content_block_stop_time, output_tokens, event_count = (
            await read_stream_response(reader, session_id, start_time)
        )

        writer.close()
        await writer.wait_closed()

        end_time = time.perf_counter()
        total_latency = end_time - start_time

        generation_time = 0.0
        throughput = 0.0
        if ttft is not None and content_block_stop_time is not None:
            generation_time = content_block_stop_time - (start_time + ttft)
            if generation_time > 0:
                throughput = output_tokens / generation_time

        debug_log(
            f"Session {session_id}: Completed - "
            f"Latency: {total_latency:.3f}s, "
            f"Generation: {generation_time:.3f}s, "
            f"Tokens: {output_tokens}, "
            f"Events: {event_count}, "
            f"Throughput: {throughput:.2f} tok/s"
        )

        return RequestResult(
            success=True,
            ttft=ttft,
            latency=total_latency,
            generation_time=generation_time,
            tokens=output_tokens,
            throughput=throughput,
        )

    except Exception as e:
        debug_log(f"Session {session_id}: Error - {type(e).__name__}: {e}")
        return RequestResult(success=False, error=str(e))


async def run_concurrent_requests(num_users: int) -> list[RequestResult]:
    debug_log(f"Starting {num_users} concurrent requests")
    tasks = [make_request(i) for i in range(num_users)]
    results = await asyncio.gather(*tasks)
    debug_log(f"All {num_users} requests completed")
    return list(results)


def calculate_percentile(data: list[float], p: float) -> float:
    if not data:
        return 0.0
    k = (len(data) - 1) * p
    f = int(k)
    c = f + 1
    if c >= len(data):
        return data[f]
    return data[f] + (k - f) * (data[c] - data[f])


def calculate_statistics(results: list[RequestResult]) -> BenchmarkStatistics:
    successful = [r for r in results if r.success]
    debug_log(
        f"Calculating statistics for {len(successful)}/{len(results)} successful requests"
    )

    if not successful:
        debug_log("No successful requests to calculate statistics")
        return BenchmarkStatistics(
            success_rate=0.0,
            avg_ttft=0.0,
            avg_latency=0.0,
            avg_generation_time=0.0,
            avg_throughput=0.0,
            p50_ttft=0.0,
            p95_ttft=0.0,
            p99_ttft=0.0,
        )

    ttfts = sorted([r.ttft for r in successful if r.ttft is not None])
    latencies = [r.latency for r in successful]
    generation_times = [r.generation_time for r in successful]
    throughputs = [r.throughput for r in successful if r.throughput > 0]

    return BenchmarkStatistics(
        success_rate=len(successful) / len(results),
        avg_ttft=sum(ttfts) / len(ttfts) if ttfts else 0.0,
        avg_latency=sum(latencies) / len(latencies),
        avg_generation_time=sum(generation_times) / len(generation_times)
        if generation_times
        else 0.0,
        avg_throughput=sum(throughputs) / len(throughputs) if throughputs else 0.0,
        p50_ttft=calculate_percentile(ttfts, 0.5) if ttfts else 0.0,
        p95_ttft=calculate_percentile(ttfts, 0.95) if ttfts else 0.0,
        p99_ttft=calculate_percentile(ttfts, 0.99) if ttfts else 0.0,
    )


def print_statistics(concurrency: int, stats: BenchmarkStatistics) -> None:
    print(f"\nTesting with {concurrency} concurrent inferences...")
    print(f"  Success Rate: {stats.success_rate:.2%}")
    print(f"  Avg TTFT: {stats.avg_ttft:.3f}s")
    print(f"  P50 TTFT: {stats.p50_ttft:.3f}s")
    print(f"  P95 TTFT: {stats.p95_ttft:.3f}s")
    print(f"  P99 TTFT: {stats.p99_ttft:.3f}s")
    print(f"  Avg Latency: {stats.avg_latency:.3f}s")
    print(f"  Avg Generation Time: {stats.avg_generation_time:.3f}s")
    print(f"  Avg Throughput: {stats.avg_throughput:.2f} tok/s")


async def benchmark() -> None:
    print(f"Benchmarking API: {BASE_URL}")
    print(f"Model: {MODEL}")
    print(f"Prompt: {PROMPT[:50]}...")
    print("-" * 80)

    for concurrency in CONCURRENCY_LEVELS:
        debug_log(f"Starting concurrency level: {concurrency}")

        results = await run_concurrent_requests(concurrency)
        stats = calculate_statistics(results)
        print_statistics(concurrency, stats)


if __name__ == "__main__":
    asyncio.run(benchmark())
Update BASE_URL, BEARER_TOKEN, and MODEL in the script, then run it to measure TTFT, throughput, and latency under different concurrency levels. Compare your results against the theoretical maximum from the VRAM calculator to identify if performance is below expectations.