Spaces:
Running
Running
| from typing import Any | |
| from loguru import logger | |
| from langflow.base.flow_processing.utils import build_data_from_result_data | |
| from langflow.custom import Component | |
| from langflow.graph.graph.base import Graph | |
| from langflow.graph.vertex.base import Vertex | |
| from langflow.helpers.flow import get_flow_inputs | |
| from langflow.io import DropdownInput, Output | |
| from langflow.schema import Data, dotdict | |
| class SubFlowComponent(Component): | |
| display_name = "Sub Flow" | |
| description = "Generates a Component from a Flow, with all of its inputs, and " | |
| name = "SubFlow" | |
| beta: bool = True | |
| icon = "Workflow" | |
| def get_flow_names(self) -> list[str]: | |
| flow_data = self.list_flows() | |
| return [flow_data.data["name"] for flow_data in flow_data] | |
| def get_flow(self, flow_name: str) -> Data | None: | |
| flow_datas = self.list_flows() | |
| for flow_data in flow_datas: | |
| if flow_data.data["name"] == flow_name: | |
| return flow_data | |
| return None | |
| def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None): | |
| if field_name == "flow_name": | |
| build_config["flow_name"]["options"] = self.get_flow_names() | |
| for key in list(build_config.keys()): | |
| if key not in [x.name for x in self.inputs] + ["code", "_type", "get_final_results_only"]: | |
| del build_config[key] | |
| if field_value is not None and field_name == "flow_name": | |
| try: | |
| flow_data = self.get_flow(field_value) | |
| except Exception: # noqa: BLE001 | |
| logger.exception(f"Error getting flow {field_value}") | |
| else: | |
| if not flow_data: | |
| msg = f"Flow {field_value} not found." | |
| logger.error(msg) | |
| else: | |
| try: | |
| graph = Graph.from_payload(flow_data.data["data"]) | |
| # Get all inputs from the graph | |
| inputs = get_flow_inputs(graph) | |
| # Add inputs to the build config | |
| build_config = self.add_inputs_to_build_config(inputs, build_config) | |
| except Exception: # noqa: BLE001 | |
| logger.exception(f"Error building graph for flow {field_value}") | |
| return build_config | |
| def add_inputs_to_build_config(self, inputs_vertex: list[Vertex], build_config: dotdict): | |
| new_fields: list[dotdict] = [] | |
| for vertex in inputs_vertex: | |
| new_vertex_inputs = [] | |
| field_template = vertex.data["node"]["template"] | |
| for inp in field_template: | |
| if inp not in {"code", "_type"}: | |
| field_template[inp]["display_name"] = ( | |
| vertex.display_name + " - " + field_template[inp]["display_name"] | |
| ) | |
| field_template[inp]["name"] = vertex.id + "|" + inp | |
| new_vertex_inputs.append(field_template[inp]) | |
| new_fields += new_vertex_inputs | |
| for field in new_fields: | |
| build_config[field["name"]] = field | |
| return build_config | |
| inputs = [ | |
| DropdownInput( | |
| name="flow_name", | |
| display_name="Flow Name", | |
| info="The name of the flow to run.", | |
| options=[], | |
| refresh_button=True, | |
| real_time_refresh=True, | |
| ), | |
| ] | |
| outputs = [Output(name="flow_outputs", display_name="Flow Outputs", method="generate_results")] | |
| async def generate_results(self) -> list[Data]: | |
| tweaks: dict = {} | |
| for field in self._attributes: | |
| if field != "flow_name" and "|" in field: | |
| [node, name] = field.split("|") | |
| if node not in tweaks: | |
| tweaks[node] = {} | |
| tweaks[node][name] = self._attributes[field] | |
| flow_name = self._attributes.get("flow_name") | |
| run_outputs = await self.run_flow( | |
| tweaks=tweaks, | |
| flow_name=flow_name, | |
| output_type="all", | |
| ) | |
| data: list[Data] = [] | |
| if not run_outputs: | |
| return data | |
| run_output = run_outputs[0] | |
| if run_output is not None: | |
| for output in run_output.outputs: | |
| if output: | |
| data.extend(build_data_from_result_data(output)) | |
| return data | |