1
mirror of https://github.com/mvt-project/mvt synced 2025-11-13 01:37:36 +01:00

Compare commits

...

50 Commits
v2.0 ... v2.1.4

Author SHA1 Message Date
Nex
6936908f86 Bumped version 2022-08-15 10:27:36 +02:00
Nex
f3e5763c6a Added SECURITY.md 2022-08-14 19:28:30 +02:00
Nex
f438f7b1fb Fixing unix epoch timestamps conversion to float 2022-08-13 23:37:35 +02:00
Nex
66a157868f Ensuring all adb connect/disconnect are happening in modules only 2022-08-13 23:12:43 +02:00
Nex
a966b694ea More line length enforcement 2022-08-13 18:27:54 +02:00
Nex
c9dd3af278 More line length enforcing 2022-08-13 18:24:11 +02:00
Nex
82a60ee07c Enforcing line length 2022-08-13 17:52:56 +02:00
Nex
8bc5113bd2 Enforcing line length 2022-08-13 17:51:06 +02:00
Nex
00d82f7f00 Enforcing line lenght 2022-08-13 17:50:00 +02:00
Nex
2781f33fb5 Added more date conversion wrappers 2022-08-13 14:04:10 +02:00
Nex
271fe5fbee Continuing enforcement of line length and simplifying date conversions 2022-08-13 02:14:24 +02:00
Nex
0f503f72b5 Starting to enforce line lengths on mvt-ios 2022-08-12 19:38:57 +02:00
Nex
424b86a261 Fixed typos 2022-08-12 19:25:56 +02:00
Nex
1fe595f4cc Added CONTRIBUTING.md file 2022-08-12 19:25:11 +02:00
Nex
b8c59f1183 Removed public_indicators.json legacy file 2022-08-12 19:15:17 +02:00
Nex
a935347aed Trying to enforce line lengths at 80/100 2022-08-12 19:14:05 +02:00
Nex
661d0a8669 Using Union type hints in order to support older versions of Python 2022-08-12 16:29:43 +02:00
Nex
63ff5fd334 Started linting the code 2022-08-12 16:20:16 +02:00
Nex
146b9245ab Sorted imports 2022-08-11 16:57:08 +02:00
Nex
99d33922be Conformed ways modules logger is initialized 2022-08-11 16:42:04 +02:00
Nex
c42634af3f Fixed logging in accessibility module 2022-08-11 14:50:25 +02:00
Nex
6cb59cc3ab Trying to tidy up ConfigurationProfiles module 2022-08-10 16:44:43 +02:00
Nex
e0481686b7 Fixed test file 2022-08-08 16:47:01 +02:00
Nex
804ade3a40 Conformed browerstate plugin to others with similar structure 2022-08-08 16:44:54 +02:00
tek
c5ccaef0c4 Fixes a bug in Safari Browser State module 2022-08-08 11:20:05 +02:00
Nex
c4416d406a Avoiding duplicate entries for stix2 files with multiple malware definitions 2022-08-06 14:49:05 +02:00
Nex
6b8a23ae10 Added an attribute list to keep track of executed modules 2022-08-05 13:52:51 +02:00
tek
872d5d766e Adds product name in iOS backup info module 2022-08-03 16:34:39 +02:00
Nex
f5abd0719c Bumped version 2022-08-02 18:26:29 +02:00
Nex
6462ffc15d Added iOS 15.6 2022-08-02 18:26:23 +02:00
Nex
6333cafd38 Bumped version 2022-07-25 17:43:37 +02:00
Nex
03c59811a3 Ordered imports 2022-07-25 17:43:27 +02:00
Nex
cfd3b5bbcb Merge branch 'main' of github.com:mvt-project/mvt 2022-07-25 17:43:08 +02:00
Nex
97ab67240f Creating MVT data folder when missing 2022-07-25 17:42:51 +02:00
Nex
7fc664185c Flake8 fixes 2022-07-20 15:49:51 +02:00
Nex
93094367c7 Bumped version 2022-07-20 15:41:42 +02:00
Nex
e8fa9c6eea Passing binary data to parse rather than a file path 2022-07-20 15:41:07 +02:00
Nex
79a01c45cc Bumped version 2022-07-20 14:12:17 +02:00
Nex
a440d12377 Merge branch 'main' of github.com:mvt-project/mvt 2022-07-20 14:12:08 +02:00
Nex
8085888c0c Improved parsing of profile events to support new formats as well 2022-07-20 14:11:36 +02:00
Nex
c2617fe778 Checking profile IDs in profile_events 2022-07-20 13:25:51 +02:00
Nex
2e1243864c Added check_indicators to profile_events 2022-07-20 13:24:20 +02:00
tek
ba5ff9b38c Fixes a minor typing bug 2022-07-18 14:25:01 +02:00
Nex
3fccebe132 Merge branch 'main' of github.com:mvt-project/mvt 2022-07-14 12:06:52 +02:00
Nex
1265b366c1 Added install_non_market_apps to settings warnings 2022-07-14 09:09:01 +02:00
Nex
c944fb3234 Enforcing quotes in timeline csv writing 2022-07-12 12:03:20 +02:00
Nex
e6b4d17027 Using error instead of warning for failed apk download 2022-07-12 11:55:31 +02:00
Nex
f55ac36189 Code style fixes 2022-07-12 11:55:10 +02:00
Nex
550d6037a6 Bumped version 2022-07-08 19:54:46 +02:00
Nex
e875c978c9 Optional address in SMS serialize 2022-07-08 19:54:33 +02:00
110 changed files with 1666 additions and 1031 deletions

19
CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,19 @@
# Contributing
Thank you for your interest in contributing to Mobile Verification Toolkit (MVT)! Your help is very much appreciated.
## Where to start
Starting to contribute to a somewhat complex project like MVT might seem intimidating. Unless you have specific ideas of new functionality you would like to submit, some good starting points are searching for `TODO:` and `FIXME:` comments throughout the code. Alternatively you can check if any GitHub issues existed marked with the ["help wanted"](https://github.com/mvt-project/mvt/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22) tag.
## Code style
When contributing code to
- **Indentation**: we use 4-spaces tabs.
- **Quotes**: we use double quotes (`"`) as a default. Single quotes (`'`) can be favored with nested strings instead of escaping (`\"`), or when using f-formatting.
- **Maximum line length**: we strongly encourage to respect a 80 characters long lines and to follow [PEP8 indentation guidelines](https://peps.python.org/pep-0008/#indentation) when having to wrap. However, if breaking at 80 is not possible or is detrimental to the readability of the code, exceptions are tolerated so long as they remain within a hard maximum length of 100 characters.

View File

@@ -11,3 +11,6 @@ upload:
test-upload: test-upload:
python3 -m twine upload --repository testpypi dist/* python3 -m twine upload --repository testpypi dist/*
pylint:
pylint --rcfile=setup.cfg mvt

5
SECURITY.md Normal file
View File

@@ -0,0 +1,5 @@
# Reporting security issues
Thank you for your interest in reporting security issues and vulnerabilities! Security research is of utmost importance and we take all reports seriously. If you discover an issue please report it to us right away!
Please DO NOT file a public issue, instead send your report privately to *nex [at] nex [dot] sx*. You can also write PGP-encrypted emails to [this key](https://keybase.io/nex/pgp_keys.asc?fingerprint=05216f3b86848a303c2fe37dd166f1667359d880).

View File

@@ -58,14 +58,16 @@ def version():
@click.option("--output", "-o", type=click.Path(exists=False), @click.option("--output", "-o", type=click.Path(exists=False),
help="Specify a path to a folder where you want to store the APKs") help="Specify a path to a folder where you want to store the APKs")
@click.option("--from-file", "-f", type=click.Path(exists=True), @click.option("--from-file", "-f", type=click.Path(exists=True),
help="Instead of acquiring from phone, load an existing packages.json file for lookups (mainly for debug purposes)") help="Instead of acquiring from phone, load an existing packages.json file for "
"lookups (mainly for debug purposes)")
@click.pass_context @click.pass_context
def download_apks(ctx, all_apks, virustotal, output, from_file, serial): def download_apks(ctx, all_apks, virustotal, output, from_file, serial):
try: try:
if from_file: if from_file:
download = DownloadAPKs.from_json(from_file) download = DownloadAPKs.from_json(from_file)
else: else:
# TODO: Do we actually want to be able to run without storing any file? # TODO: Do we actually want to be able to run without storing any
# file?
if not output: if not output:
log.critical("You need to specify an output folder with --output!") log.critical("You need to specify an output folder with --output!")
ctx.exit(1) ctx.exit(1)
@@ -130,14 +132,16 @@ def check_adb(ctx, serial, iocs, output, fast, list_modules, module):
@cli.command("check-bugreport", help="Check an Android Bug Report") @cli.command("check-bugreport", help="Check an Android Bug Report")
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True, @click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC) default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) @click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE) @click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.argument("BUGREPORT_PATH", type=click.Path(exists=True)) @click.argument("BUGREPORT_PATH", type=click.Path(exists=True))
@click.pass_context @click.pass_context
def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path): def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path):
cmd = CmdAndroidCheckBugreport(target_path=bugreport_path, results_path=output, cmd = CmdAndroidCheckBugreport(target_path=bugreport_path,
ioc_files=iocs, module_name=module) results_path=output, ioc_files=iocs,
module_name=module)
if list_modules: if list_modules:
cmd.list_modules() cmd.list_modules()
@@ -156,14 +160,14 @@ def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path):
# Command: check-backup # Command: check-backup
#============================================================================== #==============================================================================
@cli.command("check-backup", help="Check an Android Backup") @cli.command("check-backup", help="Check an Android Backup")
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True, @click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC) default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) @click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.argument("BACKUP_PATH", type=click.Path(exists=True)) @click.argument("BACKUP_PATH", type=click.Path(exists=True))
@click.pass_context @click.pass_context
def check_backup(ctx, serial, iocs, output, list_modules, backup_path): def check_backup(ctx, iocs, output, list_modules, backup_path):
cmd = CmdAndroidCheckBackup(target_path=backup_path, results_path=output, cmd = CmdAndroidCheckBackup(target_path=backup_path, results_path=output,
ioc_files=iocs) ioc_files=iocs)

View File

@@ -14,12 +14,12 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckADB(Command): class CmdAndroidCheckADB(Command):
name = "check-adb"
modules = ADB_MODULES
def __init__(self, target_path: str = None, results_path: str = None, def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None, ioc_files: list = [], module_name: str = None,
fast_mode: bool = False): serial: str = None, fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path, super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name, ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log) serial=serial, fast_mode=fast_mode, log=log)
self.name = "check-adb"
self.modules = ADB_MODULES

View File

@@ -25,16 +25,16 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckBackup(Command): class CmdAndroidCheckBackup(Command):
name = "check-backup"
modules = BACKUP_MODULES
def __init__(self, target_path: str = None, results_path: str = None, def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None, ioc_files: list = [], module_name: str = None,
fast_mode: bool = False): serial: str = None, fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path, super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name, ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log) serial=serial, fast_mode=fast_mode, log=log)
self.name = "check-backup"
self.modules = BACKUP_MODULES
self.backup_type = None self.backup_type = None
self.backup_archive = None self.backup_archive = None
self.backup_files = [] self.backup_files = []
@@ -58,8 +58,8 @@ class CmdAndroidCheckBackup(Command):
except InvalidBackupPassword: except InvalidBackupPassword:
log.critical("Invalid backup password") log.critical("Invalid backup password")
sys.exit(1) sys.exit(1)
except AndroidBackupParsingError as e: except AndroidBackupParsingError as exc:
log.critical("Impossible to parse this backup file: %s", e) log.critical("Impossible to parse this backup file: %s", exc)
log.critical("Please use Android Backup Extractor (ABE) instead") log.critical("Please use Android Backup Extractor (ABE) instead")
sys.exit(1) sys.exit(1)
@@ -73,13 +73,16 @@ class CmdAndroidCheckBackup(Command):
self.target_path = Path(self.target_path).absolute().as_posix() self.target_path = Path(self.target_path).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)): for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
for fname in subfiles: for fname in subfiles:
self.backup_files.append(os.path.relpath(os.path.join(root, fname), self.target_path)) self.backup_files.append(os.path.relpath(os.path.join(root, fname),
self.target_path))
else: else:
log.critical("Invalid backup path, path should be a folder or an Android Backup (.ab) file") log.critical("Invalid backup path, path should be a folder or an "
"Android Backup (.ab) file")
sys.exit(1) sys.exit(1)
def module_init(self, module: Callable) -> None: def module_init(self, module: Callable) -> None:
if self.backup_type == "folder": if self.backup_type == "folder":
module.from_folder(self.target_path, self.backup_files) module.from_folder(self.target_path, self.backup_files)
else: else:
module.from_ab(self.target_path, self.backup_archive, self.backup_files) module.from_ab(self.target_path, self.backup_archive,
self.backup_files)

View File

@@ -18,16 +18,16 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckBugreport(Command): class CmdAndroidCheckBugreport(Command):
name = "check-bugreport"
modules = BUGREPORT_MODULES
def __init__(self, target_path: str = None, results_path: str = None, def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None, ioc_files: list = [], module_name: str = None,
fast_mode: bool = False): serial: str = None, fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path, super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name, ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log) serial=serial, fast_mode=fast_mode, log=log)
self.name = "check-bugreport"
self.modules = BUGREPORT_MODULES
self.bugreport_format = None self.bugreport_format = None
self.bugreport_archive = None self.bugreport_archive = None
self.bugreport_files = [] self.bugreport_files = []
@@ -41,9 +41,11 @@ class CmdAndroidCheckBugreport(Command):
elif os.path.isdir(self.target_path): elif os.path.isdir(self.target_path):
self.bugreport_format = "dir" self.bugreport_format = "dir"
parent_path = Path(self.target_path).absolute().as_posix() parent_path = Path(self.target_path).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)): for root, _, subfiles in os.walk(os.path.abspath(self.target_path)):
for file_name in subfiles: for file_name in subfiles:
self.bugreport_files.append(os.path.relpath(os.path.join(root, file_name), parent_path)) file_path = os.path.relpath(os.path.join(root, file_name),
parent_path)
self.bugreport_files.append(file_path)
def module_init(self, module: Callable) -> None: def module_init(self, module: Callable) -> None:
if self.bugreport_format == "zip": if self.bugreport_format == "zip":

View File

@@ -78,13 +78,14 @@ class DownloadAPKs(AndroidExtraction):
try: try:
self._adb_download(remote_path, local_path) self._adb_download(remote_path, local_path)
except InsufficientPrivileges: except InsufficientPrivileges:
log.warn("Unable to pull package file from %s: insufficient privileges, it might be a system app", log.error("Unable to pull package file from %s: insufficient "
remote_path) "privileges, it might be a system app",
remote_path)
self._adb_reconnect() self._adb_reconnect()
return None return None
except Exception as e: except Exception as exc:
log.exception("Failed to pull package file from %s: %s", log.exception("Failed to pull package file from %s: %s",
remote_path, e) remote_path, exc)
self._adb_reconnect() self._adb_reconnect()
return None return None
@@ -121,8 +122,8 @@ class DownloadAPKs(AndroidExtraction):
if not package.get("system", False): if not package.get("system", False):
packages_selection.append(package) packages_selection.append(package)
log.info("Selected only %d packages which are not marked as \"system\"", log.info("Selected only %d packages which are not marked as "
len(packages_selection)) "\"system\"", len(packages_selection))
if len(packages_selection) == 0: if len(packages_selection) == 0:
log.info("No packages were selected for download") log.info("No packages were selected for download")
@@ -141,8 +142,8 @@ class DownloadAPKs(AndroidExtraction):
log.info("[%d/%d] Package: %s", i, len(packages_selection), log.info("[%d/%d] Package: %s", i, len(packages_selection),
package["package_name"]) package["package_name"])
# Sometimes the package path contains multiple lines for multiple apks. # Sometimes the package path contains multiple lines for multiple
# We loop through each line and download each file. # apks. We loop through each line and download each file.
for package_file in package["files"]: for package_file in package["files"]:
device_path = package_file["path"] device_path = package_file["path"]
local_path = self.pull_package_file(package["package_name"], local_path = self.pull_package_file(package["package_name"],

View File

@@ -25,8 +25,6 @@ from mvt.android.parsers.backup import (InvalidBackupPassword, parse_ab_header,
parse_backup_file) parse_backup_file)
from mvt.common.module import InsufficientPrivileges, MVTModule from mvt.common.module import InsufficientPrivileges, MVTModule
log = logging.getLogger(__name__)
ADB_KEY_PATH = os.path.expanduser("~/.android/adbkey") ADB_KEY_PATH = os.path.expanduser("~/.android/adbkey")
ADB_PUB_KEY_PATH = os.path.expanduser("~/.android/adbkey.pub") ADB_PUB_KEY_PATH = os.path.expanduser("~/.android/adbkey.pub")
@@ -36,7 +34,8 @@ class AndroidExtraction(MVTModule):
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
@@ -74,13 +73,15 @@ class AndroidExtraction(MVTModule):
try: try:
self.device = AdbDeviceUsb(serial=self.serial) self.device = AdbDeviceUsb(serial=self.serial)
except UsbDeviceNotFoundError: except UsbDeviceNotFoundError:
log.critical("No device found. Make sure it is connected and unlocked.") self.log.critical("No device found. Make sure it is connected "
"and unlocked.")
sys.exit(-1) sys.exit(-1)
# Otherwise we try to use the TCP transport. # Otherwise we try to use the TCP transport.
else: else:
addr = self.serial.split(":") addr = self.serial.split(":")
if len(addr) < 2: if len(addr) < 2:
raise ValueError("TCP serial number must follow the format: `address:port`") raise ValueError("TCP serial number must follow the format: "
"`address:port`")
self.device = AdbDeviceTcp(addr[0], int(addr[1]), self.device = AdbDeviceTcp(addr[0], int(addr[1]),
default_transport_timeout_s=30.) default_transport_timeout_s=30.)
@@ -89,18 +90,22 @@ class AndroidExtraction(MVTModule):
try: try:
self.device.connect(rsa_keys=[signer], auth_timeout_s=5) self.device.connect(rsa_keys=[signer], auth_timeout_s=5)
except (USBErrorBusy, USBErrorAccess): except (USBErrorBusy, USBErrorAccess):
log.critical("Device is busy, maybe run `adb kill-server` and try again.") self.log.critical("Device is busy, maybe run `adb kill-server` "
"and try again.")
sys.exit(-1) sys.exit(-1)
except DeviceAuthError: except DeviceAuthError:
log.error("You need to authorize this computer on the Android device. Retrying in 5 seconds...") self.log.error("You need to authorize this computer on the "
"Android device. Retrying in 5 seconds...")
time.sleep(5) time.sleep(5)
except UsbReadFailedError: except UsbReadFailedError:
log.error("Unable to connect to the device over USB. Try to unplug, plug the device and start again.") self.log.error("Unable to connect to the device over USB. "
"Try to unplug, plug the device and start again.")
sys.exit(-1) sys.exit(-1)
except OSError as e: except OSError as exc:
if e.errno == 113 and self.serial: if exc.errno == 113 and self.serial:
log.critical("Unable to connect to the device %s: did you specify the correct IP addres?", self.log.critical("Unable to connect to the device %s: "
self.serial) "did you specify the correct IP addres?",
self.serial)
sys.exit(-1) sys.exit(-1)
else: else:
break break
@@ -111,7 +116,7 @@ class AndroidExtraction(MVTModule):
def _adb_reconnect(self) -> None: def _adb_reconnect(self) -> None:
"""Reconnect to device using adb.""" """Reconnect to device using adb."""
log.info("Reconnecting ...") self.log.info("Reconnecting ...")
self._adb_disconnect() self._adb_disconnect()
self._adb_connect() self._adb_connect()
@@ -136,7 +141,9 @@ class AndroidExtraction(MVTModule):
def _adb_root_or_die(self) -> None: def _adb_root_or_die(self) -> None:
"""Check if we have a `su` binary, otherwise raise an Exception.""" """Check if we have a `su` binary, otherwise raise an Exception."""
if not self._adb_check_if_root(): if not self._adb_check_if_root():
raise InsufficientPrivileges("This module is optionally available in case the device is already rooted. Do NOT root your own device!") raise InsufficientPrivileges("This module is optionally available "
"in case the device is already rooted."
" Do NOT root your own device!")
def _adb_command_as_root(self, command): def _adb_command_as_root(self, command):
"""Execute an adb shell command. """Execute an adb shell command.
@@ -157,8 +164,6 @@ class AndroidExtraction(MVTModule):
# TODO: Need to support checking files without root privileges as well. # TODO: Need to support checking files without root privileges as well.
# Connect to the device over adb.
self._adb_connect()
# Check if we have root, if not raise an Exception. # Check if we have root, if not raise an Exception.
self._adb_root_or_die() self._adb_root_or_die()
@@ -171,17 +176,19 @@ class AndroidExtraction(MVTModule):
:param remote_path: Path to download from the device :param remote_path: Path to download from the device
:param local_path: Path to where to locally store the copy of the file :param local_path: Path to where to locally store the copy of the file
:param progress_callback: Callback for download progress bar (Default value = None) :param progress_callback: Callback for download progress bar
(Default value = None)
:param retry_root: Default value = True) :param retry_root: Default value = True)
""" """
try: try:
self.device.pull(remote_path, local_path, progress_callback) self.device.pull(remote_path, local_path, progress_callback)
except AdbCommandFailureException as e: except AdbCommandFailureException as exc:
if retry_root: if retry_root:
self._adb_download_root(remote_path, local_path, progress_callback) self._adb_download_root(remote_path, local_path,
progress_callback)
else: else:
raise Exception(f"Unable to download file {remote_path}: {e}") raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
def _adb_download_root(self, remote_path: str, local_path: str, def _adb_download_root(self, remote_path: str, local_path: str,
progress_callback: Callable = None) -> None: progress_callback: Callable = None) -> None:
@@ -190,27 +197,31 @@ class AndroidExtraction(MVTModule):
self._adb_root_or_die() self._adb_root_or_die()
# We generate a random temporary filename. # We generate a random temporary filename.
tmp_filename = "tmp_" + ''.join(random.choices(string.ascii_uppercase + string.ascii_lowercase + string.digits, k=10)) allowed_chars = (string.ascii_uppercase
+ string.ascii_lowercase
+ string.digits)
tmp_filename = "tmp_" + ''.join(random.choices(allowed_chars, k=10))
# We create a temporary local file. # We create a temporary local file.
new_remote_path = f"/sdcard/{tmp_filename}" new_remote_path = f"/sdcard/{tmp_filename}"
# We copy the file from the data folder to /sdcard/. # We copy the file from the data folder to /sdcard/.
cp = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}") cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
if cp.startswith("cp: ") and "No such file or directory" in cp: if cp_output.startswith("cp: ") and "No such file or directory" in cp_output:
raise Exception(f"Unable to process file {remote_path}: File not found") raise Exception(f"Unable to process file {remote_path}: File not found")
elif cp.startswith("cp: ") and "Permission denied" in cp: if cp_output.startswith("cp: ") and "Permission denied" in cp_output:
raise Exception(f"Unable to process file {remote_path}: Permission denied") raise Exception(f"Unable to process file {remote_path}: Permission denied")
# We download from /sdcard/ to the local temporary file. # We download from /sdcard/ to the local temporary file.
# If it doesn't work now, don't try again (retry_root=False) # If it doesn't work now, don't try again (retry_root=False)
self._adb_download(new_remote_path, local_path, retry_root=False) self._adb_download(new_remote_path, local_path, progress_callback,
retry_root=False)
# Delete the copy on /sdcard/. # Delete the copy on /sdcard/.
self._adb_command(f"rm -rf {new_remote_path}") self._adb_command(f"rm -rf {new_remote_path}")
except AdbCommandFailureException as e: except AdbCommandFailureException as exc:
raise Exception(f"Unable to download file {remote_path}: {e}") raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
def _adb_process_file(self, remote_path: str, def _adb_process_file(self, remote_path: str,
process_routine: Callable) -> None: process_routine: Callable) -> None:
@@ -223,7 +234,6 @@ class AndroidExtraction(MVTModule):
""" """
# Connect to the device over adb. # Connect to the device over adb.
self._adb_connect()
# Check if we have root, if not raise an Exception. # Check if we have root, if not raise an Exception.
self._adb_root_or_die() self._adb_root_or_die()
@@ -234,10 +244,10 @@ class AndroidExtraction(MVTModule):
new_remote_path = f"/sdcard/Download/{local_name}" new_remote_path = f"/sdcard/Download/{local_name}"
# We copy the file from the data folder to /sdcard/. # We copy the file from the data folder to /sdcard/.
cp = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}") cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
if cp.startswith("cp: ") and "No such file or directory" in cp: if cp_output.startswith("cp: ") and "No such file or directory" in cp_output:
raise Exception(f"Unable to process file {remote_path}: File not found") raise Exception(f"Unable to process file {remote_path}: File not found")
elif cp.startswith("cp: ") and "Permission denied" in cp: if cp_output.startswith("cp: ") and "Permission denied" in cp_output:
raise Exception(f"Unable to process file {remote_path}: Permission denied") raise Exception(f"Unable to process file {remote_path}: Permission denied")
# We download from /sdcard/ to the local temporary file. # We download from /sdcard/ to the local temporary file.
@@ -250,35 +260,41 @@ class AndroidExtraction(MVTModule):
tmp.close() tmp.close()
# Delete the copy on /sdcard/. # Delete the copy on /sdcard/.
self._adb_command(f"rm -f {new_remote_path}") self._adb_command(f"rm -f {new_remote_path}")
# Disconnect from the device.
self._adb_disconnect()
def _generate_backup(self, package_name: str) -> bytes: def _generate_backup(self, package_name: str) -> bytes:
self.log.warning("Please check phone and accept Android backup prompt. You may need to set a backup password. \a") self.log.warning("Please check phone and accept Android backup prompt. "
"You may need to set a backup password. \a")
# TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport... # TODO: Base64 encoding as temporary fix to avoid byte-mangling over
backup_output_b64 = self._adb_command("/system/bin/bu backup -nocompress '{}' | base64".format( # the shell transport...
package_name)) cmd = f"/system/bin/bu backup -nocompress '{package_name}' | base64"
backup_output_b64 = self._adb_command(cmd)
backup_output = base64.b64decode(backup_output_b64) backup_output = base64.b64decode(backup_output_b64)
header = parse_ab_header(backup_output) header = parse_ab_header(backup_output)
if not header["backup"]: if not header["backup"]:
self.log.error("Extracting SMS via Android backup failed. No valid backup data found.") self.log.error("Extracting SMS via Android backup failed. "
return "No valid backup data found.")
return None
if header["encryption"] == "none": if header["encryption"] == "none":
return parse_backup_file(backup_output, password=None) return parse_backup_file(backup_output, password=None)
for password_retry in range(0, 3): for _ in range(0, 3):
backup_password = Prompt.ask("Enter backup password", password=True) backup_password = Prompt.ask("Enter backup password",
password=True)
try: try:
decrypted_backup_tar = parse_backup_file(backup_output, backup_password) decrypted_backup_tar = parse_backup_file(backup_output,
backup_password)
return decrypted_backup_tar return decrypted_backup_tar
except InvalidBackupPassword: except InvalidBackupPassword:
self.log.error("You provided the wrong password! Please try again...") self.log.error("You provided the wrong password! "
"Please try again...")
self.log.warn("All attempts to decrypt backup with password failed!") self.log.warn("All attempts to decrypt backup with password failed!")
return None
def run(self) -> None: def run(self) -> None:
"""Run the main procedure.""" """Run the main procedure."""
raise NotImplementedError raise NotImplementedError

View File

@@ -6,14 +6,13 @@
import logging import logging
import os import os
import sqlite3 import sqlite3
from typing import Union
from mvt.common.utils import (convert_chrometime_to_unix, from mvt.common.utils import (convert_chrometime_to_datetime,
convert_timestamp_to_iso) convert_datetime_to_iso)
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
CHROME_HISTORY_PATH = "data/data/com.android.chrome/app_chrome/Default/History" CHROME_HISTORY_PATH = "data/data/com.android.chrome/app_chrome/Default/History"
@@ -22,17 +21,19 @@ class ChromeHistory(AndroidExtraction):
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
def serialize(self, record: dict) -> None: def serialize(self, record: dict) -> Union[dict, list]:
return { return {
"timestamp": record["isodate"], "timestamp": record["isodate"],
"module": self.__class__.__name__, "module": self.__class__.__name__,
"event": "visit", "event": "visit",
"data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, redirect source: {record['redirect_source']})" "data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, "
f"redirect source: {record['redirect_source']})"
} }
def check_indicators(self) -> None: def check_indicators(self) -> None:
@@ -69,18 +70,23 @@ class ChromeHistory(AndroidExtraction):
"url": item[1], "url": item[1],
"visit_id": item[2], "visit_id": item[2],
"timestamp": item[3], "timestamp": item[3],
"isodate": convert_timestamp_to_iso(convert_chrometime_to_unix(item[3])), "isodate": convert_datetime_to_iso(convert_chrometime_to_datetime(item[3])),
"redirect_source": item[4], "redirect_source": item[4],
}) })
cur.close() cur.close()
conn.close() conn.close()
log.info("Extracted a total of %d history items", len(self.results)) self.log.info("Extracted a total of %d history items",
len(self.results))
def run(self) -> None: def run(self) -> None:
self._adb_connect()
try: try:
self._adb_process_file(os.path.join("/", CHROME_HISTORY_PATH), self._adb_process_file(os.path.join("/", CHROME_HISTORY_PATH),
self._parse_db) self._parse_db)
except Exception as e: except Exception as exc:
self.log.error(e) self.log.error(exc)
self._adb_disconnect()

View File

@@ -9,15 +9,14 @@ from mvt.android.parsers import parse_dumpsys_accessibility
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
class DumpsysAccessibility(AndroidExtraction): class DumpsysAccessibility(AndroidExtraction):
"""This module extracts stats on accessibility.""" """This module extracts stats on accessibility."""
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
@@ -41,6 +40,8 @@ class DumpsysAccessibility(AndroidExtraction):
self.results = parse_dumpsys_accessibility(output) self.results = parse_dumpsys_accessibility(output)
for result in self.results: for result in self.results:
log.info("Found installed accessibility service \"%s\"", result.get("service")) self.log.info("Found installed accessibility service \"%s\"",
result.get("service"))
self.log.info("Identified a total of %d accessibility services", len(self.results)) self.log.info("Identified a total of %d accessibility services",
len(self.results))

View File

@@ -9,15 +9,14 @@ from mvt.android.parsers import parse_dumpsys_activity_resolver_table
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
class DumpsysActivities(AndroidExtraction): class DumpsysActivities(AndroidExtraction):
"""This module extracts details on receivers for risky activities.""" """This module extracts details on receivers for risky activities."""
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)

View File

@@ -4,13 +4,12 @@
# https://license.mvt.re/1.1/ # https://license.mvt.re/1.1/
import logging import logging
from typing import Union
from mvt.android.parsers.dumpsys import parse_dumpsys_appops from mvt.android.parsers.dumpsys import parse_dumpsys_appops
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
class DumpsysAppOps(AndroidExtraction): class DumpsysAppOps(AndroidExtraction):
"""This module extracts records from App-op Manager.""" """This module extracts records from App-op Manager."""
@@ -19,12 +18,13 @@ class DumpsysAppOps(AndroidExtraction):
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
def serialize(self, record: dict) -> None: def serialize(self, record: dict) -> Union[dict, list]:
records = [] records = []
for perm in record["permissions"]: for perm in record["permissions"]:
if "entries" not in perm: if "entries" not in perm:
@@ -36,7 +36,8 @@ class DumpsysAppOps(AndroidExtraction):
"timestamp": entry["timestamp"], "timestamp": entry["timestamp"],
"module": self.__class__.__name__, "module": self.__class__.__name__,
"event": entry["access"], "event": entry["access"],
"data": f"{record['package_name']} access to {perm['name']}: {entry['access']}", "data": f"{record['package_name']} access to "
f"{perm['name']}: {entry['access']}",
}) })
return records return records
@@ -51,9 +52,10 @@ class DumpsysAppOps(AndroidExtraction):
continue continue
for perm in result["permissions"]: for perm in result["permissions"]:
if perm["name"] == "REQUEST_INSTALL_PACKAGES" and perm["access"] == "allow": if (perm["name"] == "REQUEST_INSTALL_PACKAGES"
self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission", and perm["access"] == "allow"):
result["package_name"]) self.log.info("Package %s with REQUEST_INSTALL_PACKAGES "
"permission", result["package_name"])
def run(self) -> None: def run(self) -> None:
self._adb_connect() self._adb_connect()

View File

@@ -4,30 +4,31 @@
# https://license.mvt.re/1.1/ # https://license.mvt.re/1.1/
import logging import logging
from typing import Union
from mvt.android.parsers import parse_dumpsys_battery_daily from mvt.android.parsers import parse_dumpsys_battery_daily
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
class DumpsysBatteryDaily(AndroidExtraction): class DumpsysBatteryDaily(AndroidExtraction):
"""This module extracts records from battery daily updates.""" """This module extracts records from battery daily updates."""
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
def serialize(self, record: dict) -> None: def serialize(self, record: dict) -> Union[dict, list]:
return { return {
"timestamp": record["from"], "timestamp": record["from"],
"module": self.__class__.__name__, "module": self.__class__.__name__,
"event": "battery_daily", "event": "battery_daily",
"data": f"Recorded update of package {record['package_name']} with vers {record['vers']}" "data": f"Recorded update of package {record['package_name']} "
f"with vers {record['vers']}"
} }
def check_indicators(self) -> None: def check_indicators(self) -> None:

View File

@@ -9,15 +9,14 @@ from mvt.android.parsers import parse_dumpsys_battery_history
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
class DumpsysBatteryHistory(AndroidExtraction): class DumpsysBatteryHistory(AndroidExtraction):
"""This module extracts records from battery history events.""" """This module extracts records from battery history events."""
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
@@ -40,4 +39,5 @@ class DumpsysBatteryHistory(AndroidExtraction):
self.results = parse_dumpsys_battery_history(output) self.results = parse_dumpsys_battery_history(output)
self.log.info("Extracted %d records from battery history", len(self.results)) self.log.info("Extracted %d records from battery history",
len(self.results))

View File

@@ -9,8 +9,6 @@ from mvt.android.parsers import parse_dumpsys_dbinfo
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
class DumpsysDBInfo(AndroidExtraction): class DumpsysDBInfo(AndroidExtraction):
"""This module extracts records from battery daily updates.""" """This module extracts records from battery daily updates."""
@@ -19,7 +17,8 @@ class DumpsysDBInfo(AndroidExtraction):
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)

View File

@@ -8,15 +8,14 @@ import os
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
class DumpsysFull(AndroidExtraction): class DumpsysFull(AndroidExtraction):
"""This module extracts stats on battery consumption by processes.""" """This module extracts stats on battery consumption by processes."""
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
@@ -30,6 +29,6 @@ class DumpsysFull(AndroidExtraction):
with open(output_path, "w", encoding="utf-8") as handle: with open(output_path, "w", encoding="utf-8") as handle:
handle.write(output) handle.write(output)
log.info("Full dumpsys output stored at %s", output_path) self.log.info("Full dumpsys output stored at %s", output_path)
self._adb_disconnect() self._adb_disconnect()

View File

@@ -9,8 +9,6 @@ from mvt.android.parsers import parse_dumpsys_receiver_resolver_table
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
INTENT_NEW_OUTGOING_SMS = "android.provider.Telephony.NEW_OUTGOING_SMS" INTENT_NEW_OUTGOING_SMS = "android.provider.Telephony.NEW_OUTGOING_SMS"
INTENT_SMS_RECEIVED = "android.provider.Telephony.SMS_RECEIVED" INTENT_SMS_RECEIVED = "android.provider.Telephony.SMS_RECEIVED"
INTENT_DATA_SMS_RECEIVED = "android.intent.action.DATA_SMS_RECEIVED" INTENT_DATA_SMS_RECEIVED = "android.intent.action.DATA_SMS_RECEIVED"
@@ -23,7 +21,8 @@ class DumpsysReceivers(AndroidExtraction):
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
@@ -37,26 +36,31 @@ class DumpsysReceivers(AndroidExtraction):
for intent, receivers in self.results.items(): for intent, receivers in self.results.items():
for receiver in receivers: for receiver in receivers:
if intent == INTENT_NEW_OUTGOING_SMS: if intent == INTENT_NEW_OUTGOING_SMS:
self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"", self.log.info("Found a receiver to intercept "
"outgoing SMS messages: \"%s\"",
receiver["receiver"]) receiver["receiver"])
elif intent == INTENT_SMS_RECEIVED: elif intent == INTENT_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"", self.log.info("Found a receiver to intercept "
"incoming SMS messages: \"%s\"",
receiver["receiver"]) receiver["receiver"])
elif intent == INTENT_DATA_SMS_RECEIVED: elif intent == INTENT_DATA_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"", self.log.info("Found a receiver to intercept "
"incoming data SMS message: \"%s\"",
receiver["receiver"]) receiver["receiver"])
elif intent == INTENT_PHONE_STATE: elif intent == INTENT_PHONE_STATE:
self.log.info("Found a receiver monitoring telephony state/incoming calls: \"%s\"", self.log.info("Found a receiver monitoring "
"telephony state/incoming calls: \"%s\"",
receiver["receiver"]) receiver["receiver"])
elif intent == INTENT_NEW_OUTGOING_CALL: elif intent == INTENT_NEW_OUTGOING_CALL:
self.log.info("Found a receiver monitoring outgoing calls: \"%s\"", self.log.info("Found a receiver monitoring "
"outgoing calls: \"%s\"",
receiver["receiver"]) receiver["receiver"])
ioc = self.indicators.check_app_id(receiver["package_name"]) ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc: if ioc:
receiver["matched_indicator"] = ioc receiver["matched_indicator"] = ioc
self.detected.append({intent: receiver}) self.detected.append({intent: receiver})
continue continue
def run(self) -> None: def run(self) -> None:
self._adb_connect() self._adb_connect()

View File

@@ -3,17 +3,15 @@
# Use of this software is governed by the MVT License 1.1 that can be found at # Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/ # https://license.mvt.re/1.1/
import datetime
import logging import logging
import os import os
import stat import stat
from typing import Union
from mvt.common.utils import convert_timestamp_to_iso from mvt.common.utils import convert_unix_to_iso
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
ANDROID_TMP_FOLDERS = [ ANDROID_TMP_FOLDERS = [
"/tmp/", "/tmp/",
"/data/local/tmp/", "/data/local/tmp/",
@@ -29,13 +27,14 @@ class Files(AndroidExtraction):
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
self.full_find = False self.full_find = False
def serialize(self, record: dict) -> None: def serialize(self, record: dict) -> Union[dict, list]:
if "modified_time" in record: if "modified_time" in record:
return { return {
"timestamp": record["modified_time"], "timestamp": record["modified_time"],
@@ -44,14 +43,17 @@ class Files(AndroidExtraction):
"data": record["path"], "data": record["path"],
} }
return None
def check_indicators(self) -> None: def check_indicators(self) -> None:
for result in self.results: for result in self.results:
if result.get("is_suid"): if result.get("is_suid"):
self.log.warning("Found an SUID file in a non-standard directory \"%s\".", self.log.warning("Found an SUID file in a non-standard "
result["path"]) "directory \"%s\".", result["path"])
if self.indicators and self.indicators.check_file_path(result["path"]): if self.indicators and self.indicators.check_file_path(result["path"]):
self.log.warning("Found a known suspicous file at path: \"%s\"", result["path"]) self.log.warning("Found a known suspicous file at path: \"%s\"",
result["path"])
self.detected.append(result) self.detected.append(result)
def backup_file(self, file_path: str) -> None: def backup_file(self, file_path: str) -> None:
@@ -73,11 +75,13 @@ class Files(AndroidExtraction):
def find_files(self, folder: str) -> None: def find_files(self, folder: str) -> None:
if self.full_find: if self.full_find:
output = self._adb_command(f"find '{folder}' -type f -printf '%T@ %m %s %u %g %p\n' 2> /dev/null") cmd = f"find '{folder}' -type f -printf '%T@ %m %s %u %g %p\n' 2> /dev/null"
output = self._adb_command(cmd)
for file_line in output.splitlines(): for file_line in output.splitlines():
[unix_timestamp, mode, size, owner, group, full_path] = file_line.rstrip().split(" ", 5) [unix_timestamp, mode, size,
mod_time = convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(float(unix_timestamp)))) owner, group, full_path] = file_line.rstrip().split(" ", 5)
mod_time = convert_unix_to_iso(unix_timestamp)
self.results.append({ self.results.append({
"path": full_path, "path": full_path,
@@ -97,7 +101,8 @@ class Files(AndroidExtraction):
def run(self) -> None: def run(self) -> None:
self._adb_connect() self._adb_connect()
output = self._adb_command("find '/' -maxdepth 1 -printf '%T@ %m %s %u %g %p\n' 2> /dev/null") cmd = "find '/' -maxdepth 1 -printf '%T@ %m %s %u %g %p\n' 2> /dev/null"
output = self._adb_command(cmd)
if output or output.strip().splitlines(): if output or output.strip().splitlines():
self.full_find = True self.full_find = True
@@ -119,7 +124,8 @@ class Files(AndroidExtraction):
if self.fast_mode: if self.fast_mode:
self.log.info("Flag --fast was enabled: skipping full file listing") self.log.info("Flag --fast was enabled: skipping full file listing")
else: else:
self.log.info("Processing full file listing. This may take a while...") self.log.info("Processing full file listing. "
"This may take a while...")
self.find_files("/") self.find_files("/")
self.log.info("Found %s total files", len(self.results)) self.log.info("Found %s total files", len(self.results))

View File

@@ -10,15 +10,14 @@ from mvt.android.parsers import parse_getprop
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
class Getprop(AndroidExtraction): class Getprop(AndroidExtraction):
"""This module extracts device properties from getprop command.""" """This module extracts device properties from getprop command."""
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
@@ -37,7 +36,9 @@ class Getprop(AndroidExtraction):
if security_patch: if security_patch:
patch_date = datetime.strptime(security_patch, "%Y-%m-%d") patch_date = datetime.strptime(security_patch, "%Y-%m-%d")
if (datetime.now() - patch_date) > timedelta(days=6*30): if (datetime.now() - patch_date) > timedelta(days=6*30):
self.log.warning("This phone has not received security updates for more than " self.log.warning("This phone has not received security updates "
"six months (last update: %s)", security_patch) "for more than six months (last update: %s)",
security_patch)
self.log.info("Extracted %d Android system properties", len(self.results)) self.log.info("Extracted %d Android system properties",
len(self.results))

View File

@@ -8,15 +8,14 @@ import os
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
class Logcat(AndroidExtraction): class Logcat(AndroidExtraction):
"""This module extracts details on installed packages.""" """This module extracts details on installed packages."""
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
@@ -35,15 +34,15 @@ class Logcat(AndroidExtraction):
with open(logcat_path, "w", encoding="utf-8") as handle: with open(logcat_path, "w", encoding="utf-8") as handle:
handle.write(output) handle.write(output)
log.info("Current logcat logs stored at %s", self.log.info("Current logcat logs stored at %s",
logcat_path) logcat_path)
logcat_last_path = os.path.join(self.results_path, logcat_last_path = os.path.join(self.results_path,
"logcat_last.txt") "logcat_last.txt")
with open(logcat_last_path, "w", encoding="utf-8") as handle: with open(logcat_last_path, "w", encoding="utf-8") as handle:
handle.write(last_output) handle.write(last_output)
log.info("Logcat logs prior to last reboot stored at %s", self.log.info("Logcat logs prior to last reboot stored at %s",
logcat_last_path) logcat_last_path)
self._adb_disconnect() self._adb_disconnect()

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/ # https://license.mvt.re/1.1/
import logging import logging
from typing import Union
from rich.console import Console from rich.console import Console
from rich.progress import track from rich.progress import track
@@ -14,8 +15,6 @@ from mvt.common.virustotal import VTNoKey, VTQuotaExceeded, virustotal_lookup
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
DANGEROUS_PERMISSIONS_THRESHOLD = 10 DANGEROUS_PERMISSIONS_THRESHOLD = 10
DANGEROUS_PERMISSIONS = [ DANGEROUS_PERMISSIONS = [
"android.permission.ACCESS_COARSE_LOCATION", "android.permission.ACCESS_COARSE_LOCATION",
@@ -74,26 +73,37 @@ class Packages(AndroidExtraction):
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
def serialize(self, record: dict) -> None: def serialize(self, record: dict) -> Union[dict, list]:
records = [] records = []
timestamps = [ timestamps = [
{"event": "package_install", "timestamp": record["timestamp"]}, {
{"event": "package_first_install", "timestamp": record["first_install_time"]}, "event": "package_install",
{"event": "package_last_update", "timestamp": record["last_update_time"]}, "timestamp": record["timestamp"]
},
{
"event": "package_first_install",
"timestamp": record["first_install_time"]
},
{
"event": "package_last_update",
"timestamp": record["last_update_time"]
},
] ]
for ts in timestamps: for timestamp in timestamps:
records.append({ records.append({
"timestamp": ts["timestamp"], "timestamp": timestamp["timestamp"],
"module": self.__class__.__name__, "module": self.__class__.__name__,
"event": ts["event"], "event": timestamp["event"],
"data": f"{record['package_name']} (system: {record['system']}, third party: {record['third_party']})", "data": f"{record['package_name']} (system: {record['system']},"
f" third party: {record['third_party']})",
}) })
return records return records
@@ -101,7 +111,8 @@ class Packages(AndroidExtraction):
def check_indicators(self) -> None: def check_indicators(self) -> None:
for result in self.results: for result in self.results:
if result["package_name"] in ROOT_PACKAGES: if result["package_name"] in ROOT_PACKAGES:
self.log.warning("Found an installed package related to rooting/jailbreaking: \"%s\"", self.log.warning("Found an installed package related to "
"rooting/jailbreaking: \"%s\"",
result["package_name"]) result["package_name"])
self.detected.append(result) self.detected.append(result)
continue continue
@@ -132,14 +143,14 @@ class Packages(AndroidExtraction):
total_hashes = len(hashes) total_hashes = len(hashes)
detections = {} detections = {}
for i in track(range(total_hashes), description=f"Looking up {total_hashes} files..."): progress_desc = f"Looking up {total_hashes} files..."
for i in track(range(total_hashes), description=progress_desc):
try: try:
results = virustotal_lookup(hashes[i]) results = virustotal_lookup(hashes[i])
except VTNoKey as e: except VTNoKey:
log.info(e)
return return
except VTQuotaExceeded as e: except VTQuotaExceeded as exc:
log.error("Unable to continue: %s", e) print("Unable to continue: %s", exc)
break break
if not results: if not results:
@@ -224,10 +235,10 @@ class Packages(AndroidExtraction):
for file_path in output.splitlines(): for file_path in output.splitlines():
file_path = file_path.strip() file_path = file_path.strip()
md5 = self._adb_command(f"md5sum {file_path}").split(" ")[0] md5 = self._adb_command(f"md5sum {file_path}").split(" ", maxsplit=1)[0]
sha1 = self._adb_command(f"sha1sum {file_path}").split(" ")[0] sha1 = self._adb_command(f"sha1sum {file_path}").split(" ", maxsplit=1)[0]
sha256 = self._adb_command(f"sha256sum {file_path}").split(" ")[0] sha256 = self._adb_command(f"sha256sum {file_path}").split(" ", maxsplit=1)[0]
sha512 = self._adb_command(f"sha512sum {file_path}").split(" ")[0] sha512 = self._adb_command(f"sha512sum {file_path}").split(" ", maxsplit=1)[0]
package_files.append({ package_files.append({
"path": file_path, "path": file_path,
@@ -304,8 +315,10 @@ class Packages(AndroidExtraction):
dangerous_permissions_count += 1 dangerous_permissions_count += 1
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD: if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
self.log.info("Third-party package \"%s\" requested %d potentially dangerous permissions", self.log.info("Third-party package \"%s\" requested %d "
result["package_name"], dangerous_permissions_count) "potentially dangerous permissions",
result["package_name"],
dangerous_permissions_count)
packages_to_lookup = [] packages_to_lookup = []
for result in self.results: for result in self.results:
@@ -313,8 +326,9 @@ class Packages(AndroidExtraction):
continue continue
packages_to_lookup.append(result) packages_to_lookup.append(result)
self.log.info("Found non-system package with name \"%s\" installed by \"%s\" on %s", self.log.info("Found non-system package with name \"%s\" installed "
result["package_name"], result["installer"], result["timestamp"]) "by \"%s\" on %s", result["package_name"],
result["installer"], result["timestamp"])
if not self.fast_mode: if not self.fast_mode:
self.check_virustotal(packages_to_lookup) self.check_virustotal(packages_to_lookup)

View File

@@ -7,15 +7,14 @@ import logging
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
class Processes(AndroidExtraction): class Processes(AndroidExtraction):
"""This module extracts details on running processes.""" """This module extracts details on running processes."""
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
@@ -63,4 +62,5 @@ class Processes(AndroidExtraction):
self._adb_disconnect() self._adb_disconnect()
log.info("Extracted records on a total of %d processes", len(self.results)) self.log.info("Extracted records on a total of %d processes",
len(self.results))

View File

@@ -7,15 +7,14 @@ import logging
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
class RootBinaries(AndroidExtraction): class RootBinaries(AndroidExtraction):
"""This module extracts the list of installed packages.""" """This module extracts the list of installed packages."""
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)

View File

@@ -7,8 +7,6 @@ import logging
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
class SELinuxStatus(AndroidExtraction): class SELinuxStatus(AndroidExtraction):
"""This module checks if SELinux is being enforced.""" """This module checks if SELinux is being enforced."""
@@ -17,7 +15,8 @@ class SELinuxStatus(AndroidExtraction):
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)

View File

@@ -7,9 +7,6 @@ import logging
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
ANDROID_DANGEROUS_SETTINGS = [ ANDROID_DANGEROUS_SETTINGS = [
{ {
"description": "disabled Google Play Services apps verification", "description": "disabled Google Play Services apps verification",
@@ -51,6 +48,11 @@ ANDROID_DANGEROUS_SETTINGS = [
"key": "send_action_app_error", "key": "send_action_app_error",
"safe_value": "1", "safe_value": "1",
}, },
{
"description": "enabled installation of non Google Play apps",
"key": "install_non_market_apps",
"safe_value": "0",
}
] ]
@@ -59,7 +61,8 @@ class Settings(AndroidExtraction):
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
@@ -67,7 +70,7 @@ class Settings(AndroidExtraction):
self.results = {} if not results else results self.results = {} if not results else results
def check_indicators(self) -> None: def check_indicators(self) -> None:
for namespace, settings in self.results.items(): for _, settings in self.results.items():
for key, value in settings.items(): for key, value in settings.items():
for danger in ANDROID_DANGEROUS_SETTINGS: for danger in ANDROID_DANGEROUS_SETTINGS:
# Check if one of the dangerous settings is using an unsafe # Check if one of the dangerous settings is using an unsafe

View File

@@ -6,16 +6,15 @@
import logging import logging
import os import os
import sqlite3 import sqlite3
from typing import Union
from mvt.android.parsers.backup import (AndroidBackupParsingError, from mvt.android.parsers.backup import (AndroidBackupParsingError,
parse_tar_for_sms) parse_tar_for_sms)
from mvt.common.module import InsufficientPrivileges from mvt.common.module import InsufficientPrivileges
from mvt.common.utils import check_for_links, convert_timestamp_to_iso from mvt.common.utils import check_for_links, convert_unix_to_iso
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
SMS_BUGLE_PATH = "data/data/com.google.android.apps.messaging/databases/bugle_db" SMS_BUGLE_PATH = "data/data/com.google.android.apps.messaging/databases/bugle_db"
SMS_BUGLE_QUERY = """ SMS_BUGLE_QUERY = """
SELECT SELECT
@@ -48,18 +47,21 @@ class SMS(AndroidExtraction):
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
def serialize(self, record: dict) -> None: self.sms_db_type = 0
def serialize(self, record: dict) -> Union[dict, list]:
body = record["body"].replace("\n", "\\n") body = record["body"].replace("\n", "\\n")
return { return {
"timestamp": record["isodate"], "timestamp": record["isodate"],
"module": self.__class__.__name__, "module": self.__class__.__name__,
"event": f"sms_{record['direction']}", "event": f"sms_{record['direction']}",
"data": f"{record['address']}: \"{body}\"" "data": f"{record.get('address', 'unknown source')}: \"{body}\""
} }
def check_indicators(self) -> None: def check_indicators(self) -> None:
@@ -70,7 +72,7 @@ class SMS(AndroidExtraction):
if "body" not in message: if "body" not in message:
continue continue
# FIXME: check links exported from the body previously # TODO: check links exported from the body previously.
message_links = check_for_links(message["body"]) message_links = check_for_links(message["body"])
if self.indicators.check_domains(message_links): if self.indicators.check_domains(message_links):
self.detected.append(message) self.detected.append(message)
@@ -84,9 +86,9 @@ class SMS(AndroidExtraction):
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
cur = conn.cursor() cur = conn.cursor()
if self.SMS_DB_TYPE == 1: if self.sms_db_type == 1:
cur.execute(SMS_BUGLE_QUERY) cur.execute(SMS_BUGLE_QUERY)
elif self.SMS_DB_TYPE == 2: elif self.sms_db_type == 2:
cur.execute(SMS_MMSMS_QUERY) cur.execute(SMS_MMSMS_QUERY)
names = [description[0] for description in cur.description] names = [description[0] for description in cur.description]
@@ -97,7 +99,7 @@ class SMS(AndroidExtraction):
message[names[index]] = value message[names[index]] = value
message["direction"] = ("received" if message["incoming"] == 1 else "sent") message["direction"] = ("received" if message["incoming"] == 1 else "sent")
message["isodate"] = convert_timestamp_to_iso(message["timestamp"]) message["isodate"] = convert_unix_to_iso(message["timestamp"])
# If we find links in the messages or if they are empty we add # If we find links in the messages or if they are empty we add
# them to the list of results. # them to the list of results.
@@ -107,13 +109,16 @@ class SMS(AndroidExtraction):
cur.close() cur.close()
conn.close() conn.close()
log.info("Extracted a total of %d SMS messages containing links", len(self.results)) self.log.info("Extracted a total of %d SMS messages containing links",
len(self.results))
def _extract_sms_adb(self) -> None: def _extract_sms_adb(self) -> None:
"""Use the Android backup command to extract SMS data from the native SMS app """Use the Android backup command to extract SMS data from the native
SMS app.
It is crucial to use the under-documented "-nocompress" flag to disable the non-standard Java compression It is crucial to use the under-documented "-nocompress" flag to disable
algorithim. This module only supports an unencrypted ADB backup. the non-standard Java compression algorithm. This module only supports
an unencrypted ADB backup.
""" """
backup_tar = self._generate_backup("com.android.providers.telephony") backup_tar = self._generate_backup("com.android.providers.telephony")
if not backup_tar: if not backup_tar:
@@ -122,22 +127,34 @@ class SMS(AndroidExtraction):
try: try:
self.results = parse_tar_for_sms(backup_tar) self.results = parse_tar_for_sms(backup_tar)
except AndroidBackupParsingError: except AndroidBackupParsingError:
self.log.info("Impossible to read SMS from the Android Backup, please extract the SMS and try extracting it with Android Backup Extractor") self.log.info("Impossible to read SMS from the Android Backup, "
"please extract the SMS and try extracting it with "
"Android Backup Extractor")
return return
log.info("Extracted a total of %d SMS messages containing links", len(self.results)) self.log.info("Extracted a total of %d SMS messages containing links",
len(self.results))
def run(self) -> None: def run(self) -> None:
self._adb_connect()
try: try:
if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))): if self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH)):
self.SMS_DB_TYPE = 1 self.sms_db_type = 1
self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), self._parse_db) self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH),
elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))): self._parse_db)
self.SMS_DB_TYPE = 2 elif self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH)):
self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), self._parse_db) self.sms_db_type = 2
self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH),
self._parse_db)
self._adb_disconnect()
return return
except InsufficientPrivileges: except InsufficientPrivileges:
pass pass
self.log.warn("No SMS database found. Trying extraction of SMS data using Android backup feature.") self.log.warn("No SMS database found. Trying extraction of SMS data "
"using Android backup feature.")
self._extract_sms_adb() self._extract_sms_adb()
self._adb_disconnect()

View File

@@ -7,13 +7,12 @@ import base64
import logging import logging
import os import os
import sqlite3 import sqlite3
from typing import Union
from mvt.common.utils import check_for_links, convert_timestamp_to_iso from mvt.common.utils import check_for_links, convert_unix_to_iso
from .base import AndroidExtraction from .base import AndroidExtraction
log = logging.getLogger(__name__)
WHATSAPP_PATH = "data/data/com.whatsapp/databases/msgstore.db" WHATSAPP_PATH = "data/data/com.whatsapp/databases/msgstore.db"
@@ -22,12 +21,13 @@ class Whatsapp(AndroidExtraction):
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
def serialize(self, record: dict) -> None: def serialize(self, record: dict) -> Union[dict, list]:
text = record["data"].replace("\n", "\\n") text = record["data"].replace("\n", "\\n")
return { return {
"timestamp": record["isodate"], "timestamp": record["isodate"],
@@ -71,22 +71,30 @@ class Whatsapp(AndroidExtraction):
continue continue
message["direction"] = ("send" if message["key_from_me"] == 1 else "received") message["direction"] = ("send" if message["key_from_me"] == 1 else "received")
message["isodate"] = convert_timestamp_to_iso(message["timestamp"]) message["isodate"] = convert_unix_to_iso(message["timestamp"])
# If we find links in the messages or if they are empty we add them to the list. # If we find links in the messages or if they are empty we add them
# to the list.
if check_for_links(message["data"]) or message["data"].strip() == "": if check_for_links(message["data"]) or message["data"].strip() == "":
if (message.get('thumb_image') is not None): if message.get("thumb_image"):
message['thumb_image'] = base64.b64encode(message['thumb_image']) message["thumb_image"] = base64.b64encode(message["thumb_image"])
messages.append(message) messages.append(message)
cur.close() cur.close()
conn.close() conn.close()
log.info("Extracted a total of %d WhatsApp messages containing links", len(messages)) self.log.info("Extracted a total of %d WhatsApp messages "
"containing links", len(messages))
self.results = messages self.results = messages
def run(self) -> None: def run(self) -> None:
self._adb_connect()
try: try:
self._adb_process_file(os.path.join("/", WHATSAPP_PATH), self._parse_db) self._adb_process_file(os.path.join("/", WHATSAPP_PATH),
except Exception as e: self._parse_db)
self.log.error(e) except Exception as exc:
self.log.error(exc)
self._adb_disconnect()

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/ # https://license.mvt.re/1.1/
import fnmatch import fnmatch
import logging
import os import os
from tarfile import TarFile from tarfile import TarFile
@@ -12,7 +13,19 @@ from mvt.common.module import MVTModule
class BackupExtraction(MVTModule): class BackupExtraction(MVTModule):
"""This class provides a base for all backup extractios modules""" """This class provides a base for all backup extractios modules"""
ab = None
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.ab = None
self.backup_path = None
self.tar = None
self.files = []
def from_folder(self, backup_path: str, files: list) -> None: def from_folder(self, backup_path: str, files: list) -> None:
""" """

View File

@@ -12,7 +12,8 @@ from mvt.android.parsers.backup import parse_sms_file
class SMS(BackupExtraction): class SMS(BackupExtraction):
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
@@ -30,15 +31,17 @@ class SMS(BackupExtraction):
self.detected.append(message) self.detected.append(message)
def run(self) -> None: def run(self) -> None:
for file in self._get_files_by_pattern("apps/com.android.providers.telephony/d_f/*_sms_backup"): sms_path = "apps/com.android.providers.telephony/d_f/*_sms_backup"
for file in self._get_files_by_pattern(sms_path):
self.log.info("Processing SMS backup file at %s", file) self.log.info("Processing SMS backup file at %s", file)
data = self._get_file_content(file) data = self._get_file_content(file)
self.results.extend(parse_sms_file(data)) self.results.extend(parse_sms_file(data))
for file in self._get_files_by_pattern("apps/com.android.providers.telephony/d_f/*_mms_backup"): mms_path = "apps/com.android.providers.telephony/d_f/*_mms_backup"
for file in self._get_files_by_pattern(mms_path):
self.log.info("Processing MMS backup file at %s", file) self.log.info("Processing MMS backup file at %s", file)
data = self._get_file_content(file) data = self._get_file_content(file)
self.results.extend(parse_sms_file(data)) self.results.extend(parse_sms_file(data))
self.log.info("Extracted a total of %d SMS & MMS messages containing links", self.log.info("Extracted a total of %d SMS & MMS messages "
len(self.results)) "containing links", len(self.results))

View File

@@ -9,15 +9,14 @@ from mvt.android.parsers import parse_dumpsys_accessibility
from .base import BugReportModule from .base import BugReportModule
log = logging.getLogger(__name__)
class Accessibility(BugReportModule): class Accessibility(BugReportModule):
"""This module extracts stats on accessibility.""" """This module extracts stats on accessibility."""
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
@@ -36,7 +35,8 @@ class Accessibility(BugReportModule):
def run(self) -> None: def run(self) -> None:
content = self._get_dumpstate_file() content = self._get_dumpstate_file()
if not content: if not content:
self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") self.log.error("Unable to find dumpstate file. Did you provide a "
"valid bug report archive?")
return return
lines = [] lines = []
@@ -56,6 +56,8 @@ class Accessibility(BugReportModule):
self.results = parse_dumpsys_accessibility("\n".join(lines)) self.results = parse_dumpsys_accessibility("\n".join(lines))
for result in self.results: for result in self.results:
log.info("Found installed accessibility service \"%s\"", result.get("service")) self.log.info("Found installed accessibility service \"%s\"",
result.get("service"))
self.log.info("Identified a total of %d accessibility services", len(self.results)) self.log.info("Identified a total of %d accessibility services",
len(self.results))

View File

@@ -9,15 +9,14 @@ from mvt.android.parsers import parse_dumpsys_activity_resolver_table
from .base import BugReportModule from .base import BugReportModule
log = logging.getLogger(__name__)
class Activities(BugReportModule): class Activities(BugReportModule):
"""This module extracts details on receivers for risky activities.""" """This module extracts details on receivers for risky activities."""
def __init__(self, file_path: str = None, target_path: str = None, def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False, results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None: log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path, super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode, results_path=results_path, fast_mode=fast_mode,
log=log, results=results) log=log, results=results)
@@ -39,7 +38,8 @@ class Activities(BugReportModule):
def run(self) -> None: def run(self) -> None:
content = self._get_dumpstate_file() content = self._get_dumpstate_file()
if not content: if not content:
self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") self.log.error("Unable to find dumpstate file. Did you provide a "
"valid bug report archive?")
return return
lines = [] lines = []

Some files were not shown because too many files have changed in this diff Show More