Port docker code executor, make async, make code executor restart async (#489)

* Port docker code executor, make async, make code executor restart async

* add export

* fmt

* fix async file
This commit is contained in:
Jack Gerrits 2024-09-12 16:33:30 -04:00 committed by GitHub
parent ae10608427
commit 3ab51d3006
9 changed files with 494 additions and 72 deletions

View File

@ -23,6 +23,7 @@ dependencies = [
"protobuf~=4.25.1",
"tiktoken",
"azure-core",
"docker~=7.0",
"opentelemetry-api~=1.27.0"
]
@ -59,6 +60,7 @@ dev-dependencies = [
"types-pillow",
"types-protobuf",
"types-requests",
"types-docker",
"wikipedia",
"opentelemetry-sdk>=1.27.0",
]

View File

@ -2,6 +2,7 @@ from ._base import CodeBlock, CodeExecutor, CodeResult
from ._func_with_reqs import Alias, FunctionWithRequirements, Import, ImportFromModule, with_requirements
from ._impl.azure_container_code_executor import AzureContainerCodeExecutor
from ._impl.command_line_code_result import CommandLineCodeResult
from ._impl.docker_command_line_code_executor import DockerCommandLineCodeExecutor
from ._impl.local_commandline_code_executor import LocalCommandLineCodeExecutor
from ._utils import extract_markdown_code_blocks
@ -18,4 +19,5 @@ __all__ = [
"FunctionWithRequirements",
"with_requirements",
"extract_markdown_code_blocks",
"DockerCommandLineCodeExecutor",
]

View File

@ -49,7 +49,7 @@ class CodeExecutor(Protocol):
"""
...
def restart(self) -> None:
async def restart(self) -> None:
"""Restart the code executor.
This method should be implemented by the code executor.

View File

@ -456,7 +456,7 @@ import pkg_resources\n[d.project_name for d in pkg_resources.working_set]
return CodeResult(exit_code=exitcode, output=logs_all)
def restart(self) -> None:
async def restart(self) -> None:
"""(Experimental) Restart the code executor."""
self._session_id = str(uuid4())
self._setup_functions_complete = False

View File

@ -0,0 +1,341 @@
# File based from: https://github.com/microsoft/autogen/blob/main/autogen/coding/docker_commandline_code_executor.py
# Credit to original authors
from __future__ import annotations
import asyncio
import atexit
import logging
import shlex
import sys
import uuid
from collections.abc import Sequence
from hashlib import md5
from pathlib import Path
from types import TracebackType
from typing import Any, Callable, ClassVar, Dict, List, Optional, ParamSpec, Type, Union
import docker
from docker.errors import ImageNotFound, NotFound
from ....base._cancellation_token import CancellationToken
from ....components.code_executor._base import CodeBlock, CodeExecutor
from ....components.code_executor._func_with_reqs import FunctionWithRequirements, FunctionWithRequirementsStr
from ....components.code_executor._impl.command_line_code_result import CommandLineCodeResult
from .._func_with_reqs import (
build_python_functions_file,
)
from .utils import get_file_name_from_content, lang_to_cmd, silence_pip
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
async def _wait_for_ready(container: Any, timeout: int = 60, stop_time: float = 0.1) -> None:
elapsed_time = 0.0
while container.status != "running" and elapsed_time < timeout:
await asyncio.sleep(stop_time)
elapsed_time += stop_time
await asyncio.to_thread(container.reload)
continue
if container.status != "running":
raise ValueError("Container failed to start")
A = ParamSpec("A")
class DockerCommandLineCodeExecutor(CodeExecutor):
SUPPORTED_LANGUAGES: ClassVar[List[str]] = [
"bash",
"shell",
"sh",
"pwsh",
"powershell",
"ps1",
"python",
]
FUNCTION_PROMPT_TEMPLATE: ClassVar[
str
] = """You have access to the following user defined functions. They can be accessed from the module called `$module_name` by their function names.
For example, if there was a function called `foo` you could import it by writing `from $module_name import foo`
$functions"""
def __init__(
self,
image: str = "python:3-slim",
container_name: Optional[str] = None,
*,
timeout: int = 60,
work_dir: Union[Path, str] = Path("."),
bind_dir: Optional[Union[Path, str]] = None,
auto_remove: bool = True,
stop_container: bool = True,
functions: Sequence[
Union[
FunctionWithRequirements[Any, A],
Callable[..., Any],
FunctionWithRequirementsStr,
]
] = [],
functions_module: str = "functions",
):
"""Executes code through a command line environment in a Docker container.
The executor first saves each code block in a file in the working
directory, and then executes the code file in the container.
The executor executes the code blocks in the order they are received.
Currently, the executor only supports Python and shell scripts.
For Python code, use the language "python" for the code block.
For shell scripts, use the language "bash", "shell", or "sh" for the code
block.
Args:
image (_type_, optional): Docker image to use for code execution.
Defaults to "python:3-slim".
container_name (Optional[str], optional): Name of the Docker container
which is created. If None, will autogenerate a name. Defaults to None.
timeout (int, optional): The timeout for code execution. Defaults to 60.
work_dir (Union[Path, str], optional): The working directory for the code
execution. Defaults to Path(".").
bind_dir (Union[Path, str], optional): The directory that will be bound
to the code executor container. Useful for cases where you want to spawn
the container from within a container. Defaults to work_dir.
auto_remove (bool, optional): If true, will automatically remove the Docker
container when it is stopped. Defaults to True.
stop_container (bool, optional): If true, will automatically stop the
container when stop is called, when the context manager exits or when
the Python process exits with atext. Defaults to True.
functions (List[Union[FunctionWithRequirements[Any, A], Callable[..., Any]]]): A list of functions that are available to the code executor. Default is an empty list.
functions_module (str, optional): The name of the module that will be created to store the functions. Defaults to "functions".
"""
if timeout < 1:
raise ValueError("Timeout must be greater than or equal to 1.")
if isinstance(work_dir, str):
work_dir = Path(work_dir)
work_dir.mkdir(exist_ok=True)
if bind_dir is None:
bind_dir = work_dir
elif isinstance(bind_dir, str):
bind_dir = Path(bind_dir)
if container_name is None:
self.container_name = f"autogen-code-exec-{uuid.uuid4()}"
else:
self.container_name = container_name
self._timeout = timeout
self._work_dir: Path = work_dir
self._bind_dir: Path = bind_dir
self._auto_remove = auto_remove
self._stop_container = stop_container
self._image = image
if not functions_module.isidentifier():
raise ValueError("Module name must be a valid Python identifier")
self._functions_module = functions_module
self._functions = functions
# Setup could take some time so we intentionally wait for the first code block to do it.
if len(functions) > 0:
self._setup_functions_complete = False
else:
self._setup_functions_complete = True
@property
def timeout(self) -> int:
"""(Experimental) The timeout for code execution."""
return self._timeout
@property
def work_dir(self) -> Path:
"""(Experimental) The working directory for the code execution."""
return self._work_dir
@property
def bind_dir(self) -> Path:
"""(Experimental) The binding directory for the code execution container."""
return self._bind_dir
async def _setup_functions(self, cancellation_token: CancellationToken) -> None:
func_file_content = build_python_functions_file(self._functions)
func_file = self._work_dir / f"{self._functions_module}.py"
func_file.write_text(func_file_content)
# Collect requirements
lists_of_packages = [x.python_packages for x in self._functions if isinstance(x, FunctionWithRequirements)]
flattened_packages = [item for sublist in lists_of_packages for item in sublist]
required_packages = list(set(flattened_packages))
if len(required_packages) > 0:
logging.info("Ensuring packages are installed in executor.")
packages = shlex.join(required_packages)
result = await self._execute_code_dont_check_setup(
[CodeBlock(code=f"python -m pip install {packages}", language="sh")], cancellation_token
)
if result.exit_code != 0:
stdout = result.output
stderr = result.output
raise ValueError(f"Pip install failed. {stdout}, {stderr}")
# Attempt to load the function file to check for syntax errors, imports etc.
exec_result = await self._execute_code_dont_check_setup(
[CodeBlock(code=func_file_content, language="python")], cancellation_token
)
if exec_result.exit_code != 0:
raise ValueError(f"Functions failed to load: {exec_result.output}")
self._setup_functions_complete = True
async def _execute_code_dont_check_setup(
self, code_blocks: List[CodeBlock], cancellation_token: CancellationToken
) -> CommandLineCodeResult:
if len(code_blocks) == 0:
raise ValueError("No code blocks to execute.")
outputs = []
files = []
last_exit_code = 0
for code_block in code_blocks:
lang = code_block.language.lower()
code = silence_pip(code_block.code, lang)
# Check if there is a filename comment
try:
filename = get_file_name_from_content(code, self._work_dir)
except ValueError:
outputs.append("Filename is not in the workspace")
last_exit_code = 1
break
if not filename:
filename = f"tmp_code_{md5(code.encode()).hexdigest()}.{lang}"
code_path = self._work_dir / filename
with code_path.open("w", encoding="utf-8") as fout:
fout.write(code)
files.append(code_path)
command = ["timeout", str(self._timeout), lang_to_cmd(lang), filename]
result = await asyncio.to_thread(self._container.exec_run, command)
exit_code = result.exit_code
output = result.output.decode("utf-8")
if exit_code == 124:
output += "\n Timeout"
outputs.append(output)
last_exit_code = exit_code
if exit_code != 0:
break
code_file = str(files[0]) if files else None
return CommandLineCodeResult(exit_code=last_exit_code, output="".join(outputs), code_file=code_file)
async def execute_code_blocks(
self, code_blocks: List[CodeBlock], cancellation_token: CancellationToken
) -> CommandLineCodeResult:
"""(Experimental) Execute the code blocks and return the result.
Args:
code_blocks (List[CodeBlock]): The code blocks to execute.
Returns:
CommandlineCodeResult: The result of the code execution."""
def raise_not_implemented() -> None:
raise NotImplementedError("Cancellation is not yet supported for DockerCommandLineCodeExecutor")
cancellation_token.add_callback(lambda: raise_not_implemented())
if not self._setup_functions_complete:
await self._setup_functions(cancellation_token)
return await self._execute_code_dont_check_setup(code_blocks, cancellation_token)
async def restart(self) -> None:
"""(Experimental) Restart the code executor."""
await asyncio.to_thread(self._container.restart)
if self._container.status != "running":
self._running = False
logs_str = self._container.logs().decode("utf-8")
raise ValueError(f"Failed to restart container. Logs: {logs_str}")
async def stop(self) -> None:
"""(Experimental) Stop the code executor."""
if not self._running:
return
client = docker.from_env()
try:
container = await asyncio.to_thread(client.containers.get, self.container_name)
await asyncio.to_thread(container.stop)
except NotFound:
pass
finally:
self._running = False
async def start(self) -> None:
# Start a container from the image, read to exec commands later
client = docker.from_env()
# Check if the image exists
try:
await asyncio.to_thread(client.images.get, self._image)
except ImageNotFound:
# TODO logger
logging.info(f"Pulling image {self._image}...")
# Let the docker exception escape if this fails.
await asyncio.to_thread(client.images.pull, self._image)
self._container = await asyncio.to_thread(
client.containers.create,
self._image,
name=self.container_name,
entrypoint="/bin/sh",
tty=True,
detach=True,
auto_remove=self._auto_remove,
volumes={str(self._bind_dir.resolve()): {"bind": "/workspace", "mode": "rw"}},
working_dir="/workspace",
)
await asyncio.to_thread(self._container.start)
await _wait_for_ready(self._container)
def cleanup() -> None:
loop = asyncio.get_event_loop()
loop.run_until_complete(self.stop())
atexit.unregister(cleanup)
if self._stop_container:
atexit.register(cleanup)
# Check if the container is running
if self._container.status != "running":
logs_str = self._container.logs().decode("utf-8")
raise ValueError(f"Failed to start container from image {self._image}. Logs: {logs_str}")
self._running = True
async def __aenter__(self) -> Self:
await self.start()
return self
async def __aexit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> Optional[bool]:
await self.stop()
return None

View File

@ -81,6 +81,8 @@ $functions"""
a default working directory will be used. The default working
directory is the current directory ".".
functions (List[Union[FunctionWithRequirements[Any, A], Callable[..., Any]]]): A list of functions that are available to the code executor. Default is an empty list.
functions_module (str, optional): The name of the module that will be created to store the functions. Defaults to "functions".
"""
if timeout < 1:
@ -288,7 +290,7 @@ $functions"""
code_file = str(file_names[0]) if len(file_names) > 0 else None
return CommandLineCodeResult(exit_code=exitcode, output=logs_all, code_file=code_file)
def restart(self) -> None:
async def restart(self) -> None:
"""(Experimental) Restart the code executor."""
warnings.warn(
"Restarting local command line code executor is not supported. No action is taken.",

View File

@ -2,74 +2,116 @@
# Credit to original authors
import asyncio
import os
import sys
import tempfile
from pathlib import Path
from typing import AsyncGenerator, TypeAlias
import pytest
from autogen_core.components.code_executor import CodeBlock, LocalCommandLineCodeExecutor
import pytest_asyncio
from autogen_core.base import CancellationToken
from autogen_core.components.code_executor import CodeBlock, DockerCommandLineCodeExecutor, LocalCommandLineCodeExecutor
from aiofiles import open
def docker_tests_enabled() -> bool:
if os.environ.get("SKIP_DOCKER", "unset").lower() == "true":
return False
try:
import docker
from docker.errors import DockerException
except ImportError:
return False
try:
client = docker.from_env()
client.ping() # type: ignore
return True
except DockerException:
return False
@pytest_asyncio.fixture(scope="function")
async def executor_and_temp_dir(
request: pytest.FixtureRequest,
) -> AsyncGenerator[tuple[LocalCommandLineCodeExecutor | DockerCommandLineCodeExecutor, str], None]:
if request.param == "local":
with tempfile.TemporaryDirectory() as temp_dir:
yield LocalCommandLineCodeExecutor(work_dir=temp_dir), temp_dir
elif request.param == "docker":
if not docker_tests_enabled():
pytest.skip("Docker tests are disabled")
with tempfile.TemporaryDirectory() as temp_dir:
async with DockerCommandLineCodeExecutor(work_dir=temp_dir) as executor:
yield executor, temp_dir
ExecutorFixture: TypeAlias = tuple[LocalCommandLineCodeExecutor | DockerCommandLineCodeExecutor, str]
UNIX_SHELLS = ["bash", "sh", "shell"]
WINDOWS_SHELLS = ["ps1", "pwsh", "powershell"]
PYTHON_VARIANTS = ["python", "Python", "py"]
@pytest.mark.asyncio
async def test_execute_code() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
cancellation_token = CancellationToken()
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir)
@pytest.mark.parametrize("executor_and_temp_dir", ["local", "docker"], indirect=True)
async def test_execute_code(executor_and_temp_dir: ExecutorFixture) -> None:
executor, temp_dir = executor_and_temp_dir
cancellation_token = CancellationToken()
# Test single code block.
code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")]
# Test single code block.
code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0 and "hello world!" in code_result.output and code_result.code_file is not None
# Test multiple code blocks.
code_blocks = [
CodeBlock(code="import sys; print('hello world!')", language="python"),
CodeBlock(code="a = 100 + 100; print(a)", language="python"),
]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert (
code_result.exit_code == 0
and "hello world!" in code_result.output
and "200" in code_result.output
and code_result.code_file is not None
)
# Test bash script.
if sys.platform not in ["win32"]:
code_blocks = [CodeBlock(code="echo 'hello world!'", language="bash")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0 and "hello world!" in code_result.output and code_result.code_file is not None
# Test multiple code blocks.
code_blocks = [
CodeBlock(code="import sys; print('hello world!')", language="python"),
CodeBlock(code="a = 100 + 100; print(a)", language="python"),
]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert (
code_result.exit_code == 0
and "hello world!" in code_result.output
and "200" in code_result.output
and code_result.code_file is not None
)
# Test running code.
file_lines = ["import sys", "print('hello world!')", "a = 100 + 100", "print(a)"]
code_blocks = [CodeBlock(code="\n".join(file_lines), language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert (
code_result.exit_code == 0
and "hello world!" in code_result.output
and "200" in code_result.output
and code_result.code_file is not None
)
# Test bash script.
if sys.platform not in ["win32"]:
code_blocks = [CodeBlock(code="echo 'hello world!'", language="bash")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0 and "hello world!" in code_result.output and code_result.code_file is not None
# Check saved code file.
async with open(code_result.code_file) as f:
code_lines = await f.readlines()
for file_line, code_line in zip(file_lines, code_lines, strict=False):
assert file_line.strip() == code_line.strip()
# Test running code.
file_lines = ["import sys", "print('hello world!')", "a = 100 + 100", "print(a)"]
code_blocks = [CodeBlock(code="\n".join(file_lines), language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert (
code_result.exit_code == 0
and "hello world!" in code_result.output
and "200" in code_result.output
and code_result.code_file is not None
)
# Check saved code file.
with open(code_result.code_file) as f:
code_lines = f.readlines()
for file_line, code_line in zip(file_lines, code_lines):
assert file_line.strip() == code_line.strip()
@pytest.mark.asyncio
async def test_commandline_code_executor_timeout() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
cancellation_token = CancellationToken()
executor = LocalCommandLineCodeExecutor(timeout=1, work_dir=temp_dir)
code_blocks = [CodeBlock(code="import time; time.sleep(10); print('hello world!')", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code and "Timeout" in code_result.output
@pytest.mark.parametrize("executor_and_temp_dir", ["local", "docker"], indirect=True)
async def test_commandline_code_executor_timeout(executor_and_temp_dir: ExecutorFixture) -> None:
executor, temp_dir = executor_and_temp_dir
cancellation_token = CancellationToken()
executor = LocalCommandLineCodeExecutor(timeout=1, work_dir=temp_dir)
code_blocks = [CodeBlock(code="import time; time.sleep(10); print('hello world!')", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code and "Timeout" in code_result.output
# TODO: add docker when cancellation is supported
@pytest.mark.asyncio
async def test_commandline_code_executor_cancellation() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
@ -85,39 +127,68 @@ async def test_commandline_code_executor_cancellation() -> None:
assert code_result.exit_code and "Cancelled" in code_result.output
def test_local_commandline_code_executor_restart() -> None:
@pytest.mark.asyncio
async def test_local_commandline_code_executor_restart() -> None:
executor = LocalCommandLineCodeExecutor()
with pytest.warns(UserWarning, match=r".*No action is taken."):
executor.restart()
await executor.restart()
@pytest.mark.asyncio
async def test_invalid_relative_path() -> None:
@pytest.mark.parametrize("executor_and_temp_dir", ["local", "docker"], indirect=True)
async def test_invalid_relative_path(executor_and_temp_dir: ExecutorFixture) -> None:
executor, temp_dir = executor_and_temp_dir
cancellation_token = CancellationToken()
executor = LocalCommandLineCodeExecutor()
code = """# filename: /tmp/test.py
print("hello world")
"""
result = await executor.execute_code_blocks([CodeBlock(code=code, language="python")], cancellation_token=cancellation_token)
result = await executor.execute_code_blocks(
[CodeBlock(code=code, language="python")], cancellation_token=cancellation_token
)
assert result.exit_code == 1 and "Filename is not in the workspace" in result.output
@pytest.mark.asyncio
async def test_valid_relative_path() -> None:
with tempfile.TemporaryDirectory() as temp_dir_str:
cancellation_token = CancellationToken()
temp_dir = Path(temp_dir_str)
executor = LocalCommandLineCodeExecutor(work_dir=temp_dir)
code = """# filename: test.py
@pytest.mark.parametrize("executor_and_temp_dir", ["local", "docker"], indirect=True)
async def test_valid_relative_path(executor_and_temp_dir: ExecutorFixture) -> None:
executor, temp_dir_str = executor_and_temp_dir
cancellation_token = CancellationToken()
temp_dir = Path(temp_dir_str)
code = """# filename: test.py
print("hello world")
"""
result = await executor.execute_code_blocks([CodeBlock(code=code, language="python")], cancellation_token=cancellation_token)
assert result.exit_code == 0
assert "hello world" in result.output
assert result.code_file is not None
assert "test.py" in result.code_file
assert (temp_dir / Path("test.py")).resolve() == Path(result.code_file).resolve()
assert (temp_dir / Path("test.py")).exists()
result = await executor.execute_code_blocks(
[CodeBlock(code=code, language="python")], cancellation_token=cancellation_token
)
assert result.exit_code == 0
assert "hello world" in result.output
assert result.code_file is not None
assert "test.py" in result.code_file
assert (temp_dir / Path("test.py")).resolve() == Path(result.code_file).resolve()
assert (temp_dir / Path("test.py")).exists()
@pytest.mark.asyncio
async def test_docker_commandline_code_executor_start_stop() -> None:
if not docker_tests_enabled():
pytest.skip("Docker tests are disabled")
with tempfile.TemporaryDirectory() as temp_dir:
executor = DockerCommandLineCodeExecutor(work_dir=temp_dir)
await executor.start()
await executor.stop()
@pytest.mark.asyncio
async def test_docker_commandline_code_executor_start_stop_context_manager() -> None:
if not docker_tests_enabled():
pytest.skip("Docker tests are disabled")
with tempfile.TemporaryDirectory() as temp_dir:
async with DockerCommandLineCodeExecutor(work_dir=temp_dir) as exec:
pass

View File

@ -279,6 +279,7 @@ source = { editable = "packages/autogen-core" }
dependencies = [
{ name = "aiohttp" },
{ name = "azure-core" },
{ name = "docker" },
{ name = "grpcio" },
{ name = "openai" },
{ name = "opentelemetry-api" },
@ -320,6 +321,7 @@ dev = [
{ name = "textual-dev" },
{ name = "textual-imageview" },
{ name = "types-aiofiles" },
{ name = "types-docker" },
{ name = "types-pillow" },
{ name = "types-protobuf" },
{ name = "types-requests" },
@ -330,6 +332,7 @@ dev = [
requires-dist = [
{ name = "aiohttp" },
{ name = "azure-core" },
{ name = "docker", specifier = "~=7.0" },
{ name = "grpcio", specifier = "~=1.62.0" },
{ name = "openai", specifier = ">=1.3" },
{ name = "opentelemetry-api", specifier = "~=1.27.0" },
@ -371,6 +374,7 @@ dev = [
{ name = "textual-dev" },
{ name = "textual-imageview" },
{ name = "types-aiofiles" },
{ name = "types-docker" },
{ name = "types-pillow" },
{ name = "types-protobuf" },
{ name = "types-requests" },