1
mirror of https://github.com/mvt-project/mvt synced 2025-10-21 22:42:15 +02:00

Compare commits

..

1 Commits

Author SHA1 Message Date
Donncha Ó Cearbhaill
ac157a4421 WIP: Add inital scoffolding for multiple alerting levels in MVT 2023-11-28 13:38:58 +01:00
30 changed files with 199 additions and 73 deletions

View File

@@ -12,7 +12,7 @@ from .base import AndroidQFModule
class DumpsysAccessibility(DumpsysAccessibilityArtifact, AndroidQFModule):
"""This module analyses dumpsys accessibility"""
"""This module analyse dumpsys accessbility"""
def __init__(
self,

138
mvt/common/alerting.py Normal file
View File

@@ -0,0 +1,138 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from enum import Enum
class AlertLevel(Enum):
"""
informational: Rule is intended for enrichment of events, e.g. by tagging them. No case or alerting should be triggered by such rules because it is expected that a huge amount of events will match these rules.
low: Notable event but rarely an incident. Low rated events can be relevant in high numbers or combination with others. Immediate reaction shouldnt be necessary, but a regular review is recommended.
medium: Relevant event that should be reviewed manually on a more frequent basis.
high: Relevant event that should trigger an internal alert and requires a prompt review.
critical: Highly relevant event that indicates an incident. Critical events should be reviewed immediately.
"""
INFORMATIONAL = 0
LOW = 10
MEDIUM = 20
HIGH = 30
CRITICAL = 40
class AlertStore(object):
"""
Track all of the alerts and detections generated during an analysis.
Results can be logged as log messages or in JSON format for processing by other tools.
"""
def __init__(self) -> None:
self.alerts = []
def add_alert(
self, level, message=None, event_time=None, event=None, ioc=None, detected=True
):
"""
Add an alert to the alert store.
"""
self.alerts.append(
Alert(
level=level,
message=message,
event_time=event_time,
event=event,
ioc=ioc,
detected=detected,
)
)
def informational(
self, message=None, event_time=None, event=None, ioc=None, detected=False
):
self.add_alert(
AlertLevel.INFORMATIONAL,
message=message,
event_time=event_time,
event=event,
ioc=ioc,
detected=detected,
)
def low(self, message=None, event_time=None, event=None, ioc=None, detected=False):
self.add_alert(
AlertLevel.LOW,
message=message,
event_time=event_time,
event=event,
ioc=ioc,
detected=detected,
)
def medium(
self, message=None, event_time=None, event=None, ioc=None, detected=False
):
self.add_alert(
AlertLevel.MEDIUM,
message=message,
event_time=event_time,
event=event,
ioc=ioc,
detected=detected,
)
def high(self, message=None, event_time=None, event=None, ioc=None, detected=False):
self.add_alert(
AlertLevel.HIGH,
message=message,
event_time=event_time,
event=event,
ioc=ioc,
detected=detected,
)
def critical(
self, message=None, event_time=None, event=None, ioc=None, detected=False
):
self.add_alert(
AlertLevel.CRITICAL,
message=message,
event_time=event_time,
event=event,
ioc=ioc,
detected=detected,
)
class Alert(object):
"""
An alert generated by an MVT module.
"""
def __init__(self, level, message, event_time, event, ioc, detected):
self.level = level
self.message = message
self.event_time = event_time
self.event = event
self.ioc = ioc
self.detected = detected
def __repr__(self):
return f"<Alert level={self.level} message={self.message} event_time={self.event_time} event={self.event}>"
def __str__(self):
return f"{self.level} {self.message} {self.event_time} {self.event}"
def to_log(self):
return f"{self.level} {self.message} {self.event_time} {self.event}"
def to_json(self):
return {
"level": self.level,
"message": self.message,
"event_time": self.event_time,
"event": self.event,
"ioc": self.ioc,
"detected": self.detected,
}

View File

@@ -10,7 +10,7 @@ from .version import MVT_VERSION
def check_updates() -> None:
# First we check for MVT version updates.
# First we check for MVT version udpates.
mvt_updates = MVTUpdates()
try:
latest_version = mvt_updates.check()

View File

@@ -3,4 +3,4 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
MVT_VERSION = "2.5.0"
MVT_VERSION = "2.4.3"

View File

@@ -169,7 +169,7 @@
},
{
"identifier": "iPhone14,8",
"description": "iPhone 14 Plus"
"decription": "iPhone 14 Plus"
},
{
"identifier": "iPhone15,2",

View File

@@ -960,14 +960,6 @@
"version": "16.7.2",
"build": "20H115"
},
{
"version": "16.7.3",
"build": "20H232"
},
{
"version": "16.7.4",
"build": "20H240"
},
{
"version": "17.0",
"build": "21A327"
@@ -1003,17 +995,5 @@
{
"version": "17.1.1",
"build": "21B91"
},
{
"version": "17.1.2",
"build": "21B101"
},
{
"version": "17.2",
"build": "21C62"
},
{
"version": "17.2.1",
"build": "21C66"
}
]

View File

@@ -8,6 +8,7 @@ import io
import logging
import os
import plistlib
import sqlite3
from typing import Optional
from mvt.common.module import DatabaseNotFoundError
@@ -123,7 +124,7 @@ class Manifest(IOSExtraction):
self.log.info("Found Manifest.db database at path: %s", manifest_db_path)
conn = self._open_sqlite_db(manifest_db_path)
conn = sqlite3.connect(manifest_db_path)
cur = conn.cursor()
cur.execute("SELECT * FROM Files;")

View File

@@ -49,7 +49,7 @@ class IOSExtraction(MVTModule):
"""
# TODO: Find a better solution.
if not forced:
conn = self._open_sqlite_db(file_path)
conn = sqlite3.connect(file_path)
cur = conn.cursor()
try:
@@ -91,9 +91,6 @@ class IOSExtraction(MVTModule):
self.log.info("Database at path %s recovered successfully!", file_path)
def _open_sqlite_db(self, file_path: str) -> sqlite3.Connection:
return sqlite3.connect(f"file:{file_path}?immutable=1", uri=True)
def _get_backup_files_from_manifest(
self, relative_path: Optional[str] = None, domain: Optional[str] = None
) -> Iterator[dict]:
@@ -112,7 +109,7 @@ class IOSExtraction(MVTModule):
base_sql = "SELECT fileID, domain, relativePath FROM Files WHERE "
try:
conn = self._open_sqlite_db(manifest_db_path)
conn = sqlite3.connect(manifest_db_path)
cur = conn.cursor()
if relative_path and domain:
cur.execute(

View File

@@ -85,7 +85,7 @@ class Analytics(IOSExtraction):
def _extract_analytics_data(self):
artifact = self.file_path.split("/")[-1]
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
try:

View File

@@ -64,7 +64,7 @@ class CacheFiles(IOSExtraction):
def _process_cache_file(self, file_path):
self.log.info("Processing cache file at path: %s", file_path)
conn = self._open_sqlite_db(file_path)
conn = sqlite3.connect(file_path)
cur = conn.cursor()
try:

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import logging
import sqlite3
from typing import Optional, Union
from mvt.common.utils import convert_mactime_to_iso
@@ -60,7 +61,7 @@ class SafariFavicon(IOSExtraction):
self.detected.append(result)
def _process_favicon_db(self, file_path):
conn = self._open_sqlite_db(file_path)
conn = sqlite3.connect(file_path)
# Fetch valid icon cache.
cur = conn.cursor()

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import logging
import sqlite3
from typing import Optional, Union
from mvt.common.utils import convert_mactime_to_iso
@@ -81,7 +82,7 @@ class Calendar(IOSExtraction):
"""
Parse the calendar database
"""
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
cur.execute(

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import logging
import sqlite3
from typing import Optional, Union
from mvt.common.utils import convert_mactime_to_iso
@@ -52,7 +53,7 @@ class Calls(IOSExtraction):
)
self.log.info("Found Calls database at path: %s", self.file_path)
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
cur.execute(
"""

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import logging
import sqlite3
from typing import Optional, Union
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
@@ -65,7 +66,7 @@ class ChromeFavicon(IOSExtraction):
)
self.log.info("Found Chrome favicon cache database at path: %s", self.file_path)
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
# Fetch icon cache
cur = conn.cursor()

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import logging
import sqlite3
from typing import Optional, Union
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
@@ -66,7 +67,7 @@ class ChromeHistory(IOSExtraction):
)
self.log.info("Found Chrome history database at path: %s", self.file_path)
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
cur.execute(
"""

View File

@@ -44,7 +44,7 @@ class Contacts(IOSExtraction):
)
self.log.info("Found Contacts database at path: %s", self.file_path)
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
try:
cur.execute(

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import logging
import sqlite3
from typing import Optional, Union
from mvt.common.utils import convert_unix_to_iso
@@ -67,7 +68,7 @@ class FirefoxFavicon(IOSExtraction):
)
self.log.info("Found Firefox favicon database at path: %s", self.file_path)
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
cur.execute(
"""

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import logging
import sqlite3
from typing import Optional, Union
from mvt.common.utils import convert_unix_to_iso
@@ -67,7 +68,7 @@ class FirefoxHistory(IOSExtraction):
)
self.log.info("Found Firefox history database at path: %s", self.file_path)
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
cur.execute(
"""

View File

@@ -280,7 +280,7 @@ class InteractionC(IOSExtraction):
)
self.log.info("Found InteractionC database at path: %s", self.file_path)
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
try:

View File

@@ -76,7 +76,7 @@ class SafariBrowserState(IOSExtraction):
def _process_browser_state_db(self, db_path):
self._recover_sqlite_db_if_needed(db_path)
conn = self._open_sqlite_db(db_path)
conn = sqlite3.connect(db_path)
cur = conn.cursor()
try:

View File

@@ -5,6 +5,7 @@
import logging
import os
import sqlite3
from typing import Optional, Union
from mvt.common.url import URL
@@ -114,7 +115,7 @@ class SafariHistory(IOSExtraction):
def _process_history_db(self, history_path):
self._recover_sqlite_db_if_needed(history_path)
conn = self._open_sqlite_db(history_path)
conn = sqlite3.connect(history_path)
cur = conn.cursor()
cur.execute(
"""

View File

@@ -83,7 +83,7 @@ class Shortcuts(IOSExtraction):
)
self.log.info("Found Shortcuts database at path: %s", self.file_path)
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
conn.text_factory = bytes
cur = conn.cursor()
try:

View File

@@ -44,33 +44,29 @@ class SMS(IOSExtraction):
def serialize(self, record: dict) -> Union[dict, list]:
text = record["text"].replace("\n", "\\n")
sms_data = f"{record['service']}: {record['guid']} \"{text}\" from {record['phone_number']} ({record['account']})"
records = [
return [
{
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "sms_received",
"data": sms_data,
},
{
"timestamp": record["isodate_read"],
"module": self.__class__.__name__,
"event": "sms_read",
"data": sms_data,
},
]
# If the message was read, we add an extra event.
if record["isodate_read"]:
records.append(
{
"timestamp": record["isodate_read"],
"module": self.__class__.__name__,
"event": "sms_read",
"data": sms_data,
}
)
return records
def check_indicators(self) -> None:
for message in self.results:
alert = "ALERT: State-sponsored attackers may be targeting your iPhone"
if message.get("text", "").startswith(alert):
self.log.warning(
"Apple warning about state-sponsored attack received on the %s",
message["isodate"],
self.alerts.medium(
f"Apple warning about state-sponsored attack received on the {message['isodate']}",
event_time=message["isodate"],
event=message,
)
if not self.indicators:
@@ -91,7 +87,7 @@ class SMS(IOSExtraction):
self.log.info("Found SMS database at path: %s", self.file_path)
try:
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
cur.execute(
"""
@@ -108,7 +104,7 @@ class SMS(IOSExtraction):
conn.close()
if "database disk image is malformed" in str(exc):
self._recover_sqlite_db_if_needed(self.file_path, forced=True)
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
cur.execute(
"""

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import logging
import sqlite3
from base64 import b64encode
from typing import Optional, Union
@@ -55,10 +56,6 @@ class SMSAttachments(IOSExtraction):
def check_indicators(self) -> None:
for attachment in self.results:
# Check for known malicious filenames.
if self.indicators.check_file_path(attachment["filename"]):
self.detected.append(attachment)
if (
attachment["filename"].startswith("/var/tmp/")
and attachment["filename"].endswith("-1")
@@ -75,7 +72,7 @@ class SMSAttachments(IOSExtraction):
self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS)
self.log.info("Found SMS database at path: %s", self.file_path)
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
cur.execute(
"""

View File

@@ -95,7 +95,7 @@ class TCC(IOSExtraction):
self.detected.append(result)
def process_db(self, file_path):
conn = self._open_sqlite_db(file_path)
conn = sqlite3.connect(file_path)
cur = conn.cursor()
db_version = "v3"
try:

View File

@@ -73,7 +73,7 @@ class WebkitResourceLoadStatistics(IOSExtraction):
self._recover_sqlite_db_if_needed(db_path)
conn = self._open_sqlite_db(db_path)
conn = sqlite3.connect(db_path)
cur = conn.cursor()
try:

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import logging
import sqlite3
from typing import Optional, Union
from mvt.common.utils import check_for_links, convert_mactime_to_iso
@@ -68,7 +69,7 @@ class Whatsapp(IOSExtraction):
)
self.log.info("Found WhatsApp database at path: %s", self.file_path)
conn = self._open_sqlite_db(self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
# Query all messages and join tables which can contain media attachments

View File

@@ -17,7 +17,7 @@ class TestSMSModule:
m = SMS(target_path=get_ios_backup_folder())
run_module(m)
assert len(m.results) == 1
assert len(m.timeline) == 2
assert len(m.timeline) == 2 # SMS received and read events.
assert len(m.detected) == 0
def test_detection(self, indicator_file):

View File

@@ -4,23 +4,31 @@
# https://license.mvt.re/1.1/
import logging
import pytest
from mvt.common.indicators import Indicators
from mvt.common.module import run_module
from mvt.ios.modules.fs.filesystem import Filesystem
from ..utils import get_ios_backup_folder
from ..utils import delete_tmp_db_files, get_ios_backup_folder
@pytest.fixture()
def cleanup_tmp_artifacts():
ios_backup_folder = get_ios_backup_folder()
delete_tmp_db_files(ios_backup_folder)
return
class TestFilesystem:
def test_filesystem(self):
def test_filesystem(self, cleanup_tmp_artifacts):
m = Filesystem(target_path=get_ios_backup_folder())
run_module(m)
assert len(m.results) == 15
assert len(m.timeline) == 15
assert len(m.detected) == 0
def test_detection(self, indicator_file):
def test_detection(self, indicator_file, cleanup_tmp_artifacts):
m = Filesystem(target_path=get_ios_backup_folder())
ind = Indicators(log=logging.getLogger())
ind.parse_stix2(indicator_file)