mirror of
https://github.com/mvt-project/mvt
synced 2025-10-21 22:42:15 +02:00
Compare commits
1 Commits
v2.1
...
ios_lockdo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a30d7b2871 |
3
Makefile
3
Makefile
@@ -8,6 +8,3 @@ dist:
|
||||
|
||||
upload:
|
||||
python3 -m twine upload dist/*
|
||||
|
||||
test-upload:
|
||||
python3 -m twine upload --repository testpypi dist/*
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
import click
|
||||
from rich.logging import RichHandler
|
||||
@@ -70,7 +71,15 @@ def download_apks(ctx, all_apks, virustotal, output, from_file, serial):
|
||||
log.critical("You need to specify an output folder with --output!")
|
||||
ctx.exit(1)
|
||||
|
||||
download = DownloadAPKs(results_path=output, all_apks=all_apks)
|
||||
if not os.path.exists(output):
|
||||
try:
|
||||
os.makedirs(output)
|
||||
except Exception as e:
|
||||
log.critical("Unable to create output folder %s: %s", output, e)
|
||||
ctx.exit(1)
|
||||
|
||||
download = DownloadAPKs(output_folder=output, all_apks=all_apks,
|
||||
log=logging.getLogger(DownloadAPKs.__module__))
|
||||
if serial:
|
||||
download.serial = serial
|
||||
download.run()
|
||||
|
||||
@@ -9,7 +9,6 @@ import os
|
||||
import sys
|
||||
import tarfile
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
from rich.prompt import Prompt
|
||||
|
||||
@@ -39,7 +38,7 @@ class CmdAndroidCheckBackup(Command):
|
||||
self.backup_archive = None
|
||||
self.backup_files = []
|
||||
|
||||
def init(self) -> None:
|
||||
def init(self):
|
||||
if os.path.isfile(self.target_path):
|
||||
self.backup_type = "ab"
|
||||
with open(self.target_path, "rb") as handle:
|
||||
@@ -78,7 +77,7 @@ class CmdAndroidCheckBackup(Command):
|
||||
log.critical("Invalid backup path, path should be a folder or an Android Backup (.ab) file")
|
||||
sys.exit(1)
|
||||
|
||||
def module_init(self, module: Callable) -> None:
|
||||
def module_init(self, module):
|
||||
if self.backup_type == "folder":
|
||||
module.from_folder(self.target_path, self.backup_files)
|
||||
else:
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
from zipfile import ZipFile
|
||||
|
||||
from mvt.common.command import Command
|
||||
@@ -32,7 +31,7 @@ class CmdAndroidCheckBugreport(Command):
|
||||
self.bugreport_archive = None
|
||||
self.bugreport_files = []
|
||||
|
||||
def init(self) -> None:
|
||||
def init(self):
|
||||
if os.path.isfile(self.target_path):
|
||||
self.bugreport_format = "zip"
|
||||
self.bugreport_archive = ZipFile(self.target_path)
|
||||
@@ -45,7 +44,7 @@ class CmdAndroidCheckBugreport(Command):
|
||||
for file_name in subfiles:
|
||||
self.bugreport_files.append(os.path.relpath(os.path.join(root, file_name), parent_path))
|
||||
|
||||
def module_init(self, module: Callable) -> None:
|
||||
def module_init(self, module):
|
||||
if self.bugreport_format == "zip":
|
||||
module.from_zip(self.bugreport_archive, self.bugreport_files)
|
||||
else:
|
||||
|
||||
@@ -6,9 +6,8 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Callable
|
||||
|
||||
from rich.progress import track
|
||||
from tqdm import tqdm
|
||||
|
||||
from mvt.common.module import InsufficientPrivileges
|
||||
|
||||
@@ -18,6 +17,18 @@ from .modules.adb.packages import Packages
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# TODO: Would be better to replace tqdm with rich.progress to reduce
|
||||
# the number of dependencies. Need to investigate whether
|
||||
# it's possible to have a similar callback system.
|
||||
class PullProgress(tqdm):
|
||||
"""PullProgress is a tqdm update system for APK downloads."""
|
||||
|
||||
def update_to(self, file_name, current, total):
|
||||
if total is not None:
|
||||
self.total = total
|
||||
self.update(current - self.n)
|
||||
|
||||
|
||||
class DownloadAPKs(AndroidExtraction):
|
||||
"""DownloadAPKs is the main class operating the download of APKs
|
||||
from the device.
|
||||
@@ -25,22 +36,23 @@ class DownloadAPKs(AndroidExtraction):
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, results_path: str = "", all_apks: bool = False,
|
||||
packages: list = []):
|
||||
def __init__(self, output_folder=None, all_apks=False, log=None,
|
||||
packages=None):
|
||||
"""Initialize module.
|
||||
:param results_path: Path to the folder where data should be stored
|
||||
:param output_folder: Path to the folder where data should be stored
|
||||
:param all_apks: Boolean indicating whether to download all packages
|
||||
or filter known-goods
|
||||
:param packages: Provided list of packages, typically for JSON checks
|
||||
"""
|
||||
super().__init__(results_path=results_path, log=log)
|
||||
super().__init__(log=log)
|
||||
|
||||
self.packages = packages
|
||||
self.all_apks = all_apks
|
||||
self.results_path_apks = None
|
||||
self.output_folder_apk = None
|
||||
self.output_folder = output_folder
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_path: str) -> Callable:
|
||||
def from_json(cls, json_path):
|
||||
"""Initialize this class from an existing apks.json file.
|
||||
|
||||
:param json_path: Path to the apks.json file to parse.
|
||||
@@ -50,7 +62,7 @@ class DownloadAPKs(AndroidExtraction):
|
||||
packages = json.load(handle)
|
||||
return cls(packages=packages)
|
||||
|
||||
def pull_package_file(self, package_name: str, remote_path: str) -> None:
|
||||
def pull_package_file(self, package_name, remote_path):
|
||||
"""Pull files related to specific package from the device.
|
||||
|
||||
:param package_name: Name of the package to download
|
||||
@@ -64,7 +76,7 @@ class DownloadAPKs(AndroidExtraction):
|
||||
if "==/" in remote_path:
|
||||
file_name = "_" + remote_path.split("==/")[1].replace(".apk", "")
|
||||
|
||||
local_path = os.path.join(self.results_path_apks,
|
||||
local_path = os.path.join(self.output_folder_apk,
|
||||
f"{package_name}{file_name}.apk")
|
||||
name_counter = 0
|
||||
while True:
|
||||
@@ -72,14 +84,17 @@ class DownloadAPKs(AndroidExtraction):
|
||||
break
|
||||
|
||||
name_counter += 1
|
||||
local_path = os.path.join(self.results_path_apks,
|
||||
local_path = os.path.join(self.output_folder_apk,
|
||||
f"{package_name}{file_name}_{name_counter}.apk")
|
||||
|
||||
try:
|
||||
self._adb_download(remote_path, local_path)
|
||||
with PullProgress(unit='B', unit_divisor=1024, unit_scale=True,
|
||||
miniters=1) as pp:
|
||||
self._adb_download(remote_path, local_path,
|
||||
progress_callback=pp.update_to)
|
||||
except InsufficientPrivileges:
|
||||
log.error("Unable to pull package file from %s: insufficient privileges, it might be a system app",
|
||||
remote_path)
|
||||
log.warn("Unable to pull package file from %s: insufficient privileges, it might be a system app",
|
||||
remote_path)
|
||||
self._adb_reconnect()
|
||||
return None
|
||||
except Exception as e:
|
||||
@@ -90,9 +105,11 @@ class DownloadAPKs(AndroidExtraction):
|
||||
|
||||
return local_path
|
||||
|
||||
def get_packages(self) -> None:
|
||||
def get_packages(self):
|
||||
"""Use the Packages adb module to retrieve the list of packages.
|
||||
We reuse the same extraction logic to then download the APKs.
|
||||
|
||||
|
||||
"""
|
||||
self.log.info("Retrieving list of installed packages...")
|
||||
|
||||
@@ -103,11 +120,12 @@ class DownloadAPKs(AndroidExtraction):
|
||||
|
||||
self.packages = m.results
|
||||
|
||||
def pull_packages(self) -> None:
|
||||
"""Download all files of all selected packages from the device.
|
||||
"""
|
||||
log.info("Starting extraction of installed APKs at folder %s",
|
||||
self.results_path)
|
||||
def pull_packages(self):
|
||||
"""Download all files of all selected packages from the device."""
|
||||
log.info("Starting extraction of installed APKs at folder %s", self.output_folder)
|
||||
|
||||
if not os.path.exists(self.output_folder):
|
||||
os.mkdir(self.output_folder)
|
||||
|
||||
# If the user provided the flag --all-apks we select all packages.
|
||||
packages_selection = []
|
||||
@@ -121,7 +139,7 @@ class DownloadAPKs(AndroidExtraction):
|
||||
if not package.get("system", False):
|
||||
packages_selection.append(package)
|
||||
|
||||
log.info("Selected only %d packages which are not marked as \"system\"",
|
||||
log.info("Selected only %d packages which are not marked as system",
|
||||
len(packages_selection))
|
||||
|
||||
if len(packages_selection) == 0:
|
||||
@@ -130,15 +148,15 @@ class DownloadAPKs(AndroidExtraction):
|
||||
|
||||
log.info("Downloading packages from device. This might take some time ...")
|
||||
|
||||
self.results_path_apks = os.path.join(self.results_path, "apks")
|
||||
if not os.path.exists(self.results_path_apks):
|
||||
os.makedirs(self.results_path_apks, exist_ok=True)
|
||||
self.output_folder_apk = os.path.join(self.output_folder, "apks")
|
||||
if not os.path.exists(self.output_folder_apk):
|
||||
os.mkdir(self.output_folder_apk)
|
||||
|
||||
for i in track(range(len(packages_selection)),
|
||||
description=f"Downloading {len(packages_selection)} packages..."):
|
||||
package = packages_selection[i]
|
||||
counter = 0
|
||||
for package in packages_selection:
|
||||
counter += 1
|
||||
|
||||
log.info("[%d/%d] Package: %s", i, len(packages_selection),
|
||||
log.info("[%d/%d] Package: %s", counter, len(packages_selection),
|
||||
package["package_name"])
|
||||
|
||||
# Sometimes the package path contains multiple lines for multiple apks.
|
||||
@@ -154,12 +172,14 @@ class DownloadAPKs(AndroidExtraction):
|
||||
|
||||
log.info("Download of selected packages completed")
|
||||
|
||||
def save_json(self) -> None:
|
||||
json_path = os.path.join(self.results_path, "apks.json")
|
||||
def save_json(self):
|
||||
"""Save the results to the package.json file."""
|
||||
json_path = os.path.join(self.output_folder, "apks.json")
|
||||
with open(json_path, "w", encoding="utf-8") as handle:
|
||||
json.dump(self.packages, handle, indent=4)
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run all steps of fetch-apk."""
|
||||
self.get_packages()
|
||||
self._adb_connect()
|
||||
self.pull_packages()
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import stat
|
||||
|
||||
from mvt.common.utils import convert_timestamp_to_iso
|
||||
@@ -14,15 +13,6 @@ from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
ANDROID_TMP_FOLDERS = [
|
||||
"/tmp/",
|
||||
"/data/local/tmp/",
|
||||
]
|
||||
ANDROID_MEDIA_FOLDERS = [
|
||||
"/data/media/0",
|
||||
"/sdcard/",
|
||||
]
|
||||
|
||||
|
||||
class Files(AndroidExtraction):
|
||||
"""This module extracts the list of files on the device."""
|
||||
@@ -35,50 +25,13 @@ class Files(AndroidExtraction):
|
||||
log=log, results=results)
|
||||
self.full_find = False
|
||||
|
||||
def serialize(self, record: dict) -> None:
|
||||
if "modified_time" in record:
|
||||
return {
|
||||
"timestamp": record["modified_time"],
|
||||
"module": self.__class__.__name__,
|
||||
"event": "file_modified",
|
||||
"data": record["path"],
|
||||
}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
for result in self.results:
|
||||
if result.get("is_suid"):
|
||||
self.log.warning("Found an SUID file in a non-standard directory \"%s\".",
|
||||
result["path"])
|
||||
|
||||
if self.indicators and self.indicators.check_file_path(result["path"]):
|
||||
self.log.warning("Found a known suspicous file at path: \"%s\"", result["path"])
|
||||
self.detected.append(result)
|
||||
|
||||
def backup_file(self, file_path: str) -> None:
|
||||
local_file_name = file_path.replace("/", "_").replace(" ", "-")
|
||||
local_files_folder = os.path.join(self.results_path, "files")
|
||||
if not os.path.exists(local_files_folder):
|
||||
os.mkdir(local_files_folder)
|
||||
|
||||
local_file_path = os.path.join(local_files_folder, local_file_name)
|
||||
|
||||
try:
|
||||
self._adb_download(remote_path=file_path,
|
||||
local_path=local_file_path)
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
self.log.info("Downloaded file %s to local copy at %s",
|
||||
file_path, local_file_path)
|
||||
|
||||
def find_files(self, folder: str) -> None:
|
||||
if self.full_find:
|
||||
output = self._adb_command(f"find '{folder}' -type f -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
|
||||
output = self._adb_command(f"find '{folder}' -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
|
||||
|
||||
for file_line in output.splitlines():
|
||||
[unix_timestamp, mode, size, owner, group, full_path] = file_line.rstrip().split(" ", 5)
|
||||
mod_time = convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(float(unix_timestamp))))
|
||||
|
||||
self.results.append({
|
||||
"path": full_path,
|
||||
"modified_time": mod_time,
|
||||
@@ -90,10 +43,39 @@ class Files(AndroidExtraction):
|
||||
"group": group,
|
||||
})
|
||||
else:
|
||||
output = self._adb_command(f"find '{folder}' -type f 2> /dev/null")
|
||||
output = self._adb_command(f"find '{folder}' 2> /dev/null")
|
||||
for file_line in output.splitlines():
|
||||
self.results.append({"path": file_line.rstrip()})
|
||||
|
||||
def serialize(self, record: dict) -> None:
|
||||
if "modified_time" in record:
|
||||
return {
|
||||
"timestamp": record["modified_time"],
|
||||
"module": self.__class__.__name__,
|
||||
"event": "file_modified",
|
||||
"data": record["path"],
|
||||
}
|
||||
|
||||
def check_suspicious(self) -> None:
|
||||
"""Check for files with suspicious permissions"""
|
||||
for result in sorted(self.results, key=lambda item: item["path"]):
|
||||
if result.get("is_suid"):
|
||||
self.log.warning("Found an SUID file in a non-standard directory \"%s\".",
|
||||
result["path"])
|
||||
self.detected.append(result)
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
"""Check file list for known suspicious files or suspicious properties"""
|
||||
self.check_suspicious()
|
||||
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
if self.indicators.check_file_path(result["path"]):
|
||||
self.log.warning("Found a known suspicous file at path: \"%s\"", result["path"])
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
|
||||
@@ -101,27 +83,16 @@ class Files(AndroidExtraction):
|
||||
if output or output.strip().splitlines():
|
||||
self.full_find = True
|
||||
|
||||
for tmp_folder in ANDROID_TMP_FOLDERS:
|
||||
self.find_files(tmp_folder)
|
||||
for data_path in ["/data/local/tmp/", "/sdcard/", "/tmp/"]:
|
||||
self.find_files(data_path)
|
||||
|
||||
for entry in self.results:
|
||||
self.log.info("Found file in tmp folder at path %s",
|
||||
entry.get("path"))
|
||||
if self.results_path:
|
||||
self.backup_file(entry.get("path"))
|
||||
|
||||
for media_folder in ANDROID_MEDIA_FOLDERS:
|
||||
self.find_files(media_folder)
|
||||
|
||||
self.log.info("Found %s files in primary Android tmp and media folders",
|
||||
len(self.results))
|
||||
self.log.info("Found %s files in primary Android data directories", len(self.results))
|
||||
|
||||
if self.fast_mode:
|
||||
self.log.info("Flag --fast was enabled: skipping full file listing")
|
||||
else:
|
||||
self.log.info("Processing full file listing. This may take a while...")
|
||||
self.find_files("/")
|
||||
|
||||
self.log.info("Found %s total files", len(self.results))
|
||||
|
||||
self._adb_disconnect()
|
||||
|
||||
@@ -51,11 +51,6 @@ ANDROID_DANGEROUS_SETTINGS = [
|
||||
"key": "send_action_app_error",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "enabled installation of non Google Play apps",
|
||||
"key": "install_non_market_apps",
|
||||
"safe_value": "0",
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ class SMS(AndroidExtraction):
|
||||
"timestamp": record["isodate"],
|
||||
"module": self.__class__.__name__,
|
||||
"event": f"sms_{record['direction']}",
|
||||
"data": f"{record.get('address', 'unknown source')}: \"{body}\""
|
||||
"data": f"{record['address']}: \"{body}\""
|
||||
}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
@@ -70,7 +70,7 @@ class SMS(AndroidExtraction):
|
||||
if "body" not in message:
|
||||
continue
|
||||
|
||||
# TODO: check links exported from the body previously.
|
||||
# FIXME: check links exported from the body previously
|
||||
message_links = check_for_links(message["body"])
|
||||
if self.indicators.check_domains(message_links):
|
||||
self.detected.append(message)
|
||||
@@ -110,12 +110,10 @@ class SMS(AndroidExtraction):
|
||||
log.info("Extracted a total of %d SMS messages containing links", len(self.results))
|
||||
|
||||
def _extract_sms_adb(self) -> None:
|
||||
"""Use the Android backup command to extract SMS data from the native SMS
|
||||
app.
|
||||
"""Use the Android backup command to extract SMS data from the native SMS app
|
||||
|
||||
It is crucial to use the under-documented "-nocompress" flag to disable
|
||||
the non-standard Java compression algorithm. This module only supports
|
||||
an unencrypted ADB backup.
|
||||
It is crucial to use the under-documented "-nocompress" flag to disable the non-standard Java compression
|
||||
algorithim. This module only supports an unencrypted ADB backup.
|
||||
"""
|
||||
backup_tar = self._generate_backup("com.android.providers.telephony")
|
||||
if not backup_tar:
|
||||
@@ -124,8 +122,7 @@ class SMS(AndroidExtraction):
|
||||
try:
|
||||
self.results = parse_tar_for_sms(backup_tar)
|
||||
except AndroidBackupParsingError:
|
||||
self.log.info("Impossible to read SMS from the Android Backup, please extract "\
|
||||
"the SMS and try extracting it with Android Backup Extractor")
|
||||
self.log.info("Impossible to read SMS from the Android Backup, please extract the SMS and try extracting it with Android Backup Extractor")
|
||||
return
|
||||
|
||||
log.info("Extracted a total of %d SMS messages containing links", len(self.results))
|
||||
@@ -142,6 +139,5 @@ class SMS(AndroidExtraction):
|
||||
except InsufficientPrivileges:
|
||||
pass
|
||||
|
||||
self.log.warn("No SMS database found. Trying extraction of SMS data using " \
|
||||
"Android backup feature.")
|
||||
self.log.warn("No SMS database found. Trying extraction of SMS data using Android backup feature.")
|
||||
self._extract_sms_adb()
|
||||
|
||||
@@ -195,8 +195,7 @@ def save_timeline(timeline: list, timeline_path: str) -> None:
|
||||
|
||||
"""
|
||||
with open(timeline_path, "a+", encoding="utf-8") as handle:
|
||||
csvoutput = csv.writer(handle, delimiter=",", quotechar="\"",
|
||||
quoting=csv.QUOTE_ALL)
|
||||
csvoutput = csv.writer(handle, delimiter=",", quotechar="\"")
|
||||
csvoutput.writerow(["UTC Timestamp", "Plugin", "Event", "Description"])
|
||||
for event in sorted(timeline, key=lambda x: x["timestamp"] if x["timestamp"] is not None else ""):
|
||||
csvoutput.writerow([
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import requests
|
||||
from typing import Optional
|
||||
from tld import get_tld
|
||||
|
||||
SHORTENER_DOMAINS = [
|
||||
@@ -309,7 +308,7 @@ class URL:
|
||||
|
||||
return self.is_shortened
|
||||
|
||||
def unshorten(self) -> Optional[str]:
|
||||
def unshorten(self) -> None:
|
||||
"""Unshorten the URL by requesting an HTTP HEAD response."""
|
||||
res = requests.head(self.url)
|
||||
if str(res.status_code).startswith("30"):
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
import datetime
|
||||
import hashlib
|
||||
import os
|
||||
import re
|
||||
|
||||
|
||||
@@ -119,3 +120,14 @@ def keys_bytes_to_string(obj) -> str:
|
||||
new_obj[key] = value
|
||||
|
||||
return new_obj
|
||||
|
||||
|
||||
def secure_delete(file_path, rounds=10):
|
||||
file_size = os.path.getsize(file_path)
|
||||
|
||||
with open(file_path, "br+", buffering=-1) as handle:
|
||||
for i in range(rounds):
|
||||
handle.seek(0)
|
||||
handle.write(os.urandom(file_size))
|
||||
|
||||
os.remove(file_path)
|
||||
|
||||
@@ -3,4 +3,4 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
MVT_VERSION = "2.1"
|
||||
MVT_VERSION = "1.6"
|
||||
|
||||
@@ -8,19 +8,22 @@ import os
|
||||
|
||||
import click
|
||||
from rich.logging import RichHandler
|
||||
from rich.prompt import Prompt
|
||||
from rich.prompt import Confirm, Prompt
|
||||
from simple_term_menu import TerminalMenu
|
||||
|
||||
from mvt.common.cmd_check_iocs import CmdCheckIOCS
|
||||
from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC,
|
||||
HELP_MSG_LIST_MODULES, HELP_MSG_MODULE,
|
||||
HELP_MSG_OUTPUT)
|
||||
HELP_MSG_OUTPUT, HELP_MSG_SERIAL)
|
||||
from mvt.common.logo import logo
|
||||
from mvt.common.options import MutuallyExclusiveOption
|
||||
from mvt.common.updates import IndicatorsUpdates
|
||||
|
||||
from .cmd_check_backup import CmdIOSCheckBackup
|
||||
from .cmd_check_fs import CmdIOSCheckFS
|
||||
from .cmd_check_usb import CmdIOSCheckUSB
|
||||
from .decrypt import DecryptBackup
|
||||
from .lockdown import Lockdown
|
||||
from .modules.backup import BACKUP_MODULES
|
||||
from .modules.fs import FS_MODULES
|
||||
from .modules.mixed import MIXED_MODULES
|
||||
@@ -187,6 +190,83 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, dump_path):
|
||||
len(cmd.timeline_detected))
|
||||
|
||||
|
||||
#==============================================================================
|
||||
# Command: check-usb
|
||||
#==============================================================================
|
||||
@cli.command("check-usb", help="Extract artifacts from a live iPhone through USB")
|
||||
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
|
||||
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
|
||||
default=[], help=HELP_MSG_IOC)
|
||||
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
|
||||
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.pass_context
|
||||
def check_usb(ctx, serial, iocs, output, fast, list_modules, module):
|
||||
cmd = CmdIOSCheckUSB(results_path=output, ioc_files=iocs,
|
||||
module_name=module, fast_mode=fast,
|
||||
serial=serial)
|
||||
|
||||
if list_modules:
|
||||
cmd.list_modules()
|
||||
return
|
||||
|
||||
log.info("Checking iPhone through USB, this may take a while")
|
||||
cmd.run()
|
||||
|
||||
if len(cmd.timeline_detected) > 0:
|
||||
log.warning("The analysis of the data produced %d detections!",
|
||||
len(cmd.timeline_detected))
|
||||
|
||||
|
||||
#==============================================================================
|
||||
# Command: clear-certs
|
||||
#==============================================================================
|
||||
@cli.command("clear-certs", help="Clear iOS lockdown certificates")
|
||||
@click.pass_context
|
||||
def clear_certs(ctx):
|
||||
lock = Lockdown()
|
||||
certs = lock.find_certs()
|
||||
|
||||
if not certs:
|
||||
log.info("No iOS lockdown certificates found")
|
||||
return
|
||||
|
||||
choices = []
|
||||
for cert in certs:
|
||||
choices.append(os.path.basename(cert))
|
||||
log.info("Found lockdown certificate at %s", cert)
|
||||
|
||||
choices.append("Cancel")
|
||||
|
||||
terminal_menu = TerminalMenu(
|
||||
choices,
|
||||
title="Select which certificates to delete:",
|
||||
multi_select=True,
|
||||
show_multi_select_hint=True,
|
||||
)
|
||||
terminal_menu.show()
|
||||
|
||||
if "Cancel" in terminal_menu.chosen_menu_entries:
|
||||
log.info("Cancel, not proceeding")
|
||||
return
|
||||
|
||||
confirmed = Confirm.ask(f"You have selected {', '.join(terminal_menu.chosen_menu_entries)}. "
|
||||
"Are you sure you want to proceed deleting them?")
|
||||
if not confirmed:
|
||||
log.info("Not proceeding")
|
||||
return
|
||||
|
||||
for choice in terminal_menu.chosen_menu_entries:
|
||||
try:
|
||||
lock.delete_cert(choice)
|
||||
except PermissionError:
|
||||
log.error("Not enough permissions to delete certificate at \"%s\": "
|
||||
"try launching this command with sudo", choice)
|
||||
else:
|
||||
log.info("Deleted lockdown certificate \"%s\"", choice)
|
||||
|
||||
|
||||
#==============================================================================
|
||||
# Command: check-iocs
|
||||
#==============================================================================
|
||||
|
||||
47
mvt/ios/cmd_check_usb.py
Normal file
47
mvt/ios/cmd_check_usb.py
Normal file
@@ -0,0 +1,47 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from pymobiledevice3.exceptions import (ConnectionFailedError,
|
||||
FatalPairingError, NotTrustedError)
|
||||
from pymobiledevice3.lockdown import LockdownClient
|
||||
|
||||
from mvt.common.command import Command
|
||||
|
||||
from .modules.usb import USB_MODULES
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CmdIOSCheckUSB(Command):
|
||||
|
||||
name = "check-usb"
|
||||
modules = USB_MODULES
|
||||
|
||||
def __init__(self, target_path: str = None, results_path: str = None,
|
||||
ioc_files: list = [], module_name: str = None, serial: str = None,
|
||||
fast_mode: bool = False):
|
||||
super().__init__(target_path=target_path, results_path=results_path,
|
||||
ioc_files=ioc_files, module_name=module_name,
|
||||
serial=serial, fast_mode=fast_mode, log=log)
|
||||
self.lockdown = None
|
||||
|
||||
def init(self):
|
||||
try:
|
||||
if self.serial:
|
||||
self.lockdown = LockdownClient(udid=self.serial)
|
||||
else:
|
||||
self.lockdown = LockdownClient()
|
||||
except NotTrustedError:
|
||||
log.error("Trust this computer from the prompt appearing on the iOS device and try again")
|
||||
sys.exit(-1)
|
||||
except (ConnectionRefusedError, ConnectionFailedError, FatalPairingError):
|
||||
log.error("Unable to connect to the device over USB: try to unplug, plug the device and start again")
|
||||
sys.exit(-1)
|
||||
|
||||
def module_init(self, module):
|
||||
module.lockdown = self.lockdown
|
||||
58
mvt/ios/lockdown.py
Normal file
58
mvt/ios/lockdown.py
Normal file
@@ -0,0 +1,58 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import os
|
||||
import platform
|
||||
|
||||
from mvt.common.utils import secure_delete
|
||||
|
||||
|
||||
class Lockdown:
|
||||
|
||||
def __init__(self, uuids: list = []) -> None:
|
||||
self.uuids = uuids
|
||||
self.lockdown_folder = self._get_lockdown_folder()
|
||||
|
||||
@staticmethod
|
||||
def _get_lockdown_folder():
|
||||
system = platform.system()
|
||||
if system == "Linux":
|
||||
return "/var/lib/lockdown/"
|
||||
elif system == "Darwin":
|
||||
return "/var/db/lockdown/"
|
||||
elif system == "Windows":
|
||||
return os.path.join(os.environ.get("ALLUSERSPROFILE", ""),
|
||||
"Apple", "Lockdown")
|
||||
|
||||
@staticmethod
|
||||
def _get_pymobiledevice_folder():
|
||||
return os.path.expanduser("~/.pymobiledevice3")
|
||||
|
||||
def delete_cert(self, cert_file) -> None:
|
||||
if not self.lockdown_folder:
|
||||
return
|
||||
|
||||
cert_path = os.path.join(self.lockdown_folder, cert_file)
|
||||
if not os.path.exists(cert_path):
|
||||
return
|
||||
|
||||
secure_delete(cert_path)
|
||||
|
||||
def find_certs(self) -> list:
|
||||
if not self.lockdown_folder or not os.path.exists(self.lockdown_folder):
|
||||
return []
|
||||
|
||||
lockdown_certs = []
|
||||
for file_name in os.listdir(self.lockdown_folder):
|
||||
if not file_name.endswith(".plist"):
|
||||
continue
|
||||
|
||||
if file_name == "SystemConfiguration.plist":
|
||||
continue
|
||||
|
||||
file_path = os.path.join(self.lockdown_folder, file_name)
|
||||
lockdown_certs.append(file_path)
|
||||
|
||||
return sorted(lockdown_certs)
|
||||
@@ -31,70 +31,32 @@ class ProfileEvents(IOSExtraction):
|
||||
"timestamp": record.get("timestamp"),
|
||||
"module": self.__class__.__name__,
|
||||
"event": "profile_operation",
|
||||
"data": f"Process {record.get('process')} started operation " \
|
||||
f"{record.get('operation')} of profile {record.get('profile_id')}"
|
||||
"data": f"Process {record.get('process')} started operation {record.get('operation')} of profile {record.get('profile_id')}"
|
||||
}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc = self.indicators.check_process(result.get("process"))
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
ioc = self.indicators.check_profile(result.get("profile_id"))
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
@staticmethod
|
||||
def parse_profile_events(events_file_path) -> list:
|
||||
results = []
|
||||
|
||||
with open(events_file_path, "rb") as handle:
|
||||
events_plist = plistlib.load(handle)
|
||||
|
||||
if "ProfileEvents" not in events_plist:
|
||||
return results
|
||||
|
||||
for event in events_plist["ProfileEvents"]:
|
||||
key = list(event.keys())[0]
|
||||
|
||||
result = {
|
||||
"profile_id": key,
|
||||
"timestamp": "",
|
||||
"operation": "",
|
||||
"process": "",
|
||||
}
|
||||
|
||||
for key, value in event[key].items():
|
||||
key = key.lower()
|
||||
if key == "timestamp":
|
||||
result["timestamp"] = str(convert_timestamp_to_iso(value))
|
||||
else:
|
||||
result[key] = value
|
||||
|
||||
results.append(result)
|
||||
|
||||
return results
|
||||
|
||||
def run(self) -> None:
|
||||
for events_file in self._get_backup_files_from_manifest(relative_path=CONF_PROFILES_EVENTS_RELPATH):
|
||||
events_file_path = self._get_backup_file_from_id(events_file["file_id"])
|
||||
if not events_file_path:
|
||||
continue
|
||||
|
||||
self.log.info("Found MCProfileEvents.plist file at %s", events_file_path)
|
||||
with open(events_file_path, "rb") as handle:
|
||||
events_plist = plistlib.load(handle)
|
||||
|
||||
self.results.extend(self.parse_profile_events(events_file_path))
|
||||
if "ProfileEvents" not in events_plist:
|
||||
continue
|
||||
|
||||
for result in self.results:
|
||||
self.log.info("On %s process \"%s\" started operation \"%s\" of profile \"%s\"",
|
||||
result.get("timestamp"), result.get("process"),
|
||||
result.get("operation"), result.get("profile_id"))
|
||||
for event in events_plist["ProfileEvents"]:
|
||||
key = list(event.keys())[0]
|
||||
self.log.info("On %s process \"%s\" started operation \"%s\" of profile \"%s\"",
|
||||
event[key].get("timestamp"), event[key].get("process"),
|
||||
event[key].get("operation"), key)
|
||||
|
||||
self.results.append({
|
||||
"profile_id": key,
|
||||
"timestamp": convert_timestamp_to_iso(event[key].get("timestamp")),
|
||||
"operation": event[key].get("operation"),
|
||||
"process": event[key].get("process"),
|
||||
})
|
||||
|
||||
self.log.info("Extracted %d profile events", len(self.results))
|
||||
|
||||
10
mvt/ios/modules/usb/__init__.py
Normal file
10
mvt/ios/modules/usb/__init__.py
Normal file
@@ -0,0 +1,10 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from .applications import Applications
|
||||
from .device_info import DeviceInfo
|
||||
from .processes import Processes
|
||||
|
||||
USB_MODULES = [Applications, DeviceInfo, Processes]
|
||||
46
mvt/ios/modules/usb/applications.py
Normal file
46
mvt/ios/modules/usb/applications.py
Normal file
@@ -0,0 +1,46 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
|
||||
from pymobiledevice3.services.installation_proxy import \
|
||||
InstallationProxyService
|
||||
|
||||
from .base import IOSUSBExtraction
|
||||
|
||||
|
||||
class Applications(IOSUSBExtraction):
|
||||
"""This class extracts all applications installed on the phone"""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc = self.indicators.check_app_id(result["CFBundleIdentifier"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self) -> None:
|
||||
user_apps = InstallationProxyService(lockdown=self.lockdown).get_apps("User")
|
||||
for user_app in user_apps:
|
||||
user_app["type"] = "user"
|
||||
|
||||
system_apps = InstallationProxyService(lockdown=self.lockdown).get_apps("System")
|
||||
for system_app in system_apps:
|
||||
system_app["type"] = "system"
|
||||
|
||||
self.results = user_apps + system_apps
|
||||
|
||||
self.log.info("Identified %d applications installed on the device",
|
||||
len(self.results))
|
||||
25
mvt/ios/modules/usb/base.py
Normal file
25
mvt/ios/modules/usb/base.py
Normal file
@@ -0,0 +1,25 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class IOSUSBExtraction(MVTModule):
|
||||
"""This class provides a base for all iOS USB extraction modules."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
self.device = None
|
||||
self.serial = None
|
||||
self.lockdown = None
|
||||
39
mvt/ios/modules/usb/device_info.py
Normal file
39
mvt/ios/modules/usb/device_info.py
Normal file
@@ -0,0 +1,39 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import base64
|
||||
import logging
|
||||
|
||||
from mvt.ios.versions import latest_ios_version
|
||||
|
||||
from .base import IOSUSBExtraction
|
||||
|
||||
|
||||
class DeviceInfo(IOSUSBExtraction):
|
||||
"""This class extracts all processes running on the phone."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
def run(self) -> None:
|
||||
self.results = self.lockdown.all_values
|
||||
|
||||
for entry in self.results:
|
||||
if isinstance(self.results[entry], bytes):
|
||||
self.results[entry] = base64.b64encode(self.results[entry])
|
||||
elif isinstance(self.results[entry], dict):
|
||||
for second_entry in self.results[entry]:
|
||||
if isinstance(self.results[entry][second_entry], bytes):
|
||||
self.results[entry][second_entry] = base64.b64encode(self.results[entry][second_entry])
|
||||
|
||||
if "ProductVersion" in self.results:
|
||||
latest = latest_ios_version()
|
||||
if self.results["ProductVersion"] != latest["version"]:
|
||||
self.log.warning("This phone is running an outdated iOS version: %s (latest is %s)",
|
||||
self.results["ProductVersion"], latest['version'])
|
||||
42
mvt/ios/modules/usb/processes.py
Normal file
42
mvt/ios/modules/usb/processes.py
Normal file
@@ -0,0 +1,42 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
|
||||
from pymobiledevice3.services.os_trace import OsTraceService
|
||||
|
||||
from .base import IOSUSBExtraction
|
||||
|
||||
|
||||
class Processes(IOSUSBExtraction):
|
||||
"""This class extracts all processes running on the phone."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc = self.indicators.check_process(result["name"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self) -> None:
|
||||
processes = OsTraceService(lockdown=self.lockdown).get_pid_list().get("Payload")
|
||||
for pid in processes:
|
||||
self.results.append({
|
||||
"pid": pid,
|
||||
"name": processes[pid]["ProcessName"]
|
||||
})
|
||||
|
||||
self.log.info("Identified %d processes running on the device",
|
||||
len(self.results))
|
||||
18
setup.cfg
18
setup.cfg
@@ -21,17 +21,19 @@ package_dir = = ./
|
||||
include_package_data = True
|
||||
python_requires = >= 3.8
|
||||
install_requires =
|
||||
click >=8.1.3
|
||||
rich >=12.4.4
|
||||
click >=8.0.3
|
||||
rich >=10.12.0
|
||||
tld >=0.12.6
|
||||
requests >=2.28.1
|
||||
simplejson >=3.17.6
|
||||
packaging >=21.3
|
||||
tqdm >=4.62.3
|
||||
requests >=2.26.0
|
||||
simplejson >=3.17.5
|
||||
packaging >=21.0
|
||||
appdirs >=1.4.4
|
||||
iOSbackup >=0.9.921
|
||||
adb-shell >=0.4.3
|
||||
libusb1 >=3.0.0
|
||||
cryptography >=37.0.4
|
||||
adb-shell >=0.4.2
|
||||
libusb1 >=2.0.1
|
||||
cryptography >=36.0.1
|
||||
pymobiledevice3 >=1.23.9
|
||||
pyyaml >=6.0
|
||||
|
||||
[options.packages.find]
|
||||
|
||||
0
tests/ios_usb/__init__.py
Normal file
0
tests/ios_usb/__init__.py
Normal file
35
tests/ios_usb/test_applications.py
Normal file
35
tests/ios_usb/test_applications.py
Normal file
@@ -0,0 +1,35 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
|
||||
from pymobiledevice3.lockdown import LockdownClient
|
||||
|
||||
from mvt.common.module import run_module
|
||||
from mvt.ios.modules.usb.applications import Applications
|
||||
|
||||
|
||||
class TestUSBApplication:
|
||||
def test_run(self, mocker):
|
||||
mocker.patch("pymobiledevice3.lockdown.LockdownClient.start_service")
|
||||
mocker.patch("pymobiledevice3.usbmux.select_device")
|
||||
mocker.patch("pymobiledevice3.service_connection.ServiceConnection.create")
|
||||
mocker.patch(
|
||||
"pymobiledevice3.lockdown.LockdownClient.query_type",
|
||||
return_value="com.apple.mobile.lockdown")
|
||||
mocker.patch(
|
||||
"pymobiledevice3.lockdown.LockdownClient.validate_pairing",
|
||||
return_value=True)
|
||||
mocker.patch(
|
||||
"pymobiledevice3.services.installation_proxy.InstallationProxyService.get_apps",
|
||||
return_value=[{"CFBundleIdentifier": "com.bad.app"}]
|
||||
)
|
||||
|
||||
lockdown = LockdownClient()
|
||||
|
||||
m = Applications(log=logging)
|
||||
m.lockdown = lockdown
|
||||
run_module(m)
|
||||
assert len(m.results) == 2
|
||||
34
tests/ios_usb/test_device_info.py
Normal file
34
tests/ios_usb/test_device_info.py
Normal file
@@ -0,0 +1,34 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
|
||||
from pymobiledevice3.lockdown import LockdownClient
|
||||
|
||||
from mvt.common.module import run_module
|
||||
from mvt.ios.modules.usb.device_info import DeviceInfo
|
||||
|
||||
|
||||
class TestUSBDeviceInfo:
|
||||
def test_run(self, mocker):
|
||||
mocker.patch("pymobiledevice3.usbmux.select_device")
|
||||
mocker.patch("pymobiledevice3.service_connection.ServiceConnection.create")
|
||||
mocker.patch(
|
||||
"pymobiledevice3.lockdown.LockdownClient.query_type",
|
||||
return_value="com.apple.mobile.lockdown")
|
||||
mocker.patch(
|
||||
"pymobiledevice3.lockdown.LockdownClient.validate_pairing",
|
||||
return_value=True)
|
||||
mocker.patch(
|
||||
"pymobiledevice3.lockdown.LockdownClient.get_value",
|
||||
return_value={'DeviceClass': 'iPhone', 'ProductVersion': '14.3'}
|
||||
)
|
||||
|
||||
lockdown = LockdownClient()
|
||||
|
||||
m = DeviceInfo(log=logging)
|
||||
m.lockdown = lockdown
|
||||
run_module(m)
|
||||
assert len(m.results) == 2
|
||||
29
tests/ios_usb/test_processes.py
Normal file
29
tests/ios_usb/test_processes.py
Normal file
@@ -0,0 +1,29 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
|
||||
from mvt.common.indicators import Indicators
|
||||
from mvt.common.module import run_module
|
||||
from mvt.ios.modules.usb.processes import Processes
|
||||
|
||||
|
||||
class TestUSBProcesses:
|
||||
def test_run(self, mocker, indicator_file):
|
||||
mocker.patch("pymobiledevice3.services.base_service.BaseService.__init__")
|
||||
mocker.patch(
|
||||
"pymobiledevice3.services.os_trace.OsTraceService.get_pid_list",
|
||||
return_value={"Payload": {"1": {"ProcessName": "storebookkeeperd"}, "1854": {"ProcessName": "cfprefssd"}}}
|
||||
)
|
||||
|
||||
ind = Indicators(log=logging)
|
||||
ind.parse_stix2(indicator_file)
|
||||
ind.ioc_collections[0]["processes"].append("cfprefssd")
|
||||
|
||||
m = Processes(log=logging)
|
||||
m.indicators = ind
|
||||
run_module(m)
|
||||
assert len(m.results) == 2
|
||||
assert len(m.detected) == 1
|
||||
Reference in New Issue
Block a user