-
Notifications
You must be signed in to change notification settings - Fork 1.2k
When recovering from a failed stream, attempt to create missing programs/jobs if they are not found #8151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
When recovering from a failed stream, attempt to create missing programs/jobs if they are not found #8151
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |||||||||||||||||
| from __future__ import annotations | ||||||||||||||||||
|
|
||||||||||||||||||
| import datetime | ||||||||||||||||||
| from http import HTTPStatus | ||||||||||||||||||
| from unittest import mock | ||||||||||||||||||
|
|
||||||||||||||||||
| import duet | ||||||||||||||||||
|
|
@@ -26,7 +27,7 @@ | |||||||||||||||||
| import cirq_google as cg | ||||||||||||||||||
| from cirq_google.api import v1, v2 | ||||||||||||||||||
| from cirq_google.cloud import quantum | ||||||||||||||||||
| from cirq_google.engine import util | ||||||||||||||||||
| from cirq_google.engine import EngineException, util | ||||||||||||||||||
| from cirq_google.engine.engine import EngineContext | ||||||||||||||||||
| from cirq_google.engine.stream_manager import StreamError | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -799,6 +800,78 @@ def test_on_stream_failure_retrieves_results_using_get_job_results(get_job_resul | |||||||||||||||||
| get_job_results.assert_called_once_with('a', 'b', 'steve') | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| @mock.patch('cirq_google.engine.engine_client.EngineClient.get_job_async') | ||||||||||||||||||
| @mock.patch('cirq_google.engine.engine_client.EngineClient.get_job_results_async') | ||||||||||||||||||
| def test_recreate_job_if_not_found(get_job_results, get_job): | ||||||||||||||||||
| project_id = 'a' | ||||||||||||||||||
| program_id = 'b' | ||||||||||||||||||
| job_id = 'steve' | ||||||||||||||||||
| context = EngineContext(timeout=60, enable_streaming=False) | ||||||||||||||||||
|
|
||||||||||||||||||
| get_job.side_effect = EngineException(HTTPStatus.NOT_FOUND, 'job not found') | ||||||||||||||||||
|
|
||||||||||||||||||
| async def recreate_job(): | ||||||||||||||||||
| qjob = quantum.QuantumJob( | ||||||||||||||||||
| execution_status=quantum.ExecutionStatus(state=quantum.ExecutionStatus.State.SUCCESS), | ||||||||||||||||||
| update_time=UPDATE_TIME, | ||||||||||||||||||
| ) | ||||||||||||||||||
| get_job.side_effect = None | ||||||||||||||||||
| get_job.return_value = qjob | ||||||||||||||||||
| get_job_results.return_value = RESULTS | ||||||||||||||||||
| return cg.EngineJob( | ||||||||||||||||||
| project_id=project_id, | ||||||||||||||||||
| program_id=program_id, | ||||||||||||||||||
| job_id=job_id, | ||||||||||||||||||
| context=context, | ||||||||||||||||||
| _job=qjob, | ||||||||||||||||||
| job_result_future=None, | ||||||||||||||||||
| recreate_job=None, | ||||||||||||||||||
| ) | ||||||||||||||||||
|
|
||||||||||||||||||
| job = cg.EngineJob( | ||||||||||||||||||
| project_id=project_id, | ||||||||||||||||||
| program_id=program_id, | ||||||||||||||||||
| job_id=job_id, | ||||||||||||||||||
| context=context, | ||||||||||||||||||
| _job=None, | ||||||||||||||||||
| job_result_future=None, | ||||||||||||||||||
| recreate_job=recreate_job, | ||||||||||||||||||
| ) | ||||||||||||||||||
| data = job.results() | ||||||||||||||||||
|
|
||||||||||||||||||
| assert len(data) == 2 | ||||||||||||||||||
| assert str(data[0]) == 'q=0110' | ||||||||||||||||||
| assert str(data[1]) == 'q=1010' | ||||||||||||||||||
| get_job.assert_has_calls((mock.call(project_id, program_id, job_id, False),)) | ||||||||||||||||||
| get_job_results.assert_called_once_with(project_id, program_id, job_id) | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| @mock.patch('cirq_google.engine.engine_client.EngineClient.get_job_async') | ||||||||||||||||||
| @mock.patch('cirq_google.engine.engine_client.EngineClient.get_job_results_async') | ||||||||||||||||||
| def test_receive_results_get_job_error_propagated(get_job_results, get_job): | ||||||||||||||||||
| project_id = 'a' | ||||||||||||||||||
| program_id = 'b' | ||||||||||||||||||
| job_id = 'steve' | ||||||||||||||||||
| context = EngineContext(timeout=60, enable_streaming=False) | ||||||||||||||||||
|
|
||||||||||||||||||
| get_job.side_effect = EngineException(HTTPStatus.INTERNAL_SERVER_ERROR, 'internal error') | ||||||||||||||||||
|
|
||||||||||||||||||
| job = cg.EngineJob( | ||||||||||||||||||
| project_id=project_id, | ||||||||||||||||||
| program_id=program_id, | ||||||||||||||||||
| job_id=job_id, | ||||||||||||||||||
| context=context, | ||||||||||||||||||
| _job=None, | ||||||||||||||||||
| job_result_future=None, | ||||||||||||||||||
| ) | ||||||||||||||||||
|
|
||||||||||||||||||
| try: | ||||||||||||||||||
| job.results() | ||||||||||||||||||
| except Exception as e: | ||||||||||||||||||
| assert isinstance(e, EngineException) | ||||||||||||||||||
| assert e.code == HTTPStatus.INTERNAL_SERVER_ERROR | ||||||||||||||||||
|
Comment on lines
+868
to
+872
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In cases like this, this should probably make use of Pytest's more idiomatic
Suggested change
There is more than one try-except case like this in the PR, so if you agree with this change, please also check the other places. |
||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| @mock.patch('cirq_google.engine.engine_client.EngineClient.get_job_results_async') | ||||||||||||||||||
| def test_results_len(get_job_results): | ||||||||||||||||||
| qjob = quantum.QuantumJob( | ||||||||||||||||||
|
|
||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
|
|
||
| import datetime | ||
| import time | ||
| from http import HTTPStatus | ||
| from unittest import mock | ||
|
|
||
| import duet | ||
|
|
@@ -31,7 +32,7 @@ | |
| import cirq_google as cg | ||
| from cirq_google.api import v1, v2 | ||
| from cirq_google.cloud import quantum | ||
| from cirq_google.engine import util | ||
| from cirq_google.engine import EngineException, util | ||
| from cirq_google.engine.engine import EngineContext | ||
| from cirq_google.engine.processor_config import Run, Snapshot | ||
|
|
||
|
|
@@ -573,6 +574,84 @@ def test_run_sweep_params_with_unary_rpcs(client): | |
| client().get_job_results_async.assert_called_once() | ||
|
|
||
|
|
||
| @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) | ||
| def test_run_sweep_program_already_exists(client): | ||
| program_id = 'prog' | ||
| client().create_program_async.side_effect = [ | ||
| EngineException(HTTPStatus.CONFLICT, "program already exists"), | ||
| (program_id, quantum.QuantumProgram(name=f"projects/proj/programs/{program_id}")), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If an exception does occur in |
||
| ] | ||
| client().create_job_async.return_value = ( | ||
| 'job-id', | ||
| quantum.QuantumJob( | ||
| name=f"projects/proj/programs/{program_id}/jobs/job-id", | ||
| execution_status={'state': 'READY'}, | ||
| ), | ||
| ) | ||
| client().get_job_async.return_value = quantum.QuantumJob( | ||
| execution_status={'state': 'SUCCESS'}, update_time=_DT | ||
| ) | ||
| client().get_job_results_async.return_value = quantum.QuantumResult(result=_RESULTS) | ||
|
|
||
| engine = cg.Engine(project_id='proj', context=EngineContext(enable_streaming=False)) | ||
| job = engine.run_sweep( | ||
| program=_CIRCUIT, | ||
| program_id=program_id, | ||
| processor_id='processor0', | ||
| params=[cirq.ParamResolver({'a': 1}), cirq.ParamResolver({'a': 2})], | ||
| ) | ||
| results = job.results() | ||
|
|
||
| assert len(results) == 2 | ||
| for i, v in enumerate([1, 2]): | ||
| assert results[i].repetitions == 1 | ||
| assert results[i].params.param_dict == {'a': v} | ||
| assert results[i].measurements == {'q': np.array([[0]], dtype='uint8')} | ||
|
|
||
| client().create_program_async.assert_called_once() | ||
| client().create_job_async.assert_called_once() | ||
| client().get_job_async.assert_called_once() | ||
| client().get_job_results_async.assert_called_once() | ||
|
|
||
|
|
||
| @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) | ||
| def test_run_sweep_program_with_implicit_id_already_exists(client): | ||
| client().create_program_async.side_effect = EngineException( | ||
| HTTPStatus.CONFLICT, "program already exists" | ||
| ) | ||
| engine = cg.Engine(project_id='proj', context=EngineContext(enable_streaming=False)) | ||
|
|
||
| try: | ||
| engine.run_sweep( | ||
| program=_CIRCUIT, | ||
| processor_id='processor0', | ||
| params=[cirq.ParamResolver({'a': 1}), cirq.ParamResolver({'a': 2})], | ||
| ) | ||
| except Exception as e: | ||
| assert isinstance(e, EngineException) | ||
| assert e.code == HTTPStatus.CONFLICT | ||
|
|
||
|
|
||
| @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) | ||
| def test_run_sweep_unable_to_create_program_raises_error(client): | ||
| program_id = 'prog' | ||
| client().create_program_async.side_effect = EngineException( | ||
| HTTPStatus.INTERNAL_SERVER_ERROR, "internal error" | ||
| ) | ||
| engine = cg.Engine(project_id='proj', context=EngineContext(enable_streaming=False)) | ||
|
|
||
| try: | ||
| engine.run_sweep( | ||
| program=_CIRCUIT, | ||
| program_id=program_id, | ||
| processor_id='processor0', | ||
| params=[cirq.ParamResolver({'a': 1}), cirq.ParamResolver({'a': 2})], | ||
| ) | ||
| except Exception as e: | ||
| assert isinstance(e, EngineException) | ||
| assert e.code == HTTPStatus.INTERNAL_SERVER_ERROR | ||
|
|
||
|
|
||
| @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) | ||
| def test_run_sweep_params_with_stream_rpcs(client): | ||
| setup_run_circuit_with_result_(client, _RESULTS) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there potential for a race condition here? The variable
self._recreate_jobis tested, then a job is started, some assignments happen, and only afterwards is the tested variable set to None. If more than one asynchronous task hits this code at the same time, there seems to be potential for multiple jobs to be created.