1
mirror of https://github.com/mvt-project/mvt synced 2025-11-13 01:37:36 +01:00

Compare commits

..

1 Commits

Author SHA1 Message Date
Janik Besendorf
a5ae729b65 Fix tests on windows 2025-08-19 15:49:47 +02:00
192 changed files with 3399 additions and 3169 deletions

View File

@@ -23,7 +23,7 @@ install:
python3 -m pip install --upgrade -e .
test-requirements:
python3 -m pip install --upgrade --group dev
python3 -m pip install --upgrade -r test-requirements.txt
generate-proto-parsers:
# Generate python parsers for protobuf files

View File

@@ -1,26 +1,42 @@
# Deprecation of ADB command in MVT
# Check over ADB
In order to check an Android device over the [Android Debug Bridge (adb)](https://developer.android.com/studio/command-line/adb) you will first need to install [Android SDK Platform Tools](https://developer.android.com/studio/releases/platform-tools). If you have installed [Android Studio](https://developer.android.com/studio/) you should already have access to `adb` and other utilities.
While many Linux distributions already package Android Platform Tools (for example `android-platform-tools-base` on Debian), it is preferable to install the most recent version from the official website. Packaged versions might be outdated and incompatible with most recent Android handsets.
Next you will need to enable debugging on the Android device you are testing. [Please follow the official instructions on how to do so.](https://developer.android.com/studio/command-line/adb)
## Connecting over USB
The easiest way to check the device is over a USB transport. You will need to have USB debugging enabled and the device plugged into your computer. If everything is configured appropriately you should see your device when launching the command `adb devices`.
Now you can try launching MVT with:
```bash
mvt-android check-adb --output /path/to/results
```
If you have previously started an adb daemon MVT will alert you and require you to kill it with `adb kill-server` and relaunch the command.
!!! warning
MVT relies on the Python library [adb-shell](https://pypi.org/project/adb-shell/) to connect to an Android device, which relies on libusb for the USB transport. Because of known driver issues, Windows users [are recommended](https://github.com/JeffLIrion/adb_shell/issues/118) to install appropriate drivers using [Zadig](https://zadig.akeo.ie/). Alternatively, an easier option might be to use the TCP transport and connect over Wi-Fi as describe next.
The `mvt-android check-adb` command has been deprecated and removed from MVT.
## Connecting over Wi-FI
The ability to analyze Android devices over ADB (`mvt-android check-adb`) has been removed from MVT due to several technical and forensic limitations.
When connecting to the device over USB is not possible or not working properly, an alternative option is to connect over the network. In order to do so, first launch an adb daemon at a fixed port number:
## Reasons for Deprecation
```bash
adb tcpip 5555
```
1. **Inconsistent Data Collection Across Devices**
Android devices vary significantly in their system architecture, security policies, and available diagnostic logs. This inconsistency makes it difficult to ensure that MVT can reliably collect necessary forensic data across all devices.
Then you can specify the IP address of the phone with the adb port number to MVT like so:
2. **Incomplete Forensic Data Acquisition**
The `check-adb` command did not retrieve a full forensic snapshot of all available data on the device. For example, critical logs such as the **full bugreport** were not systematically collected, leading to potential gaps in forensic analysis. This can be a serious problem in scenarios where the analyst only had one time access to the Android device.
```bash
mvt-android check-adb --serial 192.168.1.20:5555 --output /path/to/results
```
4. **Code Duplication and Difficulty Ensuring Consistent Behavior Across Sources**
Similar forensic data such as "dumpsys" logs were being loaded and parsed by MVT's ADB, AndroidQF and Bugreport commands. Multiple modules were needed to handle each source format which created duplication leading to inconsistent
behavior and difficulties in maintaining the code base.
Where `192.168.1.20` is the correct IP address of your device.
5. **Alignment with iOS Workflow**
MVTs forensic workflow for iOS relies on pre-extracted artifacts, such as iTunes backups or filesystem dumps, rather than preforming commands or interactions directly on a live device. Removing the ADB functionality ensures a more consistent methodology across both Android and iOS mobile forensic.
## MVT modules requiring root privileges
## Alternative: Using AndroidQF for Forensic Data Collection
To replace the deprecated ADB-based approach, forensic analysts should use [AndroidQF](https://github.com/mvt-project/androidqf) for comprehensive data collection, followed by MVT for forensic analysis. The workflow is outlined in the MVT [Android methodology](./methodology.md)
Of the currently available `mvt-android check-adb` modules a handful require root privileges to function correctly. This is because certain files, such as browser history and SMS messages databases are not accessible with user privileges through adb. These modules are to be considered OPTIONALLY available in case the device was already jailbroken. **Do NOT jailbreak your own device unless you are sure of what you are doing!** Jailbreaking your phone exposes it to considerable security risks!

View File

@@ -1,53 +1,23 @@
# Methodology for Android forensic
Unfortunately Android devices provide fewer complete forensically useful datasources than their iOS cousins. Unlike iOS, the Android backup feature only provides a limited about of relevant data.
Android diagnostic logs such as *bugreport files* can be inconsistent in format and structure across different Android versions and device vendors. The limited diagnostic information available makes it difficult to triage potential compromises, and because of this `mvt-android` capabilities are limited as well.
Unfortunately Android devices provide much less observability than their iOS cousins. Android stores very little diagnostic information useful to triage potential compromises, and because of this `mvt-android` capabilities are limited as well.
However, not all is lost.
## Check Android devices with AndroidQF and MVT
## Check installed Apps
The [AndroidQF](https://github.com/mvt-project/androidqf) tool can be used to collect a wide range of forensic artifacts from an Android device including an Android backup, a bugreport file, and a range of system logs. MVT natively supports analyzing the generated AndroidQF output for signs of device compromise.
Because malware attacks over Android typically take the form of malicious or backdoored apps, the very first thing you might want to do is to extract and verify all installed Android packages and triage quickly if there are any which stand out as malicious or which might be atypical.
### Why Use AndroidQF?
While it is out of the scope of this documentation to dwell into details on how to analyze Android apps, MVT does allow to easily and automatically extract information about installed apps, download copies of them, and quickly look them up on services such as [VirusTotal](https://www.virustotal.com).
- **Complete and raw data extraction**
AndroidQF collects full forensic artifacts using an on-device forensic collection agent, ensuring that no crucial data is overlooked. The data collection does not depended on the shell environment or utilities available on the device.
- **Consistent and standardized output**
By collecting a predefined and complete set of forensic files, AndroidQF ensures consistency in data acquisition across different Android devices.
- **Future-proof analysis**
Since the full forensic artifacts are preserved, analysts can extract new evidence or apply updated analysis techniques without requiring access to the original device.
- **Cross-platform tool without dependencies**
AndroidQF is a standalone Go binary which can be used to remotely collect data from an Android device without the device owner needing to install MVT or a Python environment.
### Workflow for Android Forensic Analysis with AndroidQF
With AndroidQF the analysis process is split into a separate data collection and data analysis stages.
1. **Extract Data Using AndroidQF**
Deploy the AndroidQF forensic collector to acquire all relevant forensic artifacts from the Android device.
2. **Analyze Extracted Data with MVT**
Use the `mvt-android check-androidqf` command to perform forensic analysis on the extracted artifacts.
By separating artifact collection from forensic analysis, this approach ensures a more reliable and scalable methodology for Android forensic investigations.
For more information, refer to the [AndroidQF project documentation](https://github.com/mvt-project/androidqf).
!!! info "Using VirusTotal"
Please note that in order to use VirusTotal lookups you are required to provide your own API key through the `MVT_VT_API_KEY` environment variable. You should also note that VirusTotal enforces strict API usage. Be mindful that MVT might consume your hourly search quota.
## Check the device over Android Debug Bridge
The ability to analyze Android devices over ADB (`mvt-android check-adb`) has been removed from MVT.
Some additional diagnostic information can be extracted from the phone using the [Android Debug Bridge (adb)](https://developer.android.com/studio/command-line/adb). `mvt-android` allows to automatically extract information including [dumpsys](https://developer.android.com/studio/command-line/dumpsys) results, details on installed packages (without download), running processes, presence of root binaries and packages, and more.
See the [Android ADB documentation](./adb.md) for more information.
## Check an Android Backup (SMS messages)
Although Android backups are becoming deprecated, it is still possible to generate one. Unfortunately, because apps these days typically favor backup over the cloud, the amount of data available is limited.
The `mvt-android check-androidqf` command will automatically check an Android backup and SMS messages if an SMS backup is included in the AndroidQF extraction.
The `mvt-android check-backup` command can also be used directly with an Android backup file.
Although Android backups are becoming deprecated, it is still possible to generate one. Unfortunately, because apps these days typically favor backup over the cloud, the amount of data available is limited. Currently, `mvt-android check-backup` only supports checking SMS messages containing links.

View File

@@ -31,4 +31,21 @@ Test if the image was created successfully:
docker run -it mvt
```
If a prompt is spawned successfully, you can close it with `exit`.
If a prompt is spawned successfully, you can close it with `exit`.
## Docker usage with Android devices
If you wish to use MVT to test an Android device you will need to enable the container's access to the host's USB devices. You can do so by enabling the `--privileged` flag and mounting the USB bus device as a volume:
```bash
docker run -it --privileged -v /dev/bus/usb:/dev/bus/usb mvt
```
**Please note:** the `--privileged` parameter is generally regarded as a security risk. If you want to learn more about this check out [this explainer on container escapes](https://blog.trailofbits.com/2019/07/19/understanding-docker-container-escapes/) as it gives access to the whole system.
Recent versions of Docker provide a `--device` parameter allowing to specify a precise USB device without enabling `--privileged`:
```bash
docker run -it --device=/dev/<your_usb_port> mvt
```

View File

@@ -1,5 +1,5 @@
mkdocs==1.6.1
mkdocs-autorefs==1.4.3
mkdocs-material==9.6.20
mkdocs-autorefs==1.4.2
mkdocs-material==9.6.16
mkdocs-material-extensions==1.3.1
mkdocstrings==0.30.1
mkdocstrings==0.30.0

View File

@@ -1,11 +1,13 @@
[project]
name = "mvt"
dynamic = ["version"]
authors = [{ name = "Claudio Guarnieri", email = "nex@nex.sx" }]
authors = [
{name = "Claudio Guarnieri", email = "nex@nex.sx"}
]
maintainers = [
{ name = "Etienne Maynier", email = "tek@randhome.io" },
{ name = "Donncha Ó Cearbhaill", email = "donncha.ocearbhaill@amnesty.org" },
{ name = "Rory Flynn", email = "rory.flynn@amnesty.org" },
{name = "Etienne Maynier", email = "tek@randhome.io"},
{name = "Donncha Ó Cearbhaill", email = "donncha.ocearbhaill@amnesty.org"},
{name = "Rory Flynn", email = "rory.flynn@amnesty.org"}
]
description = "Mobile Verification Toolkit"
readme = "README.md"
@@ -14,7 +16,7 @@ classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Information Technology",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python"
]
dependencies = [
"click==8.2.1",
@@ -35,7 +37,6 @@ dependencies = [
"pydantic-settings==2.10.1",
"NSKeyedUnArchiver==1.5.2",
"python-dateutil==2.9.0.post0",
"tzdata==2025.2",
]
requires-python = ">= 3.10"
@@ -44,31 +45,20 @@ homepage = "https://docs.mvt.re/en/latest/"
repository = "https://github.com/mvt-project/mvt"
[project.scripts]
mvt-ios = "mvt.ios:cli"
mvt-android = "mvt.android:cli"
[dependency-groups]
dev = [
"requests>=2.31.0",
"pytest>=7.4.3",
"pytest-cov>=4.1.0",
"pytest-github-actions-annotate-failures>=0.2.0",
"pytest-mock>=3.14.0",
"stix2>=3.0.1",
"ruff>=0.1.6",
"mypy>=1.7.1",
"betterproto[compiler]",
]
mvt-ios = "mvt.ios:cli"
mvt-android = "mvt.android:cli"
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[tool.coverage.run]
omit = ["tests/*"]
omit = [
"tests/*",
]
[tool.coverage.html]
directory = "htmlcov"
directory= "htmlcov"
[tool.mypy]
install_types = true
@@ -78,13 +68,15 @@ packages = "src"
[tool.pytest.ini_options]
addopts = "-ra -q --cov=mvt --cov-report html --junitxml=pytest.xml --cov-report=term-missing:skip-covered"
testpaths = ["tests"]
testpaths = [
"tests"
]
[tool.ruff.lint]
select = ["C90", "E", "F", "W"] # flake8 default set
select = ["C90", "E", "F", "W"] # flake8 default set
ignore = [
"E501", # don't enforce line length violations
"C901", # complex-structure
"E501", # don't enforce line length violations
"C901", # complex-structure
# These were previously ignored but don't seem to be required:
# "E265", # no-space-after-block-comment
@@ -96,14 +88,14 @@ ignore = [
]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"] # unused-import
"__init__.py" = ["F401"] # unused-import
[tool.ruff.lint.mccabe]
max-complexity = 10
[tool.setuptools]
include-package-data = true
package-dir = { "" = "src" }
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]
@@ -112,4 +104,4 @@ where = ["src"]
mvt = ["ios/data/*.json"]
[tool.setuptools.dynamic]
version = { attr = "mvt.common.version.MVT_VERSION" }
version = {attr = "mvt.common.version.MVT_VERSION"}

View File

@@ -14,11 +14,10 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
return
for result in self.results:
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def parse(self, content: str) -> None:

View File

@@ -21,12 +21,22 @@ class DumpsysADBArtifact(AndroidArtifact):
stack = [res]
cur_indent = 0
in_multiline = False
for line in dump_data.strip(b"\n").split(b"\n"):
# Normalize line endings to handle both Unix (\n) and Windows (\r\n)
normalized_data = dump_data.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
for line in normalized_data.strip(b"\n").split(b"\n"):
# Skip completely empty lines
if not line.strip():
continue
# Track the level of indentation
indent = len(line) - len(line.lstrip())
if indent < cur_indent:
# If the current line is less indented than the previous one, back out
stack.pop()
while len(stack) > 1 and indent < cur_indent:
stack.pop()
# Check if we were in multiline mode and need to exit it
if in_multiline and not isinstance(stack[-1], list):
in_multiline = False
cur_indent = indent
else:
cur_indent = indent
@@ -38,12 +48,30 @@ class DumpsysADBArtifact(AndroidArtifact):
# Annoyingly, some values are multiline and don't have a key on each line
if in_multiline:
if key == "":
if key == "" and len(vals) < 2:
# If the line is empty, it's the terminator for the multiline value
in_multiline = False
stack.pop()
current_dict = stack[-1]
elif len(vals) >= 2 and (key in self.multiline_fields or key == "}" or vals[1] == b"{"):
# If we encounter a new field while in multiline mode, exit multiline mode
# and process this line as a new field
in_multiline = False
stack.pop()
current_dict = stack[-1]
# Don't continue here - let the line be processed as a new field
else:
current_dict.append(line.lstrip())
# When in multiline mode, the top of stack should be a list
if isinstance(stack[-1], list):
stack[-1].append(line.lstrip())
else:
# Something went wrong with the stack, exit multiline mode
in_multiline = False
current_dict = stack[-1]
continue
# Skip lines that don't have a value after '='
if len(vals) < 2:
continue
if key == "}":
@@ -133,7 +161,16 @@ class DumpsysADBArtifact(AndroidArtifact):
# TODO: Parse AdbDebuggingManager line in output.
start_of_json = content.find(b"\n{") + 2
end_of_json = content.rfind(b"}\n") - 2
# Handle both Unix (\n) and Windows (\r\n) line endings
end_of_json = content.rfind(b"}\n")
if end_of_json == -1:
end_of_json = content.rfind(b"}\r\n")
if end_of_json == -1:
self.log.error("Unable to find end of JSON block in dumpsys output")
return
end_of_json -= 2
json_content = content[start_of_json:end_of_json].rstrip()
parsed = self.indented_dump_parser(json_content)

View File

@@ -4,13 +4,13 @@
# https://license.mvt.re/1.1/
from datetime import datetime
from typing import Any
from typing import Any, Dict, List, Union
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from mvt.common.utils import convert_datetime_to_iso
from .artifact import AndroidArtifact
RISKY_PERMISSIONS = ["REQUEST_INSTALL_PACKAGES"]
RISKY_PACKAGES = ["com.android.shell"]
@@ -20,9 +20,9 @@ class DumpsysAppopsArtifact(AndroidArtifact):
Parser for dumpsys app ops info
"""
def serialize(self, result: ModuleAtomicResult) -> ModuleSerializedResult:
def serialize(self, record: dict) -> Union[dict, list]:
records = []
for perm in result["permissions"]:
for perm in record["permissions"]:
if "entries" not in perm:
continue
@@ -33,7 +33,7 @@ class DumpsysAppopsArtifact(AndroidArtifact):
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{result['package_name']} access to "
"data": f"{record['package_name']} access to "
f"{perm['name']}: {entry['access']}",
}
)
@@ -43,51 +43,51 @@ class DumpsysAppopsArtifact(AndroidArtifact):
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
ioc_match = self.indicators.check_app_id(result.get("package_name"))
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
# We use a placeholder entry to create a basic alert even without permission entries.
placeholder_entry = {"access": "Unknown", "timestamp": ""}
detected_permissions = []
for perm in result["permissions"]:
if (
perm["name"] in RISKY_PERMISSIONS
# and perm["access"] == "allow"
):
for entry in sorted(
perm["entries"] or [placeholder_entry],
key=lambda x: x["timestamp"],
):
cleaned_result = result.copy()
cleaned_result["permissions"] = [perm]
self.alertstore.medium(
f"Package '{result['package_name']}' had risky permission '{perm['name']}' set to '{entry['access']}' at {entry['timestamp']}",
detected_permissions.append(perm)
for entry in sorted(perm["entries"], key=lambda x: x["timestamp"]):
self.log.warning(
"Package '%s' had risky permission '%s' set to '%s' at %s",
result["package_name"],
perm["name"],
entry["access"],
entry["timestamp"],
cleaned_result,
)
elif result["package_name"] in RISKY_PACKAGES:
for entry in sorted(
perm["entries"] or [placeholder_entry],
key=lambda x: x["timestamp"],
):
cleaned_result = result.copy()
cleaned_result["permissions"] = [perm]
self.alertstore.medium(
f"Risky package '{result['package_name']}' had '{perm['name']}' permission set to '{entry['access']}' at {entry['timestamp']}",
detected_permissions.append(perm)
for entry in sorted(perm["entries"], key=lambda x: x["timestamp"]):
self.log.warning(
"Risky package '%s' had '%s' permission set to '%s' at %s",
result["package_name"],
perm["name"],
entry["access"],
entry["timestamp"],
cleaned_result,
)
if detected_permissions:
# We clean the result to only include the risky permission, otherwise the timeline
# will be polluted with all the other irrelevant permissions
cleaned_result = result.copy()
cleaned_result["permissions"] = detected_permissions
self.detected.append(cleaned_result)
def parse(self, output: str) -> None:
# self.results: List[Dict[str, Any]] = []
perm: dict[str, Any] = {}
package: dict[str, Any] = {}
entry: dict[str, Any] = {}
self.results: List[Dict[str, Any]] = []
perm = {}
package = {}
entry = {}
uid = None
in_packages = False

View File

@@ -3,9 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from typing import Any
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from typing import Union
from .artifact import AndroidArtifact
@@ -15,7 +13,7 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact):
Parser for dumpsys dattery daily updates.
"""
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
def serialize(self, record: dict) -> Union[dict, list]:
return {
"timestamp": record["from"],
"module": self.__class__.__name__,
@@ -29,16 +27,15 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact):
return
for result in self.results:
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def parse(self, output: str) -> None:
daily = None
daily_updates: list[dict[str, Any]] = []
daily_updates = []
for line in output.splitlines():
if line.startswith(" Daily from "):
if len(daily_updates) > 0:

View File

@@ -16,11 +16,10 @@ class DumpsysBatteryHistoryArtifact(AndroidArtifact):
return
for result in self.results:
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def parse(self, data: str) -> None:

View File

@@ -20,11 +20,10 @@ class DumpsysDBInfoArtifact(AndroidArtifact):
for result in self.results:
path = result.get("path", "")
for part in path.split("/"):
ioc_match = self.indicators.check_app_id(part)
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
ioc = self.indicators.check_app_id(part)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def parse(self, output: str) -> None:

View File

@@ -12,11 +12,10 @@ class DumpsysPackageActivitiesArtifact(AndroidArtifact):
return
for activity in self.results:
ioc_match = self.indicators.check_app_id(activity["package_name"])
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", activity, matched_indicator=ioc_match.ioc
)
ioc = self.indicators.check_app_id(activity["package_name"])
if ioc:
activity["matched_indicator"] = ioc
self.detected.append(activity)
continue
def parse(self, content: str):

View File

@@ -4,10 +4,9 @@
# https://license.mvt.re/1.1/
import re
from typing import Any, Dict, List
from typing import Any, Dict, List, Union
from mvt.android.utils import ROOT_PACKAGES
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from .artifact import AndroidArtifact
@@ -15,28 +14,25 @@ from .artifact import AndroidArtifact
class DumpsysPackagesArtifact(AndroidArtifact):
def check_indicators(self) -> None:
for result in self.results:
# XXX: De-duplication Package detections
if result["package_name"] in ROOT_PACKAGES:
self.alertstore.medium(
f'Found an installed package related to rooting/jailbreaking: "{result["package_name"]}"',
"",
result,
self.log.warning(
'Found an installed package related to rooting/jailbreaking: "%s"',
result["package_name"],
)
self.alertstore.log_latest()
self.detected.append(result)
continue
if not self.indicators:
continue
ioc_match = self.indicators.check_app_id(result.get("package_name", ""))
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
self.alertstore.log_latest()
ioc = self.indicators.check_app_id(result.get("package_name", ""))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
def serialize(self, record: dict) -> Union[dict, list]:
records = []
timestamps = [
{"event": "package_install", "timestamp": record["timestamp"]},
{
@@ -63,15 +59,15 @@ class DumpsysPackagesArtifact(AndroidArtifact):
"""
Parse one entry of a dumpsys package information
"""
details: Dict[str, Any] = {
details = {
"uid": "",
"version_name": "",
"version_code": "",
"timestamp": "",
"first_install_time": "",
"last_update_time": "",
"permissions": list(),
"requested_permissions": list(),
"permissions": [],
"requested_permissions": [],
}
in_install_permissions = False
in_runtime_permissions = False
@@ -149,7 +145,7 @@ class DumpsysPackagesArtifact(AndroidArtifact):
results = []
package_name = None
package = {}
lines: list[str] = []
lines = []
for line in output.splitlines():
if line.startswith(" Package ["):
if len(lines) > 0:

View File

@@ -16,11 +16,10 @@ class DumpsysPlatformCompatArtifact(AndroidArtifact):
return
for result in self.results:
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def parse(self, data: str) -> None:

View File

@@ -50,18 +50,14 @@ class DumpsysReceiversArtifact(AndroidArtifact):
if not self.indicators:
continue
ioc_match = self.indicators.check_app_id(receiver["package_name"])
if ioc_match:
self.alertstore.critical(
ioc_match.message,
"",
{intent: receiver},
matched_indicator=ioc_match.ioc,
)
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:
receiver["matched_indicator"] = ioc
self.detected.append({intent: receiver})
continue
def parse(self, output: str) -> None:
self.results: dict[str, list[dict[str, str]]] = {}
self.results = {}
in_receiver_resolver_table = False
in_non_data_actions = False

View File

@@ -2,13 +2,13 @@
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from typing import Union
from .artifact import AndroidArtifact
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
class FileTimestampsArtifact(AndroidArtifact):
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
def serialize(self, record: dict) -> Union[dict, list]:
records = []
for ts in set(

View File

@@ -39,10 +39,10 @@ class GetProp(AndroidArtifact):
if not matches or len(matches[0]) != 2:
continue
prop_entry = {"name": matches[0][0], "value": matches[0][1]}
self.results.append(prop_entry)
entry = {"name": matches[0][0], "value": matches[0][1]}
self.results.append(entry)
def get_device_timezone(self) -> str | None:
def get_device_timezone(self) -> str:
"""
Get the device timezone from the getprop results
@@ -59,17 +59,13 @@ class GetProp(AndroidArtifact):
self.log.info("%s: %s", entry["name"], entry["value"])
if entry["name"] == "ro.build.version.security_patch":
warning_message = warn_android_patch_level(entry["value"], self.log)
self.alertstore.medium(warning_message, "", entry)
warn_android_patch_level(entry["value"], self.log)
if not self.indicators:
return
for result in self.results:
ioc_match = self.indicators.check_android_property_name(
result.get("name", "")
)
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
ioc = self.indicators.check_android_property_name(result.get("name", ""))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@@ -1,197 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from typing import Any
from .artifact import AndroidArtifact
SUSPICIOUS_MOUNT_POINTS = [
"/system",
"/vendor",
"/product",
"/system_ext",
]
SUSPICIOUS_OPTIONS = [
"rw",
"remount",
"noatime",
"nodiratime",
]
ALLOWLIST_NOATIME = [
"/system_dlkm",
"/system_ext",
"/product",
"/vendor",
"/vendor_dlkm",
]
class Mounts(AndroidArtifact):
"""
This artifact parses mount information from /proc/mounts or similar mount data.
It can detect potentially suspicious mount configurations that may indicate
a rooted or compromised device.
"""
def parse(self, entry: str) -> None:
"""
Parse mount information from the provided entry.
Examples:
/dev/block/bootdevice/by-name/system /system ext4 ro,seclabel,relatime 0 0
/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)
"""
self.results: list[dict[str, Any]] = []
for line in entry.splitlines():
line = line.strip()
if not line:
continue
device = None
mount_point = None
filesystem_type = None
mount_options = ""
if " on " in line and " type " in line:
try:
# Format: device on mount_point type filesystem_type (options)
device_part, rest = line.split(" on ", 1)
device = device_part.strip()
# Split by 'type' to get mount_point and filesystem info
mount_part, fs_part = rest.split(" type ", 1)
mount_point = mount_part.strip()
# Parse filesystem and options
if "(" in fs_part and fs_part.endswith(")"):
# Format: filesystem_type (options)
fs_and_opts = fs_part.strip()
paren_idx = fs_and_opts.find("(")
filesystem_type = fs_and_opts[:paren_idx].strip()
mount_options = fs_and_opts[paren_idx + 1 : -1].strip()
else:
# No options in parentheses, just filesystem type
filesystem_type = fs_part.strip()
mount_options = ""
# Skip if we don't have essential info
if not device or not mount_point or not filesystem_type:
continue
# Parse options into list
options_list = (
[opt.strip() for opt in mount_options.split(",") if opt.strip()]
if mount_options
else []
)
# Check if it's a system partition
is_system_partition = mount_point in SUSPICIOUS_MOUNT_POINTS or any(
mount_point.startswith(sp) for sp in SUSPICIOUS_MOUNT_POINTS
)
# Check if it's mounted read-write
is_read_write = "rw" in options_list
mount_entry = {
"device": device,
"mount_point": mount_point,
"filesystem_type": filesystem_type,
"mount_options": mount_options,
"options_list": options_list,
"is_system_partition": is_system_partition,
"is_read_write": is_read_write,
}
self.results.append(mount_entry)
except ValueError:
# If parsing fails, skip this line
continue
else:
# Skip lines that don't match expected format
continue
def check_indicators(self) -> None:
"""
Check for suspicious mount configurations that may indicate root access
or other security concerns.
"""
system_rw_mounts = []
suspicious_mounts = []
for mount in self.results:
mount_point = mount["mount_point"]
options = mount["options_list"]
# Check for system partitions mounted as read-write
if mount["is_system_partition"] and mount["is_read_write"]:
system_rw_mounts.append(mount)
if mount_point == "/system":
self.alertstore.high(
"Root detected /system partition is mounted as read-write (rw)",
"",
mount,
)
else:
self.alertstore.high(
f"System partition {mount_point} is mounted as read-write (rw). This may indicate system modifications.",
"",
mount,
)
# Check for other suspicious mount options
suspicious_opts = [opt for opt in options if opt in SUSPICIOUS_OPTIONS]
if suspicious_opts and mount["is_system_partition"]:
if (
"noatime" in mount["mount_options"]
and mount["mount_point"] in ALLOWLIST_NOATIME
):
continue
suspicious_mounts.append(mount)
self.alertstore.high(
f"Suspicious mount options found for {mount_point}: {', '.join(suspicious_opts)}",
"",
mount,
)
# Log interesting mount information
if mount_point == "/data" or mount_point.startswith("/sdcard"):
self.log.info(
"Data partition: %s mounted as %s with options: %s",
mount_point,
mount["filesystem_type"],
mount["mount_options"],
)
self.log.info("Parsed %d mount entries", len(self.results))
# Check indicators if available
if not self.indicators:
return
for mount in self.results:
# Check if any mount points match indicators
ioc = self.indicators.check_file_path(mount.get("mount_point", ""))
if ioc:
self.alertstore.critical(
f"Mount point matches indicator: {mount.get('mount_point', '')}",
"",
mount,
matched_indicator=ioc,
)
# Check device paths for indicators
ioc = self.indicators.check_file_path(mount.get("device", ""))
if ioc:
self.alertstore.critical(
f"Device path matches indicator: {mount.get('device', '')}",
"",
mount,
matched_indicator=ioc,
)

View File

@@ -58,15 +58,13 @@ class Processes(AndroidArtifact):
if result["proc_name"] == "gatekeeperd":
continue
ioc_match = self.indicators.check_app_id(proc_name)
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
ioc = self.indicators.check_app_id(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
ioc_match = self.indicators.check_process(proc_name)
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
ioc = self.indicators.check_process(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@@ -4,18 +4,17 @@
# https://license.mvt.re/1.1/
import datetime
from typing import List, Optional
from typing import List, Optional, Union
import betterproto
import pydantic
import betterproto
from dateutil import parser
from mvt.android.parsers.proto.tombstone import Tombstone
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from mvt.common.utils import convert_datetime_to_iso
from mvt.android.parsers.proto.tombstone import Tombstone
from .artifact import AndroidArtifact
TOMBSTONE_DELIMITER = "*** *** *** *** *** *** *** *** *** *** *** *** *** *** *** ***"
# Map the legacy crash file keys to the new format.
@@ -54,7 +53,7 @@ class TombstoneCrashResult(pydantic.BaseModel):
file_name: str
file_timestamp: str # We store the timestamp as a string to avoid timezone issues
build_fingerprint: str
revision: str
revision: int
arch: Optional[str] = None
timestamp: str # We store the timestamp as a string to avoid timezone issues
process_uptime: Optional[int] = None
@@ -71,13 +70,13 @@ class TombstoneCrashResult(pydantic.BaseModel):
class TombstoneCrashArtifact(AndroidArtifact):
"""
""" "
Parser for Android tombstone crash files.
This parser can parse both text and protobuf tombstone crash files.
"""
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
def serialize(self, record: dict) -> Union[dict, list]:
return {
"timestamp": record["timestamp"],
"module": self.__class__.__name__,
@@ -93,21 +92,18 @@ class TombstoneCrashArtifact(AndroidArtifact):
return
for result in self.results:
ioc_match = self.indicators.check_process(result["process_name"])
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
ioc = self.indicators.check_process(result["process_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
if result.get("command_line", []):
command_name = result.get("command_line")[0].split("/")[-1]
command_name = result["command_line"][0]
ioc_match = self.indicators.check_process(command_name)
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
ioc = self.indicators.check_process(command_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
SUSPICIOUS_UIDS = [
@@ -116,19 +112,18 @@ class TombstoneCrashArtifact(AndroidArtifact):
2000, # shell
]
if result["uid"] in SUSPICIOUS_UIDS:
self.alertstore.medium(
(
f"Potentially suspicious crash in process '{result['process_name']}' "
f"running as UID '{result['uid']}' in tombstone '{result['file_name']}' at {result['timestamp']}"
),
"",
result,
self.log.warning(
f"Potentially suspicious crash in process '{result['process_name']}' "
f"running as UID '{result['uid']}' in tombstone '{result['file_name']}' at {result['timestamp']}"
)
self.detected.append(result)
def parse_protobuf(
self, file_name: str, file_timestamp: datetime.datetime, data: bytes
) -> None:
"""Parse Android tombstone crash files from a protobuf object."""
"""
Parse Android tombstone crash files from a protobuf object.
"""
tombstone_pb = Tombstone().parse(data)
tombstone_dict = tombstone_pb.to_dict(
betterproto.Casing.SNAKE, include_default_values=True
@@ -149,23 +144,21 @@ class TombstoneCrashArtifact(AndroidArtifact):
def parse(
self, file_name: str, file_timestamp: datetime.datetime, content: bytes
) -> None:
"""Parse text Android tombstone crash files."""
"""
Parse text Android tombstone crash files.
"""
# Split the tombstone file into a dictonary
tombstone_dict = {
"file_name": file_name,
"file_timestamp": convert_datetime_to_iso(file_timestamp),
}
lines = content.decode("utf-8").splitlines()
for line_num, line in enumerate(lines, 1):
for line in lines:
if not line.strip() or TOMBSTONE_DELIMITER in line:
continue
try:
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
if self._parse_tombstone_line(
line, key, destination_key, tombstone_dict
):
break
except Exception as e:
raise ValueError(f"Error parsing line {line_num}: {str(e)}")
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
self._parse_tombstone_line(line, key, destination_key, tombstone_dict)
# Validate the tombstone and add it to the results
tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
@@ -175,7 +168,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
self, line: str, key: str, destination_key: str, tombstone: dict
) -> bool:
if not line.startswith(f"{key}"):
return False
return None
if key == "pid":
return self._load_pid_line(line, tombstone)
@@ -194,7 +187,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
raise ValueError(f"Expected key {key}, got {line_key}")
value_clean = value.strip().strip("'")
if destination_key == "uid":
if destination_key in ["uid", "revision"]:
tombstone[destination_key] = int(value_clean)
elif destination_key == "process_uptime":
# eg. "Process uptime: 40s"
@@ -207,50 +200,51 @@ class TombstoneCrashArtifact(AndroidArtifact):
return True
def _load_pid_line(self, line: str, tombstone: dict) -> bool:
try:
parts = line.split(" >>> ") if " >>> " in line else line.split(">>>")
process_info = parts[0]
pid_part, tid_part, name_part = [part.strip() for part in line.split(",")]
# Parse pid, tid, name from process info
info_parts = [p.strip() for p in process_info.split(",")]
for info in info_parts:
key, value = info.split(":", 1)
key = key.strip()
value = value.strip()
pid_key, pid_value = pid_part.split(":", 1)
if pid_key != "pid":
raise ValueError(f"Expected key pid, got {pid_key}")
pid_value = int(pid_value.strip())
if key == "pid":
tombstone["pid"] = int(value)
elif key == "tid":
tombstone["tid"] = int(value)
elif key == "name":
tombstone["process_name"] = value
tid_key, tid_value = tid_part.split(":", 1)
if tid_key != "tid":
raise ValueError(f"Expected key tid, got {tid_key}")
tid_value = int(tid_value.strip())
# Extract binary path if it exists
if len(parts) > 1:
tombstone["binary_path"] = parts[1].strip().rstrip(" <")
name_key, name_value = name_part.split(":", 1)
if name_key != "name":
raise ValueError(f"Expected key name, got {name_key}")
name_value = name_value.strip()
process_name, binary_path = self._parse_process_name(name_value, tombstone)
return True
tombstone["pid"] = pid_value
tombstone["tid"] = tid_value
tombstone["process_name"] = process_name
tombstone["binary_path"] = binary_path
return True
except Exception as e:
raise ValueError(f"Failed to parse PID line: {str(e)}")
def _parse_process_name(self, process_name_part, tombstone: dict) -> bool:
process_name, process_path = process_name_part.split(">>>")
process_name = process_name.strip()
binary_path = process_path.strip().split(" ")[0]
return process_name, binary_path
def _load_signal_line(self, line: str, tombstone: dict) -> bool:
signal_part, code_part = map(str.strip, line.split(",")[:2])
signal, code, _ = [part.strip() for part in line.split(",", 2)]
signal = signal.split("signal ")[1]
signal_code, signal_name = signal.split(" ")
signal_name = signal_name.strip("()")
def parse_part(part: str, prefix: str) -> tuple[int, str]:
match = part.split(prefix)[1]
number = int(match.split()[0])
name = match.split("(")[1].split(")")[0] if "(" in match else "UNKNOWN"
return number, name
signal_number, signal_name = parse_part(signal_part, "signal ")
code_number, code_name = parse_part(code_part, "code ")
code_part = code.split("code ")[1]
code_number, code_name = code_part.split(" ")
code_name = code_name.strip("()")
tombstone["signal_info"] = {
"code": code_number,
"code": int(code_number),
"code_name": code_name,
"name": signal_name,
"number": signal_number,
"number": int(signal_code),
}
return True
@@ -262,6 +256,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
@staticmethod
def _parse_timestamp_string(timestamp: str) -> str:
timestamp_parsed = parser.parse(timestamp)
# HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion.
local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc)
return convert_datetime_to_iso(local_timestamp)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,37 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.common.command import Command
from .modules.adb import ADB_MODULES
log = logging.getLogger(__name__)
class CmdAndroidCheckADB(Command):
def __init__(
self,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
module_options: Optional[dict] = None,
) -> None:
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
module_options=module_options,
log=log,
)
self.name = "check-adb"
self.modules = ADB_MODULES

View File

@@ -9,192 +9,59 @@ import zipfile
from pathlib import Path
from typing import List, Optional
from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
from mvt.common.command import Command
from mvt.common.indicators import Indicators
from .modules.androidqf import ANDROIDQF_MODULES
from .modules.androidqf.base import AndroidQFModule
log = logging.getLogger(__name__)
class NoAndroidQFTargetPath(Exception):
pass
class NoAndroidQFBugReport(Exception):
pass
class NoAndroidQFBackup(Exception):
pass
class CmdAndroidCheckAndroidQF(Command):
def __init__(
self,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
ioc_files: Optional[list] = None,
iocs: Optional[Indicators] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
module_options: Optional[dict] = None,
hashes: Optional[bool] = False,
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
hashes: bool = False,
) -> None:
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
iocs=iocs,
module_name=module_name,
serial=serial,
module_options=module_options,
hashes=hashes,
sub_command=sub_command,
log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
)
self.name = "check-androidqf"
self.modules = ANDROIDQF_MODULES
self.__format: Optional[str] = None
self.__zip: Optional[zipfile.ZipFile] = None
self.__files: List[str] = []
self.format: Optional[str] = None
self.archive: Optional[zipfile.ZipFile] = None
self.files: List[str] = []
def init(self):
if not self.target_path:
raise NoAndroidQFTargetPath
if os.path.isdir(self.target_path):
self.__format = "dir"
self.format = "dir"
parent_path = Path(self.target_path).absolute().parent.as_posix()
target_abs_path = os.path.abspath(self.target_path)
for root, subdirs, subfiles in os.walk(target_abs_path):
for fname in subfiles:
file_path = os.path.relpath(os.path.join(root, fname), parent_path)
self.__files.append(file_path)
self.files.append(file_path)
elif os.path.isfile(self.target_path):
self.__format = "zip"
self.__zip = zipfile.ZipFile(self.target_path)
self.__files = self.__zip.namelist()
self.format = "zip"
self.archive = zipfile.ZipFile(self.target_path)
self.files = self.archive.namelist()
def module_init(self, module: AndroidQFModule) -> None: # type: ignore[override]
if self.__format == "zip" and self.__zip:
module.from_zip(self.__zip, self.__files)
return
if not self.target_path:
raise NoAndroidQFTargetPath
parent_path = Path(self.target_path).absolute().parent.as_posix()
module.from_dir(parent_path, self.__files)
def load_bugreport(self) -> zipfile.ZipFile:
bugreport_zip_path = None
for file_name in self.__files:
if file_name.endswith("bugreport.zip"):
bugreport_zip_path = file_name
break
def module_init(self, module):
if self.format == "zip":
module.from_zip_file(self.archive, self.files)
else:
raise NoAndroidQFBugReport
if self.__format == "zip" and self.__zip:
handle = self.__zip.open(bugreport_zip_path)
return zipfile.ZipFile(handle)
if self.__format == "dir" and self.target_path:
parent_path = Path(self.target_path).absolute().parent.as_posix()
bug_report_path = os.path.join(parent_path, bugreport_zip_path)
return zipfile.ZipFile(bug_report_path)
raise NoAndroidQFBugReport
def load_backup(self) -> bytes:
backup_ab_path = None
for file_name in self.__files:
if file_name.endswith("backup.ab"):
backup_ab_path = file_name
break
else:
raise NoAndroidQFBackup
if self.__format == "zip" and self.__zip:
backup_file_handle = self.__zip.open(backup_ab_path)
return backup_file_handle.read()
if self.__format == "dir" and self.target_path:
parent_path = Path(self.target_path).absolute().parent.as_posix()
backup_path = os.path.join(parent_path, backup_ab_path)
with open(backup_path, "rb") as backup_file:
backup_ab_data = backup_file.read()
return backup_ab_data
raise NoAndroidQFBackup
def run_bugreport_cmd(self) -> bool:
bugreport = None
try:
bugreport = self.load_bugreport()
except NoAndroidQFBugReport:
self.log.warning(
"Skipping bugreport modules as no bugreport.zip found in AndroidQF data."
)
return False
else:
cmd = CmdAndroidCheckBugreport(
target_path=None,
results_path=self.results_path,
ioc_files=self.ioc_files,
iocs=self.iocs,
module_options=self.module_options,
hashes=self.hashes,
sub_command=True,
)
cmd.from_zip(bugreport)
cmd.run()
self.timeline.extend(cmd.timeline)
self.alertstore.extend(cmd.alertstore.alerts)
finally:
if bugreport:
bugreport.close()
def run_backup_cmd(self) -> bool:
try:
backup = self.load_backup()
except NoAndroidQFBackup:
self.log.warning(
"Skipping backup modules as no backup.ab found in AndroidQF data."
)
return False
cmd = CmdAndroidCheckBackup(
target_path=None,
results_path=self.results_path,
ioc_files=self.ioc_files,
iocs=self.iocs,
module_options=self.module_options,
hashes=self.hashes,
sub_command=True,
)
cmd.from_ab(backup)
cmd.run()
self.timeline.extend(cmd.timeline)
self.alertstore.extend(cmd.alertstore.alerts)
return True
def finish(self) -> None:
"""
Run the bugreport and backup modules if the respective files are found in the AndroidQF data.
"""
self.run_bugreport_cmd()
self.run_backup_cmd()
module.from_folder(parent_path, self.files)

View File

@@ -11,7 +11,7 @@ import tarfile
from pathlib import Path
from typing import List, Optional
from mvt.android.modules.backup.base import BackupModule
from mvt.android.modules.backup.base import BackupExtraction
from mvt.android.modules.backup.helpers import prompt_or_load_android_backup_password
from mvt.android.parsers.backup import (
AndroidBackupParsingError,
@@ -20,7 +20,6 @@ from mvt.android.parsers.backup import (
parse_backup_file,
)
from mvt.common.command import Command
from mvt.common.indicators import Indicators
from .modules.backup import BACKUP_MODULES
@@ -33,81 +32,72 @@ class CmdAndroidCheckBackup(Command):
target_path: Optional[str] = None,
results_path: Optional[str] = None,
ioc_files: Optional[list] = None,
iocs: Optional[Indicators] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
module_options: Optional[dict] = None,
hashes: Optional[bool] = False,
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
hashes: bool = False,
) -> None:
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
iocs=iocs,
module_name=module_name,
serial=serial,
module_options=module_options,
hashes=hashes,
sub_command=sub_command,
log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
)
self.name = "check-backup"
self.modules = BACKUP_MODULES
self.__type: str = ""
self.__tar: Optional[tarfile.TarFile] = None
self.__files: List[str] = []
def from_ab(self, ab_file_bytes: bytes) -> None:
self.__type = "ab"
header = parse_ab_header(ab_file_bytes)
if not header["backup"]:
log.critical("Invalid backup format, file should be in .ab format")
sys.exit(1)
password = None
if header["encryption"] != "none":
password = prompt_or_load_android_backup_password(log, self.module_options)
if not password:
log.critical("No backup password provided.")
sys.exit(1)
try:
tardata = parse_backup_file(ab_file_bytes, password=password)
except InvalidBackupPassword:
log.critical("Invalid backup password")
sys.exit(1)
except AndroidBackupParsingError as exc:
log.critical("Impossible to parse this backup file: %s", exc)
log.critical("Please use Android Backup Extractor (ABE) instead")
sys.exit(1)
dbytes = io.BytesIO(tardata)
self.__tar = tarfile.open(fileobj=dbytes)
for member in self.__tar:
self.__files.append(member.name)
self.backup_type: str = ""
self.backup_archive: Optional[tarfile.TarFile] = None
self.backup_files: List[str] = []
def init(self) -> None:
if not self.target_path:
return
if os.path.isfile(self.target_path):
self.__type = "ab"
self.backup_type = "ab"
with open(self.target_path, "rb") as handle:
ab_file_bytes = handle.read()
self.from_ab(ab_file_bytes)
data = handle.read()
header = parse_ab_header(data)
if not header["backup"]:
log.critical("Invalid backup format, file should be in .ab format")
sys.exit(1)
password = None
if header["encryption"] != "none":
password = prompt_or_load_android_backup_password(
log, self.module_options
)
if not password:
log.critical("No backup password provided.")
sys.exit(1)
try:
tardata = parse_backup_file(data, password=password)
except InvalidBackupPassword:
log.critical("Invalid backup password")
sys.exit(1)
except AndroidBackupParsingError as exc:
log.critical("Impossible to parse this backup file: %s", exc)
log.critical("Please use Android Backup Extractor (ABE) instead")
sys.exit(1)
dbytes = io.BytesIO(tardata)
self.backup_archive = tarfile.open(fileobj=dbytes)
for member in self.backup_archive:
self.backup_files.append(member.name)
elif os.path.isdir(self.target_path):
self.__type = "folder"
self.backup_type = "folder"
self.target_path = Path(self.target_path).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
for fname in subfiles:
self.__files.append(
self.backup_files.append(
os.path.relpath(os.path.join(root, fname), self.target_path)
)
else:
@@ -117,12 +107,8 @@ class CmdAndroidCheckBackup(Command):
)
sys.exit(1)
def module_init(self, module: BackupModule) -> None: # type: ignore[override]
if self.__type == "folder":
module.from_dir(self.target_path, self.__files)
def module_init(self, module: BackupExtraction) -> None: # type: ignore[override]
if self.backup_type == "folder":
module.from_folder(self.target_path, self.backup_files)
else:
module.from_ab(self.target_path, self.__tar, self.__files)
def finish(self) -> None:
if self.__tar:
self.__tar.close()
module.from_ab(self.target_path, self.backup_archive, self.backup_files)

View File

@@ -11,7 +11,6 @@ from zipfile import ZipFile
from mvt.android.modules.bugreport.base import BugReportModule
from mvt.common.command import Command
from mvt.common.indicators import Indicators
from .modules.bugreport import BUGREPORT_MODULES
@@ -24,80 +23,54 @@ class CmdAndroidCheckBugreport(Command):
target_path: Optional[str] = None,
results_path: Optional[str] = None,
ioc_files: Optional[list] = None,
iocs: Optional[Indicators] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
module_options: Optional[dict] = None,
hashes: Optional[bool] = False,
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
hashes: bool = False,
) -> None:
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
iocs=iocs,
module_name=module_name,
serial=serial,
module_options=module_options,
hashes=hashes,
sub_command=sub_command,
log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
)
self.name = "check-bugreport"
self.modules = BUGREPORT_MODULES
self.__format: str = ""
self.__zip: Optional[ZipFile] = None
self.__files: List[str] = []
def from_dir(self, dir_path: str) -> None:
"""This method is used to initialize the bug report analysis from an
uncompressed directory.
"""
self.__format = "dir"
self.target_path = dir_path
parent_path = Path(dir_path).absolute().as_posix()
for root, _, subfiles in os.walk(os.path.abspath(dir_path)):
for file_name in subfiles:
file_path = os.path.relpath(os.path.join(root, file_name), parent_path)
self.__files.append(file_path)
def from_zip(self, bugreport_zip: ZipFile) -> None:
"""This method is used to initialize the bug report analysis from a
compressed archive.
"""
# NOTE: This will be invoked either by the CLI directly,or by the
# check-androidqf command. We need this because we want to support
# check-androidqf to analyse compressed archives itself too.
# So, we'll need to extract bugreport.zip from a 'androidqf.zip', and
# since nothing is written on disk, we need to be able to pass this
# command a ZipFile instance in memory.
self.__format = "zip"
self.__zip = bugreport_zip
for file_name in self.__zip.namelist():
self.__files.append(file_name)
self.bugreport_format: str = ""
self.bugreport_archive: Optional[ZipFile] = None
self.bugreport_files: List[str] = []
def init(self) -> None:
if not self.target_path:
return
if os.path.isfile(self.target_path):
self.from_zip(ZipFile(self.target_path))
self.bugreport_format = "zip"
self.bugreport_archive = ZipFile(self.target_path)
for file_name in self.bugreport_archive.namelist():
self.bugreport_files.append(file_name)
elif os.path.isdir(self.target_path):
self.from_dir(self.target_path)
self.bugreport_format = "dir"
parent_path = Path(self.target_path).absolute().as_posix()
for root, _, subfiles in os.walk(os.path.abspath(self.target_path)):
for file_name in subfiles:
file_path = os.path.relpath(
os.path.join(root, file_name), parent_path
)
self.bugreport_files.append(file_path)
def module_init(self, module: BugReportModule) -> None: # type: ignore[override]
if self.__format == "zip":
module.from_zip(self.__zip, self.__files)
if self.bugreport_format == "zip":
module.from_zip(self.bugreport_archive, self.bugreport_files)
else:
module.from_dir(self.target_path, self.__files)
module.from_folder(self.target_path, self.bugreport_files)
def finish(self) -> None:
if self.__zip:
self.__zip.close()
if self.bugreport_archive:
self.bugreport_archive.close()

View File

@@ -0,0 +1,184 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import json
import logging
import os
from typing import Callable, Optional, Union
from rich.progress import track
from mvt.common.module import InsufficientPrivileges
from .modules.adb.base import AndroidExtraction
from .modules.adb.packages import Packages
log = logging.getLogger(__name__)
class DownloadAPKs(AndroidExtraction):
"""DownloadAPKs is the main class operating the download of APKs
from the device.
"""
def __init__(
self,
results_path: Optional[str] = None,
all_apks: bool = False,
packages: Optional[list] = None,
) -> None:
"""Initialize module.
:param results_path: Path to the folder where data should be stored
:param all_apks: Boolean indicating whether to download all packages
or filter known-goods
:param packages: Provided list of packages, typically for JSON checks
"""
super().__init__(results_path=results_path, log=log)
self.packages = packages
self.all_apks = all_apks
self.results_path_apks = None
@classmethod
def from_json(cls, json_path: str) -> Callable:
"""Initialize this class from an existing apks.json file.
:param json_path: Path to the apks.json file to parse.
"""
with open(json_path, "r", encoding="utf-8") as handle:
packages = json.load(handle)
return cls(packages=packages)
def pull_package_file(
self, package_name: str, remote_path: str
) -> Union[str, None]:
"""Pull files related to specific package from the device.
:param package_name: Name of the package to download
:param remote_path: Path to the file to download
:returns: Path to the local copy
"""
log.info("Downloading %s ...", remote_path)
file_name = ""
if "==/" in remote_path:
file_name = "_" + remote_path.split("==/")[1].replace(".apk", "")
local_path = os.path.join(
self.results_path_apks, f"{package_name}{file_name}.apk"
)
name_counter = 0
while True:
if not os.path.exists(local_path):
break
name_counter += 1
local_path = os.path.join(
self.results_path_apks, f"{package_name}{file_name}_{name_counter}.apk"
)
try:
self._adb_download(remote_path, local_path)
except InsufficientPrivileges:
log.error(
"Unable to pull package file from %s: insufficient privileges, "
"it might be a system app",
remote_path,
)
self._adb_reconnect()
return None
except Exception as exc:
log.exception("Failed to pull package file from %s: %s", remote_path, exc)
self._adb_reconnect()
return None
return local_path
def get_packages(self) -> None:
"""Use the Packages adb module to retrieve the list of packages.
We reuse the same extraction logic to then download the APKs.
"""
self.log.info("Retrieving list of installed packages...")
m = Packages()
m.log = self.log
m.serial = self.serial
m.run()
self.packages = m.results
def pull_packages(self) -> None:
"""Download all files of all selected packages from the device."""
log.info(
"Starting extraction of installed APKs at folder %s", self.results_path
)
# If the user provided the flag --all-apks we select all packages.
packages_selection = []
if self.all_apks:
log.info("Selected all %d available packages", len(self.packages))
packages_selection = self.packages
else:
# Otherwise we loop through the packages and get only those that
# are not marked as system.
for package in self.packages:
if not package.get("system", False):
packages_selection.append(package)
log.info(
'Selected only %d packages which are not marked as "system"',
len(packages_selection),
)
if len(packages_selection) == 0:
log.info("No packages were selected for download")
return
log.info("Downloading packages from device. This might take some time ...")
self.results_path_apks = os.path.join(self.results_path, "apks")
if not os.path.exists(self.results_path_apks):
os.makedirs(self.results_path_apks, exist_ok=True)
for i in track(
range(len(packages_selection)),
description=f"Downloading {len(packages_selection)} packages...",
):
package = packages_selection[i]
log.info(
"[%d/%d] Package: %s",
i,
len(packages_selection),
package["package_name"],
)
# Sometimes the package path contains multiple lines for multiple
# apks. We loop through each line and download each file.
for package_file in package["files"]:
device_path = package_file["path"]
local_path = self.pull_package_file(
package["package_name"], device_path
)
if not local_path:
continue
package_file["local_path"] = local_path
log.info("Download of selected packages completed")
def save_json(self) -> None:
json_path = os.path.join(self.results_path, "apks.json")
with open(json_path, "w", encoding="utf-8") as handle:
json.dump(self.packages, handle, indent=4)
def run(self) -> None:
self.get_packages()
self._adb_connect()
self.pull_packages()
self.save_json()
self._adb_disconnect()

View File

@@ -4,7 +4,15 @@
# https://license.mvt.re/1.1/
from .chrome_history import ChromeHistory
from .dumpsys_accessibility import DumpsysAccessibility
from .dumpsys_activities import DumpsysActivities
from .dumpsys_appops import DumpsysAppOps
from .dumpsys_battery_daily import DumpsysBatteryDaily
from .dumpsys_battery_history import DumpsysBatteryHistory
from .dumpsys_dbinfo import DumpsysDBInfo
from .dumpsys_adbstate import DumpsysADBState
from .dumpsys_full import DumpsysFull
from .dumpsys_receivers import DumpsysReceivers
from .files import Files
from .getprop import Getprop
from .logcat import Logcat
@@ -24,7 +32,15 @@ ADB_MODULES = [
Getprop,
Settings,
SELinuxStatus,
DumpsysBatteryHistory,
DumpsysBatteryDaily,
DumpsysReceivers,
DumpsysActivities,
DumpsysAccessibility,
DumpsysDBInfo,
DumpsysADBState,
DumpsysFull,
DumpsysAppOps,
Packages,
Logcat,
RootBinaries,

File diff suppressed because it is too large Load Diff

View File

@@ -6,13 +6,8 @@
import logging
import os
import sqlite3
from typing import Optional
from typing import Optional, Union
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
from .base import AndroidExtraction
@@ -30,7 +25,7 @@ class ChromeHistory(AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: ModuleResults = [],
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
@@ -42,7 +37,7 @@ class ChromeHistory(AndroidExtraction):
)
self.results = []
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
def serialize(self, record: dict) -> Union[dict, list]:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -56,11 +51,9 @@ class ChromeHistory(AndroidExtraction):
return
for result in self.results:
ioc_match = self.indicators.check_url(result["url"])
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
if self.indicators.check_url(result["url"]):
self.detected.append(result)
continue
def _parse_db(self, db_path: str) -> None:
"""Parse a Chrome History database file.

View File

@@ -0,0 +1,49 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_accessibility import DumpsysAccessibilityArtifact
from .base import AndroidExtraction
class DumpsysAccessibility(DumpsysAccessibilityArtifact, AndroidExtraction):
"""This module extracts stats on accessibility."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys accessibility")
self._adb_disconnect()
self.parse(output)
for result in self.results:
self.log.info(
'Found installed accessibility service "%s"', result.get("service")
)
self.log.info(
"Identified a total of %d accessibility services", len(self.results)
)

View File

@@ -0,0 +1,45 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_package_activities import (
DumpsysPackageActivitiesArtifact,
)
from .base import AndroidExtraction
class DumpsysActivities(DumpsysPackageActivitiesArtifact, AndroidExtraction):
"""This module extracts details on receivers for risky activities."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
self.results = results if results else []
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys package")
self._adb_disconnect()
self.parse(output)
self.log.info("Extracted %d package activities", len(self.results))

Some files were not shown because too many files have changed in this diff Show More