Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jupyter Code Executor in v0.4 (alternative implementation) #4885

Merged
merged 21 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion python/packages/autogen-ext/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ video-surfer = [
"ffmpeg-python",
"openai-whisper",
]

grpc = [
"grpcio~=1.62.0", # TODO: update this once we have a stable version.
]
jupyter-executor = [
"ipykernel>=6.29.5",
"nbclient>=0.10.2",
]

[tool.hatch.build.targets.wheel]
packages = ["src/autogen_ext"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from ._jupyter_code_executor import JupyterCodeExecutor, JupyterCodeResult

__all__ = [
"JupyterCodeExecutor",
"JupyterCodeResult",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import asyncio
import base64
import json
import re
import sys
import uuid
from dataclasses import dataclass
from pathlib import Path
from types import TracebackType

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self

from autogen_core import CancellationToken
from autogen_core.code_executor import CodeBlock, CodeExecutor, CodeResult
from nbclient import NotebookClient
from nbformat import NotebookNode
from nbformat import v4 as nbformat

from .._common import silence_pip


@dataclass
class JupyterCodeResult(CodeResult):
"""A code result class for Jupyter code executor."""

output_files: list[Path]


class JupyterCodeExecutor(CodeExecutor):
def __init__(
self,
kernel_name: str = "python3",
timeout: int = 60,
output_dir: Path = Path("."),
):
"""A code executor class that executes code statefully using nbclient.
ekzhu marked this conversation as resolved.
Show resolved Hide resolved

Args:
ekzhu marked this conversation as resolved.
Show resolved Hide resolved
kernel_name (str): The kernel name to use. By default, "python3".
timeout (int): The timeout for code execution, by default 60.
output_dir (Path): The directory to save output files, by default ".".
"""
if timeout < 1:
raise ValueError("Timeout must be greater than or equal to 1.")

self._kernel_name = kernel_name
self._timeout = timeout
self._output_dir = output_dir
# TODO: Forward arguments perhaps?
self._client = NotebookClient(
ekzhu marked this conversation as resolved.
Show resolved Hide resolved
nb=nbformat.new_notebook(),
kernel_name=self._kernel_name,
timeout=self._timeout,
allow_errors=True,
)

async def execute_code_blocks(
self, code_blocks: list[CodeBlock], cancellation_token: CancellationToken
) -> JupyterCodeResult:
"""Execute code blocks and return the result.

Args:
code_blocks (list[CodeBlock]): The code blocks to execute.

Returns:
JupyterCodeResult: The result of the code execution.
"""
outputs: list[str] = []
output_files: list[Path] = []
exit_code = 0

for code_block in code_blocks:
result = await self._execute_code_block(code_block, cancellation_token)
exit_code = result.exit_code
outputs.append(result.output)
output_files.extend(result.output_files)

# Stop execution if one code block fails
if exit_code != 0:
break

return JupyterCodeResult(exit_code=exit_code, output="\n".join(outputs), output_files=output_files)

async def _execute_code_block(
self, code_block: CodeBlock, cancellation_token: CancellationToken
) -> JupyterCodeResult:
"""Execute single code block and return the result.

Args:
code_block (CodeBlock): The code block to execute.

Returns:
JupyterCodeResult: The result of the code execution.
"""
execute_task = asyncio.create_task(
Leon0402 marked this conversation as resolved.
Show resolved Hide resolved
self._execute_cell(
nbformat.new_code_cell(silence_pip(code_block.code, code_block.language)) # type: ignore
)
)

cancellation_token.link_future(execute_task)
output_cell = await asyncio.wait_for(asyncio.shield(execute_task), timeout=self._timeout)

outputs: list[str] = []
output_files: list[Path] = []
exit_code = 0

for output in output_cell.get("outputs", []):
match output.get("output_type"):
case "stream":
outputs.append(output.get("text", ""))
case "error":
traceback = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", "\n".join(output["traceback"]))
outputs.append(traceback)
exit_code = 1
case "execute_result" | "display_data":
data = output.get("data", {})
for mime, content in data.items():
match mime:
case "text/plain":
outputs.append(content)
case "image/png":
path = self._save_image(content)
output_files.append(path)
case "image/jpeg":
# TODO: Should this also be encoded? Images are encoded as both png and jpg
pass
case "text/html":
path = self._save_html(content)
output_files.append(path)
case _:
outputs.append(json.dumps(content))

return JupyterCodeResult(exit_code=exit_code, output="\n".join(outputs), output_files=output_files)
ekzhu marked this conversation as resolved.
Show resolved Hide resolved

async def _execute_cell(self, cell: NotebookNode) -> NotebookNode:
# Temporary push cell to nb as async_execute_cell expects it. But then we want to remove it again as cells can take up significant amount of memory (especially with images)
self._client.nb.cells.append(cell)
output = await self._client.async_execute_cell(
cell,
cell_index=0,
)
self._client.nb.cells.pop()
return output

def _save_image(self, image_data_base64: str) -> Path:
"""Save image data to a file."""
image_data = base64.b64decode(image_data_base64)
path = self._output_dir / f"{uuid.uuid4().hex}.png"
path.write_bytes(image_data)
return path.absolute()

def _save_html(self, html_data: str) -> Path:
"""Save HTML data to a file."""
path = self._output_dir / f"{uuid.uuid4().hex}.html"
path.write_text(html_data)
return path.absolute()

async def restart(self) -> None:
"""Restart the code executor."""
await self.stop()
await self.start()

async def start(self) -> None:
self.kernel_context = self._client.async_setup_kernel()
await self.kernel_context.__aenter__()

async def stop(self) -> None:
"""Stop the kernel."""
await self.kernel_context.__aexit__(None, None, None)

async def __aenter__(self) -> Self:
await self.start()
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
await self.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import asyncio
import inspect
from pathlib import Path

import pytest
from autogen_core import CancellationToken
from autogen_core.code_executor import CodeBlock
from autogen_ext.code_executors.jupyter import JupyterCodeExecutor, JupyterCodeResult


@pytest.mark.asyncio
async def test_execute_code(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())
assert code_result == JupyterCodeResult(exit_code=0, output="hello world!\n", output_files=[])


@pytest.mark.asyncio
async def test_execute_code_error(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [CodeBlock(code="print(undefined_variable)", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())
assert code_result == JupyterCodeResult(
exit_code=1,
output=inspect.cleandoc("""
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[1], line 1
----> 1 print(undefined_variable)

NameError: name 'undefined_variable' is not defined
"""),
output_files=[],
)


@pytest.mark.asyncio
async def test_execute_multiple_code_blocks(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [
CodeBlock(code="import sys; print('hello world!')", language="python"),
CodeBlock(code="a = 100 + 100; print(a)", language="python"),
]
code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())
assert code_result == JupyterCodeResult(exit_code=0, output="hello world!\n\n200\n", output_files=[])


@pytest.mark.asyncio
async def test_depedent_executions(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks_1 = [CodeBlock(code="a = 'hello world!'", language="python")]
code_blocks_2 = [
CodeBlock(code="print(a)", language="python"),
]
await executor.execute_code_blocks(code_blocks_1, CancellationToken())
code_result = await executor.execute_code_blocks(code_blocks_2, CancellationToken())
assert code_result == JupyterCodeResult(exit_code=0, output="hello world!\n", output_files=[])


@pytest.mark.asyncio
async def test_execute_multiple_code_blocks_error(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [
CodeBlock(code="import sys; print('hello world!')", language="python"),
CodeBlock(code="a = 100 + 100; print(a); print(undefined_variable)", language="python"),
]
code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())
assert code_result == JupyterCodeResult(
exit_code=1,
output=inspect.cleandoc("""
hello world!

200

---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[2], line 1
----> 1 a = 100 + 100; print(a); print(undefined_variable)

NameError: name 'undefined_variable' is not defined
"""),
output_files=[],
)


@pytest.mark.asyncio
async def test_execute_code_after_restart(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
await executor.restart()

code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())
assert code_result == JupyterCodeResult(exit_code=0, output="hello world!\n", output_files=[])


@pytest.mark.asyncio
async def test_commandline_code_executor_timeout(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path, timeout=2) as executor:
code_blocks = [CodeBlock(code="import time; time.sleep(10); print('hello world!')", language="python")]

with pytest.raises(asyncio.TimeoutError):
await executor.execute_code_blocks(code_blocks, CancellationToken())


@pytest.mark.asyncio
async def test_commandline_code_executor_cancellation(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [CodeBlock(code="import time; time.sleep(10); print('hello world!')", language="python")]

cancellation_token = CancellationToken()
code_result_coroutine = executor.execute_code_blocks(code_blocks, cancellation_token)

await asyncio.sleep(1)
cancellation_token.cancel()

with pytest.raises(asyncio.CancelledError):
await code_result_coroutine


@pytest.mark.asyncio
async def test_execute_code_with_image_output(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [
CodeBlock(
code=inspect.cleandoc("""
from PIL import Image, ImageDraw
img = Image.new("RGB", (100, 100), color="white")
draw = ImageDraw.Draw(img)
draw.rectangle((10, 10, 90, 90), outline="black", fill="blue")
display(img)
"""),
language="python",
)
]

code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())

assert len(code_result.output_files) == 1
assert code_result == JupyterCodeResult(
exit_code=0,
output="<PIL.Image.Image image mode=RGB size=100x100>",
output_files=code_result.output_files,
)
assert code_result.output_files[0].parent == tmp_path


@pytest.mark.asyncio
async def test_execute_code_with_html_output(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [
CodeBlock(
code=inspect.cleandoc("""
from IPython.core.display import HTML
HTML("<div style='color:blue'>Hello, HTML world!</div>")
"""),
language="python",
)
]

code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())

assert len(code_result.output_files) == 1
assert code_result == JupyterCodeResult(
exit_code=0,
output="<IPython.core.display.HTML object>",
output_files=code_result.output_files,
)
assert code_result.output_files[0].parent == tmp_path
Loading