1
mirror of https://github.com/mvt-project/mvt synced 2025-10-21 22:42:15 +02:00

Compare commits

..

45 Commits

Author SHA1 Message Date
Nex
f1d039346d Bumped version 2021-09-14 14:33:17 +02:00
Nex
ccdfd92d4a Merge branch 'dozenfossil-main' 2021-09-14 14:29:21 +02:00
Nex
032b229eb8 Minor changes for consistency 2021-09-14 14:29:04 +02:00
Nex
93936976c7 Merge branch 'main' of https://github.com/dozenfossil/mvt into dozenfossil-main 2021-09-14 14:26:37 +02:00
Nex
f3a4e9d108 Merge pull request #186 from beneficentboast/main
fix error for manipulated entries in DataUsage/NetUsage
2021-09-14 14:26:00 +02:00
Nex
93a9735b5e Reordering 2021-09-14 14:21:54 +02:00
Nex
7b0e2d4564 Added version 2021-09-14 14:20:54 +02:00
beneficentboast
725a99bcd5 fix error for manipulated entries in DataUsage 2021-09-13 20:13:43 +02:00
dozenfossil
35a6f6ec9a fix multi path/file issue 2021-09-13 20:02:48 +02:00
Nex
3f9809f36c Formatting docstrings 2021-09-11 02:39:33 +02:00
Nex
6da6595108 More docstrings 2021-09-10 20:09:37 +02:00
Nex
35dfeaccee Re-ordered list of shortener domains 2021-09-10 15:21:02 +02:00
Nex
e5f2aa3c3d Standardizing reST docstrings 2021-09-10 15:18:13 +02:00
Nex
3236c1b390 Added new TCC module 2021-09-09 12:00:48 +02:00
Nex
80a670273d Added additional locationd path 2021-09-07 15:18:00 +02:00
Nex
969b5cc506 Fixed bug in locationd module 2021-09-07 15:06:19 +02:00
Nex
ef8622d4c3 Changed event name 2021-09-03 14:49:04 +02:00
Nex
e39e9e6f92 Cleaned up and simplified module 2021-09-03 14:48:24 +02:00
Nex
7b32ed3179 Compacted record data 2021-09-03 14:41:55 +02:00
Nex
315317863e Fixed documentation 2021-09-03 14:06:01 +02:00
Nex
08d35b056a Merge branch 'guitarsinger-main' 2021-09-03 13:35:59 +02:00
Nex
3e679312d1 Renamed module 2021-09-03 13:35:27 +02:00
guitarsinger
be4f1afed6 add OSAnalyticsADDAILY 2021-09-03 11:59:44 +02:00
Nex
0dea25d86e Reverted version number to minor 2021-09-02 15:33:36 +02:00
Nex
505d3c7e60 Bumped version 2021-09-02 15:31:25 +02:00
Nex
8f04c09b75 Removed duplicate 2021-09-02 15:28:17 +02:00
Nex
595b7e2066 Fixed typo 2021-09-02 15:27:00 +02:00
Nex
d3941bb5d3 Merge pull request #177 from harsaphes/main
Checking idstatuscache.plist in a dump for iOS>14.7
2021-09-01 22:00:51 +02:00
Nex
194c8a0ac1 Using new function to retrieve local db path 2021-09-01 21:59:12 +02:00
Nex
bef190fe50 Merge pull request #178 from mvt-project/webkit_error
Fixes a bug in retrieving the backup file path in webkit session resource log
2021-09-01 21:57:49 +02:00
tek
cacf027051 Fixes a bug in retrieving the backup file path in webkit session resource logs 2021-09-01 15:49:23 -04:00
tek
da97f5ca30 Add db recovery to Safari history module 2021-09-01 15:40:45 -04:00
Nex
a774577940 Handling some exceptions more gracefully 2021-09-01 13:41:21 +02:00
Nex
7252cc82a7 Added module to dump full output of dumpsys 2021-08-30 22:20:05 +02:00
Nex
b34d80fd11 Logging module completed 2021-08-30 22:19:28 +02:00
Nex
0347dfa3c9 Added module Files to pull list of visible file pathso 2021-08-30 22:11:07 +02:00
Nex
28647b8493 Fixed is_dir() to isdir() 2021-08-30 22:08:29 +02:00
harsaphes
c2ec26fd75 Checking idstatuscache.plist in a dump for iOS>14.7 2021-08-30 21:01:59 +02:00
Nex
856a6fb895 Cleaning up some classes 2021-08-28 12:33:27 +02:00
Nex
62f3c535df Merge pull request #176 from JeffLIrion/patch-1
Fix `_adb_check_keys` method
2021-08-28 12:25:52 +02:00
Jeff Irion
34c64af815 Fix _adb_check_keys method 2021-08-27 23:26:50 -07:00
Nex
ea4da71277 Creating android home folder if missing 2021-08-27 19:12:09 +02:00
Nex
94fe3c90e0 Added logcat modules 2021-08-26 15:23:54 +02:00
Nex
f78332aa71 Split receivers into a new package 2021-08-26 14:51:56 +02:00
Nex
0c4eb0bb34 Added discovery of Android packages with potentially abusive receivers 2021-08-26 14:08:39 +02:00
42 changed files with 788 additions and 201 deletions

View File

@@ -148,6 +148,18 @@ If indicators are provided through the command-line, they are checked against th
---
### `os_analytics_ad_daily.json`
!!! info "Availability"
Backup: :material-check:
Full filesystem dump: :material-check:
This JSON file is created by mvt-ios' `OSAnalyticsADDaily` module. The module extracts records from a plist located *private/var/mobile/Library/Preferences/com.apple.osanalytics.addaily.plist*, which contains a history of data usage by processes running on the system. Besides the network statistics, these records are particularly important because they might show traces of malicious process executions and the relevant timeframe.
If indicators are provided through the command-line, they are checked against the process names. Any matches are stored in *os_analytics_ad_daily_detected.json*.
---
### `datausage.json`
!!! info "Availability"
@@ -240,6 +252,16 @@ This JSON file is created by mvt-ios' `SMSAttachments` module. The module extrac
---
### `tcc.json`
!!! info "Availability"
Backup: :material-check:
Full filesystem dump: :material-check:
This JSON file is created by mvt-ios' `TCC` module. The module extracts records from a SQLite database located at */private/var/mobile/Library/TCC/TCC.db*, which contains a list of which services such as microphone, camera, or location, apps have been granted or denied access to.
---
### `version_history.json`
!!! info "Availability"

View File

@@ -32,7 +32,10 @@ class PullProgress(tqdm):
class DownloadAPKs(AndroidExtraction):
"""DownloadAPKs is the main class operating the download of APKs
from the device."""
from the device.
"""
def __init__(self, output_folder=None, all_apks=False, log=None,
packages=None):
@@ -51,7 +54,9 @@ class DownloadAPKs(AndroidExtraction):
@classmethod
def from_json(cls, json_path):
"""Initialize this class from an existing apks.json file.
:param json_path: Path to the apks.json file to parse.
"""
with open(json_path, "r") as handle:
packages = json.load(handle)
@@ -59,9 +64,11 @@ class DownloadAPKs(AndroidExtraction):
def pull_package_file(self, package_name, remote_path):
"""Pull files related to specific package from the device.
:param package_name: Name of the package to download
:param remote_path: Path to the file to download
:returns: Path to the local copy
"""
log.info("Downloading %s ...", remote_path)
@@ -101,6 +108,8 @@ class DownloadAPKs(AndroidExtraction):
def get_packages(self):
"""Use the Packages adb module to retrieve the list of packages.
We reuse the same extraction logic to then download the APKs.
"""
self.log.info("Retrieving list of installed packages...")
@@ -111,8 +120,7 @@ class DownloadAPKs(AndroidExtraction):
self.packages = m.results
def pull_packages(self):
"""Download all files of all selected packages from the device.
"""
"""Download all files of all selected packages from the device."""
log.info("Starting extraction of installed APKs at folder %s", self.output_folder)
if not os.path.exists(self.output_folder):
@@ -185,15 +193,13 @@ class DownloadAPKs(AndroidExtraction):
log.info("Download of selected packages completed")
def save_json(self):
"""Save the results to the package.json file.
"""
"""Save the results to the package.json file."""
json_path = os.path.join(self.output_folder, "apks.json")
with open(json_path, "w") as handle:
json.dump(self.packages, handle, indent=4)
def run(self):
"""Run all steps of fetch-apk.
"""
"""Run all steps of fetch-apk."""
self.get_packages()
self._adb_connect()
self.pull_packages()

View File

@@ -5,8 +5,12 @@
from .chrome_history import ChromeHistory
from .dumpsys_batterystats import DumpsysBatterystats
from .dumpsys_full import DumpsysFull
from .dumpsys_packages import DumpsysPackages
from .dumpsys_procstats import DumpsysProcstats
from .dumpsys_receivers import DumpsysReceivers
from .files import Files
from .logcat import Logcat
from .packages import Packages
from .processes import Processes
from .rootbinaries import RootBinaries
@@ -15,4 +19,5 @@ from .whatsapp import Whatsapp
ADB_MODULES = [ChromeHistory, SMS, Whatsapp, Processes,
DumpsysBatterystats, DumpsysProcstats,
DumpsysPackages, Packages, RootBinaries]
DumpsysPackages, DumpsysReceivers, DumpsysFull,
Packages, RootBinaries, Logcat, Files]

View File

@@ -37,9 +37,12 @@ class AndroidExtraction(MVTModule):
self.device = None
self.serial = None
def _adb_check_keys(self):
"""Make sure Android adb keys exist.
"""
@staticmethod
def _adb_check_keys():
"""Make sure Android adb keys exist."""
if not os.path.isdir(os.path.dirname(ADB_KEY_PATH)):
os.path.makedirs(os.path.dirname(ADB_KEY_PATH))
if not os.path.exists(ADB_KEY_PATH):
keygen(ADB_KEY_PATH)
@@ -47,8 +50,7 @@ class AndroidExtraction(MVTModule):
write_public_keyfile(ADB_KEY_PATH, ADB_PUB_KEY_PATH)
def _adb_connect(self):
"""Connect to the device over adb.
"""
"""Connect to the device over adb."""
self._adb_check_keys()
with open(ADB_KEY_PATH, "rb") as handle:
@@ -90,47 +92,53 @@ class AndroidExtraction(MVTModule):
break
def _adb_disconnect(self):
"""Close adb connection to the device.
"""
"""Close adb connection to the device."""
self.device.close()
def _adb_reconnect(self):
"""Reconnect to device using adb.
"""
"""Reconnect to device using adb."""
log.info("Reconnecting ...")
self._adb_disconnect()
self._adb_connect()
def _adb_command(self, command):
"""Execute an adb shell command.
:param command: Shell command to execute
:returns: Output of command
"""
return self.device.shell(command)
def _adb_check_if_root(self):
"""Check if we have a `su` binary on the Android device.
:returns: Boolean indicating whether a `su` binary is present or not
"""
return bool(self._adb_command("command -v su"))
def _adb_root_or_die(self):
"""Check if we have a `su` binary, otherwise raise an Exception.
"""
"""Check if we have a `su` binary, otherwise raise an Exception."""
if not self._adb_check_if_root():
raise InsufficientPrivileges("This module is optionally available in case the device is already rooted. Do NOT root your own device!")
def _adb_command_as_root(self, command):
"""Execute an adb shell command.
:param command: Shell command to execute as root
:returns: Output of command
"""
return self._adb_command(f"su -c {command}")
def _adb_check_file_exists(self, file):
"""Verify that a file exists.
:param file: Path of the file
:returns: Boolean indicating whether the file exists or not
"""
# TODO: Need to support checking files without root privileges as well.
@@ -144,9 +152,12 @@ class AndroidExtraction(MVTModule):
def _adb_download(self, remote_path, local_path, progress_callback=None, retry_root=True):
"""Download a file form the device.
:param remote_path: Path to download from the device
:param local_path: Path to where to locally store the copy of the file
:param progress_callback: Callback for download progress bar
:param progress_callback: Callback for download progress bar (Default value = None)
:param retry_root: Default value = True)
"""
try:
self.device.pull(remote_path, local_path, progress_callback)
@@ -187,9 +198,11 @@ class AndroidExtraction(MVTModule):
def _adb_process_file(self, remote_path, process_routine):
"""Download a local copy of a file which is only accessible as root.
This is a wrapper around process_routine.
:param remote_path: Path of the file on the device to process
:param process_routine: Function to be called on the local copy of the
downloaded file
"""
# Connect to the device over adb.
self._adb_connect()
@@ -223,6 +236,5 @@ class AndroidExtraction(MVTModule):
self._adb_disconnect()
def run(self):
"""Run the main procedure.
"""
"""Run the main procedure."""
raise NotImplementedError

View File

@@ -35,7 +35,9 @@ class ChromeHistory(AndroidExtraction):
def _parse_db(self, db_path):
"""Parse a Chrome History database file.
:param db_path: Path to the History database to process.
"""
conn = sqlite3.connect(db_path)
cur = conn.cursor()

View File

@@ -0,0 +1,35 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
from .base import AndroidExtraction
log = logging.getLogger(__name__)
class DumpsysFull(AndroidExtraction):
"""This module extracts stats on battery consumption by processes."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def run(self):
self._adb_connect()
stats = self._adb_command("dumpsys")
if self.output_folder:
stats_path = os.path.join(self.output_folder,
"dumpsys.txt")
with open(stats_path, "w") as handle:
handle.write(stats)
log.info("Full dumpsys output stored at %s",
stats_path)
self._adb_disconnect()

View File

@@ -10,8 +10,9 @@ from .base import AndroidExtraction
log = logging.getLogger(__name__)
class DumpsysPackages(AndroidExtraction):
"""This module extracts stats on installed packages."""
"""This module extracts details on installed packages."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
@@ -23,6 +24,7 @@ class DumpsysPackages(AndroidExtraction):
self._adb_connect()
output = self._adb_command("dumpsys package")
if self.output_folder:
packages_path = os.path.join(self.output_folder,
"dumpsys_packages.txt")

View File

@@ -0,0 +1,87 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
from .base import AndroidExtraction
log = logging.getLogger(__name__)
ACTION_NEW_OUTGOING_SMS = "android.provider.Telephony.NEW_OUTGOING_SMS"
ACTION_SMS_RECEIVED = "android.provider.Telephony.SMS_RECEIVED"
ACTION_DATA_SMS_RECEIVED = "android.intent.action.DATA_SMS_RECEIVED"
ACTION_PHONE_STATE = "android.intent.action.PHONE_STATE"
class DumpsysReceivers(AndroidExtraction):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def run(self):
self._adb_connect()
output = self._adb_command("dumpsys package")
if not output:
return
activity = None
for line in output.split("\n"):
# Find activity block markers.
if line.strip().startswith(ACTION_NEW_OUTGOING_SMS):
activity = ACTION_NEW_OUTGOING_SMS
continue
elif line.strip().startswith(ACTION_SMS_RECEIVED):
activity = ACTION_SMS_RECEIVED
continue
elif line.strip().startswith(ACTION_PHONE_STATE):
activity = ACTION_PHONE_STATE
continue
elif line.strip().startswith(ACTION_DATA_SMS_RECEIVED):
activity = ACTION_DATA_SMS_RECEIVED
continue
# If we are not in an activity block yet, skip.
if not activity:
continue
# If we are in a block but the line does not start with 8 spaces
# it means the block ended a new one started, so we reset and
# continue.
if not line.startswith(" " * 8):
activity = None
continue
# If we got this far, we are processing receivers for the
# activities we are interested in.
receiver = line.strip().split(" ")[1]
package_name = receiver.split("/")[0]
if package_name == "com.google.android.gms":
continue
if activity == ACTION_NEW_OUTGOING_SMS:
self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"",
receiver)
elif activity == ACTION_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"",
receiver)
elif activity == ACTION_DATA_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"",
receiver)
elif activity == ACTION_PHONE_STATE:
self.log.info("Found a receiver monitoring telephony state: \"%s\"",
receiver)
self.results.append({
"activity": activity,
"package_name": package_name,
"receiver": receiver,
})
self._adb_disconnect()

View File

@@ -0,0 +1,33 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
from .base import AndroidExtraction
log = logging.getLogger(__name__)
class Files(AndroidExtraction):
"""This module extracts the list of installed packages."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def run(self):
self._adb_connect()
output = self._adb_command("find / -type f 2> /dev/null")
if output and self.output_folder:
files_txt_path = os.path.join(self.output_folder, "files.txt")
with open(files_txt_path, "w") as handle:
handle.write(output)
log.info("List of visible files stored at %s", files_txt_path)
self._adb_disconnect()

View File

@@ -0,0 +1,48 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
from .base import AndroidExtraction
log = logging.getLogger(__name__)
class Logcat(AndroidExtraction):
"""This module extracts details on installed packages."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def run(self):
self._adb_connect()
# Get the current logcat.
output = self._adb_command("logcat -d")
# Get the locat prior to last reboot.
last_output = self._adb_command("logcat -L")
if self.output_folder:
logcat_path = os.path.join(self.output_folder,
"logcat.txt")
with open(logcat_path, "w") as handle:
handle.write(output)
log.info("Current logcat logs stored at %s",
logcat_path)
logcat_last_path = os.path.join(self.output_folder,
"logcat_last.txt")
with open(logcat_last_path, "w") as handle:
handle.write(last_output)
log.info("Logcat logs prior to last reboot stored at %s",
logcat_last_path)
self._adb_disconnect()

View File

@@ -71,7 +71,9 @@ class SMS(AndroidExtraction):
def _parse_db(self, db_path):
"""Parse an Android bugle_db SMS database file.
:param db_path: Path to the Android SMS database file to process
"""
conn = sqlite3.connect(db_path)
cur = conn.cursor()

View File

@@ -48,7 +48,9 @@ class Whatsapp(AndroidExtraction):
def _parse_db(self, db_path):
"""Parse an Android msgstore.db WhatsApp database file.
:param db_path: Path to the Android WhatsApp database file to process
"""
conn = sqlite3.connect(db_path)
cur = conn.cursor()

View File

@@ -32,6 +32,9 @@ class Indicators:
def parse_stix2(self, file_path):
"""Extract indicators from a STIX2 file.
:param file_path: Path to the STIX2 file to parse
:type file_path: str
"""
self.log.info("Parsing STIX2 indicators file at path %s",
file_path)
@@ -64,9 +67,18 @@ class Indicators:
self._add_indicator(ioc=value,
iocs_list=self.ioc_files)
def check_domain(self, url):
def check_domain(self, url) -> bool:
"""Check if a given URL matches any of the provided domain indicators.
:param url: URL to match against domain indicators
:type url: str
:returns: True if the URL matched an indicator, otherwise False
:rtype: bool
"""
# TODO: If the IOC domain contains a subdomain, it is not currently
# being matched.
if not url:
return False
try:
# First we use the provided URL.
@@ -124,18 +136,33 @@ class Indicators:
return True
def check_domains(self, urls):
"""Check the provided list of (suspicious) domains against a list of URLs.
:param urls: List of URLs to check
return False
def check_domains(self, urls) -> bool:
"""Check a list of URLs against the provided list of domain indicators.
:param urls: List of URLs to check against domain indicators
:type urls: list
:returns: True if any URL matched an indicator, otherwise False
:rtype: bool
"""
if not urls:
return False
for url in urls:
if self.check_domain(url):
return True
def check_process(self, process):
return False
def check_process(self, process) -> bool:
"""Check the provided process name against the list of process
indicators.
:param process: Process name to check
:param process: Process name to check against process indicators
:type process: str
:returns: True if process matched an indicator, otherwise False
:rtype: bool
"""
if not process:
return False
@@ -151,18 +178,33 @@ class Indicators:
self.log.warning("Found a truncated known suspicious process name \"%s\"", process)
return True
def check_processes(self, processes):
return False
def check_processes(self, processes) -> bool:
"""Check the provided list of processes against the list of
process indicators.
:param processes: List of processes to check
:param processes: List of processes to check against process indicators
:type processes: list
:returns: True if process matched an indicator, otherwise False
:rtype: bool
"""
if not processes:
return False
for process in processes:
if self.check_process(process):
return True
def check_email(self, email):
return False
def check_email(self, email) -> bool:
"""Check the provided email against the list of email indicators.
:param email: Suspicious email to check
:param email: Email address to check against email indicators
:type email: str
:returns: True if email address matched an indicator, otherwise False
:rtype: bool
"""
if not email:
return False
@@ -171,9 +213,16 @@ class Indicators:
self.log.warning("Found a known suspicious email address: \"%s\"", email)
return True
def check_file(self, file_path):
return False
def check_file(self, file_path) -> bool:
"""Check the provided file path against the list of file indicators.
:param file_path: Path or name of the file to check
:param file_path: File path or file name to check against file
indicators
:type file_path: str
:returns: True if the file path matched an indicator, otherwise False
:rtype: bool
"""
if not file_path:
return False
@@ -182,3 +231,5 @@ class Indicators:
if file_name in self.ioc_files:
self.log.warning("Found a known suspicious file: \"%s\"", file_path)
return True
return False

View File

@@ -23,7 +23,8 @@ class InsufficientPrivileges(Exception):
pass
class MVTModule(object):
"""This class provides a base for all extraction modules."""
"""This class provides a base for all extraction modules.
"""
enabled = True
slug = None
@@ -31,12 +32,18 @@ class MVTModule(object):
def __init__(self, file_path=None, base_folder=None, output_folder=None,
fast_mode=False, log=None, results=[]):
"""Initialize module.
:param file_path: Path to the module's database file, if there is any.
:param file_path: Path to the module's database file, if there is any
:type file_path: str
:param base_folder: Path to the base folder (backup or filesystem dump)
:type file_path: str
:param output_folder: Folder where results will be stored
:type output_folder: str
:param fast_mode: Flag to enable or disable slow modules
:type fast_mode: bool
:param log: Handle to logger
:param results: Provided list of results entries
:type results: list
"""
self.file_path = file_path
self.base_folder = base_folder
@@ -59,18 +66,18 @@ class MVTModule(object):
return cls(results=results, log=log)
def get_slug(self):
"""Use the module's class name to retrieve a slug
"""
if self.slug:
return self.slug
sub = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", self.__class__.__name__)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", sub).lower()
def load_indicators(self, file_path):
self.indicators = Indicators(file_path, self.log)
def check_indicators(self):
"""Check the results of this module against a provided list of
indicators."""
indicators.
"""
raise NotImplementedError
def save_to_json(self):
@@ -100,9 +107,19 @@ class MVTModule(object):
def serialize(self, record):
raise NotImplementedError
def to_timeline(self):
"""Convert results into a timeline.
@staticmethod
def _deduplicate_timeline(timeline):
"""Serialize entry as JSON to deduplicate repeated entries
:param timeline: List of entries from timeline to deduplicate
"""
timeline_set = set()
for record in timeline:
timeline_set.add(json.dumps(record, sort_keys=True))
return [json.loads(record) for record in timeline_set]
def to_timeline(self):
"""Convert results into a timeline."""
for result in self.results:
record = self.serialize(result)
if record:
@@ -120,15 +137,8 @@ class MVTModule(object):
self.timeline_detected.append(record)
# De-duplicate timeline entries.
self.timeline = self.timeline_deduplicate(self.timeline)
self.timeline_detected = self.timeline_deduplicate(self.timeline_detected)
def timeline_deduplicate(self, timeline):
"""Serialize entry as JSON to deduplicate repeated entries"""
timeline_set = set()
for record in timeline:
timeline_set.add(json.dumps(record, sort_keys=True))
return [json.loads(record) for record in timeline_set]
self.timeline = self._deduplicate_timeline(self.timeline)
self.timeline_detected = self._deduplicate_timeline(self.timeline_detected)
def run(self):
"""Run the main module procedure.
@@ -150,7 +160,7 @@ def run_module(module):
module.log.info("There might be no data to extract by module %s: %s",
module.__class__.__name__, e)
except DatabaseCorruptedError as e:
module.log.error("The %s module database seems to be corrupted and recovery failed: %s",
module.log.error("The %s module database seems to be corrupted: %s",
module.__class__.__name__, e)
except Exception as e:
module.log.exception("Error in running extraction from module %s: %s",
@@ -177,8 +187,9 @@ def run_module(module):
def save_timeline(timeline, timeline_path):
"""Save the timeline in a csv file.
:param timeline: List of records to order and store.
:param timeline_path: Path to the csv file to store the timeline to.
:param timeline: List of records to order and store
:param timeline_path: Path to the csv file to store the timeline to
"""
with io.open(timeline_path, "a+", encoding="utf-8") as handle:
csvoutput = csv.writer(handle, delimiter=",", quotechar="\"")

View File

@@ -9,8 +9,7 @@ from click import Option, UsageError
class MutuallyExclusiveOption(Option):
"""This class extends click to support mutually exclusive options.
"""
"""This class extends click to support mutually exclusive options."""
def __init__(self, *args, **kwargs):
self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", []))

File diff suppressed because it is too large Load Diff

View File

@@ -10,8 +10,12 @@ import re
def convert_mactime_to_unix(timestamp, from_2001=True):
"""Converts Mac Standard Time to a Unix timestamp.
:param timestamp: MacTime timestamp (either int or float)
:returns: Unix epoch timestamp
:param timestamp: MacTime timestamp (either int or float).
:type timestamp: int
:param from_2001: bool: Whether to (Default value = True)
:param from_2001: Default value = True)
:returns: Unix epoch timestamp.
"""
if not timestamp:
return None
@@ -34,8 +38,10 @@ def convert_mactime_to_unix(timestamp, from_2001=True):
def convert_chrometime_to_unix(timestamp):
"""Converts Chrome timestamp to a Unix timestamp.
:param timestamp: Chrome timestamp as int
:returns: Unix epoch timestamp
:param timestamp: Chrome timestamp as int.
:type timestamp: int
:returns: Unix epoch timestamp.
"""
epoch_start = datetime.datetime(1601, 1 , 1)
delta = datetime.timedelta(microseconds=timestamp)
@@ -44,8 +50,11 @@ def convert_chrometime_to_unix(timestamp):
def convert_timestamp_to_iso(timestamp):
"""Converts Unix timestamp to ISO string.
:param timestamp: Unix timestamp
:returns: ISO timestamp string in YYYY-mm-dd HH:MM:SS.ms format
:param timestamp: Unix timestamp.
:type timestamp: int
:returns: ISO timestamp string in YYYY-mm-dd HH:MM:SS.ms format.
:rtype: str
"""
try:
return timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")
@@ -54,15 +63,19 @@ def convert_timestamp_to_iso(timestamp):
def check_for_links(text):
"""Checks if a given text contains HTTP links.
:param text: Any provided text
:returns: Search results
:param text: Any provided text.
:type text: str
:returns: Search results.
"""
return re.findall("(?P<url>https?://[^\s]+)", text, re.IGNORECASE)
def get_sha256_from_file_path(file_path):
"""Calculate the SHA256 hash of a file from a file path.
:param file_path: Path to the file to hash
:returns: The SHA256 hash string
"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as handle:
@@ -75,8 +88,10 @@ def get_sha256_from_file_path(file_path):
# https://stackoverflow.com/questions/57014259/json-dumps-on-dictionary-with-bytes-for-keys
def keys_bytes_to_string(obj):
"""Convert object keys from bytes to string.
:param obj: Object to convert from bytes to string.
:returns: Converted object.
:returns: Object converted to string.
:rtype: str
"""
new_obj = {}
if not isinstance(obj, dict):

View File

@@ -6,7 +6,7 @@
import requests
from packaging import version
MVT_VERSION = "1.2.5"
MVT_VERSION = "1.2.7"
def check_for_updates():
res = requests.get("https://pypi.org/pypi/mvt/json")

View File

@@ -17,6 +17,8 @@ log = logging.getLogger(__name__)
class DecryptBackup:
"""This class provides functions to decrypt an encrypted iTunes backup
using either a password or a key file.
"""
def __init__(self, backup_path, dest_path=None):
@@ -32,9 +34,12 @@ class DecryptBackup:
def can_process(self) -> bool:
return self._backup is not None
def is_encrypted(self, backup_path) -> bool:
@staticmethod
def is_encrypted(backup_path) -> bool:
"""Query Manifest.db file to see if it's encrypted or not.
:param backup_path: Path to the backup to decrypt
"""
conn = sqlite3.connect(os.path.join(backup_path, "Manifest.db"))
cur = conn.cursor()
@@ -94,7 +99,9 @@ class DecryptBackup:
def decrypt_with_password(self, password):
"""Decrypts an encrypted iOS backup.
:param password: Password to use to decrypt the original backup
"""
log.info("Decrypting iOS backup at path %s with password", self.backup_path)
@@ -130,7 +137,9 @@ class DecryptBackup:
def decrypt_with_key_file(self, key_file):
"""Decrypts an encrypted iOS backup using a key file.
:param key_file: File to read the key bytes to decrypt the backup
"""
log.info("Decrypting iOS backup at path %s with key file %s",
self.backup_path, key_file)
@@ -157,8 +166,7 @@ class DecryptBackup:
log.critical("Failed to decrypt backup. Did you provide the correct key file?")
def get_key(self):
"""Retrieve and prints the encryption key.
"""
"""Retrieve and prints the encryption key."""
if not self._backup:
return
@@ -168,7 +176,9 @@ class DecryptBackup:
def write_key(self, key_path):
"""Save extracted key to file.
:param key_path: Path to the file where to write the derived decryption key.
"""
if not self._decryption_key:
return

View File

@@ -30,9 +30,9 @@ class BackupInfo(IOSExtraction):
with open(info_path, "rb") as handle:
info = plistlib.load(handle)
fields = ["Build Version", "Device Name", "Display Name", "GUID",
fields = ["Build Version", "Device Name", "Display Name",
"GUID", "ICCID", "IMEI", "MEID", "Installed Applications",
"Last Backup Data", "Phone Number", "Product Name",
"Last Backup Date", "Phone Number", "Product Name",
"Product Type", "Product Version", "Serial Number",
"Target Identifier", "Target Type", "Unique Identifier",
"iTunes Version"]

View File

@@ -11,8 +11,7 @@ from ..base import IOSExtraction
CONF_PROFILES_DOMAIN = "SysSharedContainerDomain-systemgroup.com.apple.configurationprofiles"
class ConfigurationProfiles(IOSExtraction):
"""This module extracts the full plist data from configuration profiles.
"""
"""This module extracts the full plist data from configuration profiles."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
fast_mode=False, log=None, results=[]):

View File

@@ -27,11 +27,19 @@ class Manifest(IOSExtraction):
def _get_key(self, dictionary, key):
"""Unserialized plist objects can have keys which are str or byte types
This is a helper to try fetch a key as both a byte or string type.
:param dictionary: param key:
:param key:
"""
return dictionary.get(key.encode("utf-8"), None) or dictionary.get(key, None)
def _convert_timestamp(self, timestamp_or_unix_time_int):
@staticmethod
def _convert_timestamp(timestamp_or_unix_time_int):
"""Older iOS versions stored the manifest times as unix timestamps.
:param timestamp_or_unix_time_int:
"""
if isinstance(timestamp_or_unix_time_int, datetime.datetime):
return convert_timestamp_to_iso(timestamp_or_unix_time_int)
@@ -90,7 +98,7 @@ class Manifest(IOSExtraction):
def run(self):
manifest_db_path = os.path.join(self.base_folder, "Manifest.db")
if not os.path.isfile(manifest_db_path):
raise DatabaseNotFoundError("Impossible to find the module's database file")
raise DatabaseNotFoundError("unable to find backup's Manifest.db")
self.log.info("Found Manifest.db database at path: %s", manifest_db_path)

View File

@@ -14,6 +14,8 @@ CONF_PROFILES_EVENTS_RELPATH = "Library/ConfigurationProfiles/MCProfileEvents.pl
class ProfileEvents(IOSExtraction):
"""This module extracts events related to the installation of configuration
profiles.
"""
def __init__(self, file_path=None, base_folder=None, output_folder=None,

View File

@@ -28,7 +28,9 @@ class IOSExtraction(MVTModule):
def _recover_sqlite_db_if_needed(self, file_path):
"""Tries to recover a malformed database by running a .clone command.
:param file_path: Path to the malformed database file.
"""
# TODO: Find a better solution.
conn = sqlite3.connect(file_path)
@@ -49,9 +51,9 @@ class IOSExtraction(MVTModule):
self.log.info("Database at path %s is malformed. Trying to recover...", file_path)
if not shutil.which("sqlite3"):
raise DatabaseCorruptedError("Unable to recover without sqlite3 binary. Please install sqlite3!")
raise DatabaseCorruptedError("failed to recover without sqlite3 binary: please install sqlite3!")
if '"' in file_path:
raise DatabaseCorruptedError(f"Database at path '{file_path}' is corrupted. unable to recover because it has a quotation mark (\") in its name.")
raise DatabaseCorruptedError(f"database at path '{file_path}' is corrupted. unable to recover because it has a quotation mark (\") in its name")
bak_path = f"{file_path}.bak"
shutil.move(file_path, bak_path)
@@ -59,18 +61,20 @@ class IOSExtraction(MVTModule):
ret = subprocess.call(["sqlite3", bak_path, f".clone \"{file_path}\""],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if ret != 0:
raise DatabaseCorruptedError("Recovery of database failed")
raise DatabaseCorruptedError("failed to recover database")
self.log.info("Database at path %s recovered successfully!", file_path)
def _get_backup_files_from_manifest(self, relative_path=None, domain=None):
"""Locate files from Manifest.db.
:param relative_path: Relative path to use as filter from Manifest.db.
:param domain: Domain to use as filter from Manifest.db.
:param relative_path: Relative path to use as filter from Manifest.db. (Default value = None)
:param domain: Domain to use as filter from Manifest.db. (Default value = None)
"""
manifest_db_path = os.path.join(self.base_folder, "Manifest.db")
if not os.path.exists(manifest_db_path):
raise Exception("Unable to find backup's Manifest.db")
raise DatabaseNotFoundError("unable to find backup's Manifest.db")
base_sql = "SELECT fileID, domain, relativePath FROM Files WHERE "
@@ -86,7 +90,7 @@ class IOSExtraction(MVTModule):
elif domain:
cur.execute(f"{base_sql} domain = ?;", (domain,))
except Exception as e:
raise Exception("Query to Manifest.db failed: %s", e)
raise DatabaseCorruptedError("failed to query Manifest.db: %s", e)
for row in cur:
yield {
@@ -116,8 +120,11 @@ class IOSExtraction(MVTModule):
modules that expect to work with a single SQLite database.
If a module requires to process multiple databases or files,
you should use the helper functions above.
:param backup_id: iTunes backup database file's ID (or hash).
:param root_paths: Glob patterns for files to seek in filesystem dump.
:param root_paths: Glob patterns for files to seek in filesystem dump. (Default value = [])
:param backup_ids: Default value = None)
"""
file_path = None
# First we check if the was an explicit file path specified.
@@ -144,6 +151,6 @@ class IOSExtraction(MVTModule):
if file_path:
self.file_path = file_path
else:
raise DatabaseNotFoundError("Unable to find the module's database file")
raise DatabaseNotFoundError("unable to find the module's database file")
self._recover_sqlite_db_if_needed(self.file_path)

View File

@@ -13,7 +13,10 @@ from ..base import IOSExtraction
class Filesystem(IOSExtraction):
"""This module extracts creation and modification date of files from a
full file-system dump."""
full file-system dump.
"""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
fast_mode=False, log=None, results=[]):

View File

@@ -14,7 +14,10 @@ NETUSAGE_ROOT_PATHS = [
class Netusage(NetBase):
"""This class extracts data from netusage.sqlite and attempts to identify
any suspicious processes if running on a full filesystem dump."""
any suspicious processes if running on a full filesystem dump.
"""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
fast_mode=False, log=None, results=[]):

View File

@@ -11,7 +11,10 @@ WEBKIT_INDEXEDDB_ROOT_PATHS = [
class WebkitIndexedDB(WebkitBase):
"""This module looks extracts records from WebKit IndexedDB folders,
and checks them against any provided list of suspicious domains."""
and checks them against any provided list of suspicious domains.
"""
slug = "webkit_indexeddb"

View File

@@ -11,7 +11,10 @@ WEBKIT_LOCALSTORAGE_ROOT_PATHS = [
class WebkitLocalStorage(WebkitBase):
"""This module looks extracts records from WebKit LocalStorage folders,
and checks them against any provided list of suspicious domains."""
and checks them against any provided list of suspicious domains.
"""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
fast_mode=False, log=None, results=[]):

View File

@@ -11,7 +11,10 @@ WEBKIT_SAFARIVIEWSERVICE_ROOT_PATHS = [
class WebkitSafariViewService(WebkitBase):
"""This module looks extracts records from WebKit LocalStorage folders,
and checks them against any provided list of suspicious domains."""
and checks them against any provided list of suspicious domains.
"""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
fast_mode=False, log=None, results=[]):

View File

@@ -13,15 +13,18 @@ from .idstatuscache import IDStatusCache
from .interactionc import InteractionC
from .locationd import LocationdClients
from .net_datausage import Datausage
from .osanalytics_addaily import OSAnalyticsADDaily
from .safari_browserstate import SafariBrowserState
from .safari_history import SafariHistory
from .sms import SMS
from .sms_attachments import SMSAttachments
from .tcc import TCC
from .webkit_resource_load_statistics import WebkitResourceLoadStatistics
from .webkit_session_resource_log import WebkitSessionResourceLog
from .whatsapp import Whatsapp
MIXED_MODULES = [Calls, ChromeFavicon, ChromeHistory, Contacts, FirefoxFavicon,
FirefoxHistory, IDStatusCache, InteractionC, LocationdClients,
Datausage, SafariBrowserState, SafariHistory, SMS, SMSAttachments,
WebkitResourceLoadStatistics, WebkitSessionResourceLog, Whatsapp,]
OSAnalyticsADDaily, Datausage, SafariBrowserState, SafariHistory,
TCC, SMS, SMSAttachments, WebkitResourceLoadStatistics,
WebkitSessionResourceLog, Whatsapp,]

View File

@@ -19,7 +19,10 @@ FIREFOX_HISTORY_ROOT_PATHS = [
class FirefoxHistory(IOSExtraction):
"""This module extracts all Firefox visits and tries to detect potential
network injection attacks."""
network injection attacks.
"""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
fast_mode=False, log=None, results=[]):

View File

@@ -15,6 +15,7 @@ IDSTATUSCACHE_BACKUP_IDS = [
]
IDSTATUSCACHE_ROOT_PATHS = [
"private/var/mobile/Library/Preferences/com.apple.identityservices.idstatuscache.plist",
"private/var/mobile/Library/IdentityServices/idstatuscache.plist",
]
class IDStatusCache(IOSExtraction):
@@ -50,12 +51,8 @@ class IDStatusCache(IOSExtraction):
result.get("user"))
self.detected.append(result)
def run(self):
self._find_ios_database(backup_ids=IDSTATUSCACHE_BACKUP_IDS,
root_paths=IDSTATUSCACHE_ROOT_PATHS)
self.log.info("Found IDStatusCache plist at path: %s", self.file_path)
with open(self.file_path, "rb") as handle:
def _extract_idstatuscache_entries(self, file_path):
with open(file_path, "rb") as handle:
file_plist = plistlib.load(handle)
id_status_cache_entries = []
@@ -83,4 +80,16 @@ class IDStatusCache(IOSExtraction):
entry["occurrences"] = entry_counter[entry["user"]]
self.results.append(entry)
def run(self):
if self.is_backup:
self._find_ios_database(backup_ids=IDSTATUSCACHE_BACKUP_IDS)
self.log.info("Found IDStatusCache plist at path: %s", self.file_path)
self._extract_idstatuscache_entries(self.file_path)
elif self.is_fs_dump:
for idstatuscache_path in self._get_fs_files_from_patterns(IDSTATUSCACHE_ROOT_PATHS):
self.file_path = idstatuscache_path
self.log.info("Found IDStatusCache plist at path: %s", self.file_path)
self._extract_idstatuscache_entries(self.file_path)
self.log.info("Extracted a total of %d ID Status Cache entries", len(self.results))

Some files were not shown because too many files have changed in this diff Show More