1
mirror of https://github.com/mvt-project/mvt synced 2025-10-21 22:42:15 +02:00

Compare commits

..

20 Commits

Author SHA1 Message Date
Nex
fbf510567c Bumped version 2022-07-07 13:51:56 +02:00
Nex
94fe98b9ec Removed unused imports 2022-07-07 13:00:38 +02:00
Nex
a328d57551 Added test-upload to Makefile 2022-07-07 12:31:35 +02:00
Nex
a9eabc5d9d Updated dependencies 2022-07-07 12:28:42 +02:00
Nex
1ed6140cb6 Got rid of tqdm in favor of rich progress bar 2022-07-07 12:28:30 +02:00
Nex
efceb777f0 Small clean ups and type hints of mvt-android 2022-07-06 18:38:16 +02:00
Nex
14bbbd9e45 Refactored mvt-android adb Files module in order to keep copy of suspicious payloads 2022-07-06 17:45:21 +02:00
Nex
3cdc6da428 Temporarily removed mvt-ios check-usb command 2022-07-06 13:01:55 +02:00
Nex
459ff8c51c Adding some more checks to bugreport packages module 2022-07-05 18:10:48 +02:00
Nex
88665cf7dd Merge pull request #289 from lorenzo-reho/main
Fixed cmd_download_apks serial connection bug
2022-07-02 18:22:59 +02:00
lorenzo-reho
0a749da85f Fixed cmd_download_apks serial connection bug 2022-07-02 16:14:27 +02:00
Nex
f81604133a Fixed Prompt imports 2022-06-30 11:06:37 +02:00
Nex
cdd9b74cbc Replaced getpass with Prompt 2022-06-30 10:58:50 +02:00
Nex
3fb37b4f30 Added finish() method to Command class 2022-06-30 10:26:33 +02:00
Nex
2fe8b58c09 Removed space 2022-06-30 10:26:30 +02:00
tek
61d0c4134d Fixes a bug in mvt-android download-apks 2022-06-29 23:06:49 +02:00
Nex
6b36fe5fca Re-adding again empty spacing that went missing 2022-06-29 10:35:30 +02:00
Nex
c9f54947e3 Small language and style changes 2022-06-29 01:11:30 +02:00
Nex
ae6fec5ac5 Merge branch 'Te-k-feature/ios-check-usb' 2022-06-29 00:57:32 +02:00
Nex
298726ab2b Minor style fixes 2022-06-29 00:57:25 +02:00
23 changed files with 182 additions and 482 deletions

View File

@@ -8,3 +8,6 @@ dist:
upload: upload:
python3 -m twine upload dist/* python3 -m twine upload dist/*
test-upload:
python3 -m twine upload --repository testpypi dist/*

View File

@@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/ # https://license.mvt.re/1.1/
import logging import logging
import os
import click import click
from rich.logging import RichHandler from rich.logging import RichHandler
@@ -71,15 +70,7 @@ def download_apks(ctx, all_apks, virustotal, output, from_file, serial):
log.critical("You need to specify an output folder with --output!") log.critical("You need to specify an output folder with --output!")
ctx.exit(1) ctx.exit(1)
if not os.path.exists(output): download = DownloadAPKs(results_path=output, all_apks=all_apks)
try:
os.makedirs(output)
except Exception as e:
log.critical("Unable to create output folder %s: %s", output, e)
ctx.exit(1)
download = DownloadAPKs(output_folder=output, all_apks=all_apks,
log=logging.getLogger(DownloadAPKs.__module__))
if serial: if serial:
download.serial = serial download.serial = serial
download.run() download.run()

View File

@@ -3,13 +3,15 @@
# Use of this software is governed by the MVT License 1.1 that can be found at # Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/ # https://license.mvt.re/1.1/
import getpass
import io import io
import logging import logging
import os import os
import sys import sys
import tarfile import tarfile
from pathlib import Path from pathlib import Path
from typing import Callable
from rich.prompt import Prompt
from mvt.android.parsers.backup import (AndroidBackupParsingError, from mvt.android.parsers.backup import (AndroidBackupParsingError,
InvalidBackupPassword, parse_ab_header, InvalidBackupPassword, parse_ab_header,
@@ -37,7 +39,7 @@ class CmdAndroidCheckBackup(Command):
self.backup_archive = None self.backup_archive = None
self.backup_files = [] self.backup_files = []
def init(self): def init(self) -> None:
if os.path.isfile(self.target_path): if os.path.isfile(self.target_path):
self.backup_type = "ab" self.backup_type = "ab"
with open(self.target_path, "rb") as handle: with open(self.target_path, "rb") as handle:
@@ -50,7 +52,7 @@ class CmdAndroidCheckBackup(Command):
password = None password = None
if header["encryption"] != "none": if header["encryption"] != "none":
password = getpass.getpass(prompt="Backup Password: ", stream=None) password = Prompt.ask("Enter backup password", password=True)
try: try:
tardata = parse_backup_file(data, password=password) tardata = parse_backup_file(data, password=password)
except InvalidBackupPassword: except InvalidBackupPassword:
@@ -76,7 +78,7 @@ class CmdAndroidCheckBackup(Command):
log.critical("Invalid backup path, path should be a folder or an Android Backup (.ab) file") log.critical("Invalid backup path, path should be a folder or an Android Backup (.ab) file")
sys.exit(1) sys.exit(1)
def module_init(self, module): def module_init(self, module: Callable) -> None:
if self.backup_type == "folder": if self.backup_type == "folder":
module.from_folder(self.target_path, self.backup_files) module.from_folder(self.target_path, self.backup_files)
else: else:

View File

@@ -6,6 +6,7 @@
import logging import logging
import os import os
from pathlib import Path from pathlib import Path
from typing import Callable
from zipfile import ZipFile from zipfile import ZipFile
from mvt.common.command import Command from mvt.common.command import Command
@@ -31,7 +32,7 @@ class CmdAndroidCheckBugreport(Command):
self.bugreport_archive = None self.bugreport_archive = None
self.bugreport_files = [] self.bugreport_files = []
def init(self): def init(self) -> None:
if os.path.isfile(self.target_path): if os.path.isfile(self.target_path):
self.bugreport_format = "zip" self.bugreport_format = "zip"
self.bugreport_archive = ZipFile(self.target_path) self.bugreport_archive = ZipFile(self.target_path)
@@ -44,7 +45,7 @@ class CmdAndroidCheckBugreport(Command):
for file_name in subfiles: for file_name in subfiles:
self.bugreport_files.append(os.path.relpath(os.path.join(root, file_name), parent_path)) self.bugreport_files.append(os.path.relpath(os.path.join(root, file_name), parent_path))
def module_init(self, module): def module_init(self, module: Callable) -> None:
if self.bugreport_format == "zip": if self.bugreport_format == "zip":
module.from_zip(self.bugreport_archive, self.bugreport_files) module.from_zip(self.bugreport_archive, self.bugreport_files)
else: else:

View File

@@ -6,8 +6,9 @@
import json import json
import logging import logging
import os import os
from typing import Callable
from tqdm import tqdm from rich.progress import track
from mvt.common.module import InsufficientPrivileges from mvt.common.module import InsufficientPrivileges
@@ -17,18 +18,6 @@ from .modules.adb.packages import Packages
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# TODO: Would be better to replace tqdm with rich.progress to reduce
# the number of dependencies. Need to investigate whether
# it's possible to have a similar callback system.
class PullProgress(tqdm):
"""PullProgress is a tqdm update system for APK downloads."""
def update_to(self, file_name, current, total):
if total is not None:
self.total = total
self.update(current - self.n)
class DownloadAPKs(AndroidExtraction): class DownloadAPKs(AndroidExtraction):
"""DownloadAPKs is the main class operating the download of APKs """DownloadAPKs is the main class operating the download of APKs
from the device. from the device.
@@ -36,22 +25,22 @@ class DownloadAPKs(AndroidExtraction):
""" """
def __init__(self, output_folder=None, all_apks=False, log=None, def __init__(self, results_path: str = "", all_apks: bool = False,
packages=None): packages: list = []):
"""Initialize module. """Initialize module.
:param output_folder: Path to the folder where data should be stored :param results_path: Path to the folder where data should be stored
:param all_apks: Boolean indicating whether to download all packages :param all_apks: Boolean indicating whether to download all packages
or filter known-goods or filter known-goods
:param packages: Provided list of packages, typically for JSON checks :param packages: Provided list of packages, typically for JSON checks
""" """
super().__init__(output_folder=output_folder, log=log) super().__init__(results_path=results_path, log=log)
self.packages = packages self.packages = packages
self.all_apks = all_apks self.all_apks = all_apks
self.output_folder_apk = None self.results_path_apks = None
@classmethod @classmethod
def from_json(cls, json_path): def from_json(cls, json_path: str) -> Callable:
"""Initialize this class from an existing apks.json file. """Initialize this class from an existing apks.json file.
:param json_path: Path to the apks.json file to parse. :param json_path: Path to the apks.json file to parse.
@@ -61,7 +50,7 @@ class DownloadAPKs(AndroidExtraction):
packages = json.load(handle) packages = json.load(handle)
return cls(packages=packages) return cls(packages=packages)
def pull_package_file(self, package_name, remote_path): def pull_package_file(self, package_name: str, remote_path: str) -> None:
"""Pull files related to specific package from the device. """Pull files related to specific package from the device.
:param package_name: Name of the package to download :param package_name: Name of the package to download
@@ -75,7 +64,7 @@ class DownloadAPKs(AndroidExtraction):
if "==/" in remote_path: if "==/" in remote_path:
file_name = "_" + remote_path.split("==/")[1].replace(".apk", "") file_name = "_" + remote_path.split("==/")[1].replace(".apk", "")
local_path = os.path.join(self.output_folder_apk, local_path = os.path.join(self.results_path_apks,
f"{package_name}{file_name}.apk") f"{package_name}{file_name}.apk")
name_counter = 0 name_counter = 0
while True: while True:
@@ -83,14 +72,11 @@ class DownloadAPKs(AndroidExtraction):
break break
name_counter += 1 name_counter += 1
local_path = os.path.join(self.output_folder_apk, local_path = os.path.join(self.results_path_apks,
f"{package_name}{file_name}_{name_counter}.apk") f"{package_name}{file_name}_{name_counter}.apk")
try: try:
with PullProgress(unit='B', unit_divisor=1024, unit_scale=True, self._adb_download(remote_path, local_path)
miniters=1) as pp:
self._adb_download(remote_path, local_path,
progress_callback=pp.update_to)
except InsufficientPrivileges: except InsufficientPrivileges:
log.warn("Unable to pull package file from %s: insufficient privileges, it might be a system app", log.warn("Unable to pull package file from %s: insufficient privileges, it might be a system app",
remote_path) remote_path)
@@ -104,26 +90,24 @@ class DownloadAPKs(AndroidExtraction):
return local_path return local_path
def get_packages(self): def get_packages(self) -> None:
"""Use the Packages adb module to retrieve the list of packages. """Use the Packages adb module to retrieve the list of packages.
We reuse the same extraction logic to then download the APKs. We reuse the same extraction logic to then download the APKs.
""" """
self.log.info("Retrieving list of installed packages...") self.log.info("Retrieving list of installed packages...")
m = Packages() m = Packages()
m.log = self.log m.log = self.log
m.serial = self.serial
m.run() m.run()
self.packages = m.results self.packages = m.results
def pull_packages(self): def pull_packages(self) -> None:
"""Download all files of all selected packages from the device.""" """Download all files of all selected packages from the device.
log.info("Starting extraction of installed APKs at folder %s", self.output_folder) """
log.info("Starting extraction of installed APKs at folder %s",
if not os.path.exists(self.output_folder): self.results_path)
os.mkdir(self.output_folder)
# If the user provided the flag --all-apks we select all packages. # If the user provided the flag --all-apks we select all packages.
packages_selection = [] packages_selection = []
@@ -137,7 +121,7 @@ class DownloadAPKs(AndroidExtraction):
if not package.get("system", False): if not package.get("system", False):
packages_selection.append(package) packages_selection.append(package)
log.info("Selected only %d packages which are not marked as system", log.info("Selected only %d packages which are not marked as \"system\"",
len(packages_selection)) len(packages_selection))
if len(packages_selection) == 0: if len(packages_selection) == 0:
@@ -146,15 +130,15 @@ class DownloadAPKs(AndroidExtraction):
log.info("Downloading packages from device. This might take some time ...") log.info("Downloading packages from device. This might take some time ...")
self.output_folder_apk = os.path.join(self.output_folder, "apks") self.results_path_apks = os.path.join(self.results_path, "apks")
if not os.path.exists(self.output_folder_apk): if not os.path.exists(self.results_path_apks):
os.mkdir(self.output_folder_apk) os.makedirs(self.results_path_apks, exist_ok=True)
counter = 0 for i in track(range(len(packages_selection)),
for package in packages_selection: description=f"Downloading {len(packages_selection)} packages..."):
counter += 1 package = packages_selection[i]
log.info("[%d/%d] Package: %s", counter, len(packages_selection), log.info("[%d/%d] Package: %s", i, len(packages_selection),
package["package_name"]) package["package_name"])
# Sometimes the package path contains multiple lines for multiple apks. # Sometimes the package path contains multiple lines for multiple apks.
@@ -170,14 +154,12 @@ class DownloadAPKs(AndroidExtraction):
log.info("Download of selected packages completed") log.info("Download of selected packages completed")
def save_json(self): def save_json(self) -> None:
"""Save the results to the package.json file.""" json_path = os.path.join(self.results_path, "apks.json")
json_path = os.path.join(self.output_folder, "apks.json")
with open(json_path, "w", encoding="utf-8") as handle: with open(json_path, "w", encoding="utf-8") as handle:
json.dump(self.packages, handle, indent=4) json.dump(self.packages, handle, indent=4)
def run(self) -> None: def run(self) -> None:
"""Run all steps of fetch-apk."""
self.get_packages() self.get_packages()
self._adb_connect() self._adb_connect()
self.pull_packages() self.pull_packages()

View File

@@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/ # https://license.mvt.re/1.1/
import base64 import base64
import getpass
import logging import logging
import os import os
import random import random
@@ -19,6 +18,7 @@ from adb_shell.auth.keygen import keygen, write_public_keyfile
from adb_shell.auth.sign_pythonrsa import PythonRSASigner from adb_shell.auth.sign_pythonrsa import PythonRSASigner
from adb_shell.exceptions import (AdbCommandFailureException, DeviceAuthError, from adb_shell.exceptions import (AdbCommandFailureException, DeviceAuthError,
UsbDeviceNotFoundError, UsbReadFailedError) UsbDeviceNotFoundError, UsbReadFailedError)
from rich.prompt import Prompt
from usb1 import USBErrorAccess, USBErrorBusy from usb1 import USBErrorAccess, USBErrorBusy
from mvt.android.parsers.backup import (InvalidBackupPassword, parse_ab_header, from mvt.android.parsers.backup import (InvalidBackupPassword, parse_ab_header,
@@ -270,7 +270,7 @@ class AndroidExtraction(MVTModule):
return parse_backup_file(backup_output, password=None) return parse_backup_file(backup_output, password=None)
for password_retry in range(0, 3): for password_retry in range(0, 3):
backup_password = getpass.getpass(prompt="Backup password: ", stream=None) backup_password = Prompt.ask("Enter backup password", password=True)
try: try:
decrypted_backup_tar = parse_backup_file(backup_output, backup_password) decrypted_backup_tar = parse_backup_file(backup_output, backup_password)
return decrypted_backup_tar return decrypted_backup_tar

View File

@@ -5,6 +5,7 @@
import datetime import datetime
import logging import logging
import os
import stat import stat
from mvt.common.utils import convert_timestamp_to_iso from mvt.common.utils import convert_timestamp_to_iso
@@ -13,6 +14,15 @@ from .base import AndroidExtraction
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
ANDROID_TMP_FOLDERS = [
"/tmp/",
"/data/local/tmp/",
]
ANDROID_MEDIA_FOLDERS = [
"/data/media/0",
"/sdcard/",
]
class Files(AndroidExtraction): class Files(AndroidExtraction):
"""This module extracts the list of files on the device.""" """This module extracts the list of files on the device."""
@@ -25,13 +35,50 @@ class Files(AndroidExtraction):
log=log, results=results) log=log, results=results)
self.full_find = False self.full_find = False
def serialize(self, record: dict) -> None:
if "modified_time" in record:
return {
"timestamp": record["modified_time"],
"module": self.__class__.__name__,
"event": "file_modified",
"data": record["path"],
}
def check_indicators(self) -> None:
for result in self.results:
if result.get("is_suid"):
self.log.warning("Found an SUID file in a non-standard directory \"%s\".",
result["path"])
if self.indicators and self.indicators.check_file_path(result["path"]):
self.log.warning("Found a known suspicous file at path: \"%s\"", result["path"])
self.detected.append(result)
def backup_file(self, file_path: str) -> None:
local_file_name = file_path.replace("/", "_").replace(" ", "-")
local_files_folder = os.path.join(self.results_path, "files")
if not os.path.exists(local_files_folder):
os.mkdir(local_files_folder)
local_file_path = os.path.join(local_files_folder, local_file_name)
try:
self._adb_download(remote_path=file_path,
local_path=local_file_path)
except Exception:
pass
else:
self.log.info("Downloaded file %s to local copy at %s",
file_path, local_file_path)
def find_files(self, folder: str) -> None: def find_files(self, folder: str) -> None:
if self.full_find: if self.full_find:
output = self._adb_command(f"find '{folder}' -printf '%T@ %m %s %u %g %p\n' 2> /dev/null") output = self._adb_command(f"find '{folder}' -type f -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
for file_line in output.splitlines(): for file_line in output.splitlines():
[unix_timestamp, mode, size, owner, group, full_path] = file_line.rstrip().split(" ", 5) [unix_timestamp, mode, size, owner, group, full_path] = file_line.rstrip().split(" ", 5)
mod_time = convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(float(unix_timestamp)))) mod_time = convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(float(unix_timestamp))))
self.results.append({ self.results.append({
"path": full_path, "path": full_path,
"modified_time": mod_time, "modified_time": mod_time,
@@ -43,39 +90,10 @@ class Files(AndroidExtraction):
"group": group, "group": group,
}) })
else: else:
output = self._adb_command(f"find '{folder}' 2> /dev/null") output = self._adb_command(f"find '{folder}' -type f 2> /dev/null")
for file_line in output.splitlines(): for file_line in output.splitlines():
self.results.append({"path": file_line.rstrip()}) self.results.append({"path": file_line.rstrip()})
def serialize(self, record: dict) -> None:
if "modified_time" in record:
return {
"timestamp": record["modified_time"],
"module": self.__class__.__name__,
"event": "file_modified",
"data": record["path"],
}
def check_suspicious(self) -> None:
"""Check for files with suspicious permissions"""
for result in sorted(self.results, key=lambda item: item["path"]):
if result.get("is_suid"):
self.log.warning("Found an SUID file in a non-standard directory \"%s\".",
result["path"])
self.detected.append(result)
def check_indicators(self) -> None:
"""Check file list for known suspicious files or suspicious properties"""
self.check_suspicious()
if not self.indicators:
return
for result in self.results:
if self.indicators.check_file_path(result["path"]):
self.log.warning("Found a known suspicous file at path: \"%s\"", result["path"])
self.detected.append(result)
def run(self) -> None: def run(self) -> None:
self._adb_connect() self._adb_connect()
@@ -83,16 +101,27 @@ class Files(AndroidExtraction):
if output or output.strip().splitlines(): if output or output.strip().splitlines():
self.full_find = True self.full_find = True
for data_path in ["/data/local/tmp/", "/sdcard/", "/tmp/"]: for tmp_folder in ANDROID_TMP_FOLDERS:
self.find_files(data_path) self.find_files(tmp_folder)
self.log.info("Found %s files in primary Android data directories", len(self.results)) for entry in self.results:
self.log.info("Found file in tmp folder at path %s",
entry.get("path"))
if self.results_path:
self.backup_file(entry.get("path"))
for media_folder in ANDROID_MEDIA_FOLDERS:
self.find_files(media_folder)
self.log.info("Found %s files in primary Android tmp and media folders",
len(self.results))
if self.fast_mode: if self.fast_mode:
self.log.info("Flag --fast was enabled: skipping full file listing") self.log.info("Flag --fast was enabled: skipping full file listing")
else: else:
self.log.info("Processing full file listing. This may take a while...") self.log.info("Processing full file listing. This may take a while...")
self.find_files("/") self.find_files("/")
self.log.info("Found %s total files", len(self.results)) self.log.info("Found %s total files", len(self.results))
self._adb_disconnect() self._adb_disconnect()

View File

@@ -6,6 +6,10 @@
import logging import logging
import re import re
from mvt.android.modules.adb.packages import (DANGEROUS_PERMISSIONS,
DANGEROUS_PERMISSIONS_THRESHOLD,
ROOT_PACKAGES)
from .base import BugReportModule from .base import BugReportModule
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -41,11 +45,17 @@ class Packages(BugReportModule):
return records return records
def check_indicators(self) -> None: def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results: for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"]) if result["package_name"] in ROOT_PACKAGES:
self.log.warning("Found an installed package related to rooting/jailbreaking: \"%s\"",
result["package_name"])
self.detected.append(result)
continue
if not self.indicators:
continue
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc: if ioc:
result["matched_indicator"] = ioc result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
@@ -165,4 +175,14 @@ class Packages(BugReportModule):
self.results = self.parse_packages_list("\n".join(lines)) self.results = self.parse_packages_list("\n".join(lines))
for result in self.results:
dangerous_permissions_count = 0
for perm in result["requested_permissions"]:
if perm in DANGEROUS_PERMISSIONS:
dangerous_permissions_count += 1
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
self.log.info("Found package \"%s\" requested %d potentially dangerous permissions",
result["package_name"], dangerous_permissions_count)
self.log.info("Extracted details on %d packages", len(self.results)) self.log.info("Extracted details on %d packages", len(self.results))

View File

@@ -136,6 +136,9 @@ class Command(object):
def module_init(self, module: Callable) -> None: def module_init(self, module: Callable) -> None:
raise NotImplementedError raise NotImplementedError
def finish(self) -> None:
raise NotImplementedError
def run(self) -> None: def run(self) -> None:
self._create_storage() self._create_storage()
self._add_log_file_handler(self.log) self._add_log_file_handler(self.log)
@@ -176,3 +179,8 @@ class Command(object):
self._store_timeline() self._store_timeline()
self._store_info() self._store_info()
try:
self.finish()
except NotImplementedError:
pass

View File

@@ -3,4 +3,4 @@
# Use of this software is governed by the MVT License 1.1 that can be found at # Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/ # https://license.mvt.re/1.1/
MVT_VERSION = "1.6" MVT_VERSION = "2.0"

View File

@@ -13,14 +13,13 @@ from rich.prompt import Prompt
from mvt.common.cmd_check_iocs import CmdCheckIOCS from mvt.common.cmd_check_iocs import CmdCheckIOCS
from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC, from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC,
HELP_MSG_LIST_MODULES, HELP_MSG_MODULE, HELP_MSG_LIST_MODULES, HELP_MSG_MODULE,
HELP_MSG_OUTPUT, HELP_MSG_SERIAL) HELP_MSG_OUTPUT)
from mvt.common.logo import logo from mvt.common.logo import logo
from mvt.common.options import MutuallyExclusiveOption from mvt.common.options import MutuallyExclusiveOption
from mvt.common.updates import IndicatorsUpdates from mvt.common.updates import IndicatorsUpdates
from .cmd_check_backup import CmdIOSCheckBackup from .cmd_check_backup import CmdIOSCheckBackup
from .cmd_check_fs import CmdIOSCheckFS from .cmd_check_fs import CmdIOSCheckFS
from .cmd_check_usb import CmdIOSCheckUSB
from .decrypt import DecryptBackup from .decrypt import DecryptBackup
from .modules.backup import BACKUP_MODULES from .modules.backup import BACKUP_MODULES
from .modules.fs import FS_MODULES from .modules.fs import FS_MODULES
@@ -216,34 +215,3 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
def download_iocs(): def download_iocs():
ioc_updates = IndicatorsUpdates() ioc_updates = IndicatorsUpdates()
ioc_updates.update() ioc_updates.update()
#==============================================================================
# Command: check-usb
#==============================================================================
@cli.command("check-usb", help="Extract artifacts from a live iPhone through USB / lockdown")
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
# TODO: serial
# @click.argument("BACKUP_PATH", type=click.Path(exists=True))
@click.pass_context
def check_usb(ctx, serial, iocs, output, fast, list_modules, module):
cmd = CmdIOSCheckUSB(results_path=output, ioc_files=iocs,
module_name=module, fast_mode=fast,
serial=serial)
if list_modules:
cmd.list_modules()
return
log.info("Checking iPhone through USB, this may take a while")
cmd.run()
if len(cmd.timeline_detected) > 0:
log.warning("The analysis of the data produced %d detections!",
len(cmd.timeline_detected))

View File

@@ -1,46 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sys
from pymobiledevice3.exceptions import ConnectionFailedError
from pymobiledevice3.lockdown import LockdownClient
from mvt.common.command import Command
from .modules.usb import USB_MODULES
log = logging.getLogger(__name__)
class CmdIOSCheckUSB(Command):
name = "check-usb"
modules = USB_MODULES
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
self.lockdown = None
def init(self):
try:
if self.serial:
self.lockdown = LockdownClient(udid=self.serial)
else:
self.lockdown = LockdownClient()
except ConnectionRefusedError:
log.error("Unable to connect to the device over USB. Try to unplug, plug the device and start again.")
sys.exit(-1)
except ConnectionFailedError:
log.error("Unable to connect to the device %s", self.serial)
sys.exit(-1)
def module_init(self, module):
module.lockdown = self.lockdown

View File

@@ -1,10 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .applications import Applications
from .device_info import DeviceInfo
from .processes import Processes
USB_MODULES = [Applications, DeviceInfo, Processes]

View File

@@ -1,44 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pymobiledevice3.services.installation_proxy import \
InstallationProxyService
from .base import IOSUSBExtraction
class Applications(IOSUSBExtraction):
"""This class extracts all applications installed on the phone"""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["CFBundleIdentifier"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self) -> None:
user_apps = InstallationProxyService(lockdown=self.lockdown).get_apps("User")
for u in user_apps:
u["type"] = "user"
system_apps = InstallationProxyService(lockdown=self.lockdown).get_apps("System")
for s in system_apps:
s["type"] = "system"
self.results = user_apps + system_apps
self.log.info("{} applications identified on the phone".format(len(self.results)))

View File

@@ -1,25 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.module import MVTModule
log = logging.getLogger(__name__)
class IOSUSBExtraction(MVTModule):
"""This class provides a base for all iOS USB extraction modules."""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.device = None
self.serial = None
self.lockdown = None

View File

@@ -1,39 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import base64
import logging
from mvt.ios.versions import latest_ios_version
from .base import IOSUSBExtraction
class DeviceInfo(IOSUSBExtraction):
"""This class extracts all processes running on the phone."""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self) -> None:
self.results = self.lockdown.all_values
# Base64 encoding of bytes
for entry in self.results:
if isinstance(self.results[entry], bytes):
self.results[entry] = base64.b64encode(self.results[entry])
elif isinstance(self.results[entry], dict):
for second_entry in self.results[entry]:
if isinstance(self.results[entry][second_entry], bytes):
self.results[entry][second_entry] = base64.b64encode(self.results[entry][second_entry])
if "ProductVersion" in self.results:
latest = latest_ios_version()
if self.results["ProductVersion"] != latest["version"]:
self.log.warning("This phone is running an outdated iOS version: %s (latest is %s)",
self.results["ProductVersion"], latest['version'])

View File

@@ -1,40 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pymobiledevice3.services.os_trace import OsTraceService
from .base import IOSUSBExtraction
class Processes(IOSUSBExtraction):
"""This class extracts all processes running on the phone."""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_process(result["name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self) -> None:
processes = OsTraceService(lockdown=self.lockdown).get_pid_list().get('Payload')
for p in processes:
self.results.append({
"pid": p,
"name": processes[p]["ProcessName"]
})
self.log.info("{} running processes identified on the phone".format(len(self.results)))

View File

@@ -21,19 +21,17 @@ package_dir = = ./
include_package_data = True include_package_data = True
python_requires = >= 3.8 python_requires = >= 3.8
install_requires = install_requires =
click >=8.0.3 click >=8.1.3
rich >=10.12.0 rich >=12.4.4
tld >=0.12.6 tld >=0.12.6
tqdm >=4.62.3 requests >=2.28.1
requests >=2.26.0 simplejson >=3.17.6
simplejson >=3.17.5 packaging >=21.3
packaging >=21.0
appdirs >=1.4.4 appdirs >=1.4.4
iOSbackup >=0.9.921 iOSbackup >=0.9.921
adb-shell >=0.4.2 adb-shell >=0.4.3
libusb1 >=2.0.1 libusb1 >=3.0.0
cryptography >=36.0.1 cryptography >=37.0.4
pymobiledevice3 >= 1.23.9
pyyaml >=6.0 pyyaml >=6.0
[options.packages.find] [options.packages.find]

View File

@@ -1,35 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pymobiledevice3.lockdown import LockdownClient
from mvt.common.module import run_module
from mvt.ios.modules.usb.applications import Applications
class TestUSBApplication:
def test_run(self, mocker):
mocker.patch("pymobiledevice3.lockdown.LockdownClient.start_service")
mocker.patch("pymobiledevice3.usbmux.select_device")
mocker.patch("pymobiledevice3.service_connection.ServiceConnection.create")
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.query_type",
return_value="com.apple.mobile.lockdown")
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.validate_pairing",
return_value=True)
mocker.patch(
"pymobiledevice3.services.installation_proxy.InstallationProxyService.get_apps",
return_value=[{"CFBundleIdentifier": "com.bad.app"}]
)
lockdown = LockdownClient()
m = Applications(log=logging)
m.lockdown = lockdown
run_module(m)
assert len(m.results) == 2

View File

@@ -1,34 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pymobiledevice3.lockdown import LockdownClient
from mvt.common.module import run_module
from mvt.ios.modules.usb.device_info import DeviceInfo
class TestUSBDeviceInfo:
def test_run(self, mocker):
mocker.patch("pymobiledevice3.usbmux.select_device")
mocker.patch("pymobiledevice3.service_connection.ServiceConnection.create")
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.query_type",
return_value="com.apple.mobile.lockdown")
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.validate_pairing",
return_value=True)
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.get_value",
return_value={'DeviceClass': 'iPhone', 'ProductVersion': '14.3'}
)
lockdown = LockdownClient()
m = DeviceInfo(log=logging)
m.lockdown = lockdown
run_module(m)
assert len(m.results) == 2

View File

@@ -1,29 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.indicators import Indicators
from mvt.common.module import run_module
from mvt.ios.modules.usb.processes import Processes
class TestUSBProcesses:
def test_run(self, mocker, indicator_file):
mocker.patch("pymobiledevice3.services.base_service.BaseService.__init__")
mocker.patch(
"pymobiledevice3.services.os_trace.OsTraceService.get_pid_list",
return_value={"Payload": {"1": {"ProcessName": "storebookkeeperd"}, "1854": {"ProcessName": "cfprefssd"}}}
)
ind = Indicators(log=logging)
ind.parse_stix2(indicator_file)
ind.ioc_collections[0]["processes"].append("cfprefssd")
m = Processes(log=logging)
m.indicators = ind
run_module(m)
assert len(m.results) == 2
assert len(m.detected) == 1