Skip to content
47 changes: 47 additions & 0 deletions api_app/analyzers_manager/file_analyzers/ipqsfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.

import logging

from api_app.analyzers_manager.classes import FileAnalyzer
from api_app.mixins import IPQualityScoreMixin

logger = logging.getLogger(__name__)


class IPQSFileScan(FileAnalyzer, IPQualityScoreMixin):
"""
Scan a binary file using IPQualityScore malware detection service.
"""

scan_url = "/malware/scan/"
lookup_url = "/malware/lookup/"
postback_url = "/postback/"

def run(self):
try:
binary = self.read_file_bytes()
files = {"files": (self.filename, binary)}

# lookup endpoint check for cached result
lookup_result = self._make_request(
self.lookup_url, method="POST", _api_key=self._ipqs_api_key, files=files
)
if lookup_result.get("status", False) == "cached":
lookup_result.pop("update_url", None)
return lookup_result
# sending file to ipqs for scan
scan_result = self._make_request(
self.scan_url, method="POST", _api_key=self._ipqs_api_key, files=files
)
# waiting for scan result with help of request id
result = self._poll_for_report(
endpoint=self.postback_url,
_api_key=self._ipqs_api_key,
request_id=scan_result.get("request_id"),
)
result.pop("update_url", None)
return result
except Exception as e:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generic try/except is not required, it is already handled by the framework

logger.error(f"Error occurred while scanning file with IPQS: {e}")
raise
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
)

plugin = {'python_module': {'health_check_schedule': None, 'update_schedule': None, 'module': 'ipqsurl.IPQSUrlScan', 'base_path': 'api_app.analyzers_manager.observable_analyzers'}, 'name': 'IPQS_URL_File_Scanner', 'description': 'Scans files hosted or accessible via a URL using IPQualityScore’s malware detection.', 'disabled': False, 'soft_time_limit': 60, 'routing_key': 'default', 'health_check_status': True, 'type': 'observable', 'docker_based': False, 'maximum_tlp': 'RED', 'observable_supported': ['url'], 'supported_filetypes': [], 'run_hash': False, 'run_hash_type': '', 'not_supported_filetypes': [], 'mapping_data_model': {}, 'model': 'analyzers_manager.AnalyzerConfig'}

params = [{'python_module': {'module': 'ipqsurl.IPQSUrlScan', 'base_path': 'api_app.analyzers_manager.observable_analyzers'}, 'name': 'ipqs_api_key', 'type': 'str', 'description': 'Please provide the IPQS Api key.', 'is_secret': True, 'required': True}]

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [
ForwardManyToOneDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
ForwardOneToOneDescriptor,
]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value

def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True

def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)



def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()



class Migration(migrations.Migration):
atomic = False
dependencies = [
('api_app', '0071_delete_last_elastic_report'),
('analyzers_manager', '0172_analyzer_config_hibppasswords'),
]

operations = [
migrations.RunPython(
migrate, reverse_migrate
)
]


Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
)

plugin = {'python_module': {'health_check_schedule': None, 'update_schedule': None, 'module': 'ipqsfile.IPQSFileScan', 'base_path': 'api_app.analyzers_manager.file_analyzers'}, 'name': 'IPQS_Malware_File_Scanner', 'description': 'Scan files for malware, viruses, and malicious payloads in real-time using IPQualityScore’s advanced file scanning engine.', 'disabled': False, 'soft_time_limit': 60, 'routing_key': 'default', 'health_check_status': True, 'type': 'file', 'docker_based': False, 'maximum_tlp': 'RED', 'observable_supported': [], 'supported_filetypes': ['application/w-script-file', 'application/javascript', 'application/x-javascript', 'text/javascript', 'application/x-vbscript', 'text/x-ms-iqy', 'application/vnd.android.package-archive', 'application/x-dex', 'application/onenote', 'application/zip', 'multipart/x-zip', 'application/java-archive', 'text/rtf', 'application/rtf', 'application/x-sharedlib', 'application/vnd.microsoft.portable-executable', 'application/x-elf', 'application/octet-stream', 'application/vnd.tcpdump.pcap', 'application/pdf', 'text/html', 'application/x-mspublisher', 'application/vnd.ms-excel.addin.macroEnabled', 'application/vnd.ms-excel.sheet.macroEnabled.12', 'application/vnd.ms-excel', 'application/excel', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/xml', 'text/xml', 'application/encrypted', 'text/plain', 'text/csv', 'application/vnd.openxmlformats-officedocument.presentationml.presentation', 'application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'application/vnd.ms-powerpoint', 'application/vnd.ms-office', 'application/x-binary', 'application/x-macbinary', 'application/mac-binary', 'application/x-mach-binary', 'application/x-zip-compressed', 'application/x-compressed', 'application/vnd.ms-outlook', 'message/rfc822', 'application/pkcs7-signature', 'application/x-pkcs7-signature', 'multipart/mixed', 'text/x-shellscript', 'application/x-chrome-extension', 'application/json', 'application/x-executable', 'text/x-java', 'text/x-kotlin', 'text/x-swift', 'text/x-objective-c', 'application/x-ms-shortcut', 'application/gzip'], 'run_hash': False, 'run_hash_type': '', 'not_supported_filetypes': [], 'mapping_data_model': {}, 'model': 'analyzers_manager.AnalyzerConfig'}

params = [{'python_module': {'module': 'ipqsfile.IPQSFileScan', 'base_path': 'api_app.analyzers_manager.file_analyzers'}, 'name': 'ipqs_api_key', 'type': 'str', 'description': 'Please provide the IPQS Api key.', 'is_secret': True, 'required': True}]

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [
ForwardManyToOneDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
ForwardOneToOneDescriptor,
]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value

def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True

def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)



def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()



class Migration(migrations.Migration):
atomic = False
dependencies = [
('api_app', '0071_delete_last_elastic_report'),
('analyzers_manager', '0173_analyzer_config_ipqs_url_file_scanner'),
]

operations = [
migrations.RunPython(
migrate, reverse_migrate
)
]


Loading
Loading