Parallelism

Overview

Inspect runs evaluations using a parallel async architecture, eagerly executing many samples in parallel while at the same time ensuring that that resources aren’t over-saturated by enforcing various limits (e.g. maximum number of concurrent model connections, maximum number of subprocesses, etc.).

There are a progression of concurrency concerns, and while most evaluations can rely on the Inspect default behaviour, others will benefit from more customisation. Below we’ll cover the following:

  1. Model API connection concurrency.
  2. Evaluting multiple models in parallel.
  3. Tool environment concurrency.
  4. Writing parallel code in custom solvers and scorers.

Model Connections

Max Connections

Connections to model APIs are the most fundamental unit of concurrency to manage. The main thing that limits model API concurrency is not local compute or network availability, but rather rate limits imposed by model API providers. Here we run an evaluation and set the maximum connections to 20:

$ inspect eval --model openai/gpt-4 --max-connections 20

The default value for max connections is 10. By increasing it we might get better performance due to higher parallelism, however we might get worse performance if this causes us to frequently hit rate limits (which are retried with exponential backoff). The “correct” max connections for your evaluations will vary based on your actual rate limit and the size and complexity of your evaluations.

Rate Limits

When you run an eval you’ll see information reported on the current active connection usage as well as the number of HTTP rate limit errors that have been encountered (note that Inspect will automatically retry on rate limits and other errors likely to be transient):

The Inspect task results displayed in the terminal. The number of HTTP rate limit errors that have occurred (25) is printed in the bottom right of the task results.

Here we’ve set a higher max connections than the default (30). While you might be tempted to set this very high to see how much concurrent traffic you can sustain, more often than not setting too high a max connections will result in slower evaluations, because retries are done using exponential backoff, and bouncing off of rate limits too frequently will have you waiting minutes for retries to fire.

You should experiment with various values for max connections at different times of day (evening is often very different than daytime!). Generally speaking, you want to see some number of HTTP rate limits enforced so you know that are somewhere close to ideal utilisation, but if you see hundreds of these you are likely over-saturating and experiencing a net slowdown.

Limiting Retries

By default, inspect will continue to retry model API calls (with exponential backoff) indefinitely when a rate limit error (HTTP status 429) is returned . You can limit these retries by using the max_retries and timeout eval options. For example:

$ inspect eval --model openai/gpt-4 --max-retries 10 --timeout 600

If you want more insight into Model API connections and retries, specify log_level=http. For example:

$ inspect eval --model openai/gpt-4 --log-level=http

Note that max connections is applied per-model. This means that if you use a grader model from a provider distinct from the one you are evaluating you will get extra concurrency (as each model will enforce its own max connections).

Multiple Models

The multiple models feature described below is available in only the development version of Inspect (it is not yet published to PyPI). You can install the development version with:

$ pip install git+https://github.com/UKGovernmentBEIS/inspect_ai

You can evaluate multiple models in parallel by passing a list of models to the eval() function. For example:

eval("mathematics.py", model=[
    "openai/gpt-4-turbo",
    "anthropic/claude-3-opus-20240229",
    "google/gemini-1.5-pro"
])

An evaluation task display show the progress for 3 differnet models.

Since each model provider has its own max_connections they don’t contend with each other for resources. If you need to evaluate multiple models, doing so concurrently is highly recommended.

If you want to specify multiple models when using the --model CLI argument or INSPECT_EVAL_MODEL environment variable, just separate the model names with commas. For example:

INSPECT_EVAL_MODEL=openai/gpt-4-turbo,google/gemini-1.5-pro

Tool Environments

Tool Environments (e.g. Docker containers) often allocate resources on a per-sample basis, and also make use of the Inspect subprocess() function for executing commands within the environment.

Max Samples

The max_samples option determines how many samples are executed in parallel (and in th case of Docker containers how many containers are run in parallel). By default, max_samples is set to max_connections so that the connection to the Model API can be fully utilised.

Since Tool enviroinments include additional expensive operations beyond calling models, you may want to increase max_samples to fully saturate both the Model API and container subprocesses used for tool execution. When running an evaluation you’ll see an indicator of how many connections and how many subprocesses are currently active. If neither is at capacity then you will likely benefit from increasing max_samples.

Note that setting max_samples to an arbitrarily high number does have some disadvantages: you will consume more memory (especially if using tool environments) as well as wait longer for completed samples to be logged (so could be subject to losing more work if your eval task fails).

Max Subprocesses

The max_subprocesses option determines how many subprocesses calls can run in parallel. By defualt, this is set to os.cpu_count(). Depending on the nature of execution done inside tool environments, you might benefit from increasing or decreasting max_subprocesses.

Solvers and Scorers

REST APIs

It’s possible that your custom solvers, tools, or scorers will call other REST APIs. Two things to keep in mind when doing this are:

  1. It’s critical that connections to other APIs use async HTTP APIs (i.e. the httpx model rather than the requests module). This is because Inspect’s parallelism relies on everything being async, so if you make a blocking HTTP call with requests it will actually hold up all of the rest of the work in system!

  2. As with model APIs, rate limits may be in play, so it’s important not to over-saturate these connections. Recall that Inspect runs all samples in parallel so if you have 500 samples and don’t do anything to limit concurrency, you will likely end up making hundreds of calls at a time to the API.

Here’s some (oversimplified) example code that illustrates how to call a REST API within an Inspect component. We use the async interface of the httpx module, and we use Inspect’s concurrency() function to limit simultaneous connections to 10:

import httpx
from inspect_ai.util import concurrency
from inspect_ai.solver import Generate, TaskState

client = httpx.AsyncClient()

async def solve(state: TaskState, generate: Generate):
  ...
  # wrap the call to client.get() in an async concurrency 
  # block to limit simultaneous connections to 10
  async with concurrency("my-rest-api", 10):
    response = await client.get("https://example.com/api")

Note that we pass a name (“my-rest-api”) to the concurrency() function. This provides a named scope for managing concurrency for calls to that specific API/service.

Parallel Code

Generally speaking, you should try to make all of the code you write within Inspect solvers, tools, and scorers as parallel as possible. The main idea is to eagerly post as much work as you can, and then allow the various concurrency gates described above to take care of not overloading remote APIs or local resources. There are two keys to writing parallel code:

  1. Use async for all potentially expensive operations. If you are calling a remote API, use the httpx.AsyncClient. If you are running local code, use the subprocess() function described above.
  2. If your async work can be parallelised, do it using asyncio.gather(). For example, if you are calling three different model APIs to score a task, you can call them all in parallel. Or if you need to retrieve 10 web pages you don’t need to do it in a loop—rather, you can fetch them all at once.

Model Requests

Let’s say you have a scorer that uses three different models to score based on majority vote. You could make all of the model API calls in parallel as follows:

from inspect_ai.model import get_model

models = [
  get_model("openai/gpt-4"),
  get_model("anthropic/claude-3-sonnet-20240229"),
  get_model("mistral/mistral-large-latest")
]

output = "Output to be scored"
prompt = f"Could you please score the following output?\n\n{output}"

graders = [model.generate(prompt) for model in models]

grader_outputs = await asyncio.gather(*graders)

Note that we don’t await the call to model.generate() when building our list of graders. Rather the call to asyncio.gather() will await each of these requests and return when they have all completed. Inspect’s internal handling of max_connections for model APIs will apply to these requests, so you need now worry about how many you put in flight, they will be throttled as appropriate.

Web Requests

Here’s an examples of using asyncio.gather() to parallelise web requests:

import asyncio
import httpx
client = httpx.AsyncClient()

pages = [
  "https://www.openai.com",
  "https://www.anthropic.com",
  "https://www.google.com",
  "https://mistral.ai/"
]

downloads = [client.get(page) for page in pages]

results = await asyncio.gather(*downloads)

Note that we don’t await the client requests when building up our list of downloads. Rather, we let asyncio.gather() await all of them, returning only when all of the results are available. Compared to looping over each page download this will execute much, much quicker. Note that if you are sending requests to a REST API that might have rate limits, you should consider wrapping your HTTP requests in a concurrency() block. For example:

from inspect_ai.util import concurrency

async def download(page):
  async with concurrency("my-web-api", 2):
    return await client.get(page)
  
downloads = [download(page) for page in pages]

results = await asyncio.gather(*downloads)

Subprocesses

It’s possible that your custom solvers, tools, or scorers will need to launch child processes to perform various tasks. Subprocesses have similar considerations as calling APIs: you want to make sure that they don’t block the rest of the work in Inspect (so they should be invoked with async) and you also want to make sure they don’t provide too much concurrency (i.e. you wouldn’t want to launch 200 processes at once on a 4 core machine!)

To assist with this, Inspect provides the subprocess() function. This async function takes a command and arguments and invokes the specified command asynchronously, collecting and returning stdout and stderr. The subprocess() function also automatically limits concurrent child processes to the number of CPUs on your system (os.cpu_count()). Here’s an example from the implementation of a list_files() tool:

@tool(prompt=(
   "If you are asked to list the files in a directory you "
   + "should call the list_files function to access the listing."
))
def list_files():
    async def execute(dir: str):
        """List the files in a directory.

        Args:
            dir (str): Directory

        Returns:
            File listing of the directory
        """
        result = await subprocess(["ls", dir])
        if result.success:
            return result.stdout
        else:
            raise ToolError(result.stderr)

    return execute

The maximum number of concurrent subprocesses can be modified using the --max-subprocesses option. For example:

$ inspect eval --model openai/gpt-4 --max-subprocesses 4

Note that if you need to execute computationally expensive code in an eval, you should always factor it into a call to subprocess() so that you get optimal concurrency and performance.

Timeouts

If you need to ensure that your subprocess runs for no longer than a specified interval, you can use the timeout option. For example:

result = await subprocess(["ls", dir], timeout = 30)

If a timeout occurs, then the result.status will be False and a timeout error message will be included in result.stderr.