mirror of
https://github.com/mvt-project/mvt
synced 2025-10-21 22:42:15 +02:00
Compare commits
74 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4e0a393a02 | ||
|
|
c3dc4174fc | ||
|
|
e1d1b6c5de | ||
|
|
d0a893841b | ||
|
|
d4e99661c7 | ||
|
|
6a00d3a14d | ||
|
|
a863209abb | ||
|
|
4c7db02da4 | ||
|
|
92dfefbdeb | ||
|
|
8988adcf77 | ||
|
|
91667b0ded | ||
|
|
2365175dbd | ||
|
|
528d43b914 | ||
|
|
f952ba5119 | ||
|
|
b4ed2c6ed4 | ||
|
|
3eed1d6edf | ||
|
|
83ef545cd1 | ||
|
|
5d4fbec62b | ||
|
|
fa7d6166f4 | ||
|
|
429b223555 | ||
|
|
e4b9a9652a | ||
|
|
134581c000 | ||
|
|
5356a399c9 | ||
|
|
e0f563596d | ||
|
|
ea5de0203a | ||
|
|
ace965ee8a | ||
|
|
ad8f455209 | ||
|
|
ae67b41374 | ||
|
|
5fe88098b9 | ||
|
|
d578c240f9 | ||
|
|
427a29c2b6 | ||
|
|
5e6f6faa9c | ||
|
|
74a3ecaa4e | ||
|
|
f536af1124 | ||
|
|
631354c131 | ||
|
|
7ad7782b51 | ||
|
|
f04f91e1e3 | ||
|
|
6936908f86 | ||
|
|
f3e5763c6a | ||
|
|
f438f7b1fb | ||
|
|
66a157868f | ||
|
|
a966b694ea | ||
|
|
c9dd3af278 | ||
|
|
82a60ee07c | ||
|
|
8bc5113bd2 | ||
|
|
00d82f7f00 | ||
|
|
2781f33fb5 | ||
|
|
271fe5fbee | ||
|
|
0f503f72b5 | ||
|
|
424b86a261 | ||
|
|
1fe595f4cc | ||
|
|
b8c59f1183 | ||
|
|
a935347aed | ||
|
|
661d0a8669 | ||
|
|
63ff5fd334 | ||
|
|
146b9245ab | ||
|
|
99d33922be | ||
|
|
c42634af3f | ||
|
|
6cb59cc3ab | ||
|
|
e0481686b7 | ||
|
|
804ade3a40 | ||
|
|
c5ccaef0c4 | ||
|
|
c4416d406a | ||
|
|
6b8a23ae10 | ||
|
|
872d5d766e | ||
|
|
f5abd0719c | ||
|
|
6462ffc15d | ||
|
|
6333cafd38 | ||
|
|
03c59811a3 | ||
|
|
cfd3b5bbcb | ||
|
|
97ab67240f | ||
|
|
7fc664185c | ||
|
|
93094367c7 | ||
|
|
e8fa9c6eea |
19
CONTRIBUTING.md
Normal file
19
CONTRIBUTING.md
Normal file
@@ -0,0 +1,19 @@
|
||||
# Contributing
|
||||
|
||||
Thank you for your interest in contributing to Mobile Verification Toolkit (MVT)! Your help is very much appreciated.
|
||||
|
||||
|
||||
## Where to start
|
||||
|
||||
Starting to contribute to a somewhat complex project like MVT might seem intimidating. Unless you have specific ideas of new functionality you would like to submit, some good starting points are searching for `TODO:` and `FIXME:` comments throughout the code. Alternatively you can check if any GitHub issues existed marked with the ["help wanted"](https://github.com/mvt-project/mvt/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22) tag.
|
||||
|
||||
|
||||
## Code style
|
||||
|
||||
When contributing code to
|
||||
|
||||
- **Indentation**: we use 4-spaces tabs.
|
||||
|
||||
- **Quotes**: we use double quotes (`"`) as a default. Single quotes (`'`) can be favored with nested strings instead of escaping (`\"`), or when using f-formatting.
|
||||
|
||||
- **Maximum line length**: we strongly encourage to respect a 80 characters long lines and to follow [PEP8 indentation guidelines](https://peps.python.org/pep-0008/#indentation) when having to wrap. However, if breaking at 80 is not possible or is detrimental to the readability of the code, exceptions are tolerated. For example, long log lines, or long strings can be extended to 100 characters long. Please hard wrap anything beyond 100 characters.
|
||||
3
Makefile
3
Makefile
@@ -11,3 +11,6 @@ upload:
|
||||
|
||||
test-upload:
|
||||
python3 -m twine upload --repository testpypi dist/*
|
||||
|
||||
pylint:
|
||||
pylint --rcfile=setup.cfg mvt
|
||||
|
||||
5
SECURITY.md
Normal file
5
SECURITY.md
Normal file
@@ -0,0 +1,5 @@
|
||||
# Reporting security issues
|
||||
|
||||
Thank you for your interest in reporting security issues and vulnerabilities! Security research is of utmost importance and we take all reports seriously. If you discover an issue please report it to us right away!
|
||||
|
||||
Please DO NOT file a public issue, instead send your report privately to *nex [at] nex [dot] sx*. You can also write PGP-encrypted emails to [this key](https://keybase.io/nex/pgp_keys.asc?fingerprint=05216f3b86848a303c2fe37dd166f1667359d880).
|
||||
@@ -3,10 +3,10 @@
|
||||
If you have correctly [installed libimobiledevice](../install.md) you can easily generate an iTunes backup using the `idevicebackup2` tool included in the suite. First, you might want to ensure that backup encryption is enabled (**note: encrypted backup contain more data than unencrypted backups**):
|
||||
|
||||
```bash
|
||||
idevicebackup2 -i backup encryption on
|
||||
idevicebackup2 -i encryption on
|
||||
```
|
||||
|
||||
Note that if a backup password was previously set on this device, you might need to use the same or change it. You can try changing password using `idevicebackup2 -i backup changepw`, or by turning off encryption (`idevicebackup2 -i backup encryption off`) and turning it back on again.
|
||||
Note that if a backup password was previously set on this device, you might need to use the same or change it. You can try changing password using `idevicebackup2 -i changepw`, or by turning off encryption (`idevicebackup2 -i encryption off`) and turning it back on again.
|
||||
|
||||
If you are not able to recover or change the password, you should try to disable encryption and obtain an unencrypted backup.
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ from mvt.common.logo import logo
|
||||
from mvt.common.updates import IndicatorsUpdates
|
||||
|
||||
from .cmd_check_adb import CmdAndroidCheckADB
|
||||
from .cmd_check_androidqf import CmdAndroidCheckAndroidQF
|
||||
from .cmd_check_backup import CmdAndroidCheckBackup
|
||||
from .cmd_check_bugreport import CmdAndroidCheckBugreport
|
||||
from .cmd_download_apks import DownloadAPKs
|
||||
@@ -58,14 +59,16 @@ def version():
|
||||
@click.option("--output", "-o", type=click.Path(exists=False),
|
||||
help="Specify a path to a folder where you want to store the APKs")
|
||||
@click.option("--from-file", "-f", type=click.Path(exists=True),
|
||||
help="Instead of acquiring from phone, load an existing packages.json file for lookups (mainly for debug purposes)")
|
||||
help="Instead of acquiring from phone, load an existing packages.json file for "
|
||||
"lookups (mainly for debug purposes)")
|
||||
@click.pass_context
|
||||
def download_apks(ctx, all_apks, virustotal, output, from_file, serial):
|
||||
try:
|
||||
if from_file:
|
||||
download = DownloadAPKs.from_json(from_file)
|
||||
else:
|
||||
# TODO: Do we actually want to be able to run without storing any file?
|
||||
# TODO: Do we actually want to be able to run without storing any
|
||||
# file?
|
||||
if not output:
|
||||
log.critical("You need to specify an output folder with --output!")
|
||||
ctx.exit(1)
|
||||
@@ -119,9 +122,9 @@ def check_adb(ctx, serial, iocs, output, fast, list_modules, module):
|
||||
|
||||
cmd.run()
|
||||
|
||||
if len(cmd.timeline_detected) > 0:
|
||||
if cmd.detected_count > 0:
|
||||
log.warning("The analysis of the Android device produced %d detections!",
|
||||
len(cmd.timeline_detected))
|
||||
cmd.detected_count)
|
||||
|
||||
|
||||
#==============================================================================
|
||||
@@ -130,14 +133,16 @@ def check_adb(ctx, serial, iocs, output, fast, list_modules, module):
|
||||
@cli.command("check-bugreport", help="Check an Android Bug Report")
|
||||
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
|
||||
default=[], help=HELP_MSG_IOC)
|
||||
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
|
||||
@click.option("--output", "-o", type=click.Path(exists=False),
|
||||
help=HELP_MSG_OUTPUT)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.argument("BUGREPORT_PATH", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path):
|
||||
cmd = CmdAndroidCheckBugreport(target_path=bugreport_path, results_path=output,
|
||||
ioc_files=iocs, module_name=module)
|
||||
cmd = CmdAndroidCheckBugreport(target_path=bugreport_path,
|
||||
results_path=output, ioc_files=iocs,
|
||||
module_name=module)
|
||||
|
||||
if list_modules:
|
||||
cmd.list_modules()
|
||||
@@ -147,23 +152,23 @@ def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path):
|
||||
|
||||
cmd.run()
|
||||
|
||||
if len(cmd.timeline_detected) > 0:
|
||||
if cmd.detected_count > 0:
|
||||
log.warning("The analysis of the Android bug report produced %d detections!",
|
||||
len(cmd.timeline_detected))
|
||||
cmd.detected_count)
|
||||
|
||||
|
||||
#==============================================================================
|
||||
# Command: check-backup
|
||||
#==============================================================================
|
||||
@cli.command("check-backup", help="Check an Android Backup")
|
||||
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
|
||||
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
|
||||
default=[], help=HELP_MSG_IOC)
|
||||
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
|
||||
@click.option("--output", "-o", type=click.Path(exists=False),
|
||||
help=HELP_MSG_OUTPUT)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.argument("BACKUP_PATH", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_backup(ctx, serial, iocs, output, list_modules, backup_path):
|
||||
def check_backup(ctx, iocs, output, list_modules, backup_path):
|
||||
cmd = CmdAndroidCheckBackup(target_path=backup_path, results_path=output,
|
||||
ioc_files=iocs)
|
||||
|
||||
@@ -175,9 +180,39 @@ def check_backup(ctx, serial, iocs, output, list_modules, backup_path):
|
||||
|
||||
cmd.run()
|
||||
|
||||
if len(cmd.timeline_detected) > 0:
|
||||
if cmd.detected_count > 0:
|
||||
log.warning("The analysis of the Android backup produced %d detections!",
|
||||
len(cmd.timeline_detected))
|
||||
cmd.detected_count)
|
||||
|
||||
|
||||
#==============================================================================
|
||||
# Command: check-androidqf
|
||||
#==============================================================================
|
||||
@cli.command("check-androidqf", help="Check data collected with AndroidQF")
|
||||
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
|
||||
default=[], help=HELP_MSG_IOC)
|
||||
@click.option("--output", "-o", type=click.Path(exists=False),
|
||||
help=HELP_MSG_OUTPUT)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.argument("ANDROIDQF_PATH", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_androidqf(ctx, iocs, output, list_modules, module, androidqf_path):
|
||||
cmd = CmdAndroidCheckAndroidQF(target_path=androidqf_path,
|
||||
results_path=output, ioc_files=iocs,
|
||||
module_name=module)
|
||||
|
||||
if list_modules:
|
||||
cmd.list_modules()
|
||||
return
|
||||
|
||||
log.info("Checking AndroidQF acquisition at path: %s", androidqf_path)
|
||||
|
||||
cmd.run()
|
||||
|
||||
if cmd.detected_count > 0:
|
||||
log.warning("The analysis of the AndroidQF acquisition produced %d detections!",
|
||||
cmd.detected_count)
|
||||
|
||||
|
||||
#==============================================================================
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.command import Command
|
||||
|
||||
@@ -14,12 +15,18 @@ log = logging.getLogger(__name__)
|
||||
|
||||
class CmdAndroidCheckADB(Command):
|
||||
|
||||
name = "check-adb"
|
||||
modules = ADB_MODULES
|
||||
|
||||
def __init__(self, target_path: str = None, results_path: str = None,
|
||||
ioc_files: list = [], module_name: str = None, serial: str = None,
|
||||
fast_mode: bool = False):
|
||||
def __init__(
|
||||
self,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
ioc_files: Optional[list] = None,
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
) -> None:
|
||||
super().__init__(target_path=target_path, results_path=results_path,
|
||||
ioc_files=ioc_files, module_name=module_name,
|
||||
serial=serial, fast_mode=fast_mode, log=log)
|
||||
|
||||
self.name = "check-adb"
|
||||
self.modules = ADB_MODULES
|
||||
|
||||
32
mvt/android/cmd_check_androidqf.py
Normal file
32
mvt/android/cmd_check_androidqf.py
Normal file
@@ -0,0 +1,32 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.command import Command
|
||||
|
||||
from .modules.androidqf import ANDROIDQF_MODULES
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CmdAndroidCheckAndroidQF(Command):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
ioc_files: Optional[list] = None,
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
) -> None:
|
||||
super().__init__(target_path=target_path, results_path=results_path,
|
||||
ioc_files=ioc_files, module_name=module_name,
|
||||
serial=serial, fast_mode=fast_mode, log=log)
|
||||
|
||||
self.name = "check-androidqf"
|
||||
self.modules = ANDROIDQF_MODULES
|
||||
@@ -9,7 +9,7 @@ import os
|
||||
import sys
|
||||
import tarfile
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
from typing import Callable, Optional
|
||||
|
||||
from rich.prompt import Prompt
|
||||
|
||||
@@ -25,16 +25,22 @@ log = logging.getLogger(__name__)
|
||||
|
||||
class CmdAndroidCheckBackup(Command):
|
||||
|
||||
name = "check-backup"
|
||||
modules = BACKUP_MODULES
|
||||
|
||||
def __init__(self, target_path: str = None, results_path: str = None,
|
||||
ioc_files: list = [], module_name: str = None, serial: str = None,
|
||||
fast_mode: bool = False):
|
||||
def __init__(
|
||||
self,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
ioc_files: Optional[list] = None,
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
) -> None:
|
||||
super().__init__(target_path=target_path, results_path=results_path,
|
||||
ioc_files=ioc_files, module_name=module_name,
|
||||
serial=serial, fast_mode=fast_mode, log=log)
|
||||
|
||||
self.name = "check-backup"
|
||||
self.modules = BACKUP_MODULES
|
||||
|
||||
self.backup_type = None
|
||||
self.backup_archive = None
|
||||
self.backup_files = []
|
||||
@@ -58,8 +64,8 @@ class CmdAndroidCheckBackup(Command):
|
||||
except InvalidBackupPassword:
|
||||
log.critical("Invalid backup password")
|
||||
sys.exit(1)
|
||||
except AndroidBackupParsingError as e:
|
||||
log.critical("Impossible to parse this backup file: %s", e)
|
||||
except AndroidBackupParsingError as exc:
|
||||
log.critical("Impossible to parse this backup file: %s", exc)
|
||||
log.critical("Please use Android Backup Extractor (ABE) instead")
|
||||
sys.exit(1)
|
||||
|
||||
@@ -73,13 +79,16 @@ class CmdAndroidCheckBackup(Command):
|
||||
self.target_path = Path(self.target_path).absolute().as_posix()
|
||||
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
|
||||
for fname in subfiles:
|
||||
self.backup_files.append(os.path.relpath(os.path.join(root, fname), self.target_path))
|
||||
self.backup_files.append(os.path.relpath(os.path.join(root, fname),
|
||||
self.target_path))
|
||||
else:
|
||||
log.critical("Invalid backup path, path should be a folder or an Android Backup (.ab) file")
|
||||
log.critical("Invalid backup path, path should be a folder or an "
|
||||
"Android Backup (.ab) file")
|
||||
sys.exit(1)
|
||||
|
||||
def module_init(self, module: Callable) -> None:
|
||||
if self.backup_type == "folder":
|
||||
module.from_folder(self.target_path, self.backup_files)
|
||||
else:
|
||||
module.from_ab(self.target_path, self.backup_archive, self.backup_files)
|
||||
module.from_ab(self.target_path, self.backup_archive,
|
||||
self.backup_files)
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
from typing import Callable, Optional
|
||||
from zipfile import ZipFile
|
||||
|
||||
from mvt.common.command import Command
|
||||
@@ -18,16 +18,22 @@ log = logging.getLogger(__name__)
|
||||
|
||||
class CmdAndroidCheckBugreport(Command):
|
||||
|
||||
name = "check-bugreport"
|
||||
modules = BUGREPORT_MODULES
|
||||
|
||||
def __init__(self, target_path: str = None, results_path: str = None,
|
||||
ioc_files: list = [], module_name: str = None, serial: str = None,
|
||||
fast_mode: bool = False):
|
||||
def __init__(
|
||||
self,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
ioc_files: Optional[list] = None,
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
) -> None:
|
||||
super().__init__(target_path=target_path, results_path=results_path,
|
||||
ioc_files=ioc_files, module_name=module_name,
|
||||
serial=serial, fast_mode=fast_mode, log=log)
|
||||
|
||||
self.name = "check-bugreport"
|
||||
self.modules = BUGREPORT_MODULES
|
||||
|
||||
self.bugreport_format = None
|
||||
self.bugreport_archive = None
|
||||
self.bugreport_files = []
|
||||
@@ -41,12 +47,18 @@ class CmdAndroidCheckBugreport(Command):
|
||||
elif os.path.isdir(self.target_path):
|
||||
self.bugreport_format = "dir"
|
||||
parent_path = Path(self.target_path).absolute().as_posix()
|
||||
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
|
||||
for root, _, subfiles in os.walk(os.path.abspath(self.target_path)):
|
||||
for file_name in subfiles:
|
||||
self.bugreport_files.append(os.path.relpath(os.path.join(root, file_name), parent_path))
|
||||
file_path = os.path.relpath(os.path.join(root, file_name),
|
||||
parent_path)
|
||||
self.bugreport_files.append(file_path)
|
||||
|
||||
def module_init(self, module: Callable) -> None:
|
||||
if self.bugreport_format == "zip":
|
||||
module.from_zip(self.bugreport_archive, self.bugreport_files)
|
||||
else:
|
||||
module.from_folder(self.target_path, self.bugreport_files)
|
||||
|
||||
def finish(self) -> None:
|
||||
if self.bugreport_archive:
|
||||
self.bugreport_archive.close()
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Callable
|
||||
from typing import Callable, Optional
|
||||
|
||||
from rich.progress import track
|
||||
|
||||
@@ -25,8 +25,12 @@ class DownloadAPKs(AndroidExtraction):
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, results_path: str = "", all_apks: bool = False,
|
||||
packages: list = []):
|
||||
def __init__(
|
||||
self,
|
||||
results_path: Optional[str] = None,
|
||||
all_apks: Optional[bool] = False,
|
||||
packages: Optional[list] = None
|
||||
) -> None:
|
||||
"""Initialize module.
|
||||
:param results_path: Path to the folder where data should be stored
|
||||
:param all_apks: Boolean indicating whether to download all packages
|
||||
@@ -78,13 +82,13 @@ class DownloadAPKs(AndroidExtraction):
|
||||
try:
|
||||
self._adb_download(remote_path, local_path)
|
||||
except InsufficientPrivileges:
|
||||
log.error("Unable to pull package file from %s: insufficient privileges, it might be a system app",
|
||||
remote_path)
|
||||
log.error("Unable to pull package file from %s: insufficient privileges, "
|
||||
"it might be a system app", remote_path)
|
||||
self._adb_reconnect()
|
||||
return None
|
||||
except Exception as e:
|
||||
except Exception as exc:
|
||||
log.exception("Failed to pull package file from %s: %s",
|
||||
remote_path, e)
|
||||
remote_path, exc)
|
||||
self._adb_reconnect()
|
||||
return None
|
||||
|
||||
@@ -141,8 +145,8 @@ class DownloadAPKs(AndroidExtraction):
|
||||
log.info("[%d/%d] Package: %s", i, len(packages_selection),
|
||||
package["package_name"])
|
||||
|
||||
# Sometimes the package path contains multiple lines for multiple apks.
|
||||
# We loop through each line and download each file.
|
||||
# Sometimes the package path contains multiple lines for multiple
|
||||
# apks. We loop through each line and download each file.
|
||||
for package_file in package["files"]:
|
||||
device_path = package_file["path"]
|
||||
local_path = self.pull_package_file(package["package_name"],
|
||||
|
||||
@@ -11,7 +11,7 @@ import string
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from typing import Callable
|
||||
from typing import Callable, Optional
|
||||
|
||||
from adb_shell.adb_device import AdbDeviceTcp, AdbDeviceUsb
|
||||
from adb_shell.auth.keygen import keygen, write_public_keyfile
|
||||
@@ -25,8 +25,6 @@ from mvt.android.parsers.backup import (InvalidBackupPassword, parse_ab_header,
|
||||
parse_backup_file)
|
||||
from mvt.common.module import InsufficientPrivileges, MVTModule
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
ADB_KEY_PATH = os.path.expanduser("~/.android/adbkey")
|
||||
ADB_PUB_KEY_PATH = os.path.expanduser("~/.android/adbkey.pub")
|
||||
|
||||
@@ -34,9 +32,15 @@ ADB_PUB_KEY_PATH = os.path.expanduser("~/.android/adbkey.pub")
|
||||
class AndroidExtraction(MVTModule):
|
||||
"""This class provides a base for all Android extraction modules."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
@@ -74,7 +78,7 @@ class AndroidExtraction(MVTModule):
|
||||
try:
|
||||
self.device = AdbDeviceUsb(serial=self.serial)
|
||||
except UsbDeviceNotFoundError:
|
||||
log.critical("No device found. Make sure it is connected and unlocked.")
|
||||
self.log.critical("No device found. Make sure it is connected and unlocked.")
|
||||
sys.exit(-1)
|
||||
# Otherwise we try to use the TCP transport.
|
||||
else:
|
||||
@@ -89,18 +93,21 @@ class AndroidExtraction(MVTModule):
|
||||
try:
|
||||
self.device.connect(rsa_keys=[signer], auth_timeout_s=5)
|
||||
except (USBErrorBusy, USBErrorAccess):
|
||||
log.critical("Device is busy, maybe run `adb kill-server` and try again.")
|
||||
self.log.critical("Device is busy, maybe run `adb kill-server` and try again.")
|
||||
sys.exit(-1)
|
||||
except DeviceAuthError:
|
||||
log.error("You need to authorize this computer on the Android device. Retrying in 5 seconds...")
|
||||
self.log.error("You need to authorize this computer on the Android device. "
|
||||
"Retrying in 5 seconds...")
|
||||
time.sleep(5)
|
||||
except UsbReadFailedError:
|
||||
log.error("Unable to connect to the device over USB. Try to unplug, plug the device and start again.")
|
||||
self.log.error("Unable to connect to the device over USB. "
|
||||
"Try to unplug, plug the device and start again.")
|
||||
sys.exit(-1)
|
||||
except OSError as e:
|
||||
if e.errno == 113 and self.serial:
|
||||
log.critical("Unable to connect to the device %s: did you specify the correct IP addres?",
|
||||
self.serial)
|
||||
except OSError as exc:
|
||||
if exc.errno == 113 and self.serial:
|
||||
self.log.critical("Unable to connect to the device %s: "
|
||||
"did you specify the correct IP address?",
|
||||
self.serial)
|
||||
sys.exit(-1)
|
||||
else:
|
||||
break
|
||||
@@ -111,7 +118,7 @@ class AndroidExtraction(MVTModule):
|
||||
|
||||
def _adb_reconnect(self) -> None:
|
||||
"""Reconnect to device using adb."""
|
||||
log.info("Reconnecting ...")
|
||||
self.log.info("Reconnecting ...")
|
||||
self._adb_disconnect()
|
||||
self._adb_connect()
|
||||
|
||||
@@ -136,7 +143,9 @@ class AndroidExtraction(MVTModule):
|
||||
def _adb_root_or_die(self) -> None:
|
||||
"""Check if we have a `su` binary, otherwise raise an Exception."""
|
||||
if not self._adb_check_if_root():
|
||||
raise InsufficientPrivileges("This module is optionally available in case the device is already rooted. Do NOT root your own device!")
|
||||
raise InsufficientPrivileges("This module is optionally available "
|
||||
"in case the device is already rooted."
|
||||
" Do NOT root your own device!")
|
||||
|
||||
def _adb_command_as_root(self, command):
|
||||
"""Execute an adb shell command.
|
||||
@@ -157,60 +166,72 @@ class AndroidExtraction(MVTModule):
|
||||
|
||||
# TODO: Need to support checking files without root privileges as well.
|
||||
|
||||
# Connect to the device over adb.
|
||||
self._adb_connect()
|
||||
# Check if we have root, if not raise an Exception.
|
||||
self._adb_root_or_die()
|
||||
|
||||
return bool(self._adb_command_as_root(f"[ ! -f {file} ] || echo 1"))
|
||||
|
||||
def _adb_download(self, remote_path: str, local_path: str,
|
||||
progress_callback: Callable = None,
|
||||
retry_root: bool = True) -> None:
|
||||
def _adb_download(
|
||||
self,
|
||||
remote_path: str,
|
||||
local_path: str,
|
||||
progress_callback: Optional[Callable] = None,
|
||||
retry_root: Optional[bool] = True
|
||||
) -> None:
|
||||
"""Download a file form the device.
|
||||
|
||||
:param remote_path: Path to download from the device
|
||||
:param local_path: Path to where to locally store the copy of the file
|
||||
:param progress_callback: Callback for download progress bar (Default value = None)
|
||||
:param progress_callback: Callback for download progress bar
|
||||
(Default value = None)
|
||||
:param retry_root: Default value = True)
|
||||
|
||||
"""
|
||||
try:
|
||||
self.device.pull(remote_path, local_path, progress_callback)
|
||||
except AdbCommandFailureException as e:
|
||||
except AdbCommandFailureException as exc:
|
||||
if retry_root:
|
||||
self._adb_download_root(remote_path, local_path, progress_callback)
|
||||
self._adb_download_root(remote_path, local_path,
|
||||
progress_callback)
|
||||
else:
|
||||
raise Exception(f"Unable to download file {remote_path}: {e}")
|
||||
raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
|
||||
|
||||
def _adb_download_root(self, remote_path: str, local_path: str,
|
||||
progress_callback: Callable = None) -> None:
|
||||
def _adb_download_root(
|
||||
self,
|
||||
remote_path: str,
|
||||
local_path: str,
|
||||
progress_callback: Optional[Callable] = None
|
||||
) -> None:
|
||||
try:
|
||||
# Check if we have root, if not raise an Exception.
|
||||
self._adb_root_or_die()
|
||||
|
||||
# We generate a random temporary filename.
|
||||
tmp_filename = "tmp_" + ''.join(random.choices(string.ascii_uppercase + string.ascii_lowercase + string.digits, k=10))
|
||||
allowed_chars = (string.ascii_uppercase
|
||||
+ string.ascii_lowercase
|
||||
+ string.digits)
|
||||
tmp_filename = "tmp_" + ''.join(random.choices(allowed_chars, k=10))
|
||||
|
||||
# We create a temporary local file.
|
||||
new_remote_path = f"/sdcard/{tmp_filename}"
|
||||
|
||||
# We copy the file from the data folder to /sdcard/.
|
||||
cp = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
|
||||
if cp.startswith("cp: ") and "No such file or directory" in cp:
|
||||
cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
|
||||
if cp_output.startswith("cp: ") and "No such file or directory" in cp_output:
|
||||
raise Exception(f"Unable to process file {remote_path}: File not found")
|
||||
elif cp.startswith("cp: ") and "Permission denied" in cp:
|
||||
if cp_output.startswith("cp: ") and "Permission denied" in cp_output:
|
||||
raise Exception(f"Unable to process file {remote_path}: Permission denied")
|
||||
|
||||
# We download from /sdcard/ to the local temporary file.
|
||||
# If it doesn't work now, don't try again (retry_root=False)
|
||||
self._adb_download(new_remote_path, local_path, retry_root=False)
|
||||
self._adb_download(new_remote_path, local_path, progress_callback,
|
||||
retry_root=False)
|
||||
|
||||
# Delete the copy on /sdcard/.
|
||||
self._adb_command(f"rm -rf {new_remote_path}")
|
||||
|
||||
except AdbCommandFailureException as e:
|
||||
raise Exception(f"Unable to download file {remote_path}: {e}")
|
||||
except AdbCommandFailureException as exc:
|
||||
raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
|
||||
|
||||
def _adb_process_file(self, remote_path: str,
|
||||
process_routine: Callable) -> None:
|
||||
@@ -223,7 +244,6 @@ class AndroidExtraction(MVTModule):
|
||||
|
||||
"""
|
||||
# Connect to the device over adb.
|
||||
self._adb_connect()
|
||||
# Check if we have root, if not raise an Exception.
|
||||
self._adb_root_or_die()
|
||||
|
||||
@@ -234,10 +254,10 @@ class AndroidExtraction(MVTModule):
|
||||
new_remote_path = f"/sdcard/Download/{local_name}"
|
||||
|
||||
# We copy the file from the data folder to /sdcard/.
|
||||
cp = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
|
||||
if cp.startswith("cp: ") and "No such file or directory" in cp:
|
||||
cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
|
||||
if cp_output.startswith("cp: ") and "No such file or directory" in cp_output:
|
||||
raise Exception(f"Unable to process file {remote_path}: File not found")
|
||||
elif cp.startswith("cp: ") and "Permission denied" in cp:
|
||||
if cp_output.startswith("cp: ") and "Permission denied" in cp_output:
|
||||
raise Exception(f"Unable to process file {remote_path}: Permission denied")
|
||||
|
||||
# We download from /sdcard/ to the local temporary file.
|
||||
@@ -250,34 +270,39 @@ class AndroidExtraction(MVTModule):
|
||||
tmp.close()
|
||||
# Delete the copy on /sdcard/.
|
||||
self._adb_command(f"rm -f {new_remote_path}")
|
||||
# Disconnect from the device.
|
||||
self._adb_disconnect()
|
||||
|
||||
def _generate_backup(self, package_name: str) -> bytes:
|
||||
self.log.warning("Please check phone and accept Android backup prompt. You may need to set a backup password. \a")
|
||||
self.log.info("Please check phone and accept Android backup prompt. "
|
||||
"You may need to set a backup password. \a")
|
||||
|
||||
# TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport...
|
||||
backup_output_b64 = self._adb_command("/system/bin/bu backup -nocompress '{}' | base64".format(
|
||||
package_name))
|
||||
# TODO: Base64 encoding as temporary fix to avoid byte-mangling over
|
||||
# the shell transport...
|
||||
cmd = f"/system/bin/bu backup -nocompress '{package_name}' | base64"
|
||||
backup_output_b64 = self._adb_command(cmd)
|
||||
backup_output = base64.b64decode(backup_output_b64)
|
||||
header = parse_ab_header(backup_output)
|
||||
|
||||
if not header["backup"]:
|
||||
self.log.error("Extracting SMS via Android backup failed. No valid backup data found.")
|
||||
return
|
||||
self.log.error("Extracting SMS via Android backup failed. "
|
||||
"No valid backup data found.")
|
||||
return None
|
||||
|
||||
if header["encryption"] == "none":
|
||||
return parse_backup_file(backup_output, password=None)
|
||||
|
||||
for password_retry in range(0, 3):
|
||||
backup_password = Prompt.ask("Enter backup password", password=True)
|
||||
for _ in range(0, 3):
|
||||
backup_password = Prompt.ask("Enter backup password",
|
||||
password=True)
|
||||
try:
|
||||
decrypted_backup_tar = parse_backup_file(backup_output, backup_password)
|
||||
decrypted_backup_tar = parse_backup_file(backup_output,
|
||||
backup_password)
|
||||
return decrypted_backup_tar
|
||||
except InvalidBackupPassword:
|
||||
self.log.error("You provided the wrong password! Please try again...")
|
||||
|
||||
self.log.warn("All attempts to decrypt backup with password failed!")
|
||||
self.log.error("All attempts to decrypt backup with password failed!")
|
||||
|
||||
return None
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run the main procedure."""
|
||||
|
||||
@@ -6,33 +6,39 @@
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.common.utils import (convert_chrometime_to_unix,
|
||||
convert_timestamp_to_iso)
|
||||
from mvt.common.utils import (convert_chrometime_to_datetime,
|
||||
convert_datetime_to_iso)
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
CHROME_HISTORY_PATH = "data/data/com.android.chrome/app_chrome/Default/History"
|
||||
|
||||
|
||||
class ChromeHistory(AndroidExtraction):
|
||||
"""This module extracts records from Android's Chrome browsing history."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
def serialize(self, record: dict) -> None:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
return {
|
||||
"timestamp": record["isodate"],
|
||||
"module": self.__class__.__name__,
|
||||
"event": "visit",
|
||||
"data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, redirect source: {record['redirect_source']})"
|
||||
"data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, "
|
||||
f"redirect source: {record['redirect_source']})"
|
||||
}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
@@ -69,18 +75,24 @@ class ChromeHistory(AndroidExtraction):
|
||||
"url": item[1],
|
||||
"visit_id": item[2],
|
||||
"timestamp": item[3],
|
||||
"isodate": convert_timestamp_to_iso(convert_chrometime_to_unix(item[3])),
|
||||
"isodate": convert_datetime_to_iso(
|
||||
convert_chrometime_to_datetime(item[3])),
|
||||
"redirect_source": item[4],
|
||||
})
|
||||
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
log.info("Extracted a total of %d history items", len(self.results))
|
||||
self.log.info("Extracted a total of %d history items",
|
||||
len(self.results))
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
|
||||
try:
|
||||
self._adb_process_file(os.path.join("/", CHROME_HISTORY_PATH),
|
||||
self._parse_db)
|
||||
except Exception as e:
|
||||
self.log.error(e)
|
||||
except Exception as exc:
|
||||
self.log.error(exc)
|
||||
|
||||
self._adb_disconnect()
|
||||
|
||||
@@ -4,20 +4,25 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.parsers import parse_dumpsys_accessibility
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DumpsysAccessibility(AndroidExtraction):
|
||||
"""This module extracts stats on accessibility."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
@@ -41,6 +46,8 @@ class DumpsysAccessibility(AndroidExtraction):
|
||||
self.results = parse_dumpsys_accessibility(output)
|
||||
|
||||
for result in self.results:
|
||||
log.info("Found installed accessibility service \"%s\"", result.get("service"))
|
||||
self.log.info("Found installed accessibility service \"%s\"",
|
||||
result.get("service"))
|
||||
|
||||
self.log.info("Identified a total of %d accessibility services", len(self.results))
|
||||
self.log.info("Identified a total of %d accessibility services",
|
||||
len(self.results))
|
||||
|
||||
@@ -4,20 +4,25 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.parsers import parse_dumpsys_activity_resolver_table
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DumpsysActivities(AndroidExtraction):
|
||||
"""This module extracts details on receivers for risky activities."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
@@ -4,27 +4,32 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.android.parsers.dumpsys import parse_dumpsys_appops
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DumpsysAppOps(AndroidExtraction):
|
||||
"""This module extracts records from App-op Manager."""
|
||||
|
||||
slug = "dumpsys_appops"
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
def serialize(self, record: dict) -> None:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
records = []
|
||||
for perm in record["permissions"]:
|
||||
if "entries" not in perm:
|
||||
@@ -36,7 +41,8 @@ class DumpsysAppOps(AndroidExtraction):
|
||||
"timestamp": entry["timestamp"],
|
||||
"module": self.__class__.__name__,
|
||||
"event": entry["access"],
|
||||
"data": f"{record['package_name']} access to {perm['name']}: {entry['access']}",
|
||||
"data": f"{record['package_name']} access to "
|
||||
f"{perm['name']}: {entry['access']}",
|
||||
})
|
||||
|
||||
return records
|
||||
@@ -51,9 +57,10 @@ class DumpsysAppOps(AndroidExtraction):
|
||||
continue
|
||||
|
||||
for perm in result["permissions"]:
|
||||
if perm["name"] == "REQUEST_INSTALL_PACKAGES" and perm["access"] == "allow":
|
||||
self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission",
|
||||
result["package_name"])
|
||||
if (perm["name"] == "REQUEST_INSTALL_PACKAGES"
|
||||
and perm["access"] == "allow"):
|
||||
self.log.info("Package %s with REQUEST_INSTALL_PACKAGES "
|
||||
"permission", result["package_name"])
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
|
||||
@@ -4,30 +4,36 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.android.parsers import parse_dumpsys_battery_daily
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DumpsysBatteryDaily(AndroidExtraction):
|
||||
"""This module extracts records from battery daily updates."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
def serialize(self, record: dict) -> None:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
return {
|
||||
"timestamp": record["from"],
|
||||
"module": self.__class__.__name__,
|
||||
"event": "battery_daily",
|
||||
"data": f"Recorded update of package {record['package_name']} with vers {record['vers']}"
|
||||
"data": f"Recorded update of package {record['package_name']} "
|
||||
f"with vers {record['vers']}"
|
||||
}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
@@ -48,4 +54,5 @@ class DumpsysBatteryDaily(AndroidExtraction):
|
||||
|
||||
self.results = parse_dumpsys_battery_daily(output)
|
||||
|
||||
self.log.info("Extracted %d records from battery daily stats", len(self.results))
|
||||
self.log.info("Extracted %d records from battery daily stats",
|
||||
len(self.results))
|
||||
|
||||
@@ -4,20 +4,25 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.parsers import parse_dumpsys_battery_history
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DumpsysBatteryHistory(AndroidExtraction):
|
||||
"""This module extracts records from battery history events."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
@@ -40,4 +45,5 @@ class DumpsysBatteryHistory(AndroidExtraction):
|
||||
|
||||
self.results = parse_dumpsys_battery_history(output)
|
||||
|
||||
self.log.info("Extracted %d records from battery history", len(self.results))
|
||||
self.log.info("Extracted %d records from battery history",
|
||||
len(self.results))
|
||||
|
||||
@@ -4,22 +4,27 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.parsers import parse_dumpsys_dbinfo
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DumpsysDBInfo(AndroidExtraction):
|
||||
"""This module extracts records from battery daily updates."""
|
||||
|
||||
slug = "dumpsys_dbinfo"
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
@@ -5,18 +5,23 @@
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DumpsysFull(AndroidExtraction):
|
||||
"""This module extracts stats on battery consumption by processes."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
@@ -30,6 +35,6 @@ class DumpsysFull(AndroidExtraction):
|
||||
with open(output_path, "w", encoding="utf-8") as handle:
|
||||
handle.write(output)
|
||||
|
||||
log.info("Full dumpsys output stored at %s", output_path)
|
||||
self.log.info("Full dumpsys output stored at %s", output_path)
|
||||
|
||||
self._adb_disconnect()
|
||||
|
||||
@@ -4,13 +4,12 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.parsers import parse_dumpsys_receiver_resolver_table
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
INTENT_NEW_OUTGOING_SMS = "android.provider.Telephony.NEW_OUTGOING_SMS"
|
||||
INTENT_SMS_RECEIVED = "android.provider.Telephony.SMS_RECEIVED"
|
||||
INTENT_DATA_SMS_RECEIVED = "android.intent.action.DATA_SMS_RECEIVED"
|
||||
@@ -21,9 +20,15 @@ INTENT_NEW_OUTGOING_CALL = "android.intent.action.NEW_OUTGOING_CALL"
|
||||
class DumpsysReceivers(AndroidExtraction):
|
||||
"""This module extracts details on receivers for risky activities."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
@@ -46,17 +51,18 @@ class DumpsysReceivers(AndroidExtraction):
|
||||
self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"",
|
||||
receiver["receiver"])
|
||||
elif intent == INTENT_PHONE_STATE:
|
||||
self.log.info("Found a receiver monitoring telephony state/incoming calls: \"%s\"",
|
||||
self.log.info("Found a receiver monitoring "
|
||||
"telephony state/incoming calls: \"%s\"",
|
||||
receiver["receiver"])
|
||||
elif intent == INTENT_NEW_OUTGOING_CALL:
|
||||
self.log.info("Found a receiver monitoring outgoing calls: \"%s\"",
|
||||
receiver["receiver"])
|
||||
|
||||
ioc = self.indicators.check_app_id(receiver["package_name"])
|
||||
if ioc:
|
||||
receiver["matched_indicator"] = ioc
|
||||
self.detected.append({intent: receiver})
|
||||
continue
|
||||
ioc = self.indicators.check_app_id(receiver["package_name"])
|
||||
if ioc:
|
||||
receiver["matched_indicator"] = ioc
|
||||
self.detected.append({intent: receiver})
|
||||
continue
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
|
||||
@@ -3,17 +3,15 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import stat
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.common.utils import convert_timestamp_to_iso
|
||||
from mvt.common.utils import convert_unix_to_iso
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
ANDROID_TMP_FOLDERS = [
|
||||
"/tmp/",
|
||||
"/data/local/tmp/",
|
||||
@@ -27,15 +25,21 @@ ANDROID_MEDIA_FOLDERS = [
|
||||
class Files(AndroidExtraction):
|
||||
"""This module extracts the list of files on the device."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
self.full_find = False
|
||||
|
||||
def serialize(self, record: dict) -> None:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
if "modified_time" in record:
|
||||
return {
|
||||
"timestamp": record["modified_time"],
|
||||
@@ -44,6 +48,8 @@ class Files(AndroidExtraction):
|
||||
"data": record["path"],
|
||||
}
|
||||
|
||||
return None
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
for result in self.results:
|
||||
if result.get("is_suid"):
|
||||
@@ -51,7 +57,8 @@ class Files(AndroidExtraction):
|
||||
result["path"])
|
||||
|
||||
if self.indicators and self.indicators.check_file_path(result["path"]):
|
||||
self.log.warning("Found a known suspicous file at path: \"%s\"", result["path"])
|
||||
self.log.warning("Found a known suspicous file at path: \"%s\"",
|
||||
result["path"])
|
||||
self.detected.append(result)
|
||||
|
||||
def backup_file(self, file_path: str) -> None:
|
||||
@@ -73,11 +80,13 @@ class Files(AndroidExtraction):
|
||||
|
||||
def find_files(self, folder: str) -> None:
|
||||
if self.full_find:
|
||||
output = self._adb_command(f"find '{folder}' -type f -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
|
||||
cmd = f"find '{folder}' -type f -printf '%T@ %m %s %u %g %p\n' 2> /dev/null"
|
||||
output = self._adb_command(cmd)
|
||||
|
||||
for file_line in output.splitlines():
|
||||
[unix_timestamp, mode, size, owner, group, full_path] = file_line.rstrip().split(" ", 5)
|
||||
mod_time = convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(float(unix_timestamp))))
|
||||
[unix_timestamp, mode, size,
|
||||
owner, group, full_path] = file_line.rstrip().split(" ", 5)
|
||||
mod_time = convert_unix_to_iso(unix_timestamp)
|
||||
|
||||
self.results.append({
|
||||
"path": full_path,
|
||||
@@ -97,7 +106,8 @@ class Files(AndroidExtraction):
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
|
||||
output = self._adb_command("find '/' -maxdepth 1 -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
|
||||
cmd = "find '/' -maxdepth 1 -printf '%T@ %m %s %u %g %p\n' 2> /dev/null"
|
||||
output = self._adb_command(cmd)
|
||||
if output or output.strip().splitlines():
|
||||
self.full_find = True
|
||||
|
||||
|
||||
@@ -5,20 +5,25 @@
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.parsers import parse_getprop
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Getprop(AndroidExtraction):
|
||||
"""This module extracts device properties from getprop command."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
@@ -37,7 +42,9 @@ class Getprop(AndroidExtraction):
|
||||
if security_patch:
|
||||
patch_date = datetime.strptime(security_patch, "%Y-%m-%d")
|
||||
if (datetime.now() - patch_date) > timedelta(days=6*30):
|
||||
self.log.warning("This phone has not received security updates for more than "
|
||||
"six months (last update: %s)", security_patch)
|
||||
self.log.warning("This phone has not received security updates "
|
||||
"for more than six months (last update: %s)",
|
||||
security_patch)
|
||||
|
||||
self.log.info("Extracted %d Android system properties", len(self.results))
|
||||
self.log.info("Extracted %d Android system properties",
|
||||
len(self.results))
|
||||
|
||||
@@ -5,18 +5,23 @@
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Logcat(AndroidExtraction):
|
||||
"""This module extracts details on installed packages."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
@@ -35,15 +40,15 @@ class Logcat(AndroidExtraction):
|
||||
with open(logcat_path, "w", encoding="utf-8") as handle:
|
||||
handle.write(output)
|
||||
|
||||
log.info("Current logcat logs stored at %s",
|
||||
logcat_path)
|
||||
self.log.info("Current logcat logs stored at %s",
|
||||
logcat_path)
|
||||
|
||||
logcat_last_path = os.path.join(self.results_path,
|
||||
"logcat_last.txt")
|
||||
with open(logcat_last_path, "w", encoding="utf-8") as handle:
|
||||
handle.write(last_output)
|
||||
|
||||
log.info("Logcat logs prior to last reboot stored at %s",
|
||||
logcat_last_path)
|
||||
self.log.info("Logcat logs prior to last reboot stored at %s",
|
||||
logcat_last_path)
|
||||
|
||||
self._adb_disconnect()
|
||||
|
||||
@@ -4,18 +4,18 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional, Union
|
||||
|
||||
from rich.console import Console
|
||||
from rich.progress import track
|
||||
from rich.table import Table
|
||||
from rich.text import Text
|
||||
|
||||
from mvt.android.parsers.dumpsys import parse_dumpsys_package_for_details
|
||||
from mvt.common.virustotal import VTNoKey, VTQuotaExceeded, virustotal_lookup
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
DANGEROUS_PERMISSIONS_THRESHOLD = 10
|
||||
DANGEROUS_PERMISSIONS = [
|
||||
"android.permission.ACCESS_COARSE_LOCATION",
|
||||
@@ -39,7 +39,6 @@ DANGEROUS_PERMISSIONS = [
|
||||
"android.permission.USE_SIP",
|
||||
"com.android.browser.permission.READ_HISTORY_BOOKMARKS",
|
||||
]
|
||||
|
||||
ROOT_PACKAGES = [
|
||||
"com.noshufou.android.su",
|
||||
"com.noshufou.android.su.elite",
|
||||
@@ -67,33 +66,56 @@ ROOT_PACKAGES = [
|
||||
"com.kingouser.com",
|
||||
"com.topjohnwu.magisk",
|
||||
]
|
||||
SECURITY_PACKAGES = [
|
||||
"com.policydm",
|
||||
"com.samsung.android.app.omcagent",
|
||||
"com.samsung.android.securitylogagent",
|
||||
"com.sec.android.soagent",
|
||||
"com.wssyncmldm",
|
||||
]
|
||||
|
||||
|
||||
class Packages(AndroidExtraction):
|
||||
"""This module extracts the list of installed packages."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
def serialize(self, record: dict) -> None:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
records = []
|
||||
|
||||
timestamps = [
|
||||
{"event": "package_install", "timestamp": record["timestamp"]},
|
||||
{"event": "package_first_install", "timestamp": record["first_install_time"]},
|
||||
{"event": "package_last_update", "timestamp": record["last_update_time"]},
|
||||
{
|
||||
"event": "package_install",
|
||||
"timestamp": record["timestamp"]
|
||||
},
|
||||
{
|
||||
"event": "package_first_install",
|
||||
"timestamp": record["first_install_time"]
|
||||
},
|
||||
{
|
||||
"event": "package_last_update",
|
||||
"timestamp": record["last_update_time"]
|
||||
},
|
||||
]
|
||||
|
||||
for ts in timestamps:
|
||||
for timestamp in timestamps:
|
||||
records.append({
|
||||
"timestamp": ts["timestamp"],
|
||||
"timestamp": timestamp["timestamp"],
|
||||
"module": self.__class__.__name__,
|
||||
"event": ts["event"],
|
||||
"data": f"{record['package_name']} (system: {record['system']}, third party: {record['third_party']})",
|
||||
"event": timestamp["event"],
|
||||
"data": f"{record['package_name']} (system: {record['system']},"
|
||||
f" third party: {record['third_party']})",
|
||||
})
|
||||
|
||||
return records
|
||||
@@ -101,11 +123,16 @@ class Packages(AndroidExtraction):
|
||||
def check_indicators(self) -> None:
|
||||
for result in self.results:
|
||||
if result["package_name"] in ROOT_PACKAGES:
|
||||
self.log.warning("Found an installed package related to rooting/jailbreaking: \"%s\"",
|
||||
self.log.warning("Found an installed package related to "
|
||||
"rooting/jailbreaking: \"%s\"",
|
||||
result["package_name"])
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
if result["package_name"] in SECURITY_PACKAGES and result["disabled"]:
|
||||
self.log.warning("Found a security package disabled: \"%s\"",
|
||||
result["package_name"])
|
||||
|
||||
if not self.indicators:
|
||||
continue
|
||||
|
||||
@@ -132,14 +159,14 @@ class Packages(AndroidExtraction):
|
||||
total_hashes = len(hashes)
|
||||
detections = {}
|
||||
|
||||
for i in track(range(total_hashes), description=f"Looking up {total_hashes} files..."):
|
||||
progress_desc = f"Looking up {total_hashes} files..."
|
||||
for i in track(range(total_hashes), description=progress_desc):
|
||||
try:
|
||||
results = virustotal_lookup(hashes[i])
|
||||
except VTNoKey as e:
|
||||
log.info(e)
|
||||
except VTNoKey:
|
||||
return
|
||||
except VTQuotaExceeded as e:
|
||||
log.error("Unable to continue: %s", e)
|
||||
except VTQuotaExceeded as exc:
|
||||
print("Unable to continue: %s", exc)
|
||||
break
|
||||
|
||||
if not results:
|
||||
@@ -176,43 +203,17 @@ class Packages(AndroidExtraction):
|
||||
|
||||
@staticmethod
|
||||
def parse_package_for_details(output: str) -> dict:
|
||||
details = {
|
||||
"uid": "",
|
||||
"version_name": "",
|
||||
"version_code": "",
|
||||
"timestamp": "",
|
||||
"first_install_time": "",
|
||||
"last_update_time": "",
|
||||
"requested_permissions": [],
|
||||
}
|
||||
|
||||
in_permissions = False
|
||||
lines = []
|
||||
in_packages = False
|
||||
for line in output.splitlines():
|
||||
if in_permissions:
|
||||
if line.startswith(" " * 4) and not line.startswith(" " * 6):
|
||||
in_permissions = False
|
||||
continue
|
||||
if in_packages:
|
||||
if line.strip() == "":
|
||||
break
|
||||
lines.append(line)
|
||||
if line.strip() == "Packages:":
|
||||
in_packages = True
|
||||
|
||||
permission = line.strip().split(":")[0]
|
||||
details["requested_permissions"].append(permission)
|
||||
|
||||
if line.strip().startswith("userId="):
|
||||
details["uid"] = line.split("=")[1].strip()
|
||||
elif line.strip().startswith("versionName="):
|
||||
details["version_name"] = line.split("=")[1].strip()
|
||||
elif line.strip().startswith("versionCode="):
|
||||
details["version_code"] = line.split("=", 1)[1].strip()
|
||||
elif line.strip().startswith("timeStamp="):
|
||||
details["timestamp"] = line.split("=")[1].strip()
|
||||
elif line.strip().startswith("firstInstallTime="):
|
||||
details["first_install_time"] = line.split("=")[1].strip()
|
||||
elif line.strip().startswith("lastUpdateTime="):
|
||||
details["last_update_time"] = line.split("=")[1].strip()
|
||||
elif line.strip() == "requested permissions:":
|
||||
in_permissions = True
|
||||
continue
|
||||
|
||||
return details
|
||||
return parse_dumpsys_package_for_details("\n".join(lines))
|
||||
|
||||
def _get_files_for_package(self, package_name: str) -> list:
|
||||
output = self._adb_command(f"pm path {package_name}")
|
||||
@@ -224,10 +225,14 @@ class Packages(AndroidExtraction):
|
||||
for file_path in output.splitlines():
|
||||
file_path = file_path.strip()
|
||||
|
||||
md5 = self._adb_command(f"md5sum {file_path}").split(" ")[0]
|
||||
sha1 = self._adb_command(f"sha1sum {file_path}").split(" ")[0]
|
||||
sha256 = self._adb_command(f"sha256sum {file_path}").split(" ")[0]
|
||||
sha512 = self._adb_command(f"sha512sum {file_path}").split(" ")[0]
|
||||
md5 = self._adb_command(
|
||||
f"md5sum {file_path}").split(" ", maxsplit=1)[0]
|
||||
sha1 = self._adb_command(
|
||||
f"sha1sum {file_path}").split(" ", maxsplit=1)[0]
|
||||
sha256 = self._adb_command(
|
||||
f"sha256sum {file_path}").split(" ", maxsplit=1)[0]
|
||||
sha512 = self._adb_command(
|
||||
f"sha512sum {file_path}").split(" ", maxsplit=1)[0]
|
||||
|
||||
package_files.append({
|
||||
"path": file_path,
|
||||
@@ -271,7 +276,8 @@ class Packages(AndroidExtraction):
|
||||
"files": package_files,
|
||||
}
|
||||
|
||||
dumpsys_package = self._adb_command(f"dumpsys package {package_name}")
|
||||
dumpsys_package = self._adb_command(
|
||||
f"dumpsys package {package_name}")
|
||||
package_details = self.parse_package_for_details(dumpsys_package)
|
||||
new_package.update(package_details)
|
||||
|
||||
@@ -304,8 +310,10 @@ class Packages(AndroidExtraction):
|
||||
dangerous_permissions_count += 1
|
||||
|
||||
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
|
||||
self.log.info("Third-party package \"%s\" requested %d potentially dangerous permissions",
|
||||
result["package_name"], dangerous_permissions_count)
|
||||
self.log.info("Third-party package \"%s\" requested %d "
|
||||
"potentially dangerous permissions",
|
||||
result["package_name"],
|
||||
dangerous_permissions_count)
|
||||
|
||||
packages_to_lookup = []
|
||||
for result in self.results:
|
||||
@@ -314,7 +322,8 @@ class Packages(AndroidExtraction):
|
||||
|
||||
packages_to_lookup.append(result)
|
||||
self.log.info("Found non-system package with name \"%s\" installed by \"%s\" on %s",
|
||||
result["package_name"], result["installer"], result["timestamp"])
|
||||
result["package_name"], result["installer"],
|
||||
result["timestamp"])
|
||||
|
||||
if not self.fast_mode:
|
||||
self.check_virustotal(packages_to_lookup)
|
||||
|
||||
@@ -4,18 +4,23 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Processes(AndroidExtraction):
|
||||
"""This module extracts details on running processes."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
@@ -25,7 +30,21 @@ class Processes(AndroidExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc = self.indicators.check_app_id(result.get("name", ""))
|
||||
proc_name = result.get("proc_name", "")
|
||||
if not proc_name:
|
||||
continue
|
||||
|
||||
# Skipping this process because of false positives.
|
||||
if result["proc_name"] == "gatekeeperd":
|
||||
continue
|
||||
|
||||
ioc = self.indicators.check_app_id(proc_name)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
ioc = self.indicators.check_process(proc_name)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
@@ -33,7 +52,7 @@ class Processes(AndroidExtraction):
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
|
||||
output = self._adb_command("ps -e")
|
||||
output = self._adb_command("ps -A")
|
||||
|
||||
for line in output.splitlines()[1:]:
|
||||
line = line.strip()
|
||||
@@ -63,4 +82,5 @@ class Processes(AndroidExtraction):
|
||||
|
||||
self._adb_disconnect()
|
||||
|
||||
log.info("Extracted records on a total of %d processes", len(self.results))
|
||||
self.log.info("Extracted records on a total of %d processes",
|
||||
len(self.results))
|
||||
|
||||
@@ -4,18 +4,23 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RootBinaries(AndroidExtraction):
|
||||
"""This module extracts the list of installed packages."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
@@ -4,20 +4,25 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SELinuxStatus(AndroidExtraction):
|
||||
"""This module checks if SELinux is being enforced."""
|
||||
|
||||
slug = "selinux_status"
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
@@ -4,12 +4,10 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
ANDROID_DANGEROUS_SETTINGS = [
|
||||
{
|
||||
"description": "disabled Google Play Services apps verification",
|
||||
@@ -62,9 +60,15 @@ ANDROID_DANGEROUS_SETTINGS = [
|
||||
class Settings(AndroidExtraction):
|
||||
"""This module extracts Android system settings."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
@@ -72,7 +76,7 @@ class Settings(AndroidExtraction):
|
||||
self.results = {} if not results else results
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
for namespace, settings in self.results.items():
|
||||
for _, settings in self.results.items():
|
||||
for key, value in settings.items():
|
||||
for danger in ANDROID_DANGEROUS_SETTINGS:
|
||||
# Check if one of the dangerous settings is using an unsafe
|
||||
|
||||
@@ -6,16 +6,15 @@
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.android.parsers.backup import (AndroidBackupParsingError,
|
||||
parse_tar_for_sms)
|
||||
from mvt.common.module import InsufficientPrivileges
|
||||
from mvt.common.utils import check_for_links, convert_timestamp_to_iso
|
||||
from mvt.common.utils import check_for_links, convert_unix_to_iso
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
SMS_BUGLE_PATH = "data/data/com.google.android.apps.messaging/databases/bugle_db"
|
||||
SMS_BUGLE_QUERY = """
|
||||
SELECT
|
||||
@@ -46,14 +45,22 @@ FROM sms;
|
||||
class SMS(AndroidExtraction):
|
||||
"""This module extracts all SMS messages containing links."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
def serialize(self, record: dict) -> None:
|
||||
self.sms_db_type = 0
|
||||
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
body = record["body"].replace("\n", "\\n")
|
||||
return {
|
||||
"timestamp": record["isodate"],
|
||||
@@ -84,9 +91,9 @@ class SMS(AndroidExtraction):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cur = conn.cursor()
|
||||
|
||||
if self.SMS_DB_TYPE == 1:
|
||||
if self.sms_db_type == 1:
|
||||
cur.execute(SMS_BUGLE_QUERY)
|
||||
elif self.SMS_DB_TYPE == 2:
|
||||
elif self.sms_db_type == 2:
|
||||
cur.execute(SMS_MMSMS_QUERY)
|
||||
|
||||
names = [description[0] for description in cur.description]
|
||||
@@ -97,7 +104,7 @@ class SMS(AndroidExtraction):
|
||||
message[names[index]] = value
|
||||
|
||||
message["direction"] = ("received" if message["incoming"] == 1 else "sent")
|
||||
message["isodate"] = convert_timestamp_to_iso(message["timestamp"])
|
||||
message["isodate"] = convert_unix_to_iso(message["timestamp"])
|
||||
|
||||
# If we find links in the messages or if they are empty we add
|
||||
# them to the list of results.
|
||||
@@ -107,11 +114,12 @@ class SMS(AndroidExtraction):
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
log.info("Extracted a total of %d SMS messages containing links", len(self.results))
|
||||
self.log.info("Extracted a total of %d SMS messages containing links",
|
||||
len(self.results))
|
||||
|
||||
def _extract_sms_adb(self) -> None:
|
||||
"""Use the Android backup command to extract SMS data from the native SMS
|
||||
app.
|
||||
"""Use the Android backup command to extract SMS data from the native
|
||||
SMS app.
|
||||
|
||||
It is crucial to use the under-documented "-nocompress" flag to disable
|
||||
the non-standard Java compression algorithm. This module only supports
|
||||
@@ -124,24 +132,34 @@ class SMS(AndroidExtraction):
|
||||
try:
|
||||
self.results = parse_tar_for_sms(backup_tar)
|
||||
except AndroidBackupParsingError:
|
||||
self.log.info("Impossible to read SMS from the Android Backup, please extract "\
|
||||
"the SMS and try extracting it with Android Backup Extractor")
|
||||
self.log.info("Impossible to read SMS from the Android Backup, "
|
||||
"please extract the SMS and try extracting it with "
|
||||
"Android Backup Extractor")
|
||||
return
|
||||
|
||||
log.info("Extracted a total of %d SMS messages containing links", len(self.results))
|
||||
self.log.info("Extracted a total of %d SMS messages containing links",
|
||||
len(self.results))
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
|
||||
try:
|
||||
if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))):
|
||||
self.SMS_DB_TYPE = 1
|
||||
self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), self._parse_db)
|
||||
elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))):
|
||||
self.SMS_DB_TYPE = 2
|
||||
self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), self._parse_db)
|
||||
if self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH)):
|
||||
self.sms_db_type = 1
|
||||
self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH),
|
||||
self._parse_db)
|
||||
elif self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH)):
|
||||
self.sms_db_type = 2
|
||||
self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH),
|
||||
self._parse_db)
|
||||
|
||||
self._adb_disconnect()
|
||||
return
|
||||
except InsufficientPrivileges:
|
||||
pass
|
||||
|
||||
self.log.warn("No SMS database found. Trying extraction of SMS data using " \
|
||||
"Android backup feature.")
|
||||
self.log.info("No SMS database found. Trying extraction of SMS data "
|
||||
"using Android backup feature.")
|
||||
self._extract_sms_adb()
|
||||
|
||||
self._adb_disconnect()
|
||||
|
||||
@@ -7,27 +7,32 @@ import base64
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.common.utils import check_for_links, convert_timestamp_to_iso
|
||||
from mvt.common.utils import check_for_links, convert_unix_to_iso
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
WHATSAPP_PATH = "data/data/com.whatsapp/databases/msgstore.db"
|
||||
|
||||
|
||||
class Whatsapp(AndroidExtraction):
|
||||
"""This module extracts all WhatsApp messages containing links."""
|
||||
|
||||
def __init__(self, file_path: str = None, target_path: str = None,
|
||||
results_path: str = None, fast_mode: bool = False,
|
||||
log: logging.Logger = None, results: list = []) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
def serialize(self, record: dict) -> None:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
text = record["data"].replace("\n", "\\n")
|
||||
return {
|
||||
"timestamp": record["isodate"],
|
||||
@@ -71,22 +76,32 @@ class Whatsapp(AndroidExtraction):
|
||||
continue
|
||||
|
||||
message["direction"] = ("send" if message["key_from_me"] == 1 else "received")
|
||||
message["isodate"] = convert_timestamp_to_iso(message["timestamp"])
|
||||
message["isodate"] = convert_unix_to_iso(message["timestamp"])
|
||||
|
||||
# If we find links in the messages or if they are empty we add them
|
||||
# to the list.
|
||||
if (check_for_links(message["data"])
|
||||
or message["data"].strip() == ""):
|
||||
if message.get("thumb_image"):
|
||||
message["thumb_image"] = base64.b64encode(
|
||||
message["thumb_image"])
|
||||
|
||||
# If we find links in the messages or if they are empty we add them to the list.
|
||||
if check_for_links(message["data"]) or message["data"].strip() == "":
|
||||
if (message.get('thumb_image') is not None):
|
||||
message['thumb_image'] = base64.b64encode(message['thumb_image'])
|
||||
messages.append(message)
|
||||
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
log.info("Extracted a total of %d WhatsApp messages containing links", len(messages))
|
||||
self.log.info("Extracted a total of %d WhatsApp messages containing links",
|
||||
len(messages))
|
||||
self.results = messages
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
|
||||
try:
|
||||
self._adb_process_file(os.path.join("/", WHATSAPP_PATH), self._parse_db)
|
||||
except Exception as e:
|
||||
self.log.error(e)
|
||||
self._adb_process_file(os.path.join("/", WHATSAPP_PATH),
|
||||
self._parse_db)
|
||||
except Exception as exc:
|
||||
self.log.error(exc)
|
||||
|
||||
self._adb_disconnect()
|
||||
|
||||
15
mvt/android/modules/androidqf/__init__.py
Normal file
15
mvt/android/modules/androidqf/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from .dumpsys_accessibility import DumpsysAccessibility
|
||||
from .dumpsys_activities import DumpsysActivities
|
||||
from .dumpsys_appops import DumpsysAppops
|
||||
from .dumpsys_receivers import DumpsysReceivers
|
||||
from .getprop import Getprop
|
||||
from .processes import Processes
|
||||
from .settings import Settings
|
||||
|
||||
ANDROIDQF_MODULES = [DumpsysActivities, DumpsysReceivers, DumpsysAccessibility,
|
||||
DumpsysAppops, Processes, Getprop, Settings]
|
||||
38
mvt/android/modules/androidqf/base.py
Normal file
38
mvt/android/modules/androidqf/base.py
Normal file
@@ -0,0 +1,38 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2022 Claudio Guarnieri.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import fnmatch
|
||||
import logging
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
|
||||
class AndroidQFModule(MVTModule):
|
||||
"""This class provides a base for all Android Data analysis modules."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
fast_mode: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None
|
||||
) -> None:
|
||||
super().__init__(file_path=file_path, target_path=target_path,
|
||||
results_path=results_path, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
self._path = target_path
|
||||
self._files = []
|
||||
|
||||
for root, dirs, files in os.walk(target_path):
|
||||
for name in files:
|
||||
self._files.append(os.path.join(root, name))
|
||||
|
||||
def _get_files_by_pattern(self, pattern):
|
||||
return fnmatch.filter(self._files, pattern)
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user