Spaces:
Running
Running
| from __future__ import annotations | |
| from typing import TYPE_CHECKING, Any, cast | |
| from loguru import logger | |
| from pydantic import BaseModel | |
| from langflow.graph.vertex.base import Vertex | |
| from langflow.processing.utils import validate_and_repair_json | |
| from langflow.schema.graph import InputValue, Tweaks | |
| from langflow.schema.schema import INPUT_FIELD_NAME | |
| from langflow.services.deps import get_settings_service | |
| if TYPE_CHECKING: | |
| from langflow.api.v1.schemas import InputValueRequest | |
| from langflow.graph.graph.base import Graph | |
| from langflow.graph.schema import RunOutputs | |
| class Result(BaseModel): | |
| result: Any | |
| session_id: str | |
| async def run_graph_internal( | |
| graph: Graph, | |
| flow_id: str, | |
| *, | |
| stream: bool = False, | |
| session_id: str | None = None, | |
| inputs: list[InputValueRequest] | None = None, | |
| outputs: list[str] | None = None, | |
| ) -> tuple[list[RunOutputs], str]: | |
| """Run the graph and generate the result.""" | |
| inputs = inputs or [] | |
| effective_session_id = session_id or flow_id | |
| components = [] | |
| inputs_list = [] | |
| types = [] | |
| for input_value_request in inputs: | |
| if input_value_request.input_value is None: | |
| logger.warning("InputValueRequest input_value cannot be None, defaulting to an empty string.") | |
| input_value_request.input_value = "" | |
| components.append(input_value_request.components or []) | |
| inputs_list.append({INPUT_FIELD_NAME: input_value_request.input_value}) | |
| types.append(input_value_request.type) | |
| fallback_to_env_vars = get_settings_service().settings.fallback_to_env_var | |
| graph.session_id = effective_session_id | |
| run_outputs = await graph.arun( | |
| inputs=inputs_list, | |
| inputs_components=components, | |
| types=types, | |
| outputs=outputs or [], | |
| stream=stream, | |
| session_id=effective_session_id or "", | |
| fallback_to_env_vars=fallback_to_env_vars, | |
| ) | |
| return run_outputs, effective_session_id | |
| async def run_graph( | |
| graph: Graph, | |
| input_value: str, | |
| input_type: str, | |
| output_type: str, | |
| *, | |
| session_id: str | None = None, | |
| fallback_to_env_vars: bool = False, | |
| output_component: str | None = None, | |
| ) -> list[RunOutputs]: | |
| """Runs the given Langflow Graph with the specified input and returns the outputs. | |
| Args: | |
| graph (Graph): The graph to be executed. | |
| input_value (str): The input value to be passed to the graph. | |
| input_type (str): The type of the input value. | |
| output_type (str): The type of the desired output. | |
| session_id (str | None, optional): The session ID to be used for the flow. Defaults to None. | |
| fallback_to_env_vars (bool, optional): Whether to fallback to environment variables. | |
| Defaults to False. | |
| output_component (Optional[str], optional): The specific output component to retrieve. Defaults to None. | |
| Returns: | |
| List[RunOutputs]: A list of RunOutputs objects representing the outputs of the graph. | |
| """ | |
| inputs = [InputValue(components=[], input_value=input_value, type=input_type)] | |
| if output_component: | |
| outputs = [output_component] | |
| else: | |
| outputs = [ | |
| vertex.id | |
| for vertex in graph.vertices | |
| if output_type == "debug" | |
| or (vertex.is_output and (output_type == "any" or output_type in vertex.id.lower())) | |
| ] | |
| components = [] | |
| inputs_list = [] | |
| types = [] | |
| for input_value_request in inputs: | |
| if input_value_request.input_value is None: | |
| logger.warning("InputValueRequest input_value cannot be None, defaulting to an empty string.") | |
| input_value_request.input_value = "" | |
| components.append(input_value_request.components or []) | |
| inputs_list.append({INPUT_FIELD_NAME: input_value_request.input_value}) | |
| types.append(input_value_request.type) | |
| return await graph.arun( | |
| inputs_list, | |
| inputs_components=components, | |
| types=types, | |
| outputs=outputs or [], | |
| stream=False, | |
| session_id=session_id, | |
| fallback_to_env_vars=fallback_to_env_vars, | |
| ) | |
| def validate_input( | |
| graph_data: dict[str, Any], tweaks: Tweaks | dict[str, str | dict[str, Any]] | |
| ) -> list[dict[str, Any]]: | |
| if not isinstance(graph_data, dict) or not isinstance(tweaks, dict): | |
| msg = "graph_data and tweaks should be dictionaries" | |
| raise TypeError(msg) | |
| nodes = graph_data.get("data", {}).get("nodes") or graph_data.get("nodes") | |
| if not isinstance(nodes, list): | |
| msg = "graph_data should contain a list of nodes under 'data' key or directly under 'nodes' key" | |
| raise TypeError(msg) | |
| return nodes | |
| def apply_tweaks(node: dict[str, Any], node_tweaks: dict[str, Any]) -> None: | |
| template_data = node.get("data", {}).get("node", {}).get("template") | |
| if not isinstance(template_data, dict): | |
| logger.warning(f"Template data for node {node.get('id')} should be a dictionary") | |
| return | |
| for tweak_name, tweak_value in node_tweaks.items(): | |
| if tweak_name not in template_data: | |
| continue | |
| if tweak_name in template_data: | |
| if template_data[tweak_name]["type"] == "NestedDict": | |
| value = validate_and_repair_json(tweak_value) | |
| template_data[tweak_name]["value"] = value | |
| elif isinstance(tweak_value, dict): | |
| for k, v in tweak_value.items(): | |
| k_ = "file_path" if template_data[tweak_name]["type"] == "file" else k | |
| template_data[tweak_name][k_] = v | |
| else: | |
| key = "file_path" if template_data[tweak_name]["type"] == "file" else "value" | |
| template_data[tweak_name][key] = tweak_value | |
| def apply_tweaks_on_vertex(vertex: Vertex, node_tweaks: dict[str, Any]) -> None: | |
| for tweak_name, tweak_value in node_tweaks.items(): | |
| if tweak_name and tweak_value and tweak_name in vertex.params: | |
| vertex.params[tweak_name] = tweak_value | |
| def process_tweaks( | |
| graph_data: dict[str, Any], tweaks: Tweaks | dict[str, dict[str, Any]], *, stream: bool = False | |
| ) -> dict[str, Any]: | |
| """This function is used to tweak the graph data using the node id and the tweaks dict. | |
| :param graph_data: The dictionary containing the graph data. It must contain a 'data' key with | |
| 'nodes' as its child or directly contain 'nodes' key. Each node should have an 'id' and 'data'. | |
| :param tweaks: The dictionary containing the tweaks. The keys can be the node id or the name of the tweak. | |
| The values can be a dictionary containing the tweaks for the node or the value of the tweak. | |
| :param stream: A boolean flag indicating whether streaming should be deactivated across all components or not. | |
| Default is False. | |
| :return: The modified graph_data dictionary. | |
| :raises ValueError: If the input is not in the expected format. | |
| """ | |
| tweaks_dict = cast("dict[str, Any]", tweaks.model_dump()) if not isinstance(tweaks, dict) else tweaks | |
| if "stream" not in tweaks_dict: | |
| tweaks_dict |= {"stream": stream} | |
| nodes = validate_input(graph_data, cast("dict[str, str | dict[str, Any]]", tweaks_dict)) | |
| nodes_map = {node.get("id"): node for node in nodes} | |
| nodes_display_name_map = {node.get("data", {}).get("node", {}).get("display_name"): node for node in nodes} | |
| all_nodes_tweaks = {} | |
| for key, value in tweaks_dict.items(): | |
| if isinstance(value, dict): | |
| if (node := nodes_map.get(key)) or (node := nodes_display_name_map.get(key)): | |
| apply_tweaks(node, value) | |
| else: | |
| all_nodes_tweaks[key] = value | |
| if all_nodes_tweaks: | |
| for node in nodes: | |
| apply_tweaks(node, all_nodes_tweaks) | |
| return graph_data | |
| def process_tweaks_on_graph(graph: Graph, tweaks: dict[str, dict[str, Any]]): | |
| for vertex in graph.vertices: | |
| if isinstance(vertex, Vertex) and isinstance(vertex.id, str): | |
| node_id = vertex.id | |
| if node_tweaks := tweaks.get(node_id): | |
| apply_tweaks_on_vertex(vertex, node_tweaks) | |
| else: | |
| logger.warning("Each node should be a Vertex with an 'id' attribute of type str") | |
| return graph | |