diff --git a/api_app/analyzers_manager/file_analyzers/ipqsfile.py b/api_app/analyzers_manager/file_analyzers/ipqsfile.py new file mode 100644 index 0000000000..d87968e7bd --- /dev/null +++ b/api_app/analyzers_manager/file_analyzers/ipqsfile.py @@ -0,0 +1,54 @@ +# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl +# See the file 'LICENSE' for copying permission. + +"""IPQualityScore file analyzer. + +Provides `IPQSFileScan`, a file-scanning analyzer that uploads files to +IPQualityScore for malware detection and polls for results. +""" + +import logging + +from api_app.analyzers_manager.classes import FileAnalyzer +from api_app.mixins import IPQualityScoreMixin + +logger = logging.getLogger(__name__) + + +class IPQSFileScan(FileAnalyzer, IPQualityScoreMixin): + """ + Scan a binary file using IPQualityScore malware detection service. + """ + + @classmethod + def update(cls): + pass + + def run(self): + binary = self.read_file_bytes() + files = {"files": (self.filename, binary)} + # lookup endpoint check for cached result + lookup_result = self._make_request( + self.lookup_endpoint, + method="POST", + _api_key=self._ipqs_api_key, + files=files, + ) + if lookup_result.get("status", False) == "cached": + lookup_result.pop("update_url", None) + return lookup_result + # sending file to ipqs for scan + scan_result = self._make_request( + self.scan_endpoint, + method="POST", + _api_key=self._ipqs_api_key, + files=files, + ) + # waiting for scan result with help of request id + result = self._poll_for_report( + endpoint=self.postback_endpoint, + _api_key=self._ipqs_api_key, + request_id=scan_result.get("request_id"), + ) + result.pop("update_url", None) + return result diff --git a/api_app/analyzers_manager/migrations/0178_analyzer_config_ipqs_file_url_scanner.py b/api_app/analyzers_manager/migrations/0178_analyzer_config_ipqs_file_url_scanner.py new file mode 100644 index 0000000000..3141a29a7b --- /dev/null +++ b/api_app/analyzers_manager/migrations/0178_analyzer_config_ipqs_file_url_scanner.py @@ -0,0 +1,163 @@ +from django.db import migrations +from django.db.models.fields.related_descriptors import ( + ForwardManyToOneDescriptor, + ForwardOneToOneDescriptor, + ManyToManyDescriptor, + ReverseManyToOneDescriptor, + ReverseOneToOneDescriptor, +) + +plugin = { + "python_module": { + "health_check_schedule": None, + "update_schedule": None, + "module": "ipqsurl.IPQSUrlScan", + "base_path": "api_app.analyzers_manager.observable_analyzers", + }, + "name": "IPQS_File_URL_Scanner", + "description": ( + "Scans files hosted or accessible via a URL using " + "[IPQualityScore](https://www.ipqualityscore.com/) malware " + "detection API." + ), + "disabled": False, + "soft_time_limit": 60, + "routing_key": "default", + "health_check_status": True, + "type": "observable", + "docker_based": False, + "maximum_tlp": "AMBER", + "observable_supported": ["url"], + "supported_filetypes": [], + "run_hash": False, + "run_hash_type": "", + "not_supported_filetypes": [], + "mapping_data_model": {}, + "model": "analyzers_manager.AnalyzerConfig", +} + +params = [ + { + "python_module": { + "module": "ipqsurl.IPQSUrlScan", + "base_path": "api_app.analyzers_manager.observable_analyzers", + }, + "name": "ipqs_api_key", + "type": "str", + "description": "Please provide the IPQS API key.", + "is_secret": True, + "required": True, + }, + { + "python_module": { + "module": "ipqsurl.IPQSUrlScan", + "base_path": "api_app.analyzers_manager.observable_analyzers", + }, + "name": "polling_interval", + "type": "int", + "description": "Please provide the polling interval.", + "is_secret": False, + "required": True, + }, + { + "python_module": { + "module": "ipqsurl.IPQSUrlScan", + "base_path": "api_app.analyzers_manager.observable_analyzers", + }, + "name": "max_retries", + "type": "int", + "description": "Please provide the maximum number of retries.", + "is_secret": False, + "required": True, + }, +] + +values = [] + + +def _get_real_obj(Model, field, value): + def _get_obj(Model, other_model, value): + if isinstance(value, dict): + real_vals = {} + for key, real_val in value.items(): + real_vals[key] = _get_real_obj(other_model, key, real_val) + value = other_model.objects.get_or_create(**real_vals)[0] + # it is just the primary key serialized + else: + if isinstance(value, int): + if Model.__name__ == "PluginConfig": + value = other_model.objects.get(name=plugin["name"]) + else: + value = other_model.objects.get(pk=value) + else: + value = other_model.objects.get(name=value) + return value + + if ( + type(getattr(Model, field)) + in [ + ForwardManyToOneDescriptor, + ReverseManyToOneDescriptor, + ReverseOneToOneDescriptor, + ForwardOneToOneDescriptor, + ] + and value + ): + other_model = getattr(Model, field).get_queryset().model + value = _get_obj(Model, other_model, value) + elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: + other_model = getattr(Model, field).rel.model + value = [_get_obj(Model, other_model, val) for val in value] + return value + + +def _create_object(Model, data): + mtm, no_mtm = {}, {} + for field, value in data.items(): + value = _get_real_obj(Model, field, value) + if type(getattr(Model, field)) is ManyToManyDescriptor: + mtm[field] = value + else: + no_mtm[field] = value + try: + o = Model.objects.get(**no_mtm) + except Model.DoesNotExist: + o = Model(**no_mtm) + o.full_clean() + o.save() + for field, value in mtm.items(): + attribute = getattr(o, field) + if value is not None: + attribute.set(value) + return False + return True + + +def migrate(apps, schema_editor): + Parameter = apps.get_model("api_app", "Parameter") + PluginConfig = apps.get_model("api_app", "PluginConfig") + python_path = plugin.pop("model") + Model = apps.get_model(*python_path.split(".")) + if not Model.objects.filter(name=plugin["name"]).exists(): + exists = _create_object(Model, plugin) + if not exists: + for param in params: + _create_object(Parameter, param) + for value in values: + _create_object(PluginConfig, value) + + +def reverse_migrate(apps, schema_editor): + python_path = plugin.pop("model") + Model = apps.get_model(*python_path.split(".")) + Model.objects.get(name=plugin["name"]).delete() + + +class Migration(migrations.Migration): + atomic = False + dependencies = [ + ("api_app", "0072_update_check_system"), + ("analyzers_manager", "0177_update_urlscan_observable_supported"), + ] + + operations = [migrations.RunPython(migrate, reverse_migrate)] diff --git a/api_app/analyzers_manager/migrations/0179_analyzer_config_ipqs_malware_file_scanner.py b/api_app/analyzers_manager/migrations/0179_analyzer_config_ipqs_malware_file_scanner.py new file mode 100644 index 0000000000..f1d92902b8 --- /dev/null +++ b/api_app/analyzers_manager/migrations/0179_analyzer_config_ipqs_malware_file_scanner.py @@ -0,0 +1,222 @@ +from django.db import migrations +from django.db.models.fields.related_descriptors import ( + ForwardManyToOneDescriptor, + ForwardOneToOneDescriptor, + ManyToManyDescriptor, + ReverseManyToOneDescriptor, + ReverseOneToOneDescriptor, +) + +plugin = { + "python_module": { + "health_check_schedule": None, + "update_schedule": None, + "module": "ipqsfile.IPQSFileScan", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "IPQS_Malware_File_Scanner", + "description": ( + "Scan files for malware, viruses, and malicious payloads in real-time " + "using [IPQualityScore](https://www.ipqualityscore.com/)'s advanced " + "file scanning engine." + ), + "disabled": False, + "soft_time_limit": 60, + "routing_key": "default", + "health_check_status": True, + "type": "file", + "docker_based": False, + "maximum_tlp": "AMBER", + "observable_supported": [], + "supported_filetypes": [ + "application/w-script-file", + "application/javascript", + "application/x-javascript", + "text/javascript", + "application/x-vbscript", + "text/x-ms-iqy", + "application/vnd.android.package-archive", + "application/x-dex", + "application/onenote", + "application/zip", + "multipart/x-zip", + "application/java-archive", + "text/rtf", + "application/rtf", + "application/x-sharedlib", + "application/vnd.microsoft.portable-executable", + "application/x-elf", + "application/octet-stream", + "application/vnd.tcpdump.pcap", + "application/pdf", + "text/html", + "application/x-mspublisher", + "application/vnd.ms-excel.addin.macroEnabled", + "application/vnd.ms-excel.sheet.macroEnabled.12", + "application/vnd.ms-excel", + "application/excel", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/xml", + "text/xml", + "application/encrypted", + "text/plain", + "text/csv", + "application/vnd.openxmlformats-officedocument.presentationml.presentation", + "application/msword", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/vnd.ms-powerpoint", + "application/vnd.ms-office", + "application/x-binary", + "application/x-macbinary", + "application/mac-binary", + "application/x-mach-binary", + "application/x-zip-compressed", + "application/x-compressed", + "application/vnd.ms-outlook", + "message/rfc822", + "application/pkcs7-signature", + "application/x-pkcs7-signature", + "multipart/mixed", + "text/x-shellscript", + "application/x-chrome-extension", + "application/json", + "application/x-executable", + "text/x-java", + "text/x-kotlin", + "text/x-swift", + "text/x-objective-c", + "application/x-ms-shortcut", + "application/gzip", + ], + "run_hash": False, + "run_hash_type": "", + "not_supported_filetypes": [], + "mapping_data_model": {}, + "model": "analyzers_manager.AnalyzerConfig", +} + +params = [ + { + "python_module": { + "module": "ipqsfile.IPQSFileScan", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "ipqs_api_key", + "type": "str", + "description": "Please provide the IPQS API key.", + "is_secret": True, + "required": True, + }, + { + "python_module": { + "module": "ipqsfile.IPQSFileScan", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "polling_interval", + "type": "int", + "description": "Please provide the polling interval.", + "is_secret": False, + "required": True, + }, + { + "python_module": { + "module": "ipqsfile.IPQSFileScan", + "base_path": "api_app.analyzers_manager.file_analyzers", + }, + "name": "max_retries", + "type": "int", + "description": "Please provide the maximum number of retries.", + "is_secret": False, + "required": True, + }, +] + +values = [] + + +def _get_real_obj(Model, field, value): + def _get_obj(Model, other_model, value): + if isinstance(value, dict): + real_vals = {} + for key, real_val in value.items(): + real_vals[key] = _get_real_obj(other_model, key, real_val) + value = other_model.objects.get_or_create(**real_vals)[0] + # it is just the primary key serialized + else: + if isinstance(value, int): + if Model.__name__ == "PluginConfig": + value = other_model.objects.get(name=plugin["name"]) + else: + value = other_model.objects.get(pk=value) + else: + value = other_model.objects.get(name=value) + return value + + if ( + type(getattr(Model, field)) + in [ + ForwardManyToOneDescriptor, + ReverseManyToOneDescriptor, + ReverseOneToOneDescriptor, + ForwardOneToOneDescriptor, + ] + and value + ): + other_model = getattr(Model, field).get_queryset().model + value = _get_obj(Model, other_model, value) + elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: + other_model = getattr(Model, field).rel.model + value = [_get_obj(Model, other_model, val) for val in value] + return value + + +def _create_object(Model, data): + mtm, no_mtm = {}, {} + for field, value in data.items(): + value = _get_real_obj(Model, field, value) + if type(getattr(Model, field)) is ManyToManyDescriptor: + mtm[field] = value + else: + no_mtm[field] = value + try: + o = Model.objects.get(**no_mtm) + except Model.DoesNotExist: + o = Model(**no_mtm) + o.full_clean() + o.save() + for field, value in mtm.items(): + attribute = getattr(o, field) + if value is not None: + attribute.set(value) + return False + return True + + +def migrate(apps, schema_editor): + Parameter = apps.get_model("api_app", "Parameter") + PluginConfig = apps.get_model("api_app", "PluginConfig") + python_path = plugin.pop("model") + Model = apps.get_model(*python_path.split(".")) + if not Model.objects.filter(name=plugin["name"]).exists(): + exists = _create_object(Model, plugin) + if not exists: + for param in params: + _create_object(Parameter, param) + for value in values: + _create_object(PluginConfig, value) + + +def reverse_migrate(apps, schema_editor): + python_path = plugin.pop("model") + Model = apps.get_model(*python_path.split(".")) + Model.objects.get(name=plugin["name"]).delete() + + +class Migration(migrations.Migration): + atomic = False + dependencies = [ + ("api_app", "0072_update_check_system"), + ("analyzers_manager", "0178_analyzer_config_ipqs_file_url_scanner"), + ] + + operations = [migrations.RunPython(migrate, reverse_migrate)] diff --git a/api_app/analyzers_manager/observable_analyzers/ipqs.py b/api_app/analyzers_manager/observable_analyzers/ipqs.py index d80ebb5a65..e8db4c6a35 100644 --- a/api_app/analyzers_manager/observable_analyzers/ipqs.py +++ b/api_app/analyzers_manager/observable_analyzers/ipqs.py @@ -1,3 +1,9 @@ +"""IPQualityScore observable analyzer using the IPQualityScore API. + +This module implements the `IPQualityScore` analyzer which queries the +IPQualityScore service for URLs, IPs, emails, phones and credential leaks. +""" + import logging import re @@ -8,9 +14,12 @@ logger = logging.getLogger(__name__) +IP_REG = ( + r"^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}" + r"(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])$" +) -IP_REG = "^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])$" -IPv6_REG = ( +IPV6_REG = ( r"\b(?:(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|" r"(?:[0-9a-fA-F]{1,4}:){1,7}:|" r"(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|" @@ -28,20 +37,34 @@ r"((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}" r"(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))\b" ) -EMAIL_REG = "[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}" + +EMAIL_REG = r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$" + DOMAIN_REG = re.compile( - r"^(?:[a-zA-Z0-9]" # First character of the domain - r"(?:[a-zA-Z0-9-_]{0,61}[A-Za-z0-9])?\.)" # Sub domain + hostname - r"+[A-Za-z0-9][A-Za-z0-9-_]{0,61}" # First 61 characters of the gTLD - r"[A-Za-z]$" # Last character of the gTLD + r"^(?:[a-zA-Z0-9]" + r"(?:[a-zA-Z0-9-_]{0,61}[A-Za-z0-9])?\.)" + r"+[A-Za-z0-9][A-Za-z0-9-_]{0,61}" + r"[A-Za-z]$" ) -PHONE_REG = "^\+?[1-9]\d{1,14}$" + +PHONE_REG = r"^\+?[0-9(). -]{7,20}$" + URL_REG = ( - "((http|https)://)(www.)?[a-zA-Z0-9@:%._\\+~#?&//=]{2,256}\\.[a-z]{2,6}\\b([-a-zA-Z0-9@:%._\\+~#?&//=]*)" + r"((http|https)://)" + r"(www\.)?" + r"[a-zA-Z0-9@:%._\\+~#?&//=]{2,256}" + r"\.[a-z]{2,6}\b" + r"([-a-zA-Z0-9@:%._\\+~#?&//=]*)" ) class IPQualityScore(classes.ObservableAnalyzer): + """Observable analyzer for IPQualityScore service. + + Uses regex-based detection of the observable type and queries the + corresponding IPQualityScore endpoint. + """ + _ipqs_api_key: str url_timeout: int = 2 url_strictness: int = 0 @@ -69,6 +92,13 @@ class IPQualityScore(classes.ObservableAnalyzer): IP_ENDPOINT = IPQS_BASE_URL + "ip?ip=" EMAIL_ENDPOINT = IPQS_BASE_URL + "email?email=" PHONE_ENDPOINT = IPQS_BASE_URL + "phone?phone=" + USERNAME_ENDPOINT = IPQS_BASE_URL + "leaked/username?username=" + PASSWORD_ENDPOINT = IPQS_BASE_URL + "leaked/password?password=" + LEAKED_EMAILENDPOINT = IPQS_BASE_URL + "leaked/email?email=" + + @classmethod + def update(cls) -> bool: + pass def _get_url_payload(self): return { @@ -80,9 +110,9 @@ def _get_url_payload(self): def _get_ip_payload(self): payload = { "strictness": self.ip_strictness, - "allow_public_access_points": str(self.allow_public_access_points).lower(), + "allow_public_access_points": (str(self.allow_public_access_points).lower()), "fast": str(self.ip_fast).lower(), - "lighter_penalties": str(self.lighter_penalties).lower(), + "lighter_penalties": (str(self.lighter_penalties).lower()), "mobile": str(self.mobile).lower(), "transaction_strictness": self.transaction_strictness, } @@ -110,33 +140,114 @@ def _get_phone_payload(self): } def _get_calling_endpoint(self): - if re.match(IP_REG, self.observable_name) or re.match(IPv6_REG, self.observable_name): - return self.IP_ENDPOINT, self._get_ip_payload() - elif re.match(DOMAIN_REG, self.observable_name) or re.match(URL_REG, self.observable_name): - return self.URL_ENDPOINT, self._get_url_payload() - elif re.match(EMAIL_REG, self.observable_name): - return self.EMAIL_ENDPOINT, self._get_email_payload() - elif re.match(PHONE_REG, self.observable_name): - return self.PHONE_ENDPOINT, self._get_phone_payload() - else: - return None, None + if re.match(IP_REG, self.observable_name) or re.match(IPV6_REG, self.observable_name): + return { + "type": "ip", + "endpoint": self.IP_ENDPOINT, + "payload": self._get_ip_payload(), + } + if re.match(DOMAIN_REG, self.observable_name) or re.match(URL_REG, self.observable_name): + return { + "type": "url", + "endpoint": self.URL_ENDPOINT, + "payload": self._get_url_payload(), + } + if re.match(EMAIL_REG, self.observable_name): + return { + "type": "email", + "leaked_email_endpoint": self.LEAKED_EMAILENDPOINT, + "email_endpoint": self.EMAIL_ENDPOINT, + "payload": self._get_email_payload(), + } + if re.match(PHONE_REG, self.observable_name): + return { + "type": "phone", + "endpoint": self.PHONE_ENDPOINT, + "payload": self._get_phone_payload(), + } + return { + "type": "credentials", + "username_endpoint": self.USERNAME_ENDPOINT, + "password_endpoint": self.PASSWORD_ENDPOINT, + } def run(self): - calling_endpoint, payload = self._get_calling_endpoint() + endpoints = self._get_calling_endpoint() ipqs_headers = {"IPQS-KEY": self._ipqs_api_key} try: - if calling_endpoint and payload is not None: + if endpoints.get("type") == "credentials": + return self._handle_credentials(endpoints, ipqs_headers) + + if endpoints.get("type") == "email": + return self._handle_email(endpoints, ipqs_headers) + + if endpoints.get("type") in ["url", "phone", "ip"]: + calling_endpoint = endpoints.get("endpoint") + payload = endpoints.get("payload") response = requests.get( calling_endpoint + self.observable_name, headers=ipqs_headers, params=payload, + timeout=60, ) response.raise_for_status() - result = response.json() - return result - else: - logger.warning("Invalid or unsupported observable type") - raise AnalyzerRunException("Invalid or unsupported observable type") + return response.json() + + msg = "Invalid or unsupported observable type" + logger.warning(msg) + raise AnalyzerRunException(msg) except requests.RequestException as e: - raise AnalyzerRunException(e) + raise AnalyzerRunException(e) from e + + def _handle_credentials(self, endpoints, headers): + username_endpoint = endpoints.get("username_endpoint") + password_endpoint = endpoints.get("password_endpoint") + + response_username = requests.get( + username_endpoint + self.observable_name, + headers=headers, + timeout=60, + ) + response_username.raise_for_status() + result_username = response_username.json() + + response_password = requests.get( + password_endpoint + self.observable_name, + headers=headers, + timeout=60, + ) + response_password.raise_for_status() + result_password = response_password.json() + + return { + "darkweb_leak_username_api_result": result_username, + "darkweb_leak_password_api_result": result_password, + } + + def _handle_email(self, endpoints, headers): + leaked_email_endpoint = endpoints.get("leaked_email_endpoint") + email_endpoint = endpoints.get("email_endpoint") + email_payload = endpoints.get("payload") + + response_leaked = requests.get( + leaked_email_endpoint + self.observable_name, + headers=headers, + timeout=60, + ) + response_leaked.raise_for_status() + result_leaked = response_leaked.json() + + response_email = requests.get( + email_endpoint + self.observable_name, + headers=headers, + params=email_payload, + timeout=60, + ) + response_email.raise_for_status() + result_email = response_email.json() + + return { + "darkweb_leak_email_api_result": result_leaked, + "email_reputation_api_result": result_email, + } diff --git a/api_app/analyzers_manager/observable_analyzers/ipqsurl.py b/api_app/analyzers_manager/observable_analyzers/ipqsurl.py new file mode 100644 index 0000000000..b018127b72 --- /dev/null +++ b/api_app/analyzers_manager/observable_analyzers/ipqsurl.py @@ -0,0 +1,50 @@ +"""IPQualityScore URL analyzer. + +Provides `IPQSUrlScan`, an observable analyzer that scans URLs via +the IPQualityScore service and polls for results. +""" + +import logging + +from api_app.analyzers_manager import classes +from api_app.mixins import IPQualityScoreMixin + +logger = logging.getLogger(__name__) + + +class IPQSUrlScan(classes.ObservableAnalyzer, IPQualityScoreMixin): + """ + Scan a URL using IPQualityScore service. + """ + + @classmethod + def update(cls): + pass + + def run(self): + # lookup check for url results into ipqs database + lookup_result = self._make_request( + endpoint=self.lookup_endpoint, + method="POST", + _api_key=self._ipqs_api_key, + data={"url": self.observable_name}, + ) + if lookup_result.get("status", False) == "cached": + lookup_result.pop("update_url", None) + return lookup_result + + # sending url for scan + scan_result = self._make_request( + endpoint=self.scan_endpoint, + method="POST", + _api_key=self._ipqs_api_key, + data={"url": self.observable_name}, + ) + # waiting for results for with request id of scanned results + result = self._poll_for_report( + endpoint=self.postback_endpoint, + _api_key=self._ipqs_api_key, + request_id=scan_result.get("request_id"), + ) + result.pop("update_url", None) + return result diff --git a/api_app/mixins.py b/api_app/mixins.py index 8b072552b8..506c754e55 100644 --- a/api_app/mixins.py +++ b/api_app/mixins.py @@ -781,7 +781,7 @@ def _unzip(rule_file_path: pathlib.Path): logger.info(f"Extracting rules at {rule_file_path.parent}") with ZipFile(rule_file_path, mode="r") as archive: archive.extractall(rule_file_path.parent) # this will overwrite any existing directory - logger.info("Rules have been successfully extracted") + logger.info("Rules have been succesfully extracted") @staticmethod def _download_rules( @@ -819,3 +819,76 @@ def _download_rules( logger.info( f"Rules with version: {latest_version} have been successfully downloaded at {rule_set_directory}" ) + + +class IPQualityScoreMixin: + base_url: str = "https://www.ipqualityscore.com/api/json" # Ensure correct API base + _ipqs_api_key: str + polling_interval: int + max_retries: int + scan_endpoint: str = "/malware/scan/" + lookup_endpoint: str = "/malware/lookup/" + postback_endpoint: str = "/postback/" + + def _make_request( + self, + endpoint: str, + method: str, + _api_key: str = None, + data: Dict = None, + params: Dict = None, + files: Dict = None, + ) -> Dict: + """ + A streamlined request handler with proper timeout management. + """ + url = f"{self.base_url}{endpoint}" + headers = {"IPQS-KEY": _api_key} + + try: + if method.upper() == "POST": + response = requests.post( + url, + headers=headers, + data=data, + files=files, + timeout=60, + ) + else: + response = requests.get(url, headers=headers, json=params, timeout=60) + + response.raise_for_status() + result = response.json() + + # IPQS often returns 200 OK even if the API logic failed + if not result.get("success", True): + raise AnalyzerRunException(f"IPQS API Error: {result.get('message', 'Unknown Error')}") + + return result + + except requests.exceptions.Timeout: + raise AnalyzerRunException("Request timed out after 60s. File might be too large for sync scan.") + except requests.exceptions.JSONDecodeError: + raise AnalyzerRunException(f"Failed to decode JSON. Raw response: {response.text}") + + def _poll_for_report(self, endpoint: str, _api_key: str, request_id: str) -> Dict: + """ + Standardized polling logic for asynchronous retrieval. + """ + if not request_id: + raise AnalyzerRunException("Cannot poll without a valid request_id.") + + params = {"request_id": request_id} + + for attempt in range(self.max_retries): + logger.info(f"Polling attempt {attempt + 1}/{self.max_retries} for ID: {request_id}") + + result = self._make_request(endpoint, "GET", _api_key=_api_key, params=params) + + # Check if processing is finished + if result.get("status") != "pending": + break + + logger.info(f"Report pending. Retrying in {self.polling_interval}s...") + time.sleep(self.polling_interval) + return result diff --git a/tests/api_app/analyzers_manager/unit_tests/file_analyzers/test_ipqsfile.py b/tests/api_app/analyzers_manager/unit_tests/file_analyzers/test_ipqsfile.py new file mode 100644 index 0000000000..b54793c55a --- /dev/null +++ b/tests/api_app/analyzers_manager/unit_tests/file_analyzers/test_ipqsfile.py @@ -0,0 +1,48 @@ +from unittest.mock import patch + +from api_app.analyzers_manager.file_analyzers.ipqsfile import IPQSFileScan + +from .base_test_class import BaseFileAnalyzerTest + + +class TestIPQSFileScan(BaseFileAnalyzerTest): + analyzer_class = IPQSFileScan + + def get_extra_config(self): + return { + "_ipqs_api_key": "dummy_key", + "polling_interval": 0, + "max_retries": 1, + } + + def get_mocked_response(self): + lookup_response = { + "file_name": "test file.txt", + "success": True, + "message": "Success", + "file_hash": "abc123", + "type": "scan", + "detected": False, + "detected_scans": 0, + "total_scans": 0, + "status": "pending", + "result": [""], + "file_size": 10, + "file_type": "text/plain", + "request_id": "req1", + } + scan_response = {"success": True, "message": "Success", "request_id": "req1"} + final_response = {**lookup_response, "status": "finished"} + + return [ + patch.object( + IPQSFileScan, + "_make_request", + side_effect=[lookup_response, scan_response], + ), + patch.object( + IPQSFileScan, + "_poll_for_report", + return_value=final_response, + ), + ] diff --git a/tests/api_app/analyzers_manager/unit_tests/observable_analyzers/test_ipqs.py b/tests/api_app/analyzers_manager/unit_tests/observable_analyzers/test_ipqs.py index 27d5938560..4e5f366831 100644 --- a/tests/api_app/analyzers_manager/unit_tests/observable_analyzers/test_ipqs.py +++ b/tests/api_app/analyzers_manager/unit_tests/observable_analyzers/test_ipqs.py @@ -1,14 +1,18 @@ +"""Unit tests for the IPQualityScore observable analyzer.""" + from unittest.mock import patch -from api_app.analyzers_manager.observable_analyzers.ipqs import IPQualityScore -from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import ( - BaseAnalyzerTest, +from api_app.analyzers_manager.observable_analyzers import ipqs +from tests.api_app.analyzers_manager.unit_tests.observable_analyzers import ( + base_test_class, ) from tests.mock_utils import MockUpResponse -class IPQualityScoreTestCase(BaseAnalyzerTest): - analyzer_class = IPQualityScore +class IPQualityScoreTestCase(base_test_class.BaseAnalyzerTest): + """Tests for the `IPQualityScore` observable analyzer.""" + + analyzer_class = ipqs.IPQualityScore @staticmethod def get_mocked_response(): @@ -19,8 +23,13 @@ def get_mocked_response(): "domain": "test.com", "risk_score": 0, } - return patch("requests.get", return_value=MockUpResponse(mock_response, 200)) + return patch( + "requests.get", + return_value=MockUpResponse(mock_response, 200), + ) @classmethod def get_extra_config(cls) -> dict: - return {"_ipqs_api_key": "dummy_key"} + return { + "_ipqs_api_key": "dummy_key", + } diff --git a/tests/api_app/analyzers_manager/unit_tests/observable_analyzers/test_ipqsurl.py b/tests/api_app/analyzers_manager/unit_tests/observable_analyzers/test_ipqsurl.py new file mode 100644 index 0000000000..3736953940 --- /dev/null +++ b/tests/api_app/analyzers_manager/unit_tests/observable_analyzers/test_ipqsurl.py @@ -0,0 +1,72 @@ +"""Unit tests for the IPQS URL observable analyzer.""" + +from unittest.mock import patch + +from api_app.analyzers_manager.observable_analyzers.ipqsurl import IPQSUrlScan +from tests.api_app.analyzers_manager.unit_tests.observable_analyzers import ( + base_test_class, +) +from tests.mock_utils import MockUpResponse + + +class IPQSUrlScanTestCase(base_test_class.BaseAnalyzerTest): + """Tests for the `IPQSUrlScan` observable analyzer.""" + + analyzer_class = IPQSUrlScan + + @staticmethod + def get_mocked_response(): + # Response for lookup endpoint (non-cached case) + lookup_response = { + "file_name": "www.google.com", + "success": True, + "message": "Success", + "status": "not_cached", + "request_id": "dxwrE9RhS3", + } + # Response for scan endpoint + scan_response = { + "file_name": "www.google.com", + "success": True, + "message": "Success", + "request_id": "dxwrE9RhS3", + } + # Final response from poll_for_report + final_response = { + "file_name": "www.google.com", + "success": True, + "message": "Success", + "file_hash": ("86cc9b097d5ea4ec64a086634ef0f57b864770ccd1129d"), + "type": "scan", + "detected": False, + "detected_scans": 0, + "total_scans": 0, + "status": "finished", + "result": [""], + "file_size": 272728, + "file_type": "text/html", + "sha1": "379066b095304b84a0cc53888cda558ef483a4dd", + "md5": "4eb7c45715293e6effd84f4894cff654", + "request_id": "dxwrE9RhS3", + } + return [ + patch( + "requests.post", + side_effect=[ + MockUpResponse(lookup_response, 200), + MockUpResponse(scan_response, 200), + ], + ), + patch( + "requests.get", + return_value=MockUpResponse(final_response, 200), + ), + ] + + @classmethod + def get_extra_config(cls) -> dict: + return { + "_ipqs_api_key": "dummy_key", + "polling_interval": 0, + "max_retries": 1, + }