1
mirror of https://github.com/mvt-project/mvt synced 2025-11-13 01:37:36 +01:00

Compare commits

..

1 Commits

Author SHA1 Message Date
tek
2025e2edf2 Adds iOS 18.6.1 2025-08-20 11:07:42 +02:00
89 changed files with 1452 additions and 1301 deletions

View File

@@ -23,7 +23,7 @@ install:
python3 -m pip install --upgrade -e . python3 -m pip install --upgrade -e .
test-requirements: test-requirements:
python3 -m pip install --upgrade --group dev python3 -m pip install --upgrade -r test-requirements.txt
generate-proto-parsers: generate-proto-parsers:
# Generate python parsers for protobuf files # Generate python parsers for protobuf files

View File

@@ -1,5 +1,5 @@
mkdocs==1.6.1 mkdocs==1.6.1
mkdocs-autorefs==1.4.3 mkdocs-autorefs==1.4.2
mkdocs-material==9.6.20 mkdocs-material==9.6.16
mkdocs-material-extensions==1.3.1 mkdocs-material-extensions==1.3.1
mkdocstrings==0.30.1 mkdocstrings==0.30.0

View File

@@ -1,11 +1,13 @@
[project] [project]
name = "mvt" name = "mvt"
dynamic = ["version"] dynamic = ["version"]
authors = [{ name = "Claudio Guarnieri", email = "nex@nex.sx" }] authors = [
{name = "Claudio Guarnieri", email = "nex@nex.sx"}
]
maintainers = [ maintainers = [
{ name = "Etienne Maynier", email = "tek@randhome.io" }, {name = "Etienne Maynier", email = "tek@randhome.io"},
{ name = "Donncha Ó Cearbhaill", email = "donncha.ocearbhaill@amnesty.org" }, {name = "Donncha Ó Cearbhaill", email = "donncha.ocearbhaill@amnesty.org"},
{ name = "Rory Flynn", email = "rory.flynn@amnesty.org" }, {name = "Rory Flynn", email = "rory.flynn@amnesty.org"}
] ]
description = "Mobile Verification Toolkit" description = "Mobile Verification Toolkit"
readme = "README.md" readme = "README.md"
@@ -14,7 +16,7 @@ classifiers = [
"Development Status :: 5 - Production/Stable", "Development Status :: 5 - Production/Stable",
"Intended Audience :: Information Technology", "Intended Audience :: Information Technology",
"Operating System :: OS Independent", "Operating System :: OS Independent",
"Programming Language :: Python", "Programming Language :: Python"
] ]
dependencies = [ dependencies = [
"click==8.2.1", "click==8.2.1",
@@ -35,7 +37,6 @@ dependencies = [
"pydantic-settings==2.10.1", "pydantic-settings==2.10.1",
"NSKeyedUnArchiver==1.5.2", "NSKeyedUnArchiver==1.5.2",
"python-dateutil==2.9.0.post0", "python-dateutil==2.9.0.post0",
"tzdata==2025.2",
] ]
requires-python = ">= 3.10" requires-python = ">= 3.10"
@@ -44,31 +45,20 @@ homepage = "https://docs.mvt.re/en/latest/"
repository = "https://github.com/mvt-project/mvt" repository = "https://github.com/mvt-project/mvt"
[project.scripts] [project.scripts]
mvt-ios = "mvt.ios:cli" mvt-ios = "mvt.ios:cli"
mvt-android = "mvt.android:cli" mvt-android = "mvt.android:cli"
[dependency-groups]
dev = [
"requests>=2.31.0",
"pytest>=7.4.3",
"pytest-cov>=4.1.0",
"pytest-github-actions-annotate-failures>=0.2.0",
"pytest-mock>=3.14.0",
"stix2>=3.0.1",
"ruff>=0.1.6",
"mypy>=1.7.1",
"betterproto[compiler]",
]
[build-system] [build-system]
requires = ["setuptools>=61.0"] requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
[tool.coverage.run] [tool.coverage.run]
omit = ["tests/*"] omit = [
"tests/*",
]
[tool.coverage.html] [tool.coverage.html]
directory = "htmlcov" directory= "htmlcov"
[tool.mypy] [tool.mypy]
install_types = true install_types = true
@@ -78,13 +68,15 @@ packages = "src"
[tool.pytest.ini_options] [tool.pytest.ini_options]
addopts = "-ra -q --cov=mvt --cov-report html --junitxml=pytest.xml --cov-report=term-missing:skip-covered" addopts = "-ra -q --cov=mvt --cov-report html --junitxml=pytest.xml --cov-report=term-missing:skip-covered"
testpaths = ["tests"] testpaths = [
"tests"
]
[tool.ruff.lint] [tool.ruff.lint]
select = ["C90", "E", "F", "W"] # flake8 default set select = ["C90", "E", "F", "W"] # flake8 default set
ignore = [ ignore = [
"E501", # don't enforce line length violations "E501", # don't enforce line length violations
"C901", # complex-structure "C901", # complex-structure
# These were previously ignored but don't seem to be required: # These were previously ignored but don't seem to be required:
# "E265", # no-space-after-block-comment # "E265", # no-space-after-block-comment
@@ -96,14 +88,14 @@ ignore = [
] ]
[tool.ruff.lint.per-file-ignores] [tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"] # unused-import "__init__.py" = ["F401"] # unused-import
[tool.ruff.lint.mccabe] [tool.ruff.lint.mccabe]
max-complexity = 10 max-complexity = 10
[tool.setuptools] [tool.setuptools]
include-package-data = true include-package-data = true
package-dir = { "" = "src" } package-dir = {"" = "src"}
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
where = ["src"] where = ["src"]
@@ -112,4 +104,4 @@ where = ["src"]
mvt = ["ios/data/*.json"] mvt = ["ios/data/*.json"]
[tool.setuptools.dynamic] [tool.setuptools.dynamic]
version = { attr = "mvt.common.version.MVT_VERSION" } version = {attr = "mvt.common.version.MVT_VERSION"}

View File

@@ -1,186 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from typing import Any
from .artifact import AndroidArtifact
SUSPICIOUS_MOUNT_POINTS = [
"/system",
"/vendor",
"/product",
"/system_ext",
]
SUSPICIOUS_OPTIONS = [
"rw",
"remount",
"noatime",
"nodiratime",
]
ALLOWLIST_NOATIME = [
"/system_dlkm",
"/system_ext",
"/product",
"/vendor",
"/vendor_dlkm",
]
class Mounts(AndroidArtifact):
"""
This artifact parses mount information from /proc/mounts or similar mount data.
It can detect potentially suspicious mount configurations that may indicate
a rooted or compromised device.
"""
def parse(self, entry: str) -> None:
"""
Parse mount information from the provided entry.
Examples:
/dev/block/bootdevice/by-name/system /system ext4 ro,seclabel,relatime 0 0
/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)
"""
self.results: list[dict[str, Any]] = []
for line in entry.splitlines():
line = line.strip()
if not line:
continue
device = None
mount_point = None
filesystem_type = None
mount_options = ""
if " on " in line and " type " in line:
try:
# Format: device on mount_point type filesystem_type (options)
device_part, rest = line.split(" on ", 1)
device = device_part.strip()
# Split by 'type' to get mount_point and filesystem info
mount_part, fs_part = rest.split(" type ", 1)
mount_point = mount_part.strip()
# Parse filesystem and options
if "(" in fs_part and fs_part.endswith(")"):
# Format: filesystem_type (options)
fs_and_opts = fs_part.strip()
paren_idx = fs_and_opts.find("(")
filesystem_type = fs_and_opts[:paren_idx].strip()
mount_options = fs_and_opts[paren_idx + 1 : -1].strip()
else:
# No options in parentheses, just filesystem type
filesystem_type = fs_part.strip()
mount_options = ""
# Skip if we don't have essential info
if not device or not mount_point or not filesystem_type:
continue
# Parse options into list
options_list = (
[opt.strip() for opt in mount_options.split(",") if opt.strip()]
if mount_options
else []
)
# Check if it's a system partition
is_system_partition = mount_point in SUSPICIOUS_MOUNT_POINTS or any(
mount_point.startswith(sp) for sp in SUSPICIOUS_MOUNT_POINTS
)
# Check if it's mounted read-write
is_read_write = "rw" in options_list
mount_entry = {
"device": device,
"mount_point": mount_point,
"filesystem_type": filesystem_type,
"mount_options": mount_options,
"options_list": options_list,
"is_system_partition": is_system_partition,
"is_read_write": is_read_write,
}
self.results.append(mount_entry)
except ValueError:
# If parsing fails, skip this line
continue
else:
# Skip lines that don't match expected format
continue
def check_indicators(self) -> None:
"""
Check for suspicious mount configurations that may indicate root access
or other security concerns.
"""
system_rw_mounts = []
suspicious_mounts = []
for mount in self.results:
mount_point = mount["mount_point"]
options = mount["options_list"]
# Check for system partitions mounted as read-write
if mount["is_system_partition"] and mount["is_read_write"]:
system_rw_mounts.append(mount)
if mount_point == "/system":
self.log.warning(
"Root detected /system partition is mounted as read-write (rw). "
)
else:
self.log.warning(
"System partition %s is mounted as read-write (rw). This may indicate system modifications.",
mount_point,
)
# Check for other suspicious mount options
suspicious_opts = [opt for opt in options if opt in SUSPICIOUS_OPTIONS]
if suspicious_opts and mount["is_system_partition"]:
if (
"noatime" in mount["mount_options"]
and mount["mount_point"] in ALLOWLIST_NOATIME
):
continue
suspicious_mounts.append(mount)
self.log.warning(
"Suspicious mount options found for %s: %s",
mount_point,
", ".join(suspicious_opts),
)
# Log interesting mount information
if mount_point == "/data" or mount_point.startswith("/sdcard"):
self.log.info(
"Data partition: %s mounted as %s with options: %s",
mount_point,
mount["filesystem_type"],
mount["mount_options"],
)
self.log.info("Parsed %d mount entries", len(self.results))
# Check indicators if available
if not self.indicators:
return
for mount in self.results:
# Check if any mount points match indicators
ioc = self.indicators.check_file_path(mount.get("mount_point", ""))
if ioc:
mount["matched_indicator"] = ioc
self.detected.append(mount)
# Check device paths for indicators
ioc = self.indicators.check_file_path(mount.get("device", ""))
if ioc:
mount["matched_indicator"] = ioc
self.detected.append(mount)

View File

@@ -53,7 +53,7 @@ class TombstoneCrashResult(pydantic.BaseModel):
file_name: str file_name: str
file_timestamp: str # We store the timestamp as a string to avoid timezone issues file_timestamp: str # We store the timestamp as a string to avoid timezone issues
build_fingerprint: str build_fingerprint: str
revision: str revision: int
arch: Optional[str] = None arch: Optional[str] = None
timestamp: str # We store the timestamp as a string to avoid timezone issues timestamp: str # We store the timestamp as a string to avoid timezone issues
process_uptime: Optional[int] = None process_uptime: Optional[int] = None
@@ -70,7 +70,7 @@ class TombstoneCrashResult(pydantic.BaseModel):
class TombstoneCrashArtifact(AndroidArtifact): class TombstoneCrashArtifact(AndroidArtifact):
""" """ "
Parser for Android tombstone crash files. Parser for Android tombstone crash files.
This parser can parse both text and protobuf tombstone crash files. This parser can parse both text and protobuf tombstone crash files.
@@ -121,7 +121,9 @@ class TombstoneCrashArtifact(AndroidArtifact):
def parse_protobuf( def parse_protobuf(
self, file_name: str, file_timestamp: datetime.datetime, data: bytes self, file_name: str, file_timestamp: datetime.datetime, data: bytes
) -> None: ) -> None:
"""Parse Android tombstone crash files from a protobuf object.""" """
Parse Android tombstone crash files from a protobuf object.
"""
tombstone_pb = Tombstone().parse(data) tombstone_pb = Tombstone().parse(data)
tombstone_dict = tombstone_pb.to_dict( tombstone_dict = tombstone_pb.to_dict(
betterproto.Casing.SNAKE, include_default_values=True betterproto.Casing.SNAKE, include_default_values=True
@@ -142,23 +144,21 @@ class TombstoneCrashArtifact(AndroidArtifact):
def parse( def parse(
self, file_name: str, file_timestamp: datetime.datetime, content: bytes self, file_name: str, file_timestamp: datetime.datetime, content: bytes
) -> None: ) -> None:
"""Parse text Android tombstone crash files.""" """
Parse text Android tombstone crash files.
"""
# Split the tombstone file into a dictonary
tombstone_dict = { tombstone_dict = {
"file_name": file_name, "file_name": file_name,
"file_timestamp": convert_datetime_to_iso(file_timestamp), "file_timestamp": convert_datetime_to_iso(file_timestamp),
} }
lines = content.decode("utf-8").splitlines() lines = content.decode("utf-8").splitlines()
for line_num, line in enumerate(lines, 1): for line in lines:
if not line.strip() or TOMBSTONE_DELIMITER in line: if not line.strip() or TOMBSTONE_DELIMITER in line:
continue continue
try: for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items(): self._parse_tombstone_line(line, key, destination_key, tombstone_dict)
if self._parse_tombstone_line(
line, key, destination_key, tombstone_dict
):
break
except Exception as e:
raise ValueError(f"Error parsing line {line_num}: {str(e)}")
# Validate the tombstone and add it to the results # Validate the tombstone and add it to the results
tombstone = TombstoneCrashResult.model_validate(tombstone_dict) tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
@@ -168,7 +168,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
self, line: str, key: str, destination_key: str, tombstone: dict self, line: str, key: str, destination_key: str, tombstone: dict
) -> bool: ) -> bool:
if not line.startswith(f"{key}"): if not line.startswith(f"{key}"):
return False return None
if key == "pid": if key == "pid":
return self._load_pid_line(line, tombstone) return self._load_pid_line(line, tombstone)
@@ -187,7 +187,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
raise ValueError(f"Expected key {key}, got {line_key}") raise ValueError(f"Expected key {key}, got {line_key}")
value_clean = value.strip().strip("'") value_clean = value.strip().strip("'")
if destination_key == "uid": if destination_key in ["uid", "revision"]:
tombstone[destination_key] = int(value_clean) tombstone[destination_key] = int(value_clean)
elif destination_key == "process_uptime": elif destination_key == "process_uptime":
# eg. "Process uptime: 40s" # eg. "Process uptime: 40s"
@@ -200,50 +200,51 @@ class TombstoneCrashArtifact(AndroidArtifact):
return True return True
def _load_pid_line(self, line: str, tombstone: dict) -> bool: def _load_pid_line(self, line: str, tombstone: dict) -> bool:
try: pid_part, tid_part, name_part = [part.strip() for part in line.split(",")]
parts = line.split(" >>> ") if " >>> " in line else line.split(">>>")
process_info = parts[0]
# Parse pid, tid, name from process info pid_key, pid_value = pid_part.split(":", 1)
info_parts = [p.strip() for p in process_info.split(",")] if pid_key != "pid":
for info in info_parts: raise ValueError(f"Expected key pid, got {pid_key}")
key, value = info.split(":", 1) pid_value = int(pid_value.strip())
key = key.strip()
value = value.strip()
if key == "pid": tid_key, tid_value = tid_part.split(":", 1)
tombstone["pid"] = int(value) if tid_key != "tid":
elif key == "tid": raise ValueError(f"Expected key tid, got {tid_key}")
tombstone["tid"] = int(value) tid_value = int(tid_value.strip())
elif key == "name":
tombstone["process_name"] = value
# Extract binary path if it exists name_key, name_value = name_part.split(":", 1)
if len(parts) > 1: if name_key != "name":
tombstone["binary_path"] = parts[1].strip().rstrip(" <") raise ValueError(f"Expected key name, got {name_key}")
name_value = name_value.strip()
process_name, binary_path = self._parse_process_name(name_value, tombstone)
return True tombstone["pid"] = pid_value
tombstone["tid"] = tid_value
tombstone["process_name"] = process_name
tombstone["binary_path"] = binary_path
return True
except Exception as e: def _parse_process_name(self, process_name_part, tombstone: dict) -> bool:
raise ValueError(f"Failed to parse PID line: {str(e)}") process_name, process_path = process_name_part.split(">>>")
process_name = process_name.strip()
binary_path = process_path.strip().split(" ")[0]
return process_name, binary_path
def _load_signal_line(self, line: str, tombstone: dict) -> bool: def _load_signal_line(self, line: str, tombstone: dict) -> bool:
signal_part, code_part = map(str.strip, line.split(",")[:2]) signal, code, _ = [part.strip() for part in line.split(",", 2)]
signal = signal.split("signal ")[1]
signal_code, signal_name = signal.split(" ")
signal_name = signal_name.strip("()")
def parse_part(part: str, prefix: str) -> tuple[int, str]: code_part = code.split("code ")[1]
match = part.split(prefix)[1] code_number, code_name = code_part.split(" ")
number = int(match.split()[0]) code_name = code_name.strip("()")
name = match.split("(")[1].split(")")[0] if "(" in match else "UNKNOWN"
return number, name
signal_number, signal_name = parse_part(signal_part, "signal ")
code_number, code_name = parse_part(code_part, "code ")
tombstone["signal_info"] = { tombstone["signal_info"] = {
"code": code_number, "code": int(code_number),
"code_name": code_name, "code_name": code_name,
"name": signal_name, "name": signal_name,
"number": signal_number, "number": int(signal_code),
} }
return True return True
@@ -255,6 +256,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
@staticmethod @staticmethod
def _parse_timestamp_string(timestamp: str) -> str: def _parse_timestamp_string(timestamp: str) -> str:
timestamp_parsed = parser.parse(timestamp) timestamp_parsed = parser.parse(timestamp)
# HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion. # HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion.
local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc) local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc)
return convert_datetime_to_iso(local_timestamp) return convert_datetime_to_iso(local_timestamp)

View File

@@ -31,8 +31,6 @@ from mvt.common.help import (
HELP_MSG_HASHES, HELP_MSG_HASHES,
HELP_MSG_CHECK_IOCS, HELP_MSG_CHECK_IOCS,
HELP_MSG_STIX2, HELP_MSG_STIX2,
HELP_MSG_DISABLE_UPDATE_CHECK,
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
) )
from mvt.common.logo import logo from mvt.common.logo import logo
from mvt.common.updates import IndicatorsUpdates from mvt.common.updates import IndicatorsUpdates
@@ -55,37 +53,12 @@ log = logging.getLogger("mvt")
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
def _get_disable_flags(ctx):
"""Helper function to safely get disable flags from context."""
if ctx.obj is None:
return False, False
return (
ctx.obj.get("disable_version_check", False),
ctx.obj.get("disable_indicator_check", False),
)
# ============================================================================== # ==============================================================================
# Main # Main
# ============================================================================== # ==============================================================================
@click.group(invoke_without_command=False) @click.group(invoke_without_command=False)
@click.option( def cli():
"--disable-update-check", is_flag=True, help=HELP_MSG_DISABLE_UPDATE_CHECK logo()
)
@click.option(
"--disable-indicator-update-check",
is_flag=True,
help=HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
)
@click.pass_context
def cli(ctx, disable_update_check, disable_indicator_update_check):
ctx.ensure_object(dict)
ctx.obj["disable_version_check"] = disable_update_check
ctx.obj["disable_indicator_check"] = disable_indicator_update_check
logo(
disable_version_check=disable_update_check,
disable_indicator_check=disable_indicator_update_check,
)
# ============================================================================== # ==============================================================================
@@ -193,8 +166,6 @@ def check_adb(
module_name=module, module_name=module,
serial=serial, serial=serial,
module_options=module_options, module_options=module_options,
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
) )
if list_modules: if list_modules:
@@ -241,8 +212,6 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_
ioc_files=iocs, ioc_files=iocs,
module_name=module, module_name=module,
hashes=True, hashes=True,
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
) )
if list_modules: if list_modules:
@@ -305,8 +274,6 @@ def check_backup(
"interactive": not non_interactive, "interactive": not non_interactive,
"backup_password": cli_load_android_backup_password(log, backup_password), "backup_password": cli_load_android_backup_password(log, backup_password),
}, },
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
) )
if list_modules: if list_modules:
@@ -371,8 +338,6 @@ def check_androidqf(
"interactive": not non_interactive, "interactive": not non_interactive,
"backup_password": cli_load_android_backup_password(log, backup_password), "backup_password": cli_load_android_backup_password(log, backup_password),
}, },
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
) )
if list_modules: if list_modules:
@@ -407,13 +372,7 @@ def check_androidqf(
@click.argument("FOLDER", type=click.Path(exists=True)) @click.argument("FOLDER", type=click.Path(exists=True))
@click.pass_context @click.pass_context
def check_iocs(ctx, iocs, list_modules, module, folder): def check_iocs(ctx, iocs, list_modules, module, folder):
cmd = CmdCheckIOCS( cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
target_path=folder,
ioc_files=iocs,
module_name=module,
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
)
cmd.modules = BACKUP_MODULES + ADB_MODULES + BUGREPORT_MODULES cmd.modules = BACKUP_MODULES + ADB_MODULES + BUGREPORT_MODULES
if list_modules: if list_modules:

View File

@@ -7,7 +7,6 @@ import logging
from typing import Optional from typing import Optional
from mvt.common.command import Command from mvt.common.command import Command
from mvt.common.indicators import Indicators
from .modules.adb import ADB_MODULES from .modules.adb import ADB_MODULES
@@ -20,28 +19,18 @@ class CmdAndroidCheckADB(Command):
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
ioc_files: Optional[list] = None, ioc_files: Optional[list] = None,
iocs: Optional[Indicators] = None,
module_name: Optional[str] = None, module_name: Optional[str] = None,
serial: Optional[str] = None, serial: Optional[str] = None,
module_options: Optional[dict] = None, module_options: Optional[dict] = None,
hashes: Optional[bool] = False,
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
) -> None: ) -> None:
super().__init__( super().__init__(
target_path=target_path, target_path=target_path,
results_path=results_path, results_path=results_path,
ioc_files=ioc_files, ioc_files=ioc_files,
iocs=iocs,
module_name=module_name, module_name=module_name,
serial=serial, serial=serial,
module_options=module_options, module_options=module_options,
hashes=hashes,
sub_command=sub_command,
log=log, log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
) )
self.name = "check-adb" self.name = "check-adb"

View File

@@ -9,186 +9,59 @@ import zipfile
from pathlib import Path from pathlib import Path
from typing import List, Optional from typing import List, Optional
from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
from mvt.common.command import Command from mvt.common.command import Command
from mvt.common.indicators import Indicators
from .modules.androidqf import ANDROIDQF_MODULES from .modules.androidqf import ANDROIDQF_MODULES
from .modules.androidqf.base import AndroidQFModule
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class NoAndroidQFTargetPath(Exception):
pass
class NoAndroidQFBugReport(Exception):
pass
class NoAndroidQFBackup(Exception):
pass
class CmdAndroidCheckAndroidQF(Command): class CmdAndroidCheckAndroidQF(Command):
def __init__( def __init__(
self, self,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
ioc_files: Optional[list] = None, ioc_files: Optional[list] = None,
iocs: Optional[Indicators] = None,
module_name: Optional[str] = None, module_name: Optional[str] = None,
serial: Optional[str] = None, serial: Optional[str] = None,
module_options: Optional[dict] = None, module_options: Optional[dict] = None,
hashes: Optional[bool] = False, hashes: bool = False,
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
) -> None: ) -> None:
super().__init__( super().__init__(
target_path=target_path, target_path=target_path,
results_path=results_path, results_path=results_path,
ioc_files=ioc_files, ioc_files=ioc_files,
iocs=iocs,
module_name=module_name, module_name=module_name,
serial=serial, serial=serial,
module_options=module_options, module_options=module_options,
hashes=hashes, hashes=hashes,
sub_command=sub_command,
log=log, log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
) )
self.name = "check-androidqf" self.name = "check-androidqf"
self.modules = ANDROIDQF_MODULES self.modules = ANDROIDQF_MODULES
self.__format: Optional[str] = None self.format: Optional[str] = None
self.__zip: Optional[zipfile.ZipFile] = None self.archive: Optional[zipfile.ZipFile] = None
self.__files: List[str] = [] self.files: List[str] = []
def init(self): def init(self):
if os.path.isdir(self.target_path): if os.path.isdir(self.target_path):
self.__format = "dir" self.format = "dir"
parent_path = Path(self.target_path).absolute().parent.as_posix() parent_path = Path(self.target_path).absolute().parent.as_posix()
target_abs_path = os.path.abspath(self.target_path) target_abs_path = os.path.abspath(self.target_path)
for root, subdirs, subfiles in os.walk(target_abs_path): for root, subdirs, subfiles in os.walk(target_abs_path):
for fname in subfiles: for fname in subfiles:
file_path = os.path.relpath(os.path.join(root, fname), parent_path) file_path = os.path.relpath(os.path.join(root, fname), parent_path)
self.__files.append(file_path) self.files.append(file_path)
elif os.path.isfile(self.target_path): elif os.path.isfile(self.target_path):
self.__format = "zip" self.format = "zip"
self.__zip = zipfile.ZipFile(self.target_path) self.archive = zipfile.ZipFile(self.target_path)
self.__files = self.__zip.namelist() self.files = self.archive.namelist()
def module_init(self, module: AndroidQFModule) -> None: # type: ignore[override] def module_init(self, module):
if self.__format == "zip" and self.__zip: if self.format == "zip":
module.from_zip(self.__zip, self.__files) module.from_zip_file(self.archive, self.files)
return
if not self.target_path:
raise NoAndroidQFTargetPath
parent_path = Path(self.target_path).absolute().parent.as_posix()
module.from_dir(parent_path, self.__files)
def load_bugreport(self) -> zipfile.ZipFile:
bugreport_zip_path = None
for file_name in self.__files:
if file_name.endswith("bugreport.zip"):
bugreport_zip_path = file_name
break
else: else:
raise NoAndroidQFBugReport
if self.__format == "zip" and self.__zip:
handle = self.__zip.open(bugreport_zip_path)
return zipfile.ZipFile(handle)
if self.__format == "dir" and self.target_path:
parent_path = Path(self.target_path).absolute().parent.as_posix() parent_path = Path(self.target_path).absolute().parent.as_posix()
bug_report_path = os.path.join(parent_path, bugreport_zip_path) module.from_folder(parent_path, self.files)
return zipfile.ZipFile(bug_report_path)
raise NoAndroidQFBugReport
def load_backup(self) -> bytes:
backup_ab_path = None
for file_name in self.__files:
if file_name.endswith("backup.ab"):
backup_ab_path = file_name
break
else:
raise NoAndroidQFBackup
if self.__format == "zip" and self.__zip:
backup_file_handle = self.__zip.open(backup_ab_path)
return backup_file_handle.read()
if self.__format == "dir" and self.target_path:
parent_path = Path(self.target_path).absolute().parent.as_posix()
backup_path = os.path.join(parent_path, backup_ab_path)
with open(backup_path, "rb") as backup_file:
backup_ab_data = backup_file.read()
return backup_ab_data
raise NoAndroidQFBackup
def run_bugreport_cmd(self) -> bool:
try:
bugreport = self.load_bugreport()
except NoAndroidQFBugReport:
self.log.warning(
"Skipping bugreport modules as no bugreport.zip found in AndroidQF data."
)
return False
else:
cmd = CmdAndroidCheckBugreport(
target_path=None,
results_path=self.results_path,
ioc_files=self.ioc_files,
iocs=self.iocs,
module_options=self.module_options,
hashes=self.hashes,
sub_command=True,
)
cmd.from_zip(bugreport)
cmd.run()
self.detected_count += cmd.detected_count
self.timeline.extend(cmd.timeline)
self.timeline_detected.extend(cmd.timeline_detected)
def run_backup_cmd(self) -> bool:
try:
backup = self.load_backup()
except NoAndroidQFBackup:
self.log.warning(
"Skipping backup modules as no backup.ab found in AndroidQF data."
)
return False
else:
cmd = CmdAndroidCheckBackup(
target_path=None,
results_path=self.results_path,
ioc_files=self.ioc_files,
iocs=self.iocs,
module_options=self.module_options,
hashes=self.hashes,
sub_command=True,
)
cmd.from_ab(backup)
cmd.run()
self.detected_count += cmd.detected_count
self.timeline.extend(cmd.timeline)
self.timeline_detected.extend(cmd.timeline_detected)
def finish(self) -> None:
"""
Run the bugreport and backup modules if the respective files are found in the AndroidQF data.
"""
self.run_bugreport_cmd()
self.run_backup_cmd()

View File

@@ -20,7 +20,6 @@ from mvt.android.parsers.backup import (
parse_backup_file, parse_backup_file,
) )
from mvt.common.command import Command from mvt.common.command import Command
from mvt.common.indicators import Indicators
from .modules.backup import BACKUP_MODULES from .modules.backup import BACKUP_MODULES
@@ -33,28 +32,20 @@ class CmdAndroidCheckBackup(Command):
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
ioc_files: Optional[list] = None, ioc_files: Optional[list] = None,
iocs: Optional[Indicators] = None,
module_name: Optional[str] = None, module_name: Optional[str] = None,
serial: Optional[str] = None, serial: Optional[str] = None,
module_options: Optional[dict] = None, module_options: Optional[dict] = None,
hashes: Optional[bool] = False, hashes: bool = False,
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
) -> None: ) -> None:
super().__init__( super().__init__(
target_path=target_path, target_path=target_path,
results_path=results_path, results_path=results_path,
ioc_files=ioc_files, ioc_files=ioc_files,
iocs=iocs,
module_name=module_name, module_name=module_name,
serial=serial, serial=serial,
module_options=module_options, module_options=module_options,
hashes=hashes, hashes=hashes,
sub_command=sub_command,
log=log, log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
) )
self.name = "check-backup" self.name = "check-backup"
@@ -64,34 +55,6 @@ class CmdAndroidCheckBackup(Command):
self.backup_archive: Optional[tarfile.TarFile] = None self.backup_archive: Optional[tarfile.TarFile] = None
self.backup_files: List[str] = [] self.backup_files: List[str] = []
def from_ab(self, ab_file_bytes: bytes) -> None:
self.backup_type = "ab"
header = parse_ab_header(ab_file_bytes)
if not header["backup"]:
log.critical("Invalid backup format, file should be in .ab format")
sys.exit(1)
password = None
if header["encryption"] != "none":
password = prompt_or_load_android_backup_password(log, self.module_options)
if not password:
log.critical("No backup password provided.")
sys.exit(1)
try:
tardata = parse_backup_file(ab_file_bytes, password=password)
except InvalidBackupPassword:
log.critical("Invalid backup password")
sys.exit(1)
except AndroidBackupParsingError as exc:
log.critical("Impossible to parse this backup file: %s", exc)
log.critical("Please use Android Backup Extractor (ABE) instead")
sys.exit(1)
dbytes = io.BytesIO(tardata)
self.backup_archive = tarfile.open(fileobj=dbytes)
for member in self.backup_archive:
self.backup_files.append(member.name)
def init(self) -> None: def init(self) -> None:
if not self.target_path: if not self.target_path:
return return
@@ -99,8 +62,35 @@ class CmdAndroidCheckBackup(Command):
if os.path.isfile(self.target_path): if os.path.isfile(self.target_path):
self.backup_type = "ab" self.backup_type = "ab"
with open(self.target_path, "rb") as handle: with open(self.target_path, "rb") as handle:
ab_file_bytes = handle.read() data = handle.read()
self.from_ab(ab_file_bytes)
header = parse_ab_header(data)
if not header["backup"]:
log.critical("Invalid backup format, file should be in .ab format")
sys.exit(1)
password = None
if header["encryption"] != "none":
password = prompt_or_load_android_backup_password(
log, self.module_options
)
if not password:
log.critical("No backup password provided.")
sys.exit(1)
try:
tardata = parse_backup_file(data, password=password)
except InvalidBackupPassword:
log.critical("Invalid backup password")
sys.exit(1)
except AndroidBackupParsingError as exc:
log.critical("Impossible to parse this backup file: %s", exc)
log.critical("Please use Android Backup Extractor (ABE) instead")
sys.exit(1)
dbytes = io.BytesIO(tardata)
self.backup_archive = tarfile.open(fileobj=dbytes)
for member in self.backup_archive:
self.backup_files.append(member.name)
elif os.path.isdir(self.target_path): elif os.path.isdir(self.target_path):
self.backup_type = "folder" self.backup_type = "folder"
@@ -119,6 +109,6 @@ class CmdAndroidCheckBackup(Command):
def module_init(self, module: BackupExtraction) -> None: # type: ignore[override] def module_init(self, module: BackupExtraction) -> None: # type: ignore[override]
if self.backup_type == "folder": if self.backup_type == "folder":
module.from_dir(self.target_path, self.backup_files) module.from_folder(self.target_path, self.backup_files)
else: else:
module.from_ab(self.target_path, self.backup_archive, self.backup_files) module.from_ab(self.target_path, self.backup_archive, self.backup_files)

View File

@@ -11,7 +11,6 @@ from zipfile import ZipFile
from mvt.android.modules.bugreport.base import BugReportModule from mvt.android.modules.bugreport.base import BugReportModule
from mvt.common.command import Command from mvt.common.command import Command
from mvt.common.indicators import Indicators
from .modules.bugreport import BUGREPORT_MODULES from .modules.bugreport import BUGREPORT_MODULES
@@ -24,80 +23,54 @@ class CmdAndroidCheckBugreport(Command):
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
ioc_files: Optional[list] = None, ioc_files: Optional[list] = None,
iocs: Optional[Indicators] = None,
module_name: Optional[str] = None, module_name: Optional[str] = None,
serial: Optional[str] = None, serial: Optional[str] = None,
module_options: Optional[dict] = None, module_options: Optional[dict] = None,
hashes: Optional[bool] = False, hashes: bool = False,
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
) -> None: ) -> None:
super().__init__( super().__init__(
target_path=target_path, target_path=target_path,
results_path=results_path, results_path=results_path,
ioc_files=ioc_files, ioc_files=ioc_files,
iocs=iocs,
module_name=module_name, module_name=module_name,
serial=serial, serial=serial,
module_options=module_options, module_options=module_options,
hashes=hashes, hashes=hashes,
sub_command=sub_command,
log=log, log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
) )
self.name = "check-bugreport" self.name = "check-bugreport"
self.modules = BUGREPORT_MODULES self.modules = BUGREPORT_MODULES
self.__format: str = "" self.bugreport_format: str = ""
self.__zip: Optional[ZipFile] = None self.bugreport_archive: Optional[ZipFile] = None
self.__files: List[str] = [] self.bugreport_files: List[str] = []
def from_dir(self, dir_path: str) -> None:
"""This method is used to initialize the bug report analysis from an
uncompressed directory.
"""
self.__format = "dir"
self.target_path = dir_path
parent_path = Path(dir_path).absolute().as_posix()
for root, _, subfiles in os.walk(os.path.abspath(dir_path)):
for file_name in subfiles:
file_path = os.path.relpath(os.path.join(root, file_name), parent_path)
self.__files.append(file_path)
def from_zip(self, bugreport_zip: ZipFile) -> None:
"""This method is used to initialize the bug report analysis from a
compressed archive.
"""
# NOTE: This will be invoked either by the CLI directly,or by the
# check-androidqf command. We need this because we want to support
# check-androidqf to analyse compressed archives itself too.
# So, we'll need to extract bugreport.zip from a 'androidqf.zip', and
# since nothing is written on disk, we need to be able to pass this
# command a ZipFile instance in memory.
self.__format = "zip"
self.__zip = bugreport_zip
for file_name in self.__zip.namelist():
self.__files.append(file_name)
def init(self) -> None: def init(self) -> None:
if not self.target_path: if not self.target_path:
return return
if os.path.isfile(self.target_path): if os.path.isfile(self.target_path):
self.from_zip(ZipFile(self.target_path)) self.bugreport_format = "zip"
self.bugreport_archive = ZipFile(self.target_path)
for file_name in self.bugreport_archive.namelist():
self.bugreport_files.append(file_name)
elif os.path.isdir(self.target_path): elif os.path.isdir(self.target_path):
self.from_dir(self.target_path) self.bugreport_format = "dir"
parent_path = Path(self.target_path).absolute().as_posix()
for root, _, subfiles in os.walk(os.path.abspath(self.target_path)):
for file_name in subfiles:
file_path = os.path.relpath(
os.path.join(root, file_name), parent_path
)
self.bugreport_files.append(file_path)
def module_init(self, module: BugReportModule) -> None: # type: ignore[override] def module_init(self, module: BugReportModule) -> None: # type: ignore[override]
if self.__format == "zip": if self.bugreport_format == "zip":
module.from_zip(self.__zip, self.__files) module.from_zip(self.bugreport_archive, self.bugreport_files)
else: else:
module.from_dir(self.target_path, self.__files) module.from_folder(self.target_path, self.bugreport_files)
def finish(self) -> None: def finish(self) -> None:
if self.__zip: if self.bugreport_archive:
self.__zip.close() self.bugreport_archive.close()

View File

@@ -4,7 +4,15 @@
# https://license.mvt.re/1.1/ # https://license.mvt.re/1.1/
from .chrome_history import ChromeHistory from .chrome_history import ChromeHistory
from .dumpsys_accessibility import DumpsysAccessibility
from .dumpsys_activities import DumpsysActivities
from .dumpsys_appops import DumpsysAppOps
from .dumpsys_battery_daily import DumpsysBatteryDaily
from .dumpsys_battery_history import DumpsysBatteryHistory
from .dumpsys_dbinfo import DumpsysDBInfo
from .dumpsys_adbstate import DumpsysADBState
from .dumpsys_full import DumpsysFull from .dumpsys_full import DumpsysFull
from .dumpsys_receivers import DumpsysReceivers
from .files import Files from .files import Files
from .getprop import Getprop from .getprop import Getprop
from .logcat import Logcat from .logcat import Logcat
@@ -24,7 +32,15 @@ ADB_MODULES = [
Getprop, Getprop,
Settings, Settings,
SELinuxStatus, SELinuxStatus,
DumpsysBatteryHistory,
DumpsysBatteryDaily,
DumpsysReceivers,
DumpsysActivities,
DumpsysAccessibility,
DumpsysDBInfo,
DumpsysADBState,
DumpsysFull, DumpsysFull,
DumpsysAppOps,
Packages, Packages,
Logcat, Logcat,
RootBinaries, RootBinaries,

View File

@@ -0,0 +1,49 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_accessibility import DumpsysAccessibilityArtifact
from .base import AndroidExtraction
class DumpsysAccessibility(DumpsysAccessibilityArtifact, AndroidExtraction):
"""This module extracts stats on accessibility."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys accessibility")
self._adb_disconnect()
self.parse(output)
for result in self.results:
self.log.info(
'Found installed accessibility service "%s"', result.get("service")
)
self.log.info(
"Identified a total of %d accessibility services", len(self.results)
)

View File

@@ -0,0 +1,45 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_package_activities import (
DumpsysPackageActivitiesArtifact,
)
from .base import AndroidExtraction
class DumpsysActivities(DumpsysPackageActivitiesArtifact, AndroidExtraction):
"""This module extracts details on receivers for risky activities."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
self.results = results if results else []
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys package")
self._adb_disconnect()
self.parse(output)
self.log.info("Extracted %d package activities", len(self.results))

View File

@@ -0,0 +1,45 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_adb import DumpsysADBArtifact
from .base import AndroidExtraction
class DumpsysADBState(DumpsysADBArtifact, AndroidExtraction):
"""This module extracts ADB keystore state."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys adb", decode=False)
self._adb_disconnect()
self.parse(output)
if self.results:
self.log.info(
"Identified a total of %d trusted ADB keys",
len(self.results[0].get("user_keys", [])),
)

View File

@@ -0,0 +1,46 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_appops import DumpsysAppopsArtifact
from .base import AndroidExtraction
class DumpsysAppOps(DumpsysAppopsArtifact, AndroidExtraction):
"""This module extracts records from App-op Manager."""
slug = "dumpsys_appops"
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys appops")
self._adb_disconnect()
self.parse(output)
self.log.info(
"Extracted a total of %d records from app-ops manager", len(self.results)
)

View File

@@ -0,0 +1,44 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_battery_daily import DumpsysBatteryDailyArtifact
from .base import AndroidExtraction
class DumpsysBatteryDaily(DumpsysBatteryDailyArtifact, AndroidExtraction):
"""This module extracts records from battery daily updates."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys batterystats --daily")
self._adb_disconnect()
self.parse(output)
self.log.info(
"Extracted %d records from battery daily stats", len(self.results)
)

View File

@@ -0,0 +1,42 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_battery_history import DumpsysBatteryHistoryArtifact
from .base import AndroidExtraction
class DumpsysBatteryHistory(DumpsysBatteryHistoryArtifact, AndroidExtraction):
"""This module extracts records from battery history events."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys batterystats --history")
self._adb_disconnect()
self.parse(output)
self.log.info("Extracted %d records from battery history", len(self.results))

View File

@@ -0,0 +1,47 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_dbinfo import DumpsysDBInfoArtifact
from .base import AndroidExtraction
class DumpsysDBInfo(DumpsysDBInfoArtifact, AndroidExtraction):
"""This module extracts records from battery daily updates."""
slug = "dumpsys_dbinfo"
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys dbinfo")
self._adb_disconnect()
self.parse(output)
self.log.info(
"Extracted a total of %d records from database information",
len(self.results),
)

View File

@@ -0,0 +1,44 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_receivers import DumpsysReceiversArtifact
from .base import AndroidExtraction
class DumpsysReceivers(DumpsysReceiversArtifact, AndroidExtraction):
"""This module extracts details on receivers for risky activities."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
self.results = results if results else {}
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys package")
self.parse(output)
self._adb_disconnect()
self.log.info("Extracted receivers for %d intents", len(self.results))

View File

@@ -107,7 +107,8 @@ class Packages(AndroidExtraction):
result["matched_indicator"] = ioc result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
def check_virustotal(self, packages: list) -> None: @staticmethod
def check_virustotal(packages: list) -> None:
hashes = [] hashes = []
for package in packages: for package in packages:
for file in package.get("files", []): for file in package.get("files", []):
@@ -142,15 +143,8 @@ class Packages(AndroidExtraction):
for package in packages: for package in packages:
for file in package.get("files", []): for file in package.get("files", []):
if "package_name" in package: row = [package["package_name"], file["path"]]
row = [package["package_name"], file["path"]]
elif "name" in package:
row = [package["name"], file["path"]]
else:
self.log.error(
f"Package {package} has no name or package_name. packages.json or apks.json is malformed"
)
continue
if file["sha256"] in detections: if file["sha256"] in detections:
detection = detections[file["sha256"]] detection = detections[file["sha256"]]
positives = detection.split("/")[0] positives = detection.split("/")[0]

View File

@@ -3,22 +3,38 @@
# Use of this software is governed by the MVT License 1.1 that can be found at # Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/ # https://license.mvt.re/1.1/
from .aqf_files import AQFFiles from .dumpsys_accessibility import DumpsysAccessibility
from .aqf_getprop import AQFGetProp from .dumpsys_activities import DumpsysActivities
from .aqf_packages import AQFPackages from .dumpsys_appops import DumpsysAppops
from .aqf_processes import AQFProcesses from .dumpsys_battery_daily import DumpsysBatteryDaily
from .aqf_settings import AQFSettings from .dumpsys_battery_history import DumpsysBatteryHistory
from .mounts import Mounts from .dumpsys_dbinfo import DumpsysDBInfo
from .root_binaries import RootBinaries from .dumpsys_packages import DumpsysPackages
from .dumpsys_receivers import DumpsysReceivers
from .dumpsys_adb import DumpsysADBState
from .getprop import Getprop
from .packages import Packages
from .dumpsys_platform_compat import DumpsysPlatformCompat
from .processes import Processes
from .settings import Settings
from .sms import SMS from .sms import SMS
from .files import Files
ANDROIDQF_MODULES = [ ANDROIDQF_MODULES = [
AQFPackages, DumpsysActivities,
AQFProcesses, DumpsysReceivers,
AQFGetProp, DumpsysAccessibility,
AQFSettings, DumpsysAppops,
AQFFiles, DumpsysDBInfo,
DumpsysBatteryDaily,
DumpsysBatteryHistory,
DumpsysADBState,
Packages,
DumpsysPlatformCompat,
Processes,
Getprop,
Settings,
SMS, SMS,
RootBinaries, DumpsysPackages,
Mounts, Files,
] ]

View File

@@ -37,11 +37,11 @@ class AndroidQFModule(MVTModule):
self.files: List[str] = [] self.files: List[str] = []
self.archive: Optional[zipfile.ZipFile] = None self.archive: Optional[zipfile.ZipFile] = None
def from_dir(self, parent_path: str, files: List[str]) -> None: def from_folder(self, parent_path: str, files: List[str]):
self.parent_path = parent_path self.parent_path = parent_path
self.files = files self.files = files
def from_zip(self, archive: zipfile.ZipFile, files: List[str]) -> None: def from_zip_file(self, archive: zipfile.ZipFile, files: List[str]):
self.archive = archive self.archive = archive
self.files = files self.files = files

View File

@@ -0,0 +1,51 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_accessibility import DumpsysAccessibilityArtifact
from .base import AndroidQFModule
class DumpsysAccessibility(DumpsysAccessibilityArtifact, AndroidQFModule):
"""This module analyses dumpsys accessibility"""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
data = self._get_file_content(dumpsys_file[0]).decode("utf-8", errors="replace")
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE accessibility:")
self.parse(content)
for result in self.results:
self.log.info(
'Found installed accessibility service "%s"', result.get("service")
)
self.log.info(
"Identified a total of %d accessibility services", len(self.results)
)

View File

@@ -0,0 +1,50 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_package_activities import (
DumpsysPackageActivitiesArtifact,
)
from .base import AndroidQFModule
class DumpsysActivities(DumpsysPackageActivitiesArtifact, AndroidQFModule):
"""This module extracts details on receivers for risky activities."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
self.results = results if results else []
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
# Get data and extract the dumpsys section
data = self._get_file_content(dumpsys_file[0]).decode("utf-8", errors="replace")
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE package:")
# Parse it
self.parse(content)
self.log.info("Extracted %d package activities", len(self.results))

View File

@@ -0,0 +1,51 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_adb import DumpsysADBArtifact
from .base import AndroidQFModule
class DumpsysADBState(DumpsysADBArtifact, AndroidQFModule):
"""This module extracts ADB keystore state."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
full_dumpsys = self._get_file_content(dumpsys_file[0])
content = self.extract_dumpsys_section(
full_dumpsys,
b"DUMP OF SERVICE adb:",
binary=True,
)
self.parse(content)
if self.results:
self.log.info(
"Identified a total of %d trusted ADB keys",
len(self.results[0].get("user_keys", [])),
)

View File

@@ -0,0 +1,46 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_appops import DumpsysAppopsArtifact
from .base import AndroidQFModule
class DumpsysAppops(DumpsysAppopsArtifact, AndroidQFModule):
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
# Extract section
data = self._get_file_content(dumpsys_file[0])
section = self.extract_dumpsys_section(
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE appops:"
)
# Parse it
self.parse(section)
self.log.info("Identified %d applications in AppOps Manager", len(self.results))

View File

@@ -0,0 +1,46 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_battery_daily import DumpsysBatteryDailyArtifact
from .base import AndroidQFModule
class DumpsysBatteryDaily(DumpsysBatteryDailyArtifact, AndroidQFModule):
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
# Extract section
data = self._get_file_content(dumpsys_file[0])
section = self.extract_dumpsys_section(
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE batterystats:"
)
# Parse it
self.parse(section)
self.log.info("Extracted a total of %d battery daily stats", len(self.results))

View File

@@ -0,0 +1,46 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_battery_history import DumpsysBatteryHistoryArtifact
from .base import AndroidQFModule
class DumpsysBatteryHistory(DumpsysBatteryHistoryArtifact, AndroidQFModule):
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
# Extract section
data = self._get_file_content(dumpsys_file[0])
section = self.extract_dumpsys_section(
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE batterystats:"
)
# Parse it
self.parse(section)
self.log.info("Extracted a total of %d battery daily stats", len(self.results))

View File

@@ -0,0 +1,46 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_dbinfo import DumpsysDBInfoArtifact
from .base import AndroidQFModule
class DumpsysDBInfo(DumpsysDBInfoArtifact, AndroidQFModule):
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
# Extract dumpsys DBInfo section
data = self._get_file_content(dumpsys_file[0])
section = self.extract_dumpsys_section(
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE dbinfo:"
)
# Parse it
self.parse(section)
self.log.info("Identified %d DB Info entries", len(self.results))

View File

@@ -0,0 +1,62 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Any, Dict, List, Optional
from mvt.android.artifacts.dumpsys_packages import DumpsysPackagesArtifact
from mvt.android.modules.adb.packages import (
DANGEROUS_PERMISSIONS,
DANGEROUS_PERMISSIONS_THRESHOLD,
)
from .base import AndroidQFModule
class DumpsysPackages(DumpsysPackagesArtifact, AndroidQFModule):
"""This module analyse dumpsys packages"""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[List[Dict[str, Any]]] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if len(dumpsys_file) != 1:
self.log.info("Dumpsys file not found")
return
data = self._get_file_content(dumpsys_file[0]).decode("utf-8", errors="replace")
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE package:")
self.parse(content)
for result in self.results:
dangerous_permissions_count = 0
for perm in result["permissions"]:
if perm["name"] in DANGEROUS_PERMISSIONS:
dangerous_permissions_count += 1
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
self.log.info(
'Found package "%s" requested %d potentially dangerous permissions',
result["package_name"],
dangerous_permissions_count,
)
self.log.info("Extracted details on %d packages", len(self.results))

View File

@@ -0,0 +1,44 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_platform_compat import DumpsysPlatformCompatArtifact
from .base import AndroidQFModule
class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, AndroidQFModule):
"""This module extracts details on uninstalled apps."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
data = self._get_file_content(dumpsys_file[0]).decode("utf-8", errors="replace")
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE platform_compat:")
self.parse(content)
self.log.info("Found %d uninstalled apps", len(self.results))

View File

@@ -0,0 +1,49 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Any, Dict, List, Optional, Union
from mvt.android.artifacts.dumpsys_receivers import DumpsysReceiversArtifact
from .base import AndroidQFModule
class DumpsysReceivers(DumpsysReceiversArtifact, AndroidQFModule):
"""This module analyse dumpsys receivers"""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Union[List[Any], Dict[str, Any], None] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
self.results = results if results else {}
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
data = self._get_file_content(dumpsys_file[0])
dumpsys_section = self.extract_dumpsys_section(
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE package:"
)
self.parse(dumpsys_section)
self.log.info("Extracted receivers for %d intents", len(self.results))

Some files were not shown because too many files have changed in this diff Show More