-
Notifications
You must be signed in to change notification settings - Fork 22
Data Fabric native integration with Agents - Coded and Non-Coded [DS-7139] #1331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2766413
1ee7955
628c29d
beaef0a
ccac3cf
e32369f
5bc48d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,16 @@ | ||
| from typing import Any, List, Optional, Type | ||
| from typing import Any, Dict, List, Optional, Type | ||
|
|
||
| import sqlparse | ||
| from httpx import Response | ||
| from sqlparse.sql import ( | ||
| IdentifierList, | ||
| Parenthesis, | ||
| Statement, | ||
| Token, | ||
| TokenList, | ||
| Where, | ||
| ) | ||
| from sqlparse.tokens import DML, Comment, Keyword, Punctuation, Wildcard | ||
|
|
||
| from ..._utils import Endpoint, RequestSpec | ||
| from ...tracing import traced | ||
|
|
@@ -11,15 +21,39 @@ | |
| EntityRecordsBatchResponse, | ||
| ) | ||
|
|
||
| _FORBIDDEN_SQL_KEYWORDS = { | ||
| "INSERT", | ||
| "UPDATE", | ||
| "DELETE", | ||
| "MERGE", | ||
| "DROP", | ||
| "ALTER", | ||
| "CREATE", | ||
| "TRUNCATE", | ||
| "REPLACE", | ||
| } | ||
| _DISALLOWED_SQL_OPERATORS = { | ||
| "WITH", | ||
| "UNION", | ||
| "INTERSECT", | ||
| "EXCEPT", | ||
| "OVER", | ||
| "ROLLUP", | ||
| "CUBE", | ||
| "GROUPING SETS", | ||
| "PARTITION BY", | ||
| } | ||
|
|
||
|
|
||
| class EntitiesService(BaseService): | ||
| """Service for managing UiPath Data Service entities. | ||
|
|
||
| Entities are database tables in UiPath Data Service that can store | ||
| structured data for automation processes. | ||
| Entities represent business objects that provide structured data storage and access via the Data Service. | ||
| This service allows you to retrieve entity metadata, list entities, and query records using SQL. | ||
|
|
||
| See Also: | ||
| https://docs.uipath.com/data-service/automation-cloud/latest/user-guide/introduction | ||
| !!! warning "Preview Feature" | ||
| This function is currently experimental. | ||
| Behavior and parameters, request and response formats are subject to change in future versions. | ||
| """ | ||
|
|
||
| def __init__( | ||
|
|
@@ -389,6 +423,102 @@ class CustomerRecord: | |
| EntityRecord.from_data(data=record, model=schema) for record in records_data | ||
| ] | ||
|
|
||
| @traced(name="query_multiple_entities", run_type="uipath") | ||
| def query_multiple_entities( | ||
| self, | ||
| sql_query: str, | ||
| schema: Optional[Type[Any]] = None, | ||
| ) -> List[Dict[str, Any]]: | ||
| """Query entity records using a SQL query. | ||
|
|
||
| This method allows executing SQL queries directly against entity data | ||
| via the Data Fabric query endpoint. | ||
|
|
||
| Args: | ||
| sql_query (str): The full SQL query to execute. Should be a valid | ||
| SELECT statement targeting the entity. | ||
| schema (Optional[Type[Any]]): Optional schema class for validation. | ||
|
|
||
| Returns: | ||
| List[Dict[str, Any]]: A list of record dictionaries matching the query. | ||
|
|
||
| Examples: | ||
| Basic query:: | ||
|
|
||
| records = entities_service.query_multiple_entities( | ||
| "SELECT * FROM Customers WHERE Status = 'Active' LIMIT 100" | ||
| ) | ||
|
|
||
| Query with specific fields:: | ||
|
|
||
| records = entities_service.query_multiple_entities( | ||
| "SELECT OrderId, CustomerName, Amount FROM Orders WHERE Amount > 1000" | ||
| ) | ||
| """ | ||
| self._validate_sql_query(sql_query) | ||
| spec = self._query_multiple_entities_spec(sql_query) | ||
| headers = { | ||
| "X-UiPath-Internal-TenantName": self._url.tenant_name, | ||
| "X-UiPath-Internal-AccountName": self._url.org_name, | ||
| } | ||
| # Use absolute URL to bypass scoping since org/tenant are embedded in the path | ||
| full_url = f"{self._url.base_url}{spec.endpoint}" | ||
| response = self.request(spec.method, full_url, json=spec.json, headers=headers) | ||
|
|
||
| if response.status_code == 200: | ||
| records_data = response.json().get("results", []) | ||
| return records_data | ||
| else: | ||
| response.raise_for_status() | ||
|
Comment on lines
+468
to
+472
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This branch treats only Useful? React with 👍 / 👎. |
||
|
|
||
|
|
||
| @traced(name="query_multiple_entities_async", run_type="uipath") | ||
| async def query_multiple_entities_async( | ||
| self, | ||
| sql_query: str, | ||
| schema: Optional[Type[Any]] = None, | ||
| ) -> List[Dict[str, Any]]: | ||
| """Asynchronously query entity records using a SQL query. | ||
|
|
||
| This method allows executing SQL queries directly against entity data | ||
| via the Data Fabric query endpoint. | ||
|
|
||
| Args: | ||
| sql_query (str): The full SQL query to execute. Should be a valid | ||
| SELECT statement targeting the entity. | ||
| schema (Optional[Type[Any]]): Optional schema class for validation. | ||
|
|
||
| Returns: | ||
| List[Dict[str, Any]]: A list of record dictionaries matching the query. | ||
|
|
||
| Examples: | ||
| Basic query:: | ||
|
|
||
| records = await entities_service.query_multiple_entities_async( | ||
| "SELECT * FROM Customers WHERE Status = 'Active' LIMIT 100" | ||
| ) | ||
|
|
||
| Query with specific fields:: | ||
|
|
||
| records = await entities_service.query_multiple_entities_async( | ||
| "SELECT OrderId, CustomerName, Amount FROM Orders WHERE Amount > 1000" | ||
| ) | ||
| """ | ||
| self._validate_sql_query(sql_query) | ||
| spec = self._query_multiple_entities_spec(sql_query) | ||
| headers = { | ||
| "X-UiPath-Internal-TenantName": self._url.tenant_name, | ||
| "X-UiPath-Internal-AccountName": self._url.org_name, | ||
| } | ||
| full_url = f"{self._url.base_url}{spec.endpoint}" | ||
| response = await self.request_async(spec.method, full_url, json=spec.json, headers=headers) | ||
|
|
||
| if response.status_code == 200: | ||
| records_data = response.json().get("results", []) | ||
| return records_data | ||
| else: | ||
| response.raise_for_status() | ||
|
|
||
| @traced(name="entity_record_insert_batch", run_type="uipath") | ||
| def insert_records( | ||
| self, | ||
|
|
@@ -872,6 +1002,17 @@ def _list_records_spec( | |
| params=({"start": start, "limit": limit}), | ||
| ) | ||
|
|
||
| def _query_multiple_entities_spec( | ||
| self, | ||
| sql_query: str, | ||
| ) -> RequestSpec: | ||
| endpoint = f"/dataservice_/{self._url.org_name}/{self._url.tenant_name}/datafabric_/api/v1/query/execute" | ||
| return RequestSpec( | ||
| method="POST", | ||
| endpoint=Endpoint(endpoint), | ||
| json={"query": sql_query}, | ||
| ) | ||
|
|
||
| def _insert_batch_spec(self, entity_key: str, records: List[Any]) -> RequestSpec: | ||
| return RequestSpec( | ||
| method="POST", | ||
|
|
@@ -900,3 +1041,113 @@ def _delete_batch_spec(self, entity_key: str, record_ids: List[str]) -> RequestS | |
| ), | ||
| json=record_ids, | ||
| ) | ||
|
|
||
| def _validate_sql_query(self, sql_query: str) -> None: | ||
| query = sql_query.strip() | ||
| if not query: | ||
| raise ValueError("SQL query cannot be empty.") | ||
|
|
||
| statements = [stmt for stmt in sqlparse.parse(query) if stmt.tokens] | ||
| if len(statements) != 1: | ||
| raise ValueError("Only a single SELECT statement is allowed.") | ||
|
|
||
| statement = statements[0] | ||
| if statement.get_type() != "SELECT": | ||
| raise ValueError("Only SELECT statements are allowed.") | ||
|
|
||
| normalized_keywords = { | ||
| token.normalized | ||
| for token in statement.flatten() | ||
| if token.ttype in Keyword or token.ttype is DML | ||
| } | ||
|
|
||
| for keyword in _FORBIDDEN_SQL_KEYWORDS: | ||
| if keyword in normalized_keywords: | ||
| raise ValueError(f"SQL keyword '{keyword}' is not allowed.") | ||
|
|
||
| for operator in _DISALLOWED_SQL_OPERATORS: | ||
| if operator in normalized_keywords: | ||
| raise ValueError( | ||
| f"SQL construct '{operator}' is not allowed in entity queries." | ||
| ) | ||
|
|
||
| if self._contains_subquery(statement): | ||
| raise ValueError("Subqueries are not allowed.") | ||
|
|
||
| has_where = any(isinstance(token, Where) for token in statement.tokens) | ||
| has_limit = any( | ||
| token.ttype in Keyword and token.normalized == "LIMIT" | ||
| for token in statement.flatten() | ||
| ) | ||
| if not has_where and not has_limit: | ||
| raise ValueError("Queries without WHERE must include a LIMIT clause.") | ||
|
|
||
| projection_tokens = self._projection_tokens(statement) | ||
| has_wildcard_projection = any( | ||
| token.ttype is Wildcard | ||
| for projection_token in projection_tokens | ||
| for token in projection_token.flatten() | ||
| ) | ||
| if has_wildcard_projection and not has_where: | ||
| raise ValueError("SELECT * without filtering is not allowed.") | ||
| if not has_where and self._projection_column_count(projection_tokens) > 4: | ||
| raise ValueError( | ||
| "Selecting more than 4 columns without filtering is not allowed." | ||
| ) | ||
|
|
||
| def _contains_subquery(self, token_list: TokenList) -> bool: | ||
| for token in token_list.tokens: | ||
| if isinstance(token, Parenthesis): | ||
| if any( | ||
| nested.ttype is DML and nested.normalized == "SELECT" | ||
| for nested in token.flatten() | ||
| ): | ||
| return True | ||
| if isinstance(token, TokenList) and self._contains_subquery(token): | ||
| return True | ||
| return False | ||
|
|
||
| def _projection_tokens(self, statement: Statement) -> List[Token]: | ||
| projection: List[Token] = [] | ||
| found_select = False | ||
|
|
||
| for token in statement.tokens: | ||
| if token.is_whitespace or token.ttype in Comment: | ||
| continue | ||
| if not found_select: | ||
| if token.ttype is DML and token.normalized == "SELECT": | ||
| found_select = True | ||
| continue | ||
| if token.ttype in Keyword and token.normalized == "FROM": | ||
| break | ||
| projection.append(token) | ||
| return projection | ||
|
|
||
| def _projection_column_count(self, projection_tokens: List[Token]) -> int: | ||
| identifier_list = next( | ||
| ( | ||
| token | ||
| for token in projection_tokens | ||
| if isinstance(token, IdentifierList) | ||
| ), | ||
| None, | ||
| ) | ||
| if identifier_list is not None: | ||
| return sum(1 for _ in identifier_list.get_identifiers()) | ||
|
|
||
| count = 0 | ||
| has_current_expression = False | ||
|
|
||
| for token in projection_tokens: | ||
| if token.is_whitespace or token.ttype in Comment: | ||
| continue | ||
| if token.ttype is Punctuation and token.value == ",": | ||
| if has_current_expression: | ||
| count += 1 | ||
| has_current_expression = False | ||
| continue | ||
| has_current_expression = True | ||
|
|
||
| if has_current_expression: | ||
| count += 1 | ||
| return count | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new
query_multiple_entitiesAPI acceptsschemaand documents it as validation input, but the method returns rawresponse.json()["results"]without ever applying that schema (and the async variant has the same behavior). In callers that rely on schema validation, malformed or shape-shifted records will pass through silently and fail later in downstream logic instead of failing at the SDK boundary.Useful? React with 👍 / 👎.