Implement docker based jupyter executor (#1794)

* Implement docker based jupyter client

* formatting

* skip docker tests when asked

* feedback

* add log

* update build

* formatting

* structural changes

* update setup.py

* update tests

---------

Co-authored-by: Chi Wang <wang.chi@microsoft.com>
This commit is contained in:
Jack Gerrits 2024-02-29 09:54:11 -05:00 committed by GitHub
parent b8ceb866e6
commit 76ef0789c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 268 additions and 41 deletions

View File

@ -46,9 +46,8 @@ jobs:
# code executors auto skip without deps, so only run for python 3.11
if: matrix.python-version == '3.11'
run: |
pip install jupyter-client ipykernel
pip install -e ".[jupyter-executor]"
python -m ipykernel install --user --name python3
pip install -e ".[local-jupyter-exec]"
- name: Set AUTOGEN_USE_DOCKER based on OS
shell: bash
run: |

View File

@ -2,4 +2,11 @@ from .base import CodeBlock, CodeExecutor, CodeExtractor, CodeResult
from .factory import CodeExecutorFactory
from .markdown_code_extractor import MarkdownCodeExtractor
__all__ = ("CodeBlock", "CodeResult", "CodeExtractor", "CodeExecutor", "CodeExecutorFactory", "MarkdownCodeExtractor")
__all__ = (
"CodeBlock",
"CodeResult",
"CodeExtractor",
"CodeExecutor",
"CodeExecutorFactory",
"MarkdownCodeExtractor",
)

View File

@ -30,16 +30,12 @@ class CodeExecutorFactory:
# If the executor is already an instance of CodeExecutor, return it.
return executor
if executor == "ipython-embedded":
from .embedded_ipython_code_executor import EmbeddedIPythonCodeExecutor
from .jupyter.embedded_ipython_code_executor import EmbeddedIPythonCodeExecutor
return EmbeddedIPythonCodeExecutor(**code_execution_config.get("ipython-embedded", {}))
elif executor == "commandline-local":
from .local_commandline_code_executor import LocalCommandlineCodeExecutor
return LocalCommandlineCodeExecutor(**code_execution_config.get("commandline-local", {}))
elif executor == "jupyter-local":
from .jupyter_code_executor import LocalJupyterCodeExecutor
return LocalJupyterCodeExecutor(**code_execution_config.get("jupyter-local", {}))
else:
raise ValueError(f"Unknown code executor {executor}")

View File

@ -1,5 +1,16 @@
from .base import JupyterConnectable, JupyterConnectionInfo
from .jupyter_client import JupyterClient
from .local_jupyter_server import LocalJupyterServer
from .docker_jupyter_server import DockerJupyterServer
from .embedded_ipython_code_executor import EmbeddedIPythonCodeExecutor
from .jupyter_code_executor import JupyterCodeExecutor
__all__ = ["JupyterConnectable", "JupyterConnectionInfo", "JupyterClient", "LocalJupyterServer"]
__all__ = [
"JupyterConnectable",
"JupyterConnectionInfo",
"JupyterClient",
"LocalJupyterServer",
"DockerJupyterServer",
"EmbeddedIPythonCodeExecutor",
"JupyterCodeExecutor",
]

View File

@ -0,0 +1,167 @@
from __future__ import annotations
from pathlib import Path
import sys
from time import sleep
from types import TracebackType
import uuid
from typing import Dict, Optional, Union
import docker
import secrets
import io
import atexit
import logging
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
from .jupyter_client import JupyterClient
from .base import JupyterConnectable, JupyterConnectionInfo
def _wait_for_ready(container: docker.Container, timeout: int = 60, stop_time: int = 0.1) -> None:
elapsed_time = 0
while container.status != "running" and elapsed_time < timeout:
sleep(stop_time)
elapsed_time += stop_time
container.reload()
continue
if container.status != "running":
raise ValueError("Container failed to start")
class DockerJupyterServer(JupyterConnectable):
DEFAULT_DOCKERFILE = """FROM quay.io/jupyter/docker-stacks-foundation
SHELL ["/bin/bash", "-o", "pipefail", "-c"]
USER ${NB_UID}
RUN mamba install --yes jupyter_kernel_gateway ipykernel && \
mamba clean --all -f -y && \
fix-permissions "${CONDA_DIR}" && \
fix-permissions "/home/${NB_USER}"
ENV TOKEN="UNSET"
CMD python -m jupyter kernelgateway --KernelGatewayApp.ip=0.0.0.0 \
--KernelGatewayApp.port=8888 \
--KernelGatewayApp.auth_token="${TOKEN}" \
--JupyterApp.answer_yes=true \
--JupyterWebsocketPersonality.list_kernels=true
EXPOSE 8888
WORKDIR "${HOME}"
"""
class GenerateToken:
pass
def __init__(
self,
*,
custom_image_name: Optional[str] = None,
container_name: Optional[str] = None,
auto_remove: bool = True,
stop_container: bool = True,
docker_env: Dict[str, str] = {},
token: Union[str, GenerateToken] = GenerateToken(),
):
"""Start a Jupyter kernel gateway server in a Docker container.
Args:
custom_image_name (Optional[str], optional): Custom image to use. If this is None,
then the bundled image will be built and used. The default image is based on
quay.io/jupyter/docker-stacks-foundation and extended to include jupyter_kernel_gateway
container_name (Optional[str], optional): Name of the container to start.
A name will be generated if None.
auto_remove (bool, optional): If true the Docker container will be deleted
when it is stopped.
stop_container (bool, optional): If true the container will be stopped,
either by program exit or using the context manager
docker_env (Dict[str, str], optional): Extra environment variables to pass
to the running Docker container.
token (Union[str, GenerateToken], optional): Token to use for authentication.
If GenerateToken is used, a random token will be generated. Empty string
will be unauthenticated.
"""
if container_name is None:
container_name = f"autogen-jupyterkernelgateway-{uuid.uuid4()}"
client = docker.from_env()
if custom_image_name is None:
image_name = "autogen-jupyterkernelgateway"
# Make sure the image exists
try:
client.images.get(image_name)
except docker.errors.ImageNotFound:
# Build the image
# Get this script directory
here = Path(__file__).parent
dockerfile = io.BytesIO(self.DEFAULT_DOCKERFILE.encode("utf-8"))
logging.info(f"Image {image_name} not found. Building it now.")
client.images.build(path=here, fileobj=dockerfile, tag=image_name)
logging.info(f"Image {image_name} built successfully.")
else:
image_name = custom_image_name
# Check if the image exists
try:
client.images.get(image_name)
except docker.errors.ImageNotFound:
raise ValueError(f"Custom image {image_name} does not exist")
if isinstance(token, DockerJupyterServer.GenerateToken):
self._token = secrets.token_hex(32)
else:
self._token = token
# Run the container
env = {"TOKEN": self._token}
env.update(docker_env)
container = client.containers.run(
image_name,
detach=True,
auto_remove=auto_remove,
environment=env,
publish_all_ports=True,
name=container_name,
)
_wait_for_ready(container)
container_ports = container.ports
self._port = int(container_ports["8888/tcp"][0]["HostPort"])
self._container_id = container.id
def cleanup():
try:
inner_container = client.containers.get(container.id)
inner_container.stop()
except docker.errors.NotFound:
pass
atexit.unregister(cleanup)
if stop_container:
atexit.register(cleanup)
self._cleanup_func = cleanup
self._stop_container = stop_container
@property
def connection_info(self) -> JupyterConnectionInfo:
return JupyterConnectionInfo(host="127.0.0.1", use_https=False, port=self._port, token=self._token)
def stop(self):
self._cleanup_func()
def get_client(self) -> JupyterClient:
return JupyterClient(self.connection_info)
def __enter__(self) -> Self:
return self
def __exit__(
self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
self.stop()

View File

@ -11,9 +11,9 @@ from jupyter_client import KernelManager # type: ignore[attr-defined]
from jupyter_client.kernelspec import KernelSpecManager
from pydantic import BaseModel, Field, field_validator
from ..agentchat.agent import LLMAgent
from .base import CodeBlock, CodeExtractor, IPythonCodeResult
from .markdown_code_extractor import MarkdownCodeExtractor
from ...agentchat.agent import LLMAgent
from ..base import CodeBlock, CodeExtractor, IPythonCodeResult
from ..markdown_code_extractor import MarkdownCodeExtractor
__all__ = "EmbeddedIPythonCodeExecutor"

View File

@ -14,6 +14,7 @@ import json
import uuid
import datetime
import requests
from requests.adapters import HTTPAdapter, Retry
import websocket
from websocket import WebSocket
@ -26,6 +27,9 @@ class JupyterClient:
def __init__(self, connection_info: JupyterConnectionInfo):
self._connection_info = connection_info
self._session = requests.Session()
retries = Retry(total=5, backoff_factor=0.1)
self._session.mount("http://", HTTPAdapter(max_retries=retries))
def _get_headers(self) -> Dict[str, str]:
if self._connection_info.token is None:
@ -40,11 +44,11 @@ class JupyterClient:
return f"ws://{self._connection_info.host}:{self._connection_info.port}"
def list_kernel_specs(self) -> Dict[str, Dict[str, str]]:
response = requests.get(f"{self._get_api_base_url()}/api/kernelspecs", headers=self._get_headers())
response = self._session.get(f"{self._get_api_base_url()}/api/kernelspecs", headers=self._get_headers())
return cast(Dict[str, Dict[str, str]], response.json())
def list_kernels(self) -> List[Dict[str, str]]:
response = requests.get(f"{self._get_api_base_url()}/api/kernels", headers=self._get_headers())
response = self._session.get(f"{self._get_api_base_url()}/api/kernels", headers=self._get_headers())
return cast(List[Dict[str, str]], response.json())
def start_kernel(self, kernel_spec_name: str) -> str:
@ -57,15 +61,21 @@ class JupyterClient:
str: ID of the started kernel
"""
response = requests.post(
response = self._session.post(
f"{self._get_api_base_url()}/api/kernels",
headers=self._get_headers(),
json={"name": kernel_spec_name},
)
return cast(str, response.json()["id"])
def delete_kernel(self, kernel_id: str) -> None:
response = self._session.delete(
f"{self._get_api_base_url()}/api/kernels/{kernel_id}", headers=self._get_headers()
)
response.raise_for_status()
def restart_kernel(self, kernel_id: str) -> None:
response = requests.post(
response = self._session.post(
f"{self._get_api_base_url()}/api/kernels/{kernel_id}/restart", headers=self._get_headers()
)
response.raise_for_status()
@ -100,6 +110,9 @@ class JupyterKernelClient:
def __exit__(
self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
self.stop()
def stop(self) -> None:
self._websocket.close()
def _send_message(self, *, content: Dict[str, Any], channel: str, message_type: str) -> str:

View File

@ -3,18 +3,22 @@ import json
import os
from pathlib import Path
import re
from types import TracebackType
import uuid
from typing import Any, ClassVar, List, Union
from typing import Any, ClassVar, List, Optional, Union
import sys
from pydantic import Field
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
from ..agentchat.agent import LLMAgent
from .base import CodeBlock, CodeExecutor, CodeExtractor, CodeResult, IPythonCodeResult
from .markdown_code_extractor import MarkdownCodeExtractor
from .jupyter import JupyterConnectable, JupyterConnectionInfo, LocalJupyterServer, JupyterClient
__all__ = ("JupyterCodeExecutor", "LocalJupyterCodeExecutor")
from ...agentchat.agent import LLMAgent
from ..base import CodeBlock, CodeExecutor, CodeExtractor, IPythonCodeResult
from ..markdown_code_extractor import MarkdownCodeExtractor
from .base import JupyterConnectable, JupyterConnectionInfo
from .jupyter_client import JupyterClient
class JupyterCodeExecutor(CodeExecutor):
@ -214,9 +218,14 @@ the output will be a path to the image instead of the image itself.
lines[i] = line.replace(match.group(0), match.group(0) + " -qqq")
return "\n".join(lines)
def stop(self) -> None:
"""Stop the kernel."""
self._jupyter_client.delete_kernel(self._kernel_id)
class LocalJupyterCodeExecutor(JupyterCodeExecutor):
def __init__(self, **kwargs: Any):
"""Creates a LocalJupyterServer and passes it to JupyterCodeExecutor, see JupyterCodeExecutor for args"""
jupyter_server = LocalJupyterServer()
super().__init__(jupyter_server=jupyter_server, **kwargs)
def __enter__(self) -> Self:
return self
def __exit__(
self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
self.stop()

View File

@ -56,10 +56,15 @@ setuptools.setup(
"websurfer": ["beautifulsoup4", "markdownify", "pdfminer.six", "pathvalidate"],
"redis": ["redis"],
# Dependencies for EmbeddedIPythonExecutor, to be removed once upstream bug fixed
# jupyter-client
# https://github.com/jupyter-server/kernel_gateway/issues/398
"ipython": ["jupyter-client>=8.6.0", "ipykernel>=6.29.0"],
# Dependencies for LocalJupyterExecutor
"local-jupyter-exec": ["jupyter-kernel-gateway", "websocket-client", "requests", "ipykernel"],
"jupyter-executor": [
"jupyter-kernel-gateway",
"websocket-client",
"requests",
"jupyter-client>=8.6.0",
"ipykernel>=6.29.0",
],
},
classifiers=[
"Programming Language :: Python :: 3",

View File

@ -9,11 +9,25 @@ from autogen.agentchat.conversable_agent import ConversableAgent
from autogen.coding.base import CodeBlock, CodeExecutor
from autogen.coding.factory import CodeExecutorFactory
from autogen.oai.openai_utils import config_list_from_json
from conftest import MOCK_OPEN_AI_API_KEY, skip_openai # noqa: E402
from conftest import MOCK_OPEN_AI_API_KEY, skip_openai, skip_docker # noqa: E402
try:
from autogen.coding.embedded_ipython_code_executor import EmbeddedIPythonCodeExecutor
from autogen.coding.jupyter_code_executor import LocalJupyterCodeExecutor
from autogen.coding.jupyter import (
DockerJupyterServer,
EmbeddedIPythonCodeExecutor,
JupyterCodeExecutor,
LocalJupyterServer,
)
class DockerJupyterExecutor(JupyterCodeExecutor):
def __init__(self, **kwargs):
jupyter_server = DockerJupyterServer()
super().__init__(jupyter_server=jupyter_server, **kwargs)
class LocalJupyterCodeExecutor(JupyterCodeExecutor):
def __init__(self, **kwargs):
jupyter_server = LocalJupyterServer()
super().__init__(jupyter_server=jupyter_server, **kwargs)
# Skip on windows due to kernelgateway bug https://github.com/jupyter-server/kernel_gateway/issues/398
if sys.platform == "win32":
@ -21,21 +35,27 @@ try:
else:
classes_to_test = [EmbeddedIPythonCodeExecutor, LocalJupyterCodeExecutor]
if not skip_docker:
classes_to_test.append(DockerJupyterExecutor)
skip = False
skip_reason = ""
except ImportError:
except ImportError as e:
skip = True
skip_reason = "Dependencies for EmbeddedIPythonCodeExecutor or LocalJupyterCodeExecutor not installed."
skip_reason = "Dependencies for EmbeddedIPythonCodeExecutor or LocalJupyterCodeExecutor not installed. " + e.msg
classes_to_test = []
@pytest.mark.skipif(skip, reason=skip_reason)
def test_create_dict() -> None:
config: Dict[str, Union[str, CodeExecutor]] = {"executor": "ipython-embedded"}
executor = CodeExecutorFactory.create(config)
assert isinstance(executor, EmbeddedIPythonCodeExecutor)
@pytest.mark.skipif(skip, reason=skip_reason)
@pytest.mark.parametrize("cls", classes_to_test)
def test_create(cls) -> None:
config: Dict[str, Union[str, CodeExecutor]] = {"executor": "ipython-embedded"}
executor = CodeExecutorFactory.create(config)
assert isinstance(executor, EmbeddedIPythonCodeExecutor)
config = {"executor": cls()}
executor = CodeExecutorFactory.create(config)
assert executor is config["executor"]