1
mirror of https://github.com/mvt-project/mvt synced 2025-10-21 22:42:15 +02:00

Compare commits

..

33 Commits

Author SHA1 Message Date
Nex
79a01c45cc Bumped version 2022-07-20 14:12:17 +02:00
Nex
a440d12377 Merge branch 'main' of github.com:mvt-project/mvt 2022-07-20 14:12:08 +02:00
Nex
8085888c0c Improved parsing of profile events to support new formats as well 2022-07-20 14:11:36 +02:00
Nex
c2617fe778 Checking profile IDs in profile_events 2022-07-20 13:25:51 +02:00
Nex
2e1243864c Added check_indicators to profile_events 2022-07-20 13:24:20 +02:00
tek
ba5ff9b38c Fixes a minor typing bug 2022-07-18 14:25:01 +02:00
Nex
3fccebe132 Merge branch 'main' of github.com:mvt-project/mvt 2022-07-14 12:06:52 +02:00
Nex
1265b366c1 Added install_non_market_apps to settings warnings 2022-07-14 09:09:01 +02:00
Nex
c944fb3234 Enforcing quotes in timeline csv writing 2022-07-12 12:03:20 +02:00
Nex
e6b4d17027 Using error instead of warning for failed apk download 2022-07-12 11:55:31 +02:00
Nex
f55ac36189 Code style fixes 2022-07-12 11:55:10 +02:00
Nex
550d6037a6 Bumped version 2022-07-08 19:54:46 +02:00
Nex
e875c978c9 Optional address in SMS serialize 2022-07-08 19:54:33 +02:00
Nex
fbf510567c Bumped version 2022-07-07 13:51:56 +02:00
Nex
94fe98b9ec Removed unused imports 2022-07-07 13:00:38 +02:00
Nex
a328d57551 Added test-upload to Makefile 2022-07-07 12:31:35 +02:00
Nex
a9eabc5d9d Updated dependencies 2022-07-07 12:28:42 +02:00
Nex
1ed6140cb6 Got rid of tqdm in favor of rich progress bar 2022-07-07 12:28:30 +02:00
Nex
efceb777f0 Small clean ups and type hints of mvt-android 2022-07-06 18:38:16 +02:00
Nex
14bbbd9e45 Refactored mvt-android adb Files module in order to keep copy of suspicious payloads 2022-07-06 17:45:21 +02:00
Nex
3cdc6da428 Temporarily removed mvt-ios check-usb command 2022-07-06 13:01:55 +02:00
Nex
459ff8c51c Adding some more checks to bugreport packages module 2022-07-05 18:10:48 +02:00
Nex
88665cf7dd Merge pull request #289 from lorenzo-reho/main
Fixed cmd_download_apks serial connection bug
2022-07-02 18:22:59 +02:00
lorenzo-reho
0a749da85f Fixed cmd_download_apks serial connection bug 2022-07-02 16:14:27 +02:00
Nex
f81604133a Fixed Prompt imports 2022-06-30 11:06:37 +02:00
Nex
cdd9b74cbc Replaced getpass with Prompt 2022-06-30 10:58:50 +02:00
Nex
3fb37b4f30 Added finish() method to Command class 2022-06-30 10:26:33 +02:00
Nex
2fe8b58c09 Removed space 2022-06-30 10:26:30 +02:00
tek
61d0c4134d Fixes a bug in mvt-android download-apks 2022-06-29 23:06:49 +02:00
Nex
6b36fe5fca Re-adding again empty spacing that went missing 2022-06-29 10:35:30 +02:00
Nex
c9f54947e3 Small language and style changes 2022-06-29 01:11:30 +02:00
Nex
ae6fec5ac5 Merge branch 'Te-k-feature/ios-check-usb' 2022-06-29 00:57:32 +02:00
Nex
298726ab2b Minor style fixes 2022-06-29 00:57:25 +02:00
28 changed files with 259 additions and 510 deletions

View File

@@ -8,3 +8,6 @@ dist:
upload:
python3 -m twine upload dist/*
test-upload:
python3 -m twine upload --repository testpypi dist/*

File diff suppressed because one or more lines are too long

View File

@@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/
import logging
import os
import click
from rich.logging import RichHandler
@@ -71,15 +70,7 @@ def download_apks(ctx, all_apks, virustotal, output, from_file, serial):
log.critical("You need to specify an output folder with --output!")
ctx.exit(1)
if not os.path.exists(output):
try:
os.makedirs(output)
except Exception as e:
log.critical("Unable to create output folder %s: %s", output, e)
ctx.exit(1)
download = DownloadAPKs(output_folder=output, all_apks=all_apks,
log=logging.getLogger(DownloadAPKs.__module__))
download = DownloadAPKs(results_path=output, all_apks=all_apks)
if serial:
download.serial = serial
download.run()

View File

@@ -3,13 +3,15 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import getpass
import io
import logging
import os
import sys
import tarfile
from pathlib import Path
from typing import Callable
from rich.prompt import Prompt
from mvt.android.parsers.backup import (AndroidBackupParsingError,
InvalidBackupPassword, parse_ab_header,
@@ -37,7 +39,7 @@ class CmdAndroidCheckBackup(Command):
self.backup_archive = None
self.backup_files = []
def init(self):
def init(self) -> None:
if os.path.isfile(self.target_path):
self.backup_type = "ab"
with open(self.target_path, "rb") as handle:
@@ -50,7 +52,7 @@ class CmdAndroidCheckBackup(Command):
password = None
if header["encryption"] != "none":
password = getpass.getpass(prompt="Backup Password: ", stream=None)
password = Prompt.ask("Enter backup password", password=True)
try:
tardata = parse_backup_file(data, password=password)
except InvalidBackupPassword:
@@ -76,7 +78,7 @@ class CmdAndroidCheckBackup(Command):
log.critical("Invalid backup path, path should be a folder or an Android Backup (.ab) file")
sys.exit(1)
def module_init(self, module):
def module_init(self, module: Callable) -> None:
if self.backup_type == "folder":
module.from_folder(self.target_path, self.backup_files)
else:

View File

@@ -6,6 +6,7 @@
import logging
import os
from pathlib import Path
from typing import Callable
from zipfile import ZipFile
from mvt.common.command import Command
@@ -31,7 +32,7 @@ class CmdAndroidCheckBugreport(Command):
self.bugreport_archive = None
self.bugreport_files = []
def init(self):
def init(self) -> None:
if os.path.isfile(self.target_path):
self.bugreport_format = "zip"
self.bugreport_archive = ZipFile(self.target_path)
@@ -44,7 +45,7 @@ class CmdAndroidCheckBugreport(Command):
for file_name in subfiles:
self.bugreport_files.append(os.path.relpath(os.path.join(root, file_name), parent_path))
def module_init(self, module):
def module_init(self, module: Callable) -> None:
if self.bugreport_format == "zip":
module.from_zip(self.bugreport_archive, self.bugreport_files)
else:

View File

@@ -6,8 +6,9 @@
import json
import logging
import os
from typing import Callable
from tqdm import tqdm
from rich.progress import track
from mvt.common.module import InsufficientPrivileges
@@ -17,18 +18,6 @@ from .modules.adb.packages import Packages
log = logging.getLogger(__name__)
# TODO: Would be better to replace tqdm with rich.progress to reduce
# the number of dependencies. Need to investigate whether
# it's possible to have a similar callback system.
class PullProgress(tqdm):
"""PullProgress is a tqdm update system for APK downloads."""
def update_to(self, file_name, current, total):
if total is not None:
self.total = total
self.update(current - self.n)
class DownloadAPKs(AndroidExtraction):
"""DownloadAPKs is the main class operating the download of APKs
from the device.
@@ -36,22 +25,22 @@ class DownloadAPKs(AndroidExtraction):
"""
def __init__(self, output_folder=None, all_apks=False, log=None,
packages=None):
def __init__(self, results_path: str = "", all_apks: bool = False,
packages: list = []):
"""Initialize module.
:param output_folder: Path to the folder where data should be stored
:param results_path: Path to the folder where data should be stored
:param all_apks: Boolean indicating whether to download all packages
or filter known-goods
:param packages: Provided list of packages, typically for JSON checks
"""
super().__init__(output_folder=output_folder, log=log)
super().__init__(results_path=results_path, log=log)
self.packages = packages
self.all_apks = all_apks
self.output_folder_apk = None
self.results_path_apks = None
@classmethod
def from_json(cls, json_path):
def from_json(cls, json_path: str) -> Callable:
"""Initialize this class from an existing apks.json file.
:param json_path: Path to the apks.json file to parse.
@@ -61,7 +50,7 @@ class DownloadAPKs(AndroidExtraction):
packages = json.load(handle)
return cls(packages=packages)
def pull_package_file(self, package_name, remote_path):
def pull_package_file(self, package_name: str, remote_path: str) -> None:
"""Pull files related to specific package from the device.
:param package_name: Name of the package to download
@@ -75,7 +64,7 @@ class DownloadAPKs(AndroidExtraction):
if "==/" in remote_path:
file_name = "_" + remote_path.split("==/")[1].replace(".apk", "")
local_path = os.path.join(self.output_folder_apk,
local_path = os.path.join(self.results_path_apks,
f"{package_name}{file_name}.apk")
name_counter = 0
while True:
@@ -83,17 +72,14 @@ class DownloadAPKs(AndroidExtraction):
break
name_counter += 1
local_path = os.path.join(self.output_folder_apk,
local_path = os.path.join(self.results_path_apks,
f"{package_name}{file_name}_{name_counter}.apk")
try:
with PullProgress(unit='B', unit_divisor=1024, unit_scale=True,
miniters=1) as pp:
self._adb_download(remote_path, local_path,
progress_callback=pp.update_to)
self._adb_download(remote_path, local_path)
except InsufficientPrivileges:
log.warn("Unable to pull package file from %s: insufficient privileges, it might be a system app",
remote_path)
log.error("Unable to pull package file from %s: insufficient privileges, it might be a system app",
remote_path)
self._adb_reconnect()
return None
except Exception as e:
@@ -104,26 +90,24 @@ class DownloadAPKs(AndroidExtraction):
return local_path
def get_packages(self):
def get_packages(self) -> None:
"""Use the Packages adb module to retrieve the list of packages.
We reuse the same extraction logic to then download the APKs.
"""
self.log.info("Retrieving list of installed packages...")
m = Packages()
m.log = self.log
m.serial = self.serial
m.run()
self.packages = m.results
def pull_packages(self):
"""Download all files of all selected packages from the device."""
log.info("Starting extraction of installed APKs at folder %s", self.output_folder)
if not os.path.exists(self.output_folder):
os.mkdir(self.output_folder)
def pull_packages(self) -> None:
"""Download all files of all selected packages from the device.
"""
log.info("Starting extraction of installed APKs at folder %s",
self.results_path)
# If the user provided the flag --all-apks we select all packages.
packages_selection = []
@@ -137,7 +121,7 @@ class DownloadAPKs(AndroidExtraction):
if not package.get("system", False):
packages_selection.append(package)
log.info("Selected only %d packages which are not marked as system",
log.info("Selected only %d packages which are not marked as \"system\"",
len(packages_selection))
if len(packages_selection) == 0:
@@ -146,15 +130,15 @@ class DownloadAPKs(AndroidExtraction):
log.info("Downloading packages from device. This might take some time ...")
self.output_folder_apk = os.path.join(self.output_folder, "apks")
if not os.path.exists(self.output_folder_apk):
os.mkdir(self.output_folder_apk)
self.results_path_apks = os.path.join(self.results_path, "apks")
if not os.path.exists(self.results_path_apks):
os.makedirs(self.results_path_apks, exist_ok=True)
counter = 0
for package in packages_selection:
counter += 1
for i in track(range(len(packages_selection)),
description=f"Downloading {len(packages_selection)} packages..."):
package = packages_selection[i]
log.info("[%d/%d] Package: %s", counter, len(packages_selection),
log.info("[%d/%d] Package: %s", i, len(packages_selection),
package["package_name"])
# Sometimes the package path contains multiple lines for multiple apks.
@@ -170,14 +154,12 @@ class DownloadAPKs(AndroidExtraction):
log.info("Download of selected packages completed")
def save_json(self):
"""Save the results to the package.json file."""
json_path = os.path.join(self.output_folder, "apks.json")
def save_json(self) -> None:
json_path = os.path.join(self.results_path, "apks.json")
with open(json_path, "w", encoding="utf-8") as handle:
json.dump(self.packages, handle, indent=4)
def run(self) -> None:
"""Run all steps of fetch-apk."""
self.get_packages()
self._adb_connect()
self.pull_packages()

View File

@@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/
import base64
import getpass
import logging
import os
import random
@@ -19,6 +18,7 @@ from adb_shell.auth.keygen import keygen, write_public_keyfile
from adb_shell.auth.sign_pythonrsa import PythonRSASigner
from adb_shell.exceptions import (AdbCommandFailureException, DeviceAuthError,
UsbDeviceNotFoundError, UsbReadFailedError)
from rich.prompt import Prompt
from usb1 import USBErrorAccess, USBErrorBusy
from mvt.android.parsers.backup import (InvalidBackupPassword, parse_ab_header,
@@ -270,7 +270,7 @@ class AndroidExtraction(MVTModule):
return parse_backup_file(backup_output, password=None)
for password_retry in range(0, 3):
backup_password = getpass.getpass(prompt="Backup password: ", stream=None)
backup_password = Prompt.ask("Enter backup password", password=True)
try:
decrypted_backup_tar = parse_backup_file(backup_output, backup_password)
return decrypted_backup_tar

View File

@@ -5,6 +5,7 @@
import datetime
import logging
import os
import stat
from mvt.common.utils import convert_timestamp_to_iso
@@ -13,6 +14,15 @@ from .base import AndroidExtraction
log = logging.getLogger(__name__)
ANDROID_TMP_FOLDERS = [
"/tmp/",
"/data/local/tmp/",
]
ANDROID_MEDIA_FOLDERS = [
"/data/media/0",
"/sdcard/",
]
class Files(AndroidExtraction):
"""This module extracts the list of files on the device."""
@@ -25,13 +35,50 @@ class Files(AndroidExtraction):
log=log, results=results)
self.full_find = False
def serialize(self, record: dict) -> None:
if "modified_time" in record:
return {
"timestamp": record["modified_time"],
"module": self.__class__.__name__,
"event": "file_modified",
"data": record["path"],
}
def check_indicators(self) -> None:
for result in self.results:
if result.get("is_suid"):
self.log.warning("Found an SUID file in a non-standard directory \"%s\".",
result["path"])
if self.indicators and self.indicators.check_file_path(result["path"]):
self.log.warning("Found a known suspicous file at path: \"%s\"", result["path"])
self.detected.append(result)
def backup_file(self, file_path: str) -> None:
local_file_name = file_path.replace("/", "_").replace(" ", "-")
local_files_folder = os.path.join(self.results_path, "files")
if not os.path.exists(local_files_folder):
os.mkdir(local_files_folder)
local_file_path = os.path.join(local_files_folder, local_file_name)
try:
self._adb_download(remote_path=file_path,
local_path=local_file_path)
except Exception:
pass
else:
self.log.info("Downloaded file %s to local copy at %s",
file_path, local_file_path)
def find_files(self, folder: str) -> None:
if self.full_find:
output = self._adb_command(f"find '{folder}' -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
output = self._adb_command(f"find '{folder}' -type f -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
for file_line in output.splitlines():
[unix_timestamp, mode, size, owner, group, full_path] = file_line.rstrip().split(" ", 5)
mod_time = convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(float(unix_timestamp))))
self.results.append({
"path": full_path,
"modified_time": mod_time,
@@ -43,39 +90,10 @@ class Files(AndroidExtraction):
"group": group,
})
else:
output = self._adb_command(f"find '{folder}' 2> /dev/null")
output = self._adb_command(f"find '{folder}' -type f 2> /dev/null")
for file_line in output.splitlines():
self.results.append({"path": file_line.rstrip()})
def serialize(self, record: dict) -> None:
if "modified_time" in record:
return {
"timestamp": record["modified_time"],
"module": self.__class__.__name__,
"event": "file_modified",
"data": record["path"],
}
def check_suspicious(self) -> None:
"""Check for files with suspicious permissions"""
for result in sorted(self.results, key=lambda item: item["path"]):
if result.get("is_suid"):
self.log.warning("Found an SUID file in a non-standard directory \"%s\".",
result["path"])
self.detected.append(result)
def check_indicators(self) -> None:
"""Check file list for known suspicious files or suspicious properties"""
self.check_suspicious()
if not self.indicators:
return
for result in self.results:
if self.indicators.check_file_path(result["path"]):
self.log.warning("Found a known suspicous file at path: \"%s\"", result["path"])
self.detected.append(result)
def run(self) -> None:
self._adb_connect()
@@ -83,16 +101,27 @@ class Files(AndroidExtraction):
if output or output.strip().splitlines():
self.full_find = True
for data_path in ["/data/local/tmp/", "/sdcard/", "/tmp/"]:
self.find_files(data_path)
for tmp_folder in ANDROID_TMP_FOLDERS:
self.find_files(tmp_folder)
self.log.info("Found %s files in primary Android data directories", len(self.results))
for entry in self.results:
self.log.info("Found file in tmp folder at path %s",
entry.get("path"))
if self.results_path:
self.backup_file(entry.get("path"))
for media_folder in ANDROID_MEDIA_FOLDERS:
self.find_files(media_folder)
self.log.info("Found %s files in primary Android tmp and media folders",
len(self.results))
if self.fast_mode:
self.log.info("Flag --fast was enabled: skipping full file listing")
else:
self.log.info("Processing full file listing. This may take a while...")
self.find_files("/")
self.log.info("Found %s total files", len(self.results))
self._adb_disconnect()

View File

@@ -51,6 +51,11 @@ ANDROID_DANGEROUS_SETTINGS = [
"key": "send_action_app_error",
"safe_value": "1",
},
{
"description": "enabled installation of non Google Play apps",
"key": "install_non_market_apps",
"safe_value": "0",
}
]

View File

@@ -59,7 +59,7 @@ class SMS(AndroidExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": f"sms_{record['direction']}",
"data": f"{record['address']}: \"{body}\""
"data": f"{record.get('address', 'unknown source')}: \"{body}\""
}
def check_indicators(self) -> None:
@@ -70,7 +70,7 @@ class SMS(AndroidExtraction):
if "body" not in message:
continue
# FIXME: check links exported from the body previously
# TODO: check links exported from the body previously.
message_links = check_for_links(message["body"])
if self.indicators.check_domains(message_links):
self.detected.append(message)
@@ -110,10 +110,12 @@ class SMS(AndroidExtraction):
log.info("Extracted a total of %d SMS messages containing links", len(self.results))
def _extract_sms_adb(self) -> None:
"""Use the Android backup command to extract SMS data from the native SMS app
"""Use the Android backup command to extract SMS data from the native SMS
app.
It is crucial to use the under-documented "-nocompress" flag to disable the non-standard Java compression
algorithim. This module only supports an unencrypted ADB backup.
It is crucial to use the under-documented "-nocompress" flag to disable
the non-standard Java compression algorithm. This module only supports
an unencrypted ADB backup.
"""
backup_tar = self._generate_backup("com.android.providers.telephony")
if not backup_tar:
@@ -122,7 +124,8 @@ class SMS(AndroidExtraction):
try:
self.results = parse_tar_for_sms(backup_tar)
except AndroidBackupParsingError:
self.log.info("Impossible to read SMS from the Android Backup, please extract the SMS and try extracting it with Android Backup Extractor")
self.log.info("Impossible to read SMS from the Android Backup, please extract "\
"the SMS and try extracting it with Android Backup Extractor")
return
log.info("Extracted a total of %d SMS messages containing links", len(self.results))
@@ -139,5 +142,6 @@ class SMS(AndroidExtraction):
except InsufficientPrivileges:
pass
self.log.warn("No SMS database found. Trying extraction of SMS data using Android backup feature.")
self.log.warn("No SMS database found. Trying extraction of SMS data using " \
"Android backup feature.")
self._extract_sms_adb()

View File

@@ -6,6 +6,10 @@
import logging
import re
from mvt.android.modules.adb.packages import (DANGEROUS_PERMISSIONS,
DANGEROUS_PERMISSIONS_THRESHOLD,
ROOT_PACKAGES)
from .base import BugReportModule
log = logging.getLogger(__name__)
@@ -41,11 +45,17 @@ class Packages(BugReportModule):
return records
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if result["package_name"] in ROOT_PACKAGES:
self.log.warning("Found an installed package related to rooting/jailbreaking: \"%s\"",
result["package_name"])
self.detected.append(result)
continue
if not self.indicators:
continue
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
@@ -165,4 +175,14 @@ class Packages(BugReportModule):
self.results = self.parse_packages_list("\n".join(lines))
for result in self.results:
dangerous_permissions_count = 0
for perm in result["requested_permissions"]:
if perm in DANGEROUS_PERMISSIONS:
dangerous_permissions_count += 1
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
self.log.info("Found package \"%s\" requested %d potentially dangerous permissions",
result["package_name"], dangerous_permissions_count)
self.log.info("Extracted details on %d packages", len(self.results))

View File

@@ -136,6 +136,9 @@ class Command(object):
def module_init(self, module: Callable) -> None:
raise NotImplementedError
def finish(self) -> None:
raise NotImplementedError
def run(self) -> None:
self._create_storage()
self._add_log_file_handler(self.log)
@@ -176,3 +179,8 @@ class Command(object):
self._store_timeline()
self._store_info()
try:
self.finish()
except NotImplementedError:
pass

View File

@@ -195,7 +195,8 @@ def save_timeline(timeline: list, timeline_path: str) -> None:
"""
with open(timeline_path, "a+", encoding="utf-8") as handle:
csvoutput = csv.writer(handle, delimiter=",", quotechar="\"")
csvoutput = csv.writer(handle, delimiter=",", quotechar="\"",
quoting=csv.QUOTE_ALL)
csvoutput.writerow(["UTC Timestamp", "Plugin", "Event", "Description"])
for event in sorted(timeline, key=lambda x: x["timestamp"] if x["timestamp"] is not None else ""):
csvoutput.writerow([

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import requests
from typing import Optional
from tld import get_tld
SHORTENER_DOMAINS = [
@@ -308,7 +309,7 @@ class URL:
return self.is_shortened
def unshorten(self) -> None:
def unshorten(self) -> Optional[str]:
"""Unshorten the URL by requesting an HTTP HEAD response."""
res = requests.head(self.url)
if str(res.status_code).startswith("30"):

View File

@@ -3,4 +3,4 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
MVT_VERSION = "1.6"
MVT_VERSION = "2.1"

View File

@@ -13,14 +13,13 @@ from rich.prompt import Prompt
from mvt.common.cmd_check_iocs import CmdCheckIOCS
from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC,
HELP_MSG_LIST_MODULES, HELP_MSG_MODULE,
HELP_MSG_OUTPUT, HELP_MSG_SERIAL)
HELP_MSG_OUTPUT)
from mvt.common.logo import logo
from mvt.common.options import MutuallyExclusiveOption
from mvt.common.updates import IndicatorsUpdates
from .cmd_check_backup import CmdIOSCheckBackup
from .cmd_check_fs import CmdIOSCheckFS
from .cmd_check_usb import CmdIOSCheckUSB
from .decrypt import DecryptBackup
from .modules.backup import BACKUP_MODULES
from .modules.fs import FS_MODULES
@@ -216,34 +215,3 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
def download_iocs():
ioc_updates = IndicatorsUpdates()
ioc_updates.update()
#==============================================================================
# Command: check-usb
#==============================================================================
@cli.command("check-usb", help="Extract artifacts from a live iPhone through USB / lockdown")
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
# TODO: serial
# @click.argument("BACKUP_PATH", type=click.Path(exists=True))
@click.pass_context
def check_usb(ctx, serial, iocs, output, fast, list_modules, module):
cmd = CmdIOSCheckUSB(results_path=output, ioc_files=iocs,
module_name=module, fast_mode=fast,
serial=serial)
if list_modules:
cmd.list_modules()
return
log.info("Checking iPhone through USB, this may take a while")
cmd.run()
if len(cmd.timeline_detected) > 0:
log.warning("The analysis of the data produced %d detections!",
len(cmd.timeline_detected))

View File

@@ -1,46 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sys
from pymobiledevice3.exceptions import ConnectionFailedError
from pymobiledevice3.lockdown import LockdownClient
from mvt.common.command import Command
from .modules.usb import USB_MODULES
log = logging.getLogger(__name__)
class CmdIOSCheckUSB(Command):
name = "check-usb"
modules = USB_MODULES
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
self.lockdown = None
def init(self):
try:
if self.serial:
self.lockdown = LockdownClient(udid=self.serial)
else:
self.lockdown = LockdownClient()
except ConnectionRefusedError:
log.error("Unable to connect to the device over USB. Try to unplug, plug the device and start again.")
sys.exit(-1)
except ConnectionFailedError:
log.error("Unable to connect to the device %s", self.serial)
sys.exit(-1)
def module_init(self, module):
module.lockdown = self.lockdown

View File

@@ -31,32 +31,70 @@ class ProfileEvents(IOSExtraction):
"timestamp": record.get("timestamp"),
"module": self.__class__.__name__,
"event": "profile_operation",
"data": f"Process {record.get('process')} started operation {record.get('operation')} of profile {record.get('profile_id')}"
"data": f"Process {record.get('process')} started operation " \
f"{record.get('operation')} of profile {record.get('profile_id')}"
}
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_process(result.get("process"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
ioc = self.indicators.check_profile(result.get("profile_id"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
@staticmethod
def parse_profile_events(events_file_path) -> list:
results = []
with open(events_file_path, "rb") as handle:
events_plist = plistlib.load(handle)
if "ProfileEvents" not in events_plist:
return results
for event in events_plist["ProfileEvents"]:
key = list(event.keys())[0]
result = {
"profile_id": key,
"timestamp": "",
"operation": "",
"process": "",
}
for key, value in event[key].items():
key = key.lower()
if key == "timestamp":
result["timestamp"] = str(convert_timestamp_to_iso(value))
else:
result[key] = value
results.append(result)
return results
def run(self) -> None:
for events_file in self._get_backup_files_from_manifest(relative_path=CONF_PROFILES_EVENTS_RELPATH):
events_file_path = self._get_backup_file_from_id(events_file["file_id"])
if not events_file_path:
continue
with open(events_file_path, "rb") as handle:
events_plist = plistlib.load(handle)
self.log.info("Found MCProfileEvents.plist file at %s", events_file_path)
if "ProfileEvents" not in events_plist:
continue
self.results.extend(self.parse_profile_events(events_file_path))
for event in events_plist["ProfileEvents"]:
key = list(event.keys())[0]
self.log.info("On %s process \"%s\" started operation \"%s\" of profile \"%s\"",
event[key].get("timestamp"), event[key].get("process"),
event[key].get("operation"), key)
self.results.append({
"profile_id": key,
"timestamp": convert_timestamp_to_iso(event[key].get("timestamp")),
"operation": event[key].get("operation"),
"process": event[key].get("process"),
})
for result in self.results:
self.log.info("On %s process \"%s\" started operation \"%s\" of profile \"%s\"",
result.get("timestamp"), result.get("process"),
result.get("operation"), result.get("profile_id"))
self.log.info("Extracted %d profile events", len(self.results))

View File

@@ -1,10 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .applications import Applications
from .device_info import DeviceInfo
from .processes import Processes
USB_MODULES = [Applications, DeviceInfo, Processes]

View File

@@ -1,44 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pymobiledevice3.services.installation_proxy import \
InstallationProxyService
from .base import IOSUSBExtraction
class Applications(IOSUSBExtraction):
"""This class extracts all applications installed on the phone"""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["CFBundleIdentifier"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self) -> None:
user_apps = InstallationProxyService(lockdown=self.lockdown).get_apps("User")
for u in user_apps:
u["type"] = "user"
system_apps = InstallationProxyService(lockdown=self.lockdown).get_apps("System")
for s in system_apps:
s["type"] = "system"
self.results = user_apps + system_apps
self.log.info("{} applications identified on the phone".format(len(self.results)))

View File

@@ -1,25 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.module import MVTModule
log = logging.getLogger(__name__)
class IOSUSBExtraction(MVTModule):
"""This class provides a base for all iOS USB extraction modules."""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.device = None
self.serial = None
self.lockdown = None

View File

@@ -1,39 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import base64
import logging
from mvt.ios.versions import latest_ios_version
from .base import IOSUSBExtraction
class DeviceInfo(IOSUSBExtraction):
"""This class extracts all processes running on the phone."""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self) -> None:
self.results = self.lockdown.all_values
# Base64 encoding of bytes
for entry in self.results:
if isinstance(self.results[entry], bytes):
self.results[entry] = base64.b64encode(self.results[entry])
elif isinstance(self.results[entry], dict):
for second_entry in self.results[entry]:
if isinstance(self.results[entry][second_entry], bytes):
self.results[entry][second_entry] = base64.b64encode(self.results[entry][second_entry])
if "ProductVersion" in self.results:
latest = latest_ios_version()
if self.results["ProductVersion"] != latest["version"]:
self.log.warning("This phone is running an outdated iOS version: %s (latest is %s)",
self.results["ProductVersion"], latest['version'])

View File

@@ -1,40 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pymobiledevice3.services.os_trace import OsTraceService
from .base import IOSUSBExtraction
class Processes(IOSUSBExtraction):
"""This class extracts all processes running on the phone."""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_process(result["name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self) -> None:
processes = OsTraceService(lockdown=self.lockdown).get_pid_list().get('Payload')
for p in processes:
self.results.append({
"pid": p,
"name": processes[p]["ProcessName"]
})
self.log.info("{} running processes identified on the phone".format(len(self.results)))

View File

@@ -21,19 +21,17 @@ package_dir = = ./
include_package_data = True
python_requires = >= 3.8
install_requires =
click >=8.0.3
rich >=10.12.0
click >=8.1.3
rich >=12.4.4
tld >=0.12.6
tqdm >=4.62.3
requests >=2.26.0
simplejson >=3.17.5
packaging >=21.0
requests >=2.28.1
simplejson >=3.17.6
packaging >=21.3
appdirs >=1.4.4
iOSbackup >=0.9.921
adb-shell >=0.4.2
libusb1 >=2.0.1
cryptography >=36.0.1
pymobiledevice3 >= 1.23.9
adb-shell >=0.4.3
libusb1 >=3.0.0
cryptography >=37.0.4
pyyaml >=6.0
[options.packages.find]

View File

@@ -1,35 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pymobiledevice3.lockdown import LockdownClient
from mvt.common.module import run_module
from mvt.ios.modules.usb.applications import Applications
class TestUSBApplication:
def test_run(self, mocker):
mocker.patch("pymobiledevice3.lockdown.LockdownClient.start_service")
mocker.patch("pymobiledevice3.usbmux.select_device")
mocker.patch("pymobiledevice3.service_connection.ServiceConnection.create")
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.query_type",
return_value="com.apple.mobile.lockdown")
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.validate_pairing",
return_value=True)
mocker.patch(
"pymobiledevice3.services.installation_proxy.InstallationProxyService.get_apps",
return_value=[{"CFBundleIdentifier": "com.bad.app"}]
)
lockdown = LockdownClient()
m = Applications(log=logging)
m.lockdown = lockdown
run_module(m)
assert len(m.results) == 2

View File

@@ -1,34 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pymobiledevice3.lockdown import LockdownClient
from mvt.common.module import run_module
from mvt.ios.modules.usb.device_info import DeviceInfo
class TestUSBDeviceInfo:
def test_run(self, mocker):
mocker.patch("pymobiledevice3.usbmux.select_device")
mocker.patch("pymobiledevice3.service_connection.ServiceConnection.create")
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.query_type",
return_value="com.apple.mobile.lockdown")
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.validate_pairing",
return_value=True)
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.get_value",
return_value={'DeviceClass': 'iPhone', 'ProductVersion': '14.3'}
)
lockdown = LockdownClient()
m = DeviceInfo(log=logging)
m.lockdown = lockdown
run_module(m)
assert len(m.results) == 2

View File

@@ -1,29 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.indicators import Indicators
from mvt.common.module import run_module
from mvt.ios.modules.usb.processes import Processes
class TestUSBProcesses:
def test_run(self, mocker, indicator_file):
mocker.patch("pymobiledevice3.services.base_service.BaseService.__init__")
mocker.patch(
"pymobiledevice3.services.os_trace.OsTraceService.get_pid_list",
return_value={"Payload": {"1": {"ProcessName": "storebookkeeperd"}, "1854": {"ProcessName": "cfprefssd"}}}
)
ind = Indicators(log=logging)
ind.parse_stix2(indicator_file)
ind.ioc_collections[0]["processes"].append("cfprefssd")
m = Processes(log=logging)
m.indicators = ind
run_module(m)
assert len(m.results) == 2
assert len(m.detected) == 1