1
mirror of https://github.com/mvt-project/mvt synced 2025-11-13 01:37:36 +01:00

Compare commits

..

2 Commits

Author SHA1 Message Date
besendorf
e507c3ecbc Merge branch 'main' into fix/vt 2025-09-19 07:30:14 +02:00
Janik Besendorf
197c89b08f make virustotal check also work with androidqf extractions 2025-09-01 17:40:11 +02:00
8 changed files with 50 additions and 412 deletions

View File

@@ -2,4 +2,4 @@ mkdocs==1.6.1
mkdocs-autorefs==1.4.3
mkdocs-material==9.6.20
mkdocs-material-extensions==1.3.1
mkdocstrings==0.30.1
mkdocstrings==0.30.0

View File

@@ -35,7 +35,6 @@ dependencies = [
"pydantic-settings==2.10.1",
"NSKeyedUnArchiver==1.5.2",
"python-dateutil==2.9.0.post0",
"tzdata==2025.2",
]
requires-python = ">= 3.10"

View File

@@ -1,186 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from typing import Any
from .artifact import AndroidArtifact
SUSPICIOUS_MOUNT_POINTS = [
"/system",
"/vendor",
"/product",
"/system_ext",
]
SUSPICIOUS_OPTIONS = [
"rw",
"remount",
"noatime",
"nodiratime",
]
ALLOWLIST_NOATIME = [
"/system_dlkm",
"/system_ext",
"/product",
"/vendor",
"/vendor_dlkm",
]
class Mounts(AndroidArtifact):
"""
This artifact parses mount information from /proc/mounts or similar mount data.
It can detect potentially suspicious mount configurations that may indicate
a rooted or compromised device.
"""
def parse(self, entry: str) -> None:
"""
Parse mount information from the provided entry.
Examples:
/dev/block/bootdevice/by-name/system /system ext4 ro,seclabel,relatime 0 0
/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)
"""
self.results: list[dict[str, Any]] = []
for line in entry.splitlines():
line = line.strip()
if not line:
continue
device = None
mount_point = None
filesystem_type = None
mount_options = ""
if " on " in line and " type " in line:
try:
# Format: device on mount_point type filesystem_type (options)
device_part, rest = line.split(" on ", 1)
device = device_part.strip()
# Split by 'type' to get mount_point and filesystem info
mount_part, fs_part = rest.split(" type ", 1)
mount_point = mount_part.strip()
# Parse filesystem and options
if "(" in fs_part and fs_part.endswith(")"):
# Format: filesystem_type (options)
fs_and_opts = fs_part.strip()
paren_idx = fs_and_opts.find("(")
filesystem_type = fs_and_opts[:paren_idx].strip()
mount_options = fs_and_opts[paren_idx + 1 : -1].strip()
else:
# No options in parentheses, just filesystem type
filesystem_type = fs_part.strip()
mount_options = ""
# Skip if we don't have essential info
if not device or not mount_point or not filesystem_type:
continue
# Parse options into list
options_list = (
[opt.strip() for opt in mount_options.split(",") if opt.strip()]
if mount_options
else []
)
# Check if it's a system partition
is_system_partition = mount_point in SUSPICIOUS_MOUNT_POINTS or any(
mount_point.startswith(sp) for sp in SUSPICIOUS_MOUNT_POINTS
)
# Check if it's mounted read-write
is_read_write = "rw" in options_list
mount_entry = {
"device": device,
"mount_point": mount_point,
"filesystem_type": filesystem_type,
"mount_options": mount_options,
"options_list": options_list,
"is_system_partition": is_system_partition,
"is_read_write": is_read_write,
}
self.results.append(mount_entry)
except ValueError:
# If parsing fails, skip this line
continue
else:
# Skip lines that don't match expected format
continue
def check_indicators(self) -> None:
"""
Check for suspicious mount configurations that may indicate root access
or other security concerns.
"""
system_rw_mounts = []
suspicious_mounts = []
for mount in self.results:
mount_point = mount["mount_point"]
options = mount["options_list"]
# Check for system partitions mounted as read-write
if mount["is_system_partition"] and mount["is_read_write"]:
system_rw_mounts.append(mount)
if mount_point == "/system":
self.log.warning(
"Root detected /system partition is mounted as read-write (rw). "
)
else:
self.log.warning(
"System partition %s is mounted as read-write (rw). This may indicate system modifications.",
mount_point,
)
# Check for other suspicious mount options
suspicious_opts = [opt for opt in options if opt in SUSPICIOUS_OPTIONS]
if suspicious_opts and mount["is_system_partition"]:
if (
"noatime" in mount["mount_options"]
and mount["mount_point"] in ALLOWLIST_NOATIME
):
continue
suspicious_mounts.append(mount)
self.log.warning(
"Suspicious mount options found for %s: %s",
mount_point,
", ".join(suspicious_opts),
)
# Log interesting mount information
if mount_point == "/data" or mount_point.startswith("/sdcard"):
self.log.info(
"Data partition: %s mounted as %s with options: %s",
mount_point,
mount["filesystem_type"],
mount["mount_options"],
)
self.log.info("Parsed %d mount entries", len(self.results))
# Check indicators if available
if not self.indicators:
return
for mount in self.results:
# Check if any mount points match indicators
ioc = self.indicators.check_file_path(mount.get("mount_point", ""))
if ioc:
mount["matched_indicator"] = ioc
self.detected.append(mount)
# Check device paths for indicators
ioc = self.indicators.check_file_path(mount.get("device", ""))
if ioc:
mount["matched_indicator"] = ioc
self.detected.append(mount)

View File

@@ -53,7 +53,7 @@ class TombstoneCrashResult(pydantic.BaseModel):
file_name: str
file_timestamp: str # We store the timestamp as a string to avoid timezone issues
build_fingerprint: str
revision: str
revision: int
arch: Optional[str] = None
timestamp: str # We store the timestamp as a string to avoid timezone issues
process_uptime: Optional[int] = None
@@ -70,7 +70,7 @@ class TombstoneCrashResult(pydantic.BaseModel):
class TombstoneCrashArtifact(AndroidArtifact):
"""
""" "
Parser for Android tombstone crash files.
This parser can parse both text and protobuf tombstone crash files.
@@ -121,7 +121,9 @@ class TombstoneCrashArtifact(AndroidArtifact):
def parse_protobuf(
self, file_name: str, file_timestamp: datetime.datetime, data: bytes
) -> None:
"""Parse Android tombstone crash files from a protobuf object."""
"""
Parse Android tombstone crash files from a protobuf object.
"""
tombstone_pb = Tombstone().parse(data)
tombstone_dict = tombstone_pb.to_dict(
betterproto.Casing.SNAKE, include_default_values=True
@@ -142,23 +144,21 @@ class TombstoneCrashArtifact(AndroidArtifact):
def parse(
self, file_name: str, file_timestamp: datetime.datetime, content: bytes
) -> None:
"""Parse text Android tombstone crash files."""
"""
Parse text Android tombstone crash files.
"""
# Split the tombstone file into a dictonary
tombstone_dict = {
"file_name": file_name,
"file_timestamp": convert_datetime_to_iso(file_timestamp),
}
lines = content.decode("utf-8").splitlines()
for line_num, line in enumerate(lines, 1):
for line in lines:
if not line.strip() or TOMBSTONE_DELIMITER in line:
continue
try:
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
if self._parse_tombstone_line(
line, key, destination_key, tombstone_dict
):
break
except Exception as e:
raise ValueError(f"Error parsing line {line_num}: {str(e)}")
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
self._parse_tombstone_line(line, key, destination_key, tombstone_dict)
# Validate the tombstone and add it to the results
tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
@@ -168,7 +168,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
self, line: str, key: str, destination_key: str, tombstone: dict
) -> bool:
if not line.startswith(f"{key}"):
return False
return None
if key == "pid":
return self._load_pid_line(line, tombstone)
@@ -187,7 +187,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
raise ValueError(f"Expected key {key}, got {line_key}")
value_clean = value.strip().strip("'")
if destination_key == "uid":
if destination_key in ["uid", "revision"]:
tombstone[destination_key] = int(value_clean)
elif destination_key == "process_uptime":
# eg. "Process uptime: 40s"
@@ -200,50 +200,51 @@ class TombstoneCrashArtifact(AndroidArtifact):
return True
def _load_pid_line(self, line: str, tombstone: dict) -> bool:
try:
parts = line.split(" >>> ") if " >>> " in line else line.split(">>>")
process_info = parts[0]
pid_part, tid_part, name_part = [part.strip() for part in line.split(",")]
# Parse pid, tid, name from process info
info_parts = [p.strip() for p in process_info.split(",")]
for info in info_parts:
key, value = info.split(":", 1)
key = key.strip()
value = value.strip()
pid_key, pid_value = pid_part.split(":", 1)
if pid_key != "pid":
raise ValueError(f"Expected key pid, got {pid_key}")
pid_value = int(pid_value.strip())
if key == "pid":
tombstone["pid"] = int(value)
elif key == "tid":
tombstone["tid"] = int(value)
elif key == "name":
tombstone["process_name"] = value
tid_key, tid_value = tid_part.split(":", 1)
if tid_key != "tid":
raise ValueError(f"Expected key tid, got {tid_key}")
tid_value = int(tid_value.strip())
# Extract binary path if it exists
if len(parts) > 1:
tombstone["binary_path"] = parts[1].strip().rstrip(" <")
name_key, name_value = name_part.split(":", 1)
if name_key != "name":
raise ValueError(f"Expected key name, got {name_key}")
name_value = name_value.strip()
process_name, binary_path = self._parse_process_name(name_value, tombstone)
return True
tombstone["pid"] = pid_value
tombstone["tid"] = tid_value
tombstone["process_name"] = process_name
tombstone["binary_path"] = binary_path
return True
except Exception as e:
raise ValueError(f"Failed to parse PID line: {str(e)}")
def _parse_process_name(self, process_name_part, tombstone: dict) -> bool:
process_name, process_path = process_name_part.split(">>>")
process_name = process_name.strip()
binary_path = process_path.strip().split(" ")[0]
return process_name, binary_path
def _load_signal_line(self, line: str, tombstone: dict) -> bool:
signal_part, code_part = map(str.strip, line.split(",")[:2])
signal, code, _ = [part.strip() for part in line.split(",", 2)]
signal = signal.split("signal ")[1]
signal_code, signal_name = signal.split(" ")
signal_name = signal_name.strip("()")
def parse_part(part: str, prefix: str) -> tuple[int, str]:
match = part.split(prefix)[1]
number = int(match.split()[0])
name = match.split("(")[1].split(")")[0] if "(" in match else "UNKNOWN"
return number, name
signal_number, signal_name = parse_part(signal_part, "signal ")
code_number, code_name = parse_part(code_part, "code ")
code_part = code.split("code ")[1]
code_number, code_name = code_part.split(" ")
code_name = code_name.strip("()")
tombstone["signal_info"] = {
"code": code_number,
"code": int(code_number),
"code_name": code_name,
"name": signal_name,
"number": signal_number,
"number": int(signal_code),
}
return True
@@ -255,6 +256,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
@staticmethod
def _parse_timestamp_string(timestamp: str) -> str:
timestamp_parsed = parser.parse(timestamp)
# HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion.
local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc)
return convert_datetime_to_iso(local_timestamp)

View File

@@ -19,7 +19,6 @@ from .processes import Processes
from .settings import Settings
from .sms import SMS
from .files import Files
from .mounts import Mounts
ANDROIDQF_MODULES = [
DumpsysActivities,
@@ -38,5 +37,4 @@ ANDROIDQF_MODULES = [
SMS,
DumpsysPackages,
Files,
Mounts,
]

View File

@@ -1,74 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import json
from typing import Optional
from mvt.android.artifacts.mounts import Mounts as MountsArtifact
from .base import AndroidQFModule
class Mounts(MountsArtifact, AndroidQFModule):
"""This module extracts and analyzes mount information from AndroidQF acquisitions."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
self.results = []
def run(self) -> None:
"""
Run the mounts analysis module.
This module looks for mount information files collected by androidqf
and analyzes them for suspicious configurations, particularly focusing
on detecting root access indicators like /system mounted as read-write.
"""
mount_files = self._get_files_by_pattern("*/mounts.json")
if not mount_files:
self.log.info("No mount information file found")
return
self.log.info("Found mount information file: %s", mount_files[0])
try:
data = self._get_file_content(mount_files[0]).decode(
"utf-8", errors="replace"
)
except Exception as exc:
self.log.error("Failed to read mount information file: %s", exc)
return
# Parse the mount data
try:
json_data = json.loads(data)
if isinstance(json_data, list):
# AndroidQF format: array of strings like
# "/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)"
mount_content = "\n".join(json_data)
self.parse(mount_content)
except Exception as exc:
self.log.error("Failed to parse mount information: %s", exc)
return
self.log.info("Extracted a total of %d mount entries", len(self.results))

View File

@@ -1159,9 +1159,5 @@
{
"version": "26",
"build": "23A341"
},
{
"version": "26.0.1",
"build": "23A355"
}
]

View File

@@ -1,97 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pathlib import Path
from mvt.common.module import run_module
from ..utils import get_android_androidqf, list_files
class TestAndroidqfMountsArtifact:
def test_parse_mounts_token_checks(self):
"""
Test the artifact-level `parse` method using tolerant token checks.
Different parser variants may place mount tokens into different dict
keys (for example `mount_options`, `pass_num`, `dump_freq`, etc.). To
avoid brittle assertions we concatenate each parsed entry's values and
look for expected tokens (device names, mount points, options) somewhere
in the combined representation.
"""
from mvt.android.artifacts.mounts import Mounts as MountsArtifact
m = MountsArtifact()
mount_lines = [
"/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)",
"/dev/block/by-name/system on /system type ext4 (rw,seclabel,noatime)",
"/dev/block/by-name/data on /data type f2fs (rw,nosuid,nodev,noatime)",
]
mount_content = "\n".join(mount_lines)
# Parse the mount lines (artifact-level)
m.parse(mount_content)
# Basic sanity: parser should return one entry per input line
assert len(m.results) == 3, f"Expected 3 parsed mounts, got: {m.results}"
# Concatenate each entry's values into a single string so token checks
# are tolerant to which dict keys were used by the parser.
def concat_values(entry):
parts = []
for v in entry.values():
try:
parts.append(str(v))
except Exception:
# Skip values that can't be stringified
continue
return " ".join(parts)
concatenated = [concat_values(e) for e in m.results]
# Token expectations (tolerant):
# - Root line should include 'dm-12' and 'noatime' (and typically 'ro')
assert any("dm-12" in s and "noatime" in s for s in concatenated), (
f"No root-like tokens (dm-12 + noatime) found in parsed results: {concatenated}"
)
# - System line should include '/system' or 'by-name/system' and 'rw'
assert any(
(("by-name/system" in s or "/system" in s) and "rw" in s)
for s in concatenated
), (
f"No system-like tokens (system + rw) found in parsed results: {concatenated}"
)
# - Data line should include '/data' or 'by-name/data' and 'rw'
assert any(
(("by-name/data" in s or "/data" in s) and "rw" in s) for s in concatenated
), f"No data-like tokens (data + rw) found in parsed results: {concatenated}"
class TestAndroidqfMountsModule:
def test_androidqf_module_no_mounts_file(self):
"""
When no `mounts.json` is present in the androidqf dataset, the module
should not produce results nor detections.
"""
from mvt.android.modules.androidqf.mounts import Mounts
data_path = get_android_androidqf()
m = Mounts(target_path=data_path, log=logging)
files = list_files(data_path)
parent_path = Path(data_path).absolute().parent.as_posix()
m.from_folder(parent_path, files)
run_module(m)
# The provided androidqf test dataset does not include mounts.json, so
# results should remain empty.
assert len(m.results) == 0, (
f"Expected no results when mounts.json is absent, got: {m.results}"
)
assert len(m.detected) == 0, f"Expected no detections, got: {m.detected}"