When recovering from a failed stream, attempt to create missing programs/jobs if they are not found#8151
When recovering from a failed stream, attempt to create missing programs/jobs if they are not found#8151hoisinberg wants to merge 1 commit into
Conversation
…ams/jobs if they are not found
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #8151 +/- ##
========================================
Coverage 99.60% 99.60%
========================================
Files 1118 1118
Lines 101048 101157 +109
========================================
+ Hits 100647 100757 +110
+ Misses 401 400 -1 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
| if e.code == HTTPStatus.NOT_FOUND and self._recreate_job: | ||
| # If the program/job was not created successfully, attempt to recreate once. | ||
| new_job = await self._recreate_job() | ||
|
|
||
| self.project_id = new_job.project_id | ||
| self.program_id = new_job.program_id | ||
| self.job_id = new_job.job_id | ||
| self.context = new_job.context | ||
| self._job = new_job._job | ||
| self._results = new_job._results | ||
| self._batched_results = new_job._batched_results | ||
| self._job_result_future = new_job._job_result_future | ||
| self._recreate_job = None |
There was a problem hiding this comment.
Is there potential for a race condition here? The variable self._recreate_job is tested, then a job is started, some assignments happen, and only afterwards is the tested variable set to None. If more than one asynchronous task hits this code at the same time, there seems to be potential for multiple jobs to be created.
| program_id = 'prog' | ||
| client().create_program_async.side_effect = [ | ||
| EngineException(HTTPStatus.CONFLICT, "program already exists"), | ||
| (program_id, quantum.QuantumProgram(name=f"projects/proj/programs/{program_id}")), |
There was a problem hiding this comment.
If an exception does occur in create_program_async, is this line ever executed?
| try: | ||
| job.results() | ||
| except Exception as e: | ||
| assert isinstance(e, EngineException) | ||
| assert e.code == HTTPStatus.INTERNAL_SERVER_ERROR |
There was a problem hiding this comment.
In cases like this, this should probably make use of Pytest's more idiomatic pytest.raises. E.g.,
| try: | |
| job.results() | |
| except Exception as e: | |
| assert isinstance(e, EngineException) | |
| assert e.code == HTTPStatus.INTERNAL_SERVER_ERROR | |
| with pytest.raises(EngineException) as exc_info: | |
| job.results() | |
| assert exc_info.value.code == HTTPStatus.INTERNAL_SERVER_ERROR |
There is more than one try-except case like this in the PR, so if you agree with this change, please also check the other places.
http://b/510016374 surfaced issues where the existing recovery logic from stream failures cannot handle cases where programs/jobs were not successfully created before the stream was terminated. This change addresses those cases by attempting to recreate programs/jobs during the recovery process.