Skip to content

Container Executor

Classes:

CodeBlock

Bases: BaseModel

A class that represents a code block.

Source code in tapeagents/tools/container_executor.py
307
308
309
310
311
class CodeBlock(BaseModel):
    """A class that represents a code block."""

    code: str = Field(description="The code to execute.")
    language: str = Field(description="The language of the code.")

CodeResult

Bases: BaseModel

A class that represents the result of a code execution.

Source code in tapeagents/tools/container_executor.py
314
315
316
317
318
319
class CodeResult(BaseModel):
    """A class that represents the result of a code execution."""

    exit_code: int = Field(description="The exit code of the code execution.")
    output: str = Field(description="The output of the code execution.")
    output_files: list[str] = Field(default=None, description="The output files of the code execution.")

CommandLineCodeResult

Bases: CodeResult

(Experimental) A code result class for command line code executor.

Source code in tapeagents/tools/container_executor.py
322
323
324
325
326
327
328
class CommandLineCodeResult(CodeResult):
    """(Experimental) A code result class for command line code executor."""

    code_files: list[str] = Field(
        default=None,
        description="The file that the executed code block was saved to.",
    )

ContainerExecutor

Methods:

  • __init__

    (Experimental) A code executor class that executes code through

  • execute_code_blocks

    (Experimental) Execute the code blocks and return the result.

  • restart

    (Experimental) Restart the code executor.

  • stop

    (Experimental) Stop the code executor.

Attributes:

  • bind_dir (Path) –

    (Experimental) The binding directory for the code execution container.

  • timeout (int) –

    (Experimental) The timeout for code execution.

  • work_dir (Path) –

    (Experimental) The working directory for the code execution.

Source code in tapeagents/tools/container_executor.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
class ContainerExecutor:
    DEFAULT_EXECUTION_POLICY: ClassVar[Dict[str, bool]] = {
        "bash": True,
        "shell": True,
        "sh": True,
        "p-sh": True,
        "powershell": True,
        "ps1": True,
        "python": True,
        "javascript": False,
        "html": False,
        "css": False,
    }
    LANGUAGE_ALIASES: ClassVar[Dict[str, str]] = {"py": "python", "js": "javascript"}

    def __init__(
        self,
        image: str = "python:3-slim",
        container_name: Optional[str] = None,
        timeout: int = 60,
        work_dir: Union[Path, str] = Path("."),
        bind_dir: Optional[Union[Path, str]] = None,
        auto_remove: bool = True,
        stop_container: bool = True,
        execution_policies: Optional[Dict[str, bool]] = None,
    ):
        """(Experimental) A code executor class that executes code through
        a command line environment in a Docker container.

        The executor first saves each code block in a file in the working
        directory, and then executes the code file in the container.
        The executor executes the code blocks in the order they are received.
        Currently, the executor only supports Python and shell scripts.
        For Python code, use the language "python" for the code block.
        For shell scripts, use the language "bash", "shell", or "sh" for the code
        block.

        Args:
            image (_type_, optional): Docker image to use for code execution.
                Defaults to "python:3-slim".
            container_name (Optional[str], optional): Name of the Docker container
                which is created. If None, will autogenerate a name. Defaults to None.
            timeout (int, optional): The timeout for code execution. Defaults to 60.
            work_dir (Union[Path, str], optional): The working directory for the code
                execution. Defaults to Path(".").
            bind_dir (Union[Path, str], optional): The directory that will be bound
                to the code executor container. Useful for cases where you want to spawn
                the container from within a container. Defaults to work_dir.
            auto_remove (bool, optional): If true, will automatically remove the Docker
                container when it is stopped. Defaults to True.
            stop_container (bool, optional): If true, will automatically stop the
                container when stop is called, when the context manager exits or when
                the Python process exits with atext. Defaults to True.

        Raises:
            ValueError: On argument error, or if the container fails to start.
        """
        if timeout < 1:
            raise ValueError("Timeout must be greater than or equal to 1.")

        if isinstance(work_dir, str):
            work_dir = Path(work_dir)
        work_dir.mkdir(parents=True, exist_ok=True)

        if bind_dir is None:
            bind_dir = work_dir
        elif isinstance(bind_dir, str):
            bind_dir = Path(bind_dir)

        import podman as docker

        client = docker.from_env()
        # Check if the image exists
        try:
            client.images.get(image)
        except docker.errors.ImageNotFound:
            logging.info(f"Pulling image {image}...")
            # Let the docker exception escape if this fails.
            client.images.pull(image)

        if container_name is None:
            container_name = f"autogen-code-exec-{uuid.uuid4()}"

        # Start a container from the image, read to exec commands later
        host_path = str(bind_dir.resolve())
        mounts = [
            {
                "type": "bind",
                "source": host_path,
                "target": "/workspace",
            }
        ]
        self._container = client.containers.create(
            image,
            name=container_name,
            # Note this change: was needed for Podman
            # entrypoint="/bin/sh",
            entrypoint=["/bin/sh"],
            tty=True,
            auto_remove=auto_remove,
            # volumes={str(bind_dir.resolve()): {"bind": "/workspace", "mode": "rw"}},
            mounts=mounts,
            working_dir="/workspace",
        )
        self._container.start()

        _wait_for_ready(self._container)

        def cleanup() -> None:
            try:
                container = client.containers.get(container_name)
                container.stop()
            except docker.errors.NotFound:
                pass
            atexit.unregister(cleanup)

        if stop_container:
            atexit.register(cleanup)

        self._cleanup = cleanup

        # Check if the container is running
        if self._container.status != "running":
            raise ValueError(f"Failed to start container from image {image}. Logs: {self._container.logs()}")

        self._timeout = timeout
        self._work_dir: Path = work_dir
        self._bind_dir: Path = bind_dir
        self.execution_policies = self.DEFAULT_EXECUTION_POLICY.copy()
        if execution_policies is not None:
            self.execution_policies.update(execution_policies)

    @property
    def timeout(self) -> int:
        """(Experimental) The timeout for code execution."""
        return self._timeout

    @property
    def work_dir(self) -> Path:
        """(Experimental) The working directory for the code execution."""
        return self._work_dir

    @property
    def bind_dir(self) -> Path:
        """(Experimental) The binding directory for the code execution container."""
        return self._bind_dir

    def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> CommandLineCodeResult:
        """(Experimental) Execute the code blocks and return the result.

        Args:
            code_blocks (List[CodeBlock]): The code blocks to execute.

        Returns:
            CommandlineCodeResult: The result of the code execution."""

        if len(code_blocks) == 0:
            raise ValueError("No code blocks to execute.")

        outputs = []
        output_files = []
        files: list[Path] = []
        last_exit_code = 0
        for code_block in code_blocks:
            lang = self.LANGUAGE_ALIASES.get(code_block.language.lower(), code_block.language.lower())
            if lang not in self.DEFAULT_EXECUTION_POLICY:
                outputs.append(f"Unsupported language {lang}\n")
                last_exit_code = 1
                break

            execute_code = self.execution_policies.get(lang, False)
            code = silence_pip(code_block.code, lang)

            # Check if there is a filename comment
            try:
                filename = _get_file_name_from_content(code, self._work_dir)
            except ValueError:
                outputs.append("Filename is not in the workspace")
                last_exit_code = 1
                break

            if not filename:
                filename = f"tmp_code_{md5(code.encode()).hexdigest()}.{lang}"

            code_path = self._work_dir / filename
            with code_path.open("w", encoding="utf-8") as fout:
                fout.write(code)
            files.append(code_path)

            if not execute_code:
                outputs.append(f"Code saved to {str(code_path)}\n")
                continue

            command = ["timeout", str(self._timeout), _cmd(lang), filename]
            # result = self._container.exec_run(command)
            # exit_code = result.exit_code
            # output = result.output.decode("utf-8")
            exit_code, output = self._container.exec_run(command, tty=True)
            logger.info(f"Command: {command}, Exit code: {exit_code}\n Output: {output}")
            assert isinstance(output, bytes)
            output = output.decode("utf-8")
            if exit_code == 124:
                output += "\n" + "Timeout"
            outputs.append(output)
            if file_output := _get_file_name_from_output(output, self._work_dir):
                output_files.append(file_output)

            last_exit_code = exit_code
            if exit_code != 0:
                break

        return CommandLineCodeResult(
            exit_code=last_exit_code,
            output="".join(outputs),
            output_files=output_files,
            code_files=[str(file) for file in files],
        )

    def restart(self) -> None:
        """(Experimental) Restart the code executor."""
        self._container.restart()
        if self._container.status != "running":
            raise ValueError(f"Failed to restart container. Logs: {self._container.logs()}")

    def stop(self) -> None:
        """(Experimental) Stop the code executor."""
        self._cleanup()

    def __enter__(self) -> Self:
        return self

    def __exit__(
        self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
    ) -> None:
        self.stop()

bind_dir: Path property

(Experimental) The binding directory for the code execution container.

timeout: int property

(Experimental) The timeout for code execution.

work_dir: Path property

(Experimental) The working directory for the code execution.

__init__(image='python:3-slim', container_name=None, timeout=60, work_dir=Path('.'), bind_dir=None, auto_remove=True, stop_container=True, execution_policies=None)

(Experimental) A code executor class that executes code through a command line environment in a Docker container.

The executor first saves each code block in a file in the working directory, and then executes the code file in the container. The executor executes the code blocks in the order they are received. Currently, the executor only supports Python and shell scripts. For Python code, use the language "python" for the code block. For shell scripts, use the language "bash", "shell", or "sh" for the code block.

Parameters:

  • image (_type_, default: 'python:3-slim' ) –

    Docker image to use for code execution. Defaults to "python:3-slim".

  • container_name (Optional[str], default: None ) –

    Name of the Docker container which is created. If None, will autogenerate a name. Defaults to None.

  • timeout (int, default: 60 ) –

    The timeout for code execution. Defaults to 60.

  • work_dir (Union[Path, str], default: Path('.') ) –

    The working directory for the code execution. Defaults to Path(".").

  • bind_dir (Union[Path, str], default: None ) –

    The directory that will be bound to the code executor container. Useful for cases where you want to spawn the container from within a container. Defaults to work_dir.

  • auto_remove (bool, default: True ) –

    If true, will automatically remove the Docker container when it is stopped. Defaults to True.

  • stop_container (bool, default: True ) –

    If true, will automatically stop the container when stop is called, when the context manager exits or when the Python process exits with atext. Defaults to True.

Raises:

  • ValueError

    On argument error, or if the container fails to start.

Source code in tapeagents/tools/container_executor.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def __init__(
    self,
    image: str = "python:3-slim",
    container_name: Optional[str] = None,
    timeout: int = 60,
    work_dir: Union[Path, str] = Path("."),
    bind_dir: Optional[Union[Path, str]] = None,
    auto_remove: bool = True,
    stop_container: bool = True,
    execution_policies: Optional[Dict[str, bool]] = None,
):
    """(Experimental) A code executor class that executes code through
    a command line environment in a Docker container.

    The executor first saves each code block in a file in the working
    directory, and then executes the code file in the container.
    The executor executes the code blocks in the order they are received.
    Currently, the executor only supports Python and shell scripts.
    For Python code, use the language "python" for the code block.
    For shell scripts, use the language "bash", "shell", or "sh" for the code
    block.

    Args:
        image (_type_, optional): Docker image to use for code execution.
            Defaults to "python:3-slim".
        container_name (Optional[str], optional): Name of the Docker container
            which is created. If None, will autogenerate a name. Defaults to None.
        timeout (int, optional): The timeout for code execution. Defaults to 60.
        work_dir (Union[Path, str], optional): The working directory for the code
            execution. Defaults to Path(".").
        bind_dir (Union[Path, str], optional): The directory that will be bound
            to the code executor container. Useful for cases where you want to spawn
            the container from within a container. Defaults to work_dir.
        auto_remove (bool, optional): If true, will automatically remove the Docker
            container when it is stopped. Defaults to True.
        stop_container (bool, optional): If true, will automatically stop the
            container when stop is called, when the context manager exits or when
            the Python process exits with atext. Defaults to True.

    Raises:
        ValueError: On argument error, or if the container fails to start.
    """
    if timeout < 1:
        raise ValueError("Timeout must be greater than or equal to 1.")

    if isinstance(work_dir, str):
        work_dir = Path(work_dir)
    work_dir.mkdir(parents=True, exist_ok=True)

    if bind_dir is None:
        bind_dir = work_dir
    elif isinstance(bind_dir, str):
        bind_dir = Path(bind_dir)

    import podman as docker

    client = docker.from_env()
    # Check if the image exists
    try:
        client.images.get(image)
    except docker.errors.ImageNotFound:
        logging.info(f"Pulling image {image}...")
        # Let the docker exception escape if this fails.
        client.images.pull(image)

    if container_name is None:
        container_name = f"autogen-code-exec-{uuid.uuid4()}"

    # Start a container from the image, read to exec commands later
    host_path = str(bind_dir.resolve())
    mounts = [
        {
            "type": "bind",
            "source": host_path,
            "target": "/workspace",
        }
    ]
    self._container = client.containers.create(
        image,
        name=container_name,
        # Note this change: was needed for Podman
        # entrypoint="/bin/sh",
        entrypoint=["/bin/sh"],
        tty=True,
        auto_remove=auto_remove,
        # volumes={str(bind_dir.resolve()): {"bind": "/workspace", "mode": "rw"}},
        mounts=mounts,
        working_dir="/workspace",
    )
    self._container.start()

    _wait_for_ready(self._container)

    def cleanup() -> None:
        try:
            container = client.containers.get(container_name)
            container.stop()
        except docker.errors.NotFound:
            pass
        atexit.unregister(cleanup)

    if stop_container:
        atexit.register(cleanup)

    self._cleanup = cleanup

    # Check if the container is running
    if self._container.status != "running":
        raise ValueError(f"Failed to start container from image {image}. Logs: {self._container.logs()}")

    self._timeout = timeout
    self._work_dir: Path = work_dir
    self._bind_dir: Path = bind_dir
    self.execution_policies = self.DEFAULT_EXECUTION_POLICY.copy()
    if execution_policies is not None:
        self.execution_policies.update(execution_policies)

execute_code_blocks(code_blocks)

(Experimental) Execute the code blocks and return the result.

Parameters:

  • code_blocks (List[CodeBlock]) –

    The code blocks to execute.

Returns:

Source code in tapeagents/tools/container_executor.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> CommandLineCodeResult:
    """(Experimental) Execute the code blocks and return the result.

    Args:
        code_blocks (List[CodeBlock]): The code blocks to execute.

    Returns:
        CommandlineCodeResult: The result of the code execution."""

    if len(code_blocks) == 0:
        raise ValueError("No code blocks to execute.")

    outputs = []
    output_files = []
    files: list[Path] = []
    last_exit_code = 0
    for code_block in code_blocks:
        lang = self.LANGUAGE_ALIASES.get(code_block.language.lower(), code_block.language.lower())
        if lang not in self.DEFAULT_EXECUTION_POLICY:
            outputs.append(f"Unsupported language {lang}\n")
            last_exit_code = 1
            break

        execute_code = self.execution_policies.get(lang, False)
        code = silence_pip(code_block.code, lang)

        # Check if there is a filename comment
        try:
            filename = _get_file_name_from_content(code, self._work_dir)
        except ValueError:
            outputs.append("Filename is not in the workspace")
            last_exit_code = 1
            break

        if not filename:
            filename = f"tmp_code_{md5(code.encode()).hexdigest()}.{lang}"

        code_path = self._work_dir / filename
        with code_path.open("w", encoding="utf-8") as fout:
            fout.write(code)
        files.append(code_path)

        if not execute_code:
            outputs.append(f"Code saved to {str(code_path)}\n")
            continue

        command = ["timeout", str(self._timeout), _cmd(lang), filename]
        # result = self._container.exec_run(command)
        # exit_code = result.exit_code
        # output = result.output.decode("utf-8")
        exit_code, output = self._container.exec_run(command, tty=True)
        logger.info(f"Command: {command}, Exit code: {exit_code}\n Output: {output}")
        assert isinstance(output, bytes)
        output = output.decode("utf-8")
        if exit_code == 124:
            output += "\n" + "Timeout"
        outputs.append(output)
        if file_output := _get_file_name_from_output(output, self._work_dir):
            output_files.append(file_output)

        last_exit_code = exit_code
        if exit_code != 0:
            break

    return CommandLineCodeResult(
        exit_code=last_exit_code,
        output="".join(outputs),
        output_files=output_files,
        code_files=[str(file) for file in files],
    )

restart()

(Experimental) Restart the code executor.

Source code in tapeagents/tools/container_executor.py
266
267
268
269
270
def restart(self) -> None:
    """(Experimental) Restart the code executor."""
    self._container.restart()
    if self._container.status != "running":
        raise ValueError(f"Failed to restart container. Logs: {self._container.logs()}")

stop()

(Experimental) Stop the code executor.

Source code in tapeagents/tools/container_executor.py
272
273
274
def stop(self) -> None:
    """(Experimental) Stop the code executor."""
    self._cleanup()

extract_code_blocks(message)

(Experimental) Extract code blocks from a message. If no code blocks are found, return an empty list.

Parameters:

  • message (str) –

    The message to extract code blocks from.

Returns:

  • List[CodeBlock]

    List[CodeBlock]: The extracted code blocks or an empty list.

Source code in tapeagents/tools/container_executor.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def extract_code_blocks(message: str) -> List[CodeBlock]:
    """(Experimental) Extract code blocks from a message. If no code blocks are found,
    return an empty list.

    Args:
        message (str): The message to extract code blocks from.

    Returns:
        List[CodeBlock]: The extracted code blocks or an empty list.
    """

    text = message
    match = re.findall(CODE_BLOCK_PATTERN, text, flags=re.DOTALL)
    if not match:
        return []
    code_blocks = []
    for lang, code in match:
        if lang == "":
            lang = infer_lang(code)
        if lang == UNKNOWN:
            lang = ""
        code_blocks.append(CodeBlock(code=code, language=lang))
    return code_blocks

infer_lang(code)

infer the language for the code. TODO: make it robust.

Source code in tapeagents/tools/container_executor.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def infer_lang(code: str) -> str:
    """infer the language for the code.
    TODO: make it robust.
    """
    if code.startswith("python ") or code.startswith("pip") or code.startswith("python3 "):
        return "sh"

    # check if code is a valid python code
    try:
        compile(code, "test", "exec")
        return "python"
    except SyntaxError:
        # not a valid python code
        return UNKNOWN

silence_pip(code, lang)

Apply -qqq flag to pip install commands.

Source code in tapeagents/tools/container_executor.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def silence_pip(code: str, lang: str) -> str:
    """Apply -qqq flag to pip install commands."""
    if lang == "python":
        regex = r"^! ?pip install"
    elif lang in ["bash", "shell", "sh", "pwsh", "powershell", "ps1"]:
        regex = r"^pip install"
    else:
        return code

    # Find lines that start with pip install and make sure "-qqq" flag is added.
    lines = code.split("\n")
    for i, line in enumerate(lines):
        # use regex to find lines that start with pip install.
        match = re.search(regex, line)
        if match is not None:
            if "-qqq" not in line:
                lines[i] = line.replace(match.group(0), match.group(0) + " -qqq")
    return "\n".join(lines)