-
-
Notifications
You must be signed in to change notification settings - Fork 635
Feature ipqs #3431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature ipqs #3431
Changes from 14 commits
4be648e
3bc5e1d
c43b164
174232f
00b3b5a
5be04c1
24c0e90
d28b642
94d486a
a709d36
ecfbfda
2a3a33f
a6d1cbf
bb82c9e
fd02b47
7e3fc58
4f041a2
13a1606
c20eef8
e6b0c79
e3b2306
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| # This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl | ||
| # See the file 'LICENSE' for copying permission. | ||
|
|
||
| """IPQualityScore file analyzer. | ||
|
|
||
| Provides `IPQSFileScan`, a file-scanning analyzer that uploads files to | ||
| IPQualityScore for malware detection and polls for results. | ||
| """ | ||
|
|
||
| import logging | ||
|
|
||
| from api_app.analyzers_manager.classes import FileAnalyzer | ||
| from api_app.mixins import IPQualityScoreMixin | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class IPQSFileScan(FileAnalyzer, IPQualityScoreMixin): | ||
| """ | ||
| Scan a binary file using IPQualityScore malware detection service. | ||
| """ | ||
|
|
||
| @classmethod | ||
| def update(cls): | ||
| pass | ||
|
|
||
| def run(self): | ||
| binary = self.read_file_bytes() | ||
| files = {"files": (self.filename, binary)} | ||
| # lookup endpoint check for cached result | ||
| lookup_result = self._make_request( | ||
| self.lookup_endpoint, | ||
| method="POST", | ||
| _api_key=self._ipqs_api_key, | ||
| files=files, | ||
| ) | ||
| if lookup_result.get("status", False) == "cached": | ||
| lookup_result.pop("update_url", None) | ||
| return lookup_result | ||
| # sending file to ipqs for scan | ||
| scan_result = self._make_request( | ||
| self.scan_endpoint, | ||
| method="POST", | ||
| _api_key=self._ipqs_api_key, | ||
| files=files, | ||
| ) | ||
| # waiting for scan result with help of request id | ||
| result = self._poll_for_report( | ||
| endpoint=self.postback_endpoint, | ||
| _api_key=self._ipqs_api_key, | ||
| request_id=scan_result.get("request_id"), | ||
| ) | ||
| result.pop("update_url", None) | ||
| return result |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| from django.db import migrations | ||
| from django.db.models.fields.related_descriptors import ( | ||
| ForwardManyToOneDescriptor, | ||
| ForwardOneToOneDescriptor, | ||
| ManyToManyDescriptor, | ||
| ReverseManyToOneDescriptor, | ||
| ReverseOneToOneDescriptor, | ||
| ) | ||
|
|
||
| plugin = {'python_module': {'health_check_schedule': None, 'update_schedule': None, 'module': 'ipqsurl.IPQSUrlScan', 'base_path': 'api_app.analyzers_manager.observable_analyzers'}, 'name': 'IPQS_File_URL_Scanner', 'description': 'Scans files hosted or accessible via a URL using IPQualityScore’s malware detection API.', 'disabled': False, 'soft_time_limit': 60, 'routing_key': 'default', 'health_check_status': True, 'type': 'observable', 'docker_based': False, 'maximum_tlp': 'RED', 'observable_supported': ['url'], 'supported_filetypes': [], 'run_hash': False, 'run_hash_type': '', 'not_supported_filetypes': [], 'mapping_data_model': {}, 'model': 'analyzers_manager.AnalyzerConfig'} | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. max TLP must be AMBER for external services |
||
| params = [{'python_module': {'module': 'ipqsurl.IPQSUrlScan', 'base_path': 'api_app.analyzers_manager.observable_analyzers'}, 'name': 'ipqs_api_key', 'type': 'str', 'description': 'Please provide the IPQS API key.', 'is_secret': True, 'required': True}] | ||
|
||
|
|
||
| values = [] | ||
|
|
||
|
|
||
| def _get_real_obj(Model, field, value): | ||
| def _get_obj(Model, other_model, value): | ||
| if isinstance(value, dict): | ||
| real_vals = {} | ||
| for key, real_val in value.items(): | ||
| real_vals[key] = _get_real_obj(other_model, key, real_val) | ||
| value = other_model.objects.get_or_create(**real_vals)[0] | ||
| # it is just the primary key serialized | ||
| else: | ||
| if isinstance(value, int): | ||
| if Model.__name__ == "PluginConfig": | ||
| value = other_model.objects.get(name=plugin["name"]) | ||
| else: | ||
| value = other_model.objects.get(pk=value) | ||
| else: | ||
| value = other_model.objects.get(name=value) | ||
| return value | ||
|
|
||
| if ( | ||
| type(getattr(Model, field)) | ||
| in [ | ||
| ForwardManyToOneDescriptor, | ||
| ReverseManyToOneDescriptor, | ||
| ReverseOneToOneDescriptor, | ||
| ForwardOneToOneDescriptor, | ||
| ] | ||
| and value | ||
| ): | ||
| other_model = getattr(Model, field).get_queryset().model | ||
| value = _get_obj(Model, other_model, value) | ||
| elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: | ||
| other_model = getattr(Model, field).rel.model | ||
| value = [_get_obj(Model, other_model, val) for val in value] | ||
| return value | ||
|
|
||
| def _create_object(Model, data): | ||
| mtm, no_mtm = {}, {} | ||
| for field, value in data.items(): | ||
| value = _get_real_obj(Model, field, value) | ||
| if type(getattr(Model, field)) is ManyToManyDescriptor: | ||
| mtm[field] = value | ||
| else: | ||
| no_mtm[field] = value | ||
| try: | ||
| o = Model.objects.get(**no_mtm) | ||
| except Model.DoesNotExist: | ||
| o = Model(**no_mtm) | ||
| o.full_clean() | ||
| o.save() | ||
| for field, value in mtm.items(): | ||
| attribute = getattr(o, field) | ||
| if value is not None: | ||
| attribute.set(value) | ||
| return False | ||
| return True | ||
|
|
||
| def migrate(apps, schema_editor): | ||
| Parameter = apps.get_model("api_app", "Parameter") | ||
| PluginConfig = apps.get_model("api_app", "PluginConfig") | ||
| python_path = plugin.pop("model") | ||
| Model = apps.get_model(*python_path.split(".")) | ||
| if not Model.objects.filter(name=plugin["name"]).exists(): | ||
| exists = _create_object(Model, plugin) | ||
| if not exists: | ||
| for param in params: | ||
| _create_object(Parameter, param) | ||
| for value in values: | ||
| _create_object(PluginConfig, value) | ||
|
|
||
|
|
||
|
|
||
| def reverse_migrate(apps, schema_editor): | ||
| python_path = plugin.pop("model") | ||
| Model = apps.get_model(*python_path.split(".")) | ||
| Model.objects.get(name=plugin["name"]).delete() | ||
|
|
||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| atomic = False | ||
| dependencies = [ | ||
| ('api_app', '0072_update_check_system'), | ||
| ('analyzers_manager', '0177_update_urlscan_observable_supported'), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.RunPython( | ||
| migrate, reverse_migrate | ||
| ) | ||
| ] | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| from django.db import migrations | ||
| from django.db.models.fields.related_descriptors import ( | ||
| ForwardManyToOneDescriptor, | ||
| ForwardOneToOneDescriptor, | ||
| ManyToManyDescriptor, | ||
| ReverseManyToOneDescriptor, | ||
| ReverseOneToOneDescriptor, | ||
| ) | ||
|
|
||
| plugin = {'python_module': {'health_check_schedule': None, 'update_schedule': None, 'module': 'ipqsfile.IPQSFileScan', 'base_path': 'api_app.analyzers_manager.file_analyzers'}, 'name': 'IPQS_Malware_File_Scanner', 'description': 'Scan files for malware, viruses, and malicious payloads in real-time using IPQualityScore’s advanced file scanning engine.', 'disabled': False, 'soft_time_limit': 60, 'routing_key': 'default', 'health_check_status': True, 'type': 'file', 'docker_based': False, 'maximum_tlp': 'RED', 'observable_supported': [], 'supported_filetypes': ['application/w-script-file', 'application/javascript', 'application/x-javascript', 'text/javascript', 'application/x-vbscript', 'text/x-ms-iqy', 'application/vnd.android.package-archive', 'application/x-dex', 'application/onenote', 'application/zip', 'multipart/x-zip', 'application/java-archive', 'text/rtf', 'application/rtf', 'application/x-sharedlib', 'application/vnd.microsoft.portable-executable', 'application/x-elf', 'application/octet-stream', 'application/vnd.tcpdump.pcap', 'application/pdf', 'text/html', 'application/x-mspublisher', 'application/vnd.ms-excel.addin.macroEnabled', 'application/vnd.ms-excel.sheet.macroEnabled.12', 'application/vnd.ms-excel', 'application/excel', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/xml', 'text/xml', 'application/encrypted', 'text/plain', 'text/csv', 'application/vnd.openxmlformats-officedocument.presentationml.presentation', 'application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'application/vnd.ms-powerpoint', 'application/vnd.ms-office', 'application/x-binary', 'application/x-macbinary', 'application/mac-binary', 'application/x-mach-binary', 'application/x-zip-compressed', 'application/x-compressed', 'application/vnd.ms-outlook', 'message/rfc822', 'application/pkcs7-signature', 'application/x-pkcs7-signature', 'multipart/mixed', 'text/x-shellscript', 'application/x-chrome-extension', 'application/json', 'application/x-executable', 'text/x-java', 'text/x-kotlin', 'text/x-swift', 'text/x-objective-c', 'application/x-ms-shortcut', 'application/gzip'], 'run_hash': False, 'run_hash_type': '', 'not_supported_filetypes': [], 'mapping_data_model': {}, 'model': 'analyzers_manager.AnalyzerConfig'} | ||
|
|
||
| params = [{'python_module': {'module': 'ipqsfile.IPQSFileScan', 'base_path': 'api_app.analyzers_manager.file_analyzers'}, 'name': 'ipqs_api_key', 'type': 'str', 'description': 'Please provide the IPQS API key.', 'is_secret': True, 'required': True}] | ||
|
||
|
|
||
| values = [] | ||
|
|
||
|
|
||
| def _get_real_obj(Model, field, value): | ||
| def _get_obj(Model, other_model, value): | ||
| if isinstance(value, dict): | ||
| real_vals = {} | ||
| for key, real_val in value.items(): | ||
| real_vals[key] = _get_real_obj(other_model, key, real_val) | ||
| value = other_model.objects.get_or_create(**real_vals)[0] | ||
| # it is just the primary key serialized | ||
| else: | ||
| if isinstance(value, int): | ||
| if Model.__name__ == "PluginConfig": | ||
| value = other_model.objects.get(name=plugin["name"]) | ||
| else: | ||
| value = other_model.objects.get(pk=value) | ||
| else: | ||
| value = other_model.objects.get(name=value) | ||
| return value | ||
|
|
||
| if ( | ||
| type(getattr(Model, field)) | ||
| in [ | ||
| ForwardManyToOneDescriptor, | ||
| ReverseManyToOneDescriptor, | ||
| ReverseOneToOneDescriptor, | ||
| ForwardOneToOneDescriptor, | ||
| ] | ||
| and value | ||
| ): | ||
| other_model = getattr(Model, field).get_queryset().model | ||
| value = _get_obj(Model, other_model, value) | ||
| elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: | ||
| other_model = getattr(Model, field).rel.model | ||
| value = [_get_obj(Model, other_model, val) for val in value] | ||
| return value | ||
|
|
||
| def _create_object(Model, data): | ||
| mtm, no_mtm = {}, {} | ||
| for field, value in data.items(): | ||
| value = _get_real_obj(Model, field, value) | ||
| if type(getattr(Model, field)) is ManyToManyDescriptor: | ||
| mtm[field] = value | ||
| else: | ||
| no_mtm[field] = value | ||
| try: | ||
| o = Model.objects.get(**no_mtm) | ||
| except Model.DoesNotExist: | ||
| o = Model(**no_mtm) | ||
| o.full_clean() | ||
| o.save() | ||
| for field, value in mtm.items(): | ||
| attribute = getattr(o, field) | ||
| if value is not None: | ||
| attribute.set(value) | ||
| return False | ||
| return True | ||
|
|
||
| def migrate(apps, schema_editor): | ||
| Parameter = apps.get_model("api_app", "Parameter") | ||
| PluginConfig = apps.get_model("api_app", "PluginConfig") | ||
| python_path = plugin.pop("model") | ||
| Model = apps.get_model(*python_path.split(".")) | ||
| if not Model.objects.filter(name=plugin["name"]).exists(): | ||
| exists = _create_object(Model, plugin) | ||
| if not exists: | ||
| for param in params: | ||
| _create_object(Parameter, param) | ||
| for value in values: | ||
| _create_object(PluginConfig, value) | ||
|
|
||
|
|
||
|
|
||
| def reverse_migrate(apps, schema_editor): | ||
| python_path = plugin.pop("model") | ||
| Model = apps.get_model(*python_path.split(".")) | ||
| Model.objects.get(name=plugin["name"]).delete() | ||
|
|
||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| atomic = False | ||
| dependencies = [ | ||
| ('api_app', '0072_update_check_system'), | ||
| ('analyzers_manager', '0178_analyzer_config_ipqs_file_url_scanner'), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.RunPython( | ||
| migrate, reverse_migrate | ||
| ) | ||
| ] | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the description use markdown to add link to the service, this will be rendered in the GUI