Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion azure-quantum/tests/unit/local/mock_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
from types import SimpleNamespace
from azure.quantum._client import ServicesClient
from azure.quantum._client.models import JobDetails, SessionDetails, ItemDetails
from azure.quantum._workspace_connection_params import WorkspaceConnectionParams
from common import (
SUBSCRIPTION_ID,
RESOURCE_GROUP,
LOCATION,
ENDPOINT_URI,
WORKSPACE,
)


def _paged(items: List, page_size: int = 100) -> ItemPaged:
Expand Down Expand Up @@ -349,6 +357,35 @@ def list(
return _paged(items[skip : skip + top], page_size=top)


class MockWorkspaceMgmtClient:
"""Mock management client that avoids network calls to ARM/ARG."""

def __init__(self, credential: Optional[object] = None, base_url: Optional[str] = None, user_agent: Optional[str] = None) -> None:
self._credential = credential
self._base_url = base_url
self._user_agent = user_agent

def close(self) -> None:
"""No-op close for mock."""
pass

def __enter__(self) -> 'MockWorkspaceMgmtClient':
return self

def __exit__(self, *exc_details) -> None:
pass

def load_workspace_from_arg(self, connection_params: WorkspaceConnectionParams) -> None:
connection_params.subscription_id = SUBSCRIPTION_ID
connection_params.resource_group = RESOURCE_GROUP
connection_params.location = LOCATION
connection_params.quantum_endpoint = ENDPOINT_URI

def load_workspace_from_arm(self, connection_params: WorkspaceConnectionParams) -> None:
connection_params.location = LOCATION
connection_params.quantum_endpoint = ENDPOINT_URI


class MockServicesClient(ServicesClient):
def __init__(self, authentication_policy: Optional[object] = None) -> None:
# in-memory stores
Expand All @@ -363,8 +400,20 @@ def __init__(self, authentication_policy: Optional[object] = None) -> None:
# Mimic ServicesClient config shape for tests that inspect policy
self._config = SimpleNamespace(authentication_policy=authentication_policy)

def __enter__(self) -> 'MockServicesClient':
return self

def __exit__(self, *exc_details) -> None:
pass


class WorkspaceMock(Workspace):
def __init__(self, **kwargs) -> None:
# Create and pass mock management client to prevent network calls
if '_mgmt_client' not in kwargs:
kwargs['_mgmt_client'] = MockWorkspaceMgmtClient()
super().__init__(**kwargs)

def _create_client(self) -> ServicesClient: # type: ignore[override]
# Pass through the Workspace's auth policy to the mock client
auth_policy = self._connection_params.get_auth_policy()
Expand Down Expand Up @@ -466,7 +515,9 @@ def seed_sessions(ws: WorkspaceMock) -> None:

def create_default_workspace() -> WorkspaceMock:
ws = WorkspaceMock(
subscription_id="sub", resource_group="rg", name="ws", location="westus"
subscription_id=SUBSCRIPTION_ID,
resource_group=RESOURCE_GROUP,
name=WORKSPACE
)
seed_jobs(ws)
seed_sessions(ws)
Expand Down
39 changes: 34 additions & 5 deletions azure-quantum/tests/unit/local/test_job_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
# Licensed under the MIT License.
##

import pytest
from unittest.mock import Mock

from azure.quantum import Job, JobDetails


def _mock_job(output_data_format: str, results_as_json_str: str) -> Job:
def _mock_job(output_data_format: str, results_as_json_str: str, status: str = "Succeeded") -> Job:
job_details = JobDetails(
id="",
name="",
Expand All @@ -18,7 +18,7 @@ def _mock_job(output_data_format: str, results_as_json_str: str) -> Job:
input_data_format="",
output_data_format=output_data_format,
)
job_details.status = "Succeeded"
job_details.status = status
job = Job(workspace=None, job_details=job_details)

job.has_completed = Mock(return_value=True)
Expand All @@ -37,8 +37,8 @@ def decode():
return job


def _get_job_results(output_data_format: str, results_as_json_str: str):
job = _mock_job(output_data_format, results_as_json_str)
def _get_job_results(output_data_format: str, results_as_json_str: str, status: str = "Succeeded"):
job = _mock_job(output_data_format, results_as_json_str, status)
return job.get_results()


Expand Down Expand Up @@ -70,6 +70,35 @@ def test_job_for_microsoft_quantum_results_v1_success():
assert job_results["[1]"] == 0.50


def test_job_get_results_with_completed_status():
job_results = _get_job_results(
"microsoft.quantum-results.v1",
'{"Histogram": ["[0]", 0.50, "[1]", 0.50]}',
"Completed",
)
assert len(job_results.keys()) == 2
assert job_results["[0]"] == 0.50
assert job_results["[1]"] == 0.50


def test_job_get_results_with_failed_status_raises_runtime_error():
with pytest.raises(RuntimeError, match="Cannot retrieve results as job execution failed"):
_get_job_results(
"microsoft.quantum-results.v1",
'{"Histogram": ["[0]", 0.50, "[1]", 0.50]}',
"Failed",
)


def test_job_get_results_with_cancelled_status_raises_runtime_error():
with pytest.raises(RuntimeError, match="Cannot retrieve results as job execution failed"):
_get_job_results(
"microsoft.quantum-results.v1",
'{"Histogram": ["[0]", 0.50, "[1]", 0.50]}',
"Cancelled",
)


def test_job_for_microsoft_quantum_results_v1_no_histogram_returns_raw_result():
job_result_raw = '{"NotHistogramProperty": ["[0]", 0.50, "[1]", 0.50]}'
job_result = _get_job_results("microsoft.quantum-results.v1", job_result_raw)
Expand Down
Loading