Spaces:
Running
Running
| from __future__ import annotations | |
| import os | |
| import platform | |
| import shlex | |
| import subprocess | |
| from typing import Annotated | |
| import gradio as gr | |
| from app import _log_call_end, _log_call_start, _truncate_for_log | |
| from ._docstrings import autodoc | |
| from .File_System import _resolve_path, ROOT_DIR, _display_path | |
| import shutil | |
| def _detect_shell(prefer_powershell: bool = True) -> tuple[list[str], str]: | |
| """ | |
| Pick an appropriate shell for the host OS. | |
| - Windows: use PowerShell by default, fall back to cmd.exe. | |
| - POSIX: use /bin/bash if available, else /bin/sh. | |
| Returns (shell_cmd_prefix, shell_name) where shell_cmd_prefix is the command list to launch the shell. | |
| """ | |
| system = platform.system().lower() | |
| if system == "windows": | |
| if prefer_powershell: | |
| pwsh = shutil.which("pwsh") | |
| candidates = [pwsh, shutil.which("powershell"), shutil.which("powershell.exe")] | |
| for cand in candidates: | |
| if cand: | |
| return [cand, "-NoLogo", "-NoProfile", "-Command"], "powershell" | |
| # Fallback to cmd | |
| comspec = os.environ.get("ComSpec", r"C:\\Windows\\System32\\cmd.exe") | |
| return [comspec, "/C"], "cmd" | |
| # POSIX | |
| bash = shutil.which("bash") | |
| if bash: | |
| return [bash, "-lc"], "bash" | |
| sh = os.environ.get("SHELL", "/bin/sh") | |
| return [sh, "-lc"], "sh" | |
| # Detect shell at import time for docs/UI purposes | |
| _DETECTED_SHELL_PREFIX, _DETECTED_SHELL_NAME = _detect_shell() | |
| # Clarify path semantics and expose detected shell in summary | |
| TOOL_SUMMARY = ( | |
| "Execute a shell command within a safe working directory under the tool root ('/'). " | |
| "Paths must be relative to '/'. " | |
| "Set workdir to '.' to use the root. " | |
| "Absolute paths are disabled." | |
| f"Detected shell: {_DETECTED_SHELL_NAME}." | |
| ) | |
| def _run_command(command: str, cwd: str, timeout: int) -> tuple[str, str, int]: | |
| shell_prefix, shell_name = _detect_shell() | |
| full_cmd = shell_prefix + [command] | |
| try: | |
| proc = subprocess.run( | |
| full_cmd, | |
| cwd=cwd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| encoding="utf-8", | |
| errors="replace", | |
| timeout=timeout if timeout and timeout > 0 else None, | |
| ) | |
| return proc.stdout, proc.stderr, proc.returncode | |
| except subprocess.TimeoutExpired as exc: | |
| return exc.stdout or "", (exc.stderr or "") + "\n[timeout]", 124 | |
| except Exception as exc: | |
| return "", f"Execution failed: {exc}", 1 | |
| def Shell_Command( | |
| command: Annotated[str, "Shell command to execute. Accepts multi-part pipelines as a single string."], | |
| workdir: Annotated[str, "Working directory (relative to root unless UNSAFE_ALLOW_ABS_PATHS=1)."] = ".", | |
| timeout: Annotated[int, "Timeout in seconds (0 = no timeout, be careful on public hosting)."] = 60, | |
| ) -> str: | |
| _log_call_start("Shell_Command", command=command, workdir=workdir, timeout=timeout) | |
| if not command or not command.strip(): | |
| result = "No command provided." | |
| _log_call_end("Shell_Command", _truncate_for_log(result)) | |
| return result | |
| abs_cwd, err = _resolve_path(workdir) | |
| if err: | |
| _log_call_end("Shell_Command", _truncate_for_log(err)) | |
| return err | |
| if not os.path.exists(abs_cwd): | |
| result = f"Working directory not found: {abs_cwd}" | |
| _log_call_end("Shell_Command", _truncate_for_log(result)) | |
| return result | |
| # Capture shell used for transparency | |
| _, shell_name = _detect_shell() | |
| stdout, stderr, code = _run_command(command, cwd=abs_cwd, timeout=timeout) | |
| display_cwd = _display_path(abs_cwd) | |
| header = ( | |
| f"Command: {command}\n" | |
| f"CWD: {display_cwd}\n" | |
| f"Root: /\n" | |
| f"Shell: {shell_name}\n" | |
| f"Exit code: {code}\n" | |
| f"--- STDOUT ---\n" | |
| ) | |
| output = header + (stdout or "<empty>") + "\n--- STDERR ---\n" + (stderr or "<empty>") | |
| _log_call_end("Shell_Command", _truncate_for_log(f"exit={code} stdout={len(stdout)} stderr={len(stderr)}")) | |
| return output | |
| def build_interface() -> gr.Interface: | |
| return gr.Interface( | |
| fn=Shell_Command, | |
| inputs=[ | |
| gr.Textbox(label="Command", placeholder="echo hello || dir", lines=2, info="Shell command to execute"), | |
| gr.Textbox(label="Workdir", value=".", max_lines=1, info="Working directory (relative to root)"), | |
| gr.Slider(minimum=0, maximum=600, step=5, value=60, label="Timeout (seconds)", info="Timeout in seconds (0 = no timeout)"), | |
| ], | |
| outputs=gr.Textbox(label="Output", lines=20), | |
| title="Shell Command", | |
| description=( | |
| "<div style=\"text-align:center; overflow:hidden;\">" | |
| "Run a shell command under the same safe root as File System. " | |
| "Absolute paths are disabled, use relative paths. " | |
| f"Detected shell: {_DETECTED_SHELL_NAME}. " | |
| "</div>" | |
| ), | |
| api_description=TOOL_SUMMARY, | |
| flagging_mode="never", | |
| submit_btn="Run", | |
| ) | |
| __all__ = ["Shell_Command", "build_interface"] | |