diff --git a/.github/workflows/ci_workflow.yml b/.github/workflows/ci_workflow.yml index cf56aa6..70ca77d 100644 --- a/.github/workflows/ci_workflow.yml +++ b/.github/workflows/ci_workflow.yml @@ -14,25 +14,24 @@ jobs: strategy: matrix: # Only lint using the primary version used for dev - python-version: [3.11] + python-version: ["3.9", "3.10", "3.11"] steps: - - uses: actions/checkout@v2 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-version }} - - name: Install Poetry - uses: snok/install-poetry@v1 - with: - version: 1.8.4 - - name: Install dependencies - run: | - poetry install - - name: Run lint command from tox.ini - run: | - poetry run tox -e lint - + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + version: 1.7.1 + - name: Install dependencies + run: | + poetry install + - name: Run lint command from tox.ini + run: | + poetry run tox -e lint pytest: runs-on: ubuntu-latest @@ -40,21 +39,21 @@ jobs: GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}} strategy: matrix: - python-version: [3.9, 3.10, 3.11] + python-version: ["3.9", "3.10", "3.11"] steps: - - uses: actions/checkout@v2 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-version }} - - name: Install Poetry - uses: snok/install-poetry@v1 - with: - version: 1.8.4 - - name: Install dependencies - run: | - poetry install - - name: Test with pytest - run: | - poetry run pytest --capture=no + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + version: 1.7.1 + - name: Install dependencies + run: | + poetry install + - name: Test with pytest + run: | + poetry run pytest --capture=no diff --git a/tap_sumologic/streams.py b/tap_sumologic/streams.py index 585b5a2..eaa24e4 100644 --- a/tap_sumologic/streams.py +++ b/tap_sumologic/streams.py @@ -2,7 +2,7 @@ import time from datetime import datetime -from typing import Any, Dict, Iterable, Optional +from typing import Any, Dict, Iterable, List, Mapping, Optional from tap_sumologic.client import SumoLogicStream @@ -58,7 +58,59 @@ def __init__( self.rollup = rollup self.timeshift = timeshift - def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: + def _wait_for_search_job(self, search_job: dict, delay: int) -> Optional[dict]: + """Poll until the search job finishes or is cancelled.""" + status = self.conn.search_job_status(search_job) + while status["state"] != "DONE GATHERING RESULTS": + if status["state"] == "CANCELLED": + return None + time.sleep(delay) + self.logger.info("") + status = self.conn.search_job_status(search_job) + # remove key histogramBuckets from status + del status["histogramBuckets"] + self.logger.info(f"Query Status: {status}") + return status + + def _fetch_search_job_records( + self, + search_job: dict, + record_count: int, + custom_columns: Dict[str, Any], + limit: int, + pagination_delay: int, + ) -> List[Dict[str, Any]]: + """Fetch search job results with optional pagination delays.""" + records: List[Dict[str, Any]] = [] + count = 0 + while count < record_count: + self.logger.info( + f"Get {self.query_type} {count} of {record_count}, limit={limit}" + ) + response = self.conn.search_job_records( + search_job, self.query_type, limit=limit, offset=count + ) + self.logger.info(f"Got {self.query_type} {count} of {record_count}") + + recs = response[self.query_type] + for rec in recs: + records.append({**rec["map"], **custom_columns}) + + if len(recs) > 0: + count = count + len(recs) + if count < record_count: + self.logger.info( + "Waiting 1 second before next paginated API call to avoid " + "rate limit..." + ) + time.sleep(pagination_delay) + else: + break # make sure we exit if nothing comes back + return records + + def get_records( + self, context: Optional[Mapping[str, Any]] + ) -> Iterable[Dict[str, Any]]: """Return a generator of row-type dictionary objects. The optional `context` argument is used to identify a specific slice of the @@ -67,7 +119,7 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: """ self.logger.info("Running query in sumologic to get records") - records = [] + records: List[Dict[str, Any]] = [] limit = 10000 now_datetime = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f") @@ -82,6 +134,7 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: if self.query_type in ["messages", "records"]: delay = 5 + pagination_delay = 1 search_job = self.conn.search_job( self.query, self.config["start_date"], @@ -92,41 +145,20 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: ) # self.logger.info(search_job) - status = self.conn.search_job_status(search_job) - while status["state"] != "DONE GATHERING RESULTS": - if status["state"] == "CANCELLED": - break - time.sleep(delay) - self.logger.info("") - status = self.conn.search_job_status(search_job) - # remove key histogramBuckets from status - del status["histogramBuckets"] - self.logger.info(f"Query Status: {status}") - - self.logger.info(status["state"]) - - if status["state"] == "DONE GATHERING RESULTS": - record_count = status[f"{self.query_type[:-1]}Count"] - count = 0 - while count < record_count: - self.logger.info( - f"Get {self.query_type} {count} of {record_count}, " - f"limit={limit}" - ) - response = self.conn.search_job_records( - search_job, self.query_type, limit=limit, offset=count - ) - self.logger.info(f"Got {self.query_type} {count} of {record_count}") - - recs = response[self.query_type] - # extract the result maps to put them in the list of records - for rec in recs: - records.append({**rec["map"], **custom_columns}) + status = self._wait_for_search_job(search_job, delay) + if status is None: + self.logger.info("Search job was cancelled, no records to yield") + else: + self.logger.info(status["state"]) - if len(recs) > 0: - count = count + len(recs) - else: - break # make sure we exit if nothing comes back + record_count = status[f"{self.query_type[:-1]}Count"] + records = self._fetch_search_job_records( + search_job=search_job, + record_count=record_count, + custom_columns=custom_columns, + limit=limit, + pagination_delay=pagination_delay, + ) elif self.query_type == "metrics": response = self.conn.metrics_query( @@ -138,6 +170,12 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: self.timeshift, ) records = response["queryResult"][0]["timeSeriesList"]["timeSeries"] + # Enable below lines to add delay iff we've back to back metric queries + # to be triggered to avoid rate limit. + # self.logger.info( + # "Waiting 15 seconds after metrics query to avoid rate limit..." + # ) + # time.sleep(15) for row in records: yield row