Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 32 additions & 33 deletions .github/workflows/ci_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,47 +14,46 @@ jobs:
strategy:
matrix:
# Only lint using the primary version used for dev
python-version: [3.11]
python-version: ["3.9", "3.10", "3.11"]

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.8.4
- name: Install dependencies
run: |
poetry install
- name: Run lint command from tox.ini
run: |
poetry run tox -e lint

- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.7.1
- name: Install dependencies
run: |
poetry install
- name: Run lint command from tox.ini
run: |
poetry run tox -e lint
pytest:

runs-on: ubuntu-latest
env:
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}
strategy:
matrix:
python-version: [3.9, 3.10, 3.11]
python-version: ["3.9", "3.10", "3.11"]

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.8.4
- name: Install dependencies
run: |
poetry install
- name: Test with pytest
run: |
poetry run pytest --capture=no
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.7.1
- name: Install dependencies
run: |
poetry install
- name: Test with pytest
run: |
poetry run pytest --capture=no
112 changes: 75 additions & 37 deletions tap_sumologic/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import time
from datetime import datetime
from typing import Any, Dict, Iterable, Optional
from typing import Any, Dict, Iterable, List, Mapping, Optional

from tap_sumologic.client import SumoLogicStream

Expand Down Expand Up @@ -58,7 +58,59 @@ def __init__(
self.rollup = rollup
self.timeshift = timeshift

def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
def _wait_for_search_job(self, search_job: dict, delay: int) -> Optional[dict]:
"""Poll until the search job finishes or is cancelled."""
status = self.conn.search_job_status(search_job)
while status["state"] != "DONE GATHERING RESULTS":
if status["state"] == "CANCELLED":
return None
time.sleep(delay)
self.logger.info("")
status = self.conn.search_job_status(search_job)
# remove key histogramBuckets from status
del status["histogramBuckets"]
self.logger.info(f"Query Status: {status}")
return status

def _fetch_search_job_records(
self,
search_job: dict,
record_count: int,
custom_columns: Dict[str, Any],
limit: int,
pagination_delay: int,
) -> List[Dict[str, Any]]:
"""Fetch search job results with optional pagination delays."""
records: List[Dict[str, Any]] = []
count = 0
while count < record_count:
self.logger.info(
f"Get {self.query_type} {count} of {record_count}, limit={limit}"
)
response = self.conn.search_job_records(
search_job, self.query_type, limit=limit, offset=count
)
self.logger.info(f"Got {self.query_type} {count} of {record_count}")

recs = response[self.query_type]
for rec in recs:
records.append({**rec["map"], **custom_columns})

if len(recs) > 0:
count = count + len(recs)
if count < record_count:
self.logger.info(
"Waiting 1 second before next paginated API call to avoid "
"rate limit..."
)
time.sleep(pagination_delay)
else:
break # make sure we exit if nothing comes back
return records

def get_records(
self, context: Optional[Mapping[str, Any]]
) -> Iterable[Dict[str, Any]]:
"""Return a generator of row-type dictionary objects.

The optional `context` argument is used to identify a specific slice of the
Expand All @@ -67,7 +119,7 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
"""
self.logger.info("Running query in sumologic to get records")

records = []
records: List[Dict[str, Any]] = []
limit = 10000

now_datetime = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")
Expand All @@ -82,6 +134,7 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:

if self.query_type in ["messages", "records"]:
delay = 5
pagination_delay = 1
search_job = self.conn.search_job(
self.query,
self.config["start_date"],
Expand All @@ -92,41 +145,20 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
)
# self.logger.info(search_job)

status = self.conn.search_job_status(search_job)
while status["state"] != "DONE GATHERING RESULTS":
if status["state"] == "CANCELLED":
break
time.sleep(delay)
self.logger.info("")
status = self.conn.search_job_status(search_job)
# remove key histogramBuckets from status
del status["histogramBuckets"]
self.logger.info(f"Query Status: {status}")

self.logger.info(status["state"])

if status["state"] == "DONE GATHERING RESULTS":
record_count = status[f"{self.query_type[:-1]}Count"]
count = 0
while count < record_count:
self.logger.info(
f"Get {self.query_type} {count} of {record_count}, "
f"limit={limit}"
)
response = self.conn.search_job_records(
search_job, self.query_type, limit=limit, offset=count
)
self.logger.info(f"Got {self.query_type} {count} of {record_count}")

recs = response[self.query_type]
# extract the result maps to put them in the list of records
for rec in recs:
records.append({**rec["map"], **custom_columns})
status = self._wait_for_search_job(search_job, delay)
if status is None:
self.logger.info("Search job was cancelled, no records to yield")
else:
self.logger.info(status["state"])

if len(recs) > 0:
count = count + len(recs)
else:
break # make sure we exit if nothing comes back
record_count = status[f"{self.query_type[:-1]}Count"]
records = self._fetch_search_job_records(
search_job=search_job,
record_count=record_count,
custom_columns=custom_columns,
limit=limit,
pagination_delay=pagination_delay,
)

elif self.query_type == "metrics":
response = self.conn.metrics_query(
Expand All @@ -138,6 +170,12 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
self.timeshift,
)
records = response["queryResult"][0]["timeSeriesList"]["timeSeries"]
# Enable below lines to add delay iff we've back to back metric queries
# to be triggered to avoid rate limit.
# self.logger.info(
# "Waiting 15 seconds after metrics query to avoid rate limit..."
# )
# time.sleep(15)

for row in records:
yield row