Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cortexutils/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# encoding: utf-8
# -*- coding: utf-8 -*-

import os
import tempfile
Expand All @@ -11,7 +11,7 @@

class Analyzer(Worker):
def __init__(self, job_directory=None, secret_phrases=None):
Worker.__init__(self, job_directory, secret_phrases)
super().__init__(job_directory, secret_phrases)

# Not breaking compatibility
self.artifact = self._input
Expand All @@ -31,15 +31,15 @@ def get_data(self):
return self.get_param("data", None, "Missing data field")

def get_param(self, name, default=None, message=None):
data = super(Analyzer, self).get_param(name, default, message)
data = super().get_param(name, default, message)
if (
name == "file"
and self.data_type == "file"
and self.job_directory is not None
):
path = "%s/input/%s" % (self.job_directory, data)
if os.path.isfile(path):
return path
input_path = os.path.join(self.job_directory, "input", data)
if os.path.isfile(input_path):
return input_path
else:
return data

Expand Down Expand Up @@ -117,7 +117,7 @@ def report(self, full_report, ensure_ascii=False):
operation_list = self.operations(full_report)
except Exception:
pass # nosec B110
super(Analyzer, self).report(
super().report(
{
"success": True,
"summary": summary,
Expand Down
13 changes: 7 additions & 6 deletions cortexutils/extractor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
from builtins import str as unicode


class ExtractionError(Exception):
Expand Down Expand Up @@ -67,7 +68,7 @@ def __init_regex():
+ "(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])"
+ ")"
)
regex.append({"type": "ip", "regex": re.compile(r"{}".format(r))})
regex.append({"type": "ip", "regex": re.compile(r)})
Comment thread
JuanTecedor marked this conversation as resolved.

# URL
regex.append({"type": "url", "regex": re.compile(r"^(http://|https://)")})
Expand Down Expand Up @@ -95,7 +96,7 @@ def __init_regex():
{
"type": "user-agent",
"regex": re.compile(
r"^(Mozilla/[45]\.0 |AppleWebKit/[0-9]{3}\.[0-9]{2} |Chrome/[0-9]{2}\.[0-9]\." # noqa
r"^(Mozilla/[45]\.0 |AppleWebKit/[0-9]{3}\.[0-9]{2} |Chrome/[0-9]{2}\.[0-9]\." # noqa: E501
r"[0-9]{4}\.[0-9]{3} |Safari/[0-9]{3}\.[0-9]{2} ).*?$"
),
}
Expand All @@ -115,7 +116,7 @@ def __init_regex():
"type": "registry",
"regex": re.compile(
r"^(HKEY|HKLM|HKCU|HKCR|HKCC)"
r"(_LOCAL_MACHINE|_CURRENT_USER|_CURRENT_CONFIG|_CLASSES_ROOT|)[\\a-zA-Z0-9]+$" # noqa
r"(_LOCAL_MACHINE|_CURRENT_USER|_CURRENT_CONFIG|_CLASSES_ROOT|)[\\a-zA-Z0-9]+$" # noqa: E501
),
}
)
Expand Down Expand Up @@ -149,7 +150,7 @@ def __checktype(self, value):
if self.ignore == value:
return ""

if isinstance(value, (str, unicode)):
if isinstance(value, str):
for r in self.regex:
if r.get("regex").match(value):
return r.get("type")
Expand Down Expand Up @@ -179,7 +180,7 @@ def check_iterable(self, iterable):
"""
results = []
# Only the string left
if isinstance(iterable, (str, unicode)):
if isinstance(iterable, str):
dt = self.__checktype(iterable)
if len(dt) > 0:
results.append({"dataType": dt, "data": iterable})
Expand Down
6 changes: 3 additions & 3 deletions cortexutils/responder.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/usr/bin/env python
# encoding: utf-8
# -*- coding: utf-8 -*-

from cortexutils.worker import Worker


class Responder(Worker):
def __init__(self, job_directory=None, secret_phrases=None):
Worker.__init__(self, job_directory, secret_phrases)
super().__init__(job_directory, secret_phrases)

# Not breaking compatibility
self.artifact = self._input
Expand All @@ -28,7 +28,7 @@ def report(self, full_report, ensure_ascii=False):
operation_list = self.operations(full_report)
except Exception:
pass # nosec B110
super(Responder, self).report(
super().report(
{"success": True, "full": full_report, "operations": operation_list},
ensure_ascii,
)
Expand Down
30 changes: 11 additions & 19 deletions cortexutils/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# encoding: utf-8
# -*- coding: utf-8 -*-

import codecs
import json
Expand All @@ -9,7 +9,7 @@
DEFAULT_SECRET_PHRASES = ("key", "password", "secret")


class Worker(object):
class Worker:
READ_TIMEOUT = 3 # seconds

def __init__(self, job_directory, secret_phrases):
Expand All @@ -25,8 +25,9 @@ def __init__(self, job_directory, secret_phrases):
self.secret_phrases = secret_phrases
# Load input
self._input = {}
if os.path.isfile("%s/input/input.json" % self.job_directory):
with open("%s/input/input.json" % self.job_directory) as f_input:
input_path = os.path.join(self.job_directory, "input", "input.json")
if os.path.isfile(input_path):
with open(input_path) as f_input:
self._input = json.load(f_input)
else:
# If input file doesn't exist,
Expand Down Expand Up @@ -72,15 +73,9 @@ def __set_proxies(self):
def __set_encoding():
try:
if sys.stdout.encoding != "UTF-8":
if sys.version_info[0] == 3:
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.buffer, "strict")
else:
sys.stdout = codecs.getwriter("utf-8")(sys.stdout, "strict")
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.buffer, "strict")
if sys.stderr.encoding != "UTF-8":
if sys.version_info[0] == 3:
sys.stderr = codecs.getwriter("utf-8")(sys.stderr.buffer, "strict")
else:
sys.stderr = codecs.getwriter("utf-8")(sys.stderr, "strict")
sys.stderr = codecs.getwriter("utf-8")(sys.stderr.buffer, "strict")
except Exception:
pass # nosec B110

Expand Down Expand Up @@ -123,13 +118,10 @@ def __write_output(self, data, ensure_ascii=False):
if self.job_directory is None:
json.dump(data, sys.stdout, ensure_ascii=ensure_ascii)
else:
try:
os.makedirs("%s/output" % self.job_directory)
except Exception:
pass # nosec B110
with open(
"%s/output/output.json" % self.job_directory, mode="w"
) as f_output:
output_dir = os.path.join(self.job_directory, "output")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "output.json")
with open(output_path, mode="w") as f_output:
json.dump(data, f_output, ensure_ascii=ensure_ascii)

def get_data(self):
Expand Down
32 changes: 16 additions & 16 deletions tests/test_suite_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,29 @@
#!/usr/bin/env python
# coding: utf-8
# -*- coding: utf-8 -*-

import os
import sys
import json
import unittest

from io import open
from cortexutils.analyzer import Analyzer

# Different lib when using python3 or 2
if sys.version_info >= (3, 0):
from io import StringIO
else:
from StringIO import StringIO
from io import StringIO


def load_test_fixture(fixture_path):
path = os.path.dirname(os.path.abspath(__file__))
fixture_file = open(path + "/" + fixture_path)
input = fixture_file.read()
fixture_file.close()
tests_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(tests_dir, fixture_path)
with open(file_path) as fixture_file:
input = fixture_file.read()
sys.stdin = StringIO(input)
sys.stdout = StringIO()


class TestMinimalConfig(unittest.TestCase):
def setUp(self):
load_test_fixture("fixtures/test-minimal-config.json")
fixture_path = os.path.join("fixtures", "test-minimal-config.json")
load_test_fixture(fixture_path)
self.analyzer = Analyzer()

def test_default_config(self):
Expand All @@ -49,7 +45,8 @@ def test_params_data(self):

class TestProxyConfig(unittest.TestCase):
def setUp(self):
load_test_fixture("fixtures/test-proxy-config.json")
fixture_path = os.path.join("fixtures", "test-proxy-config.json")
load_test_fixture(fixture_path)
self.analyzer = Analyzer()

def test_proxy_config(self):
Expand All @@ -64,7 +61,8 @@ def test_proxy_config(self):

class TestTlpConfig(unittest.TestCase):
def setUp(self):
load_test_fixture("fixtures/test-tlp-config.json")
fixture_path = os.path.join("fixtures", "test-tlp-config.json")
load_test_fixture(fixture_path)
self.analyzer = Analyzer()

def test_check_tlp_disabled(self):
Expand Down Expand Up @@ -95,7 +93,8 @@ def test_check_tlp_ok(self):

class TestErrorResponse(unittest.TestCase):
def setUp(self):
load_test_fixture("fixtures/test-error-response.json")
fixture_path = os.path.join("fixtures", "test-error-response.json")
load_test_fixture(fixture_path)
self.analyzer = Analyzer()

def test_error_response(self):
Expand Down Expand Up @@ -130,7 +129,8 @@ def test_error_response(self):

class TestReportResponse(unittest.TestCase):
def setUp(self):
load_test_fixture("fixtures/test-report-response.json")
fixture_path = os.path.join("fixtures", "test-report-response.json")
load_test_fixture(fixture_path)
self.analyzer = Analyzer()

def test_report_response(self):
Expand Down
9 changes: 2 additions & 7 deletions tests/test_suite_extractor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
This contains the unit tests for the extractor.
"""
Expand All @@ -21,13 +23,6 @@ def test_single_fqdn(self):
"FQDN single string: wrong data type.",
)

def test_single_fqdn_as_unicode(self):
self.assertEqual(
self.extractor.check_string(value="www.google.de"),
"fqdn",
"FQDN single string: wrong data type.",
)

def test_single_domain(self):
self.assertEqual(
self.extractor.check_string(value="google.de"),
Expand Down
9 changes: 3 additions & 6 deletions tests/test_suite_integration.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
#!/usr/bin/env python
# coding: utf-8
# -*- coding: utf-8 -*-

import json
import unittest
import sys

from cortexutils.analyzer import Analyzer

# Different lib when using python3 or 2
if sys.version_info >= (3, 0):
from io import StringIO
else:
from StringIO import StringIO
from io import StringIO


class AnalyzerExtractorOutputTest(unittest.TestCase):
Expand Down
Loading