Integrate large payload support for SQS#2116
Conversation
|
Hello, Do you have any roadmap or due date to integrate this feature to support large message which more than 256kb(in SQS Message Broker) |
ca4eb4b to
2d1f691
Compare
f9a0f71 to
2fa40f4
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2116 +/- ##
==========================================
- Coverage 81.50% 79.34% -2.17%
==========================================
Files 77 77
Lines 9524 9555 +31
Branches 1152 1155 +3
==========================================
- Hits 7763 7581 -182
- Misses 1569 1806 +237
+ Partials 192 168 -24 ☔ View full report in Codecov by Sentry. |
|
Coverage needs to be improved |
39844fc to
6eb59bb
Compare
This adds support for handling large payloads in SQS. The 'sqs_extended_client' is imported and utilized for fetching file from S3 as payload when necessary. As Kombu asynchronously fetches new messages from the queue, not using the standard boto3 APIs, we have to manually fetch the s3 file, rather than rely on the sqs_extended_client to perform that action Relates to: celery#279
The try/except block was triggering when sqs_extended_client isn't installed, which results in boto being overwritten with None
Introduce two tests to verify S3 client creation behavior: one for insecure connections and another for custom endpoint usage. This ensures proper configuration of boto3 client initialization in these scenarios.
6eb59bb to
2ffdb7a
Compare
auvipy
left a comment
There was a problem hiding this comment.
this seems to need more test coverage
|
@auvipy I was wondering if I could help get this PR across the finish line. It looks like we're trying to add more tests IIUC? Anyways, don't want to take this if someone else is working on it, but I figured I would offer. |
|
yes you can pull from their branch and update it and come with a new PR. there might be some merge conflicts as well |
There was a problem hiding this comment.
Pull Request Overview
This PR adds support for processing large SQS message payloads stored in S3 using the Amazon SQS Extended Client.
- Imports and wires up
sqs_extended_clientin the SQS transport to detect pointer messages and fetch full payloads from S3. - Introduces
new_s3_clientands3helper methods and updatesnew_sqs_clientto enable large payload support. - Adds unit tests for S3 client configuration and end-to-end extended-client message retrieval, and updates requirements to include the extended client library.
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| t/unit/transport/test_SQS.py | Added tests for S3 client setup and fetching extended payloads from S3. |
| requirements/extras/sqs.txt | Added amazon-sqs-extended-client>=1.0.1 to extras requirements. |
| kombu/transport/SQS.py | Integrated sqs_extended_client logic, large-payload handling, and added S3 helpers. |
| kombu/asynchronous/aws/sqs/ext.py | Imported sqs_extended_client to conditionally enable extended-client support. |
| kombu/asynchronous/aws/ext.py | Imported sqs_extended_client in the AWS ext utilities module. |
Comments suppressed due to low confidence (4)
t/unit/transport/test_SQS.py:371
- [nitpick] In
test_new_s3_client_with_is_secure_false, consider adding an assertion on the return value ofnew_s3_clientto verify it returns the expected client instance.
def test_new_s3_client_with_is_secure_false(self, mock_session):
t/unit/transport/test_SQS.py:1106
- Consider asserting that
get_objectwas called with the correctBucketandKeyparameters to ensure accurate S3 invocation.
assert s3_client.get_object.called
kombu/transport/SQS.py:735
- [nitpick] Add a docstring to
new_s3_clientexplaining its parameters, return type, and behavior, including howis_secureandendpoint_urlare used.
def new_s3_client(
kombu/asynchronous/aws/ext.py:26
- [nitpick] The import of
sqs_extended_clientin this module isn't used elsewhere; consider removing it to reduce redundancy.
import sqs_extended_client
| ) | ||
| else: | ||
|
|
||
| if ( |
There was a problem hiding this comment.
[nitpick] Consider extracting the S3 payload fetching logic into a dedicated helper method to improve readability and facilitate unit testing.
This adds support for handling large payloads in SQS. The 'sqs_extended_client' is imported and utilized for fetching file from S3 as payload when necessary.
As Kombu asynchronously fetches new messages from the queue, not using the standard boto3 APIs, we have to manually fetch the s3 file, rather than rely on the sqs_extended_client to perform that action
Relates to: #279