1
mirror of https://github.com/mvt-project/mvt synced 2025-10-21 22:42:15 +02:00

Compare commits

..

1 Commits

Author SHA1 Message Date
Donncha Ó Cearbhaill
95b2f04db6 WIP for Triangulation post-processing module 2023-06-28 21:46:18 +02:00
150 changed files with 2472 additions and 3914 deletions

View File

@@ -1,11 +0,0 @@
name: Black
on: [push]
jobs:
black:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: psf/black@stable
with:
options: "--check"

View File

@@ -2,8 +2,8 @@
Python script to download the Apple RSS feed and parse it. Python script to download the Apple RSS feed and parse it.
""" """
import json
import os import os
import json
import urllib.request import urllib.request
from xml.dom.minidom import parseString from xml.dom.minidom import parseString
@@ -12,7 +12,7 @@ from packaging import version
def download_apple_rss(feed_url): def download_apple_rss(feed_url):
with urllib.request.urlopen(feed_url) as f: with urllib.request.urlopen(feed_url) as f:
rss_feed = f.read().decode("utf-8") rss_feed = f.read().decode('utf-8')
print("Downloaded RSS feed from Apple.") print("Downloaded RSS feed from Apple.")
return rss_feed return rss_feed
@@ -27,10 +27,7 @@ def parse_latest_ios_versions(rss_feed_text):
continue continue
import re import re
build_match = re.match(r"iOS (?P<version>[\d\.]+) (?P<beta>beta )?(\S*)?\((?P<build>.*)\)", title)
build_match = re.match(
r"iOS (?P<version>[\d\.]+) (?P<beta>beta )?(\S*)?\((?P<build>.*)\)", title
)
if not build_match: if not build_match:
print("Could not parse iOS build:", title) print("Could not parse iOS build:", title)
continue continue
@@ -65,22 +62,16 @@ def update_mvt(mvt_checkout_path, latest_ios_versions):
print("No new iOS versions found.") print("No new iOS versions found.")
else: else:
print("Found {} new iOS versions.".format(new_entry_count)) print("Found {} new iOS versions.".format(new_entry_count))
new_version_list = sorted( new_version_list = sorted(current_versions, key=lambda x: version.Version(x["version"]))
current_versions, key=lambda x: version.Version(x["version"])
)
with open(version_path, "w") as version_file: with open(version_path, "w") as version_file:
json.dump(new_version_list, version_file, indent=4) json.dump(new_version_list, version_file, indent=4)
def main(): def main():
print("Downloading RSS feed...") print("Downloading RSS feed...")
mvt_checkout_path = os.path.abspath( mvt_checkout_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))
os.path.join(os.path.dirname(__file__), "../../../")
)
rss_feed = download_apple_rss( rss_feed = download_apple_rss("https://developer.apple.com/news/releases/rss/releases.rss")
"https://developer.apple.com/news/releases/rss/releases.rss"
)
latest_ios_version = parse_latest_ios_versions(rss_feed) latest_ios_version = parse_latest_ios_versions(rss_feed)
update_mvt(mvt_checkout_path, latest_ios_version) update_mvt(mvt_checkout_path, latest_ios_version)

View File

@@ -4,7 +4,6 @@ check:
flake8 flake8
pytest -q pytest -q
ruff check -q . ruff check -q .
black --check .
clean: clean:
rm -rf $(PWD)/build $(PWD)/dist $(PWD)/mvt.egg-info rm -rf $(PWD)/build $(PWD)/dist $(PWD)/mvt.egg-info

View File

@@ -11,24 +11,10 @@
Mobile Verification Toolkit (MVT) is a collection of utilities to simplify and automate the process of gathering forensic traces helpful to identify a potential compromise of Android and iOS devices. Mobile Verification Toolkit (MVT) is a collection of utilities to simplify and automate the process of gathering forensic traces helpful to identify a potential compromise of Android and iOS devices.
It has been developed and released by the [Amnesty International Security Lab](https://www.amnesty.org/en/tech/) in July 2021 in the context of the [Pegasus Project](https://forbiddenstories.org/about-the-pegasus-project/) along with [a technical forensic methodology](https://www.amnesty.org/en/latest/research/2021/07/forensic-methodology-report-how-to-catch-nso-groups-pegasus/). It continues to be maintained by Amnesty International and other contributors. It has been developed and released by the [Amnesty International Security Lab](https://www.amnesty.org/en/tech/) in July 2021 in the context of the [Pegasus project](https://forbiddenstories.org/about-the-pegasus-project/) along with [a technical forensic methodology and forensic evidence](https://www.amnesty.org/en/latest/research/2021/07/forensic-methodology-report-how-to-catch-nso-groups-pegasus/).
> **Note** *Warning*: MVT is a forensic research tool intended for technologists and investigators. Using it requires understanding the basics of forensic analysis and using command-line tools. This is not intended for end-user self-assessment. If you are concerned with the security of your device please seek expert assistance.
> MVT is a forensic research tool intended for technologists and investigators. It requires understanding digital forensics and using command-line tools. This is not intended for end-user self-assessment. If you are concerned with the security of your device please seek reputable expert assistance.
>
### Indicators of Compromise
MVT supports using public [indicators of compromise (IOCs)](https://github.com/mvt-project/mvt-indicators) to scan mobile devices for potential traces of targeting or infection by known spyware campaigns. This includes IOCs published by [Amnesty International](https://github.com/AmnestyTech/investigations/) and other research groups.
> **Warning**
> Public indicators of compromise are insufficient to determine that a device is "clean", and not targeted with a particular spyware tool. Reliance on public indicators alone can miss recent forensic traces and give a false sense of security.
>
> Reliable and comprehensive digital forensic support and triage requires access to non-public indicators, research and threat intelligence.
>
>Such support is available to civil society through [Amnesty International's Security Lab](https://www.amnesty.org/en/tech/) or through our forensic partnership with [Access Nows Digital Security Helpline](https://www.accessnow.org/help/).
More information about using indicators of compromise with MVT is available in the [documentation](https://docs.mvt.re/en/latest/iocs/).
## Installation ## Installation

View File

@@ -1,27 +0,0 @@
# Development
The Mobile Verification Toolkit team welcomes contributions of new forensic modules or other contributions which help improve the software.
## Testing
MVT uses `pytest` for unit and integration tests. Code style consistency is maintained with `flake8`, `ruff` and `black`. All can
be run automatically with:
```bash
make check
```
Run these tests before making new commits or opening pull requests.
## Profiling
Some MVT modules extract and process significant amounts of data during the analysis process or while checking results against known indicators. Care must be
take to avoid inefficient code paths as we add new modules.
MVT modules can be profiled with Python built-in `cProfile` by setting the `MVT_PROFILE` environment variable.
```bash
MVT_PROFILE=1 dev/mvt-ios check-backup test_backup
```
Open an issue or PR if you are encountering significant performance issues when analyzing a device with MVT.

View File

@@ -43,6 +43,6 @@ export MVT_STIX2="/home/user/IOC1.stix2:/home/user/IOC2.stix2"
- [This repository](https://github.com/Te-k/stalkerware-indicators) contains IOCs for Android stalkerware including [a STIX MVT-compatible file](https://raw.githubusercontent.com/Te-k/stalkerware-indicators/master/generated/stalkerware.stix2). - [This repository](https://github.com/Te-k/stalkerware-indicators) contains IOCs for Android stalkerware including [a STIX MVT-compatible file](https://raw.githubusercontent.com/Te-k/stalkerware-indicators/master/generated/stalkerware.stix2).
- We are also maintaining [a list of IOCs](https://github.com/mvt-project/mvt-indicators) in STIX format from public spyware campaigns. - We are also maintaining [a list of IOCs](https://github.com/mvt-project/mvt-indicators) in STIX format from public spyware campaigns.
You can automaticallly download the latest public indicator files with the command `mvt-ios download-iocs` or `mvt-android download-iocs`. These commands download the list of indicators from the [mvt-indicators](https://github.com/mvt-project/mvt-indicators/blob/main/indicators.yaml) repository and store them in the [appdir](https://pypi.org/project/appdirs/) folder. They are then loaded automatically by MVT. You can automaticallly download the latest public indicator files with the command `mvt-ios download-iocs` or `mvt-android download-iocs`. These commands download the list of indicators listed [here](https://github.com/mvt-project/mvt/blob/main/public_indicators.json) and store them in the [appdir](https://pypi.org/project/appdirs/) folder. They are then loaded automatically by MVT.
Please [open an issue](https://github.com/mvt-project/mvt/issues/) to suggest new sources of STIX-formatted IOCs. Please [open an issue](https://github.com/mvt-project/mvt/issues/) to suggest new sources of STIX-formatted IOCs.

View File

@@ -1,7 +1,7 @@
site_name: Mobile Verification Toolkit site_name: Mobile Verification Toolkit
repo_url: https://github.com/mvt-project/mvt repo_url: https://github.com/mvt-project/mvt
edit_uri: edit/main/docs/ edit_uri: edit/main/docs/
copyright: Copyright &copy; 2021-2023 MVT Project Developers copyright: Copyright &copy; 2021-2022 MVT Project Developers
site_description: Mobile Verification Toolkit Documentation site_description: Mobile Verification Toolkit Documentation
markdown_extensions: markdown_extensions:
- attr_list - attr_list
@@ -46,5 +46,4 @@ nav:
- Check an Android Backup (SMS messages): "android/backup.md" - Check an Android Backup (SMS messages): "android/backup.md"
- Download APKs: "android/download_apks.md" - Download APKs: "android/download_apks.md"
- Indicators of Compromise: "iocs.md" - Indicators of Compromise: "iocs.md"
- Development: "development.md"
- License: "license.md" - License: "license.md"

File diff suppressed because it is too large Load Diff

View File

@@ -14,6 +14,7 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckADB(Command): class CmdAndroidCheckADB(Command):
def __init__( def __init__(
self, self,
target_path: Optional[str] = None, target_path: Optional[str] = None,
@@ -21,17 +22,11 @@ class CmdAndroidCheckADB(Command):
ioc_files: Optional[list] = None, ioc_files: Optional[list] = None,
module_name: Optional[str] = None, module_name: Optional[str] = None,
serial: Optional[str] = None, serial: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
) -> None: ) -> None:
super().__init__( super().__init__(target_path=target_path, results_path=results_path,
target_path=target_path, ioc_files=ioc_files, module_name=module_name,
results_path=results_path, serial=serial, fast_mode=fast_mode, log=log)
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
fast_mode=fast_mode,
log=log,
)
self.name = "check-adb" self.name = "check-adb"
self.modules = ADB_MODULES self.modules = ADB_MODULES

View File

@@ -14,6 +14,7 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckAndroidQF(Command): class CmdAndroidCheckAndroidQF(Command):
def __init__( def __init__(
self, self,
target_path: Optional[str] = None, target_path: Optional[str] = None,
@@ -21,19 +22,13 @@ class CmdAndroidCheckAndroidQF(Command):
ioc_files: Optional[list] = None, ioc_files: Optional[list] = None,
module_name: Optional[str] = None, module_name: Optional[str] = None,
serial: Optional[str] = None, serial: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
hashes: bool = False, hashes: Optional[bool] = False,
) -> None: ) -> None:
super().__init__( super().__init__(target_path=target_path, results_path=results_path,
target_path=target_path, ioc_files=ioc_files, module_name=module_name,
results_path=results_path, serial=serial, fast_mode=fast_mode, hashes=hashes,
ioc_files=ioc_files, log=log)
module_name=module_name,
serial=serial,
fast_mode=fast_mode,
hashes=hashes,
log=log,
)
self.name = "check-androidqf" self.name = "check-androidqf"
self.modules = ANDROIDQF_MODULES self.modules = ANDROIDQF_MODULES

View File

@@ -14,12 +14,9 @@ from typing import List, Optional
from rich.prompt import Prompt from rich.prompt import Prompt
from mvt.android.modules.backup.base import BackupExtraction from mvt.android.modules.backup.base import BackupExtraction
from mvt.android.parsers.backup import ( from mvt.android.parsers.backup import (AndroidBackupParsingError,
AndroidBackupParsingError, InvalidBackupPassword, parse_ab_header,
InvalidBackupPassword, parse_backup_file)
parse_ab_header,
parse_backup_file,
)
from mvt.common.command import Command from mvt.common.command import Command
from .modules.backup import BACKUP_MODULES from .modules.backup import BACKUP_MODULES
@@ -28,6 +25,7 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckBackup(Command): class CmdAndroidCheckBackup(Command):
def __init__( def __init__(
self, self,
target_path: Optional[str] = None, target_path: Optional[str] = None,
@@ -35,19 +33,13 @@ class CmdAndroidCheckBackup(Command):
ioc_files: Optional[list] = None, ioc_files: Optional[list] = None,
module_name: Optional[str] = None, module_name: Optional[str] = None,
serial: Optional[str] = None, serial: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
hashes: bool = False, hashes: Optional[bool] = False,
) -> None: ) -> None:
super().__init__( super().__init__(target_path=target_path, results_path=results_path,
target_path=target_path, ioc_files=ioc_files, module_name=module_name,
results_path=results_path, serial=serial, fast_mode=fast_mode, hashes=hashes,
ioc_files=ioc_files, log=log)
module_name=module_name,
serial=serial,
fast_mode=fast_mode,
hashes=hashes,
log=log,
)
self.name = "check-backup" self.name = "check-backup"
self.modules = BACKUP_MODULES self.modules = BACKUP_MODULES
@@ -93,18 +85,16 @@ class CmdAndroidCheckBackup(Command):
self.target_path = Path(self.target_path).absolute().as_posix() self.target_path = Path(self.target_path).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)): for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
for fname in subfiles: for fname in subfiles:
self.backup_files.append( self.backup_files.append(os.path.relpath(os.path.join(root, fname),
os.path.relpath(os.path.join(root, fname), self.target_path) self.target_path))
)
else: else:
log.critical( log.critical("Invalid backup path, path should be a folder or an "
"Invalid backup path, path should be a folder or an " "Android Backup (.ab) file")
"Android Backup (.ab) file"
)
sys.exit(1) sys.exit(1)
def module_init(self, module: BackupExtraction) -> None: # type: ignore[override] def module_init(self, module: BackupExtraction) -> None: # type: ignore[override]
if self.backup_type == "folder": if self.backup_type == "folder":
module.from_folder(self.target_path, self.backup_files) module.from_folder(self.target_path, self.backup_files)
else: else:
module.from_ab(self.target_path, self.backup_archive, self.backup_files) module.from_ab(self.target_path, self.backup_archive,
self.backup_files)

View File

@@ -18,6 +18,7 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckBugreport(Command): class CmdAndroidCheckBugreport(Command):
def __init__( def __init__(
self, self,
target_path: Optional[str] = None, target_path: Optional[str] = None,
@@ -25,19 +26,13 @@ class CmdAndroidCheckBugreport(Command):
ioc_files: Optional[list] = None, ioc_files: Optional[list] = None,
module_name: Optional[str] = None, module_name: Optional[str] = None,
serial: Optional[str] = None, serial: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
hashes: bool = False, hashes: Optional[bool] = False,
) -> None: ) -> None:
super().__init__( super().__init__(target_path=target_path, results_path=results_path,
target_path=target_path, ioc_files=ioc_files, module_name=module_name,
results_path=results_path, serial=serial, fast_mode=fast_mode, hashes=hashes,
ioc_files=ioc_files, log=log)
module_name=module_name,
serial=serial,
fast_mode=fast_mode,
hashes=hashes,
log=log,
)
self.name = "check-bugreport" self.name = "check-bugreport"
self.modules = BUGREPORT_MODULES self.modules = BUGREPORT_MODULES
@@ -60,9 +55,8 @@ class CmdAndroidCheckBugreport(Command):
parent_path = Path(self.target_path).absolute().as_posix() parent_path = Path(self.target_path).absolute().as_posix()
for root, _, subfiles in os.walk(os.path.abspath(self.target_path)): for root, _, subfiles in os.walk(os.path.abspath(self.target_path)):
for file_name in subfiles: for file_name in subfiles:
file_path = os.path.relpath( file_path = os.path.relpath(os.path.join(root, file_name),
os.path.join(root, file_name), parent_path parent_path)
)
self.bugreport_files.append(file_path) self.bugreport_files.append(file_path)
def module_init(self, module: BugReportModule) -> None: # type: ignore[override] def module_init(self, module: BugReportModule) -> None: # type: ignore[override]

View File

@@ -26,7 +26,7 @@ class DownloadAPKs(AndroidExtraction):
def __init__( def __init__(
self, self,
results_path: Optional[str] = None, results_path: Optional[str] = None,
all_apks: bool = False, all_apks: Optional[bool] = False,
packages: Optional[list] = None, packages: Optional[list] = None,
) -> None: ) -> None:
"""Initialize module. """Initialize module.
@@ -66,31 +66,27 @@ class DownloadAPKs(AndroidExtraction):
if "==/" in remote_path: if "==/" in remote_path:
file_name = "_" + remote_path.split("==/")[1].replace(".apk", "") file_name = "_" + remote_path.split("==/")[1].replace(".apk", "")
local_path = os.path.join( local_path = os.path.join(self.results_path_apks,
self.results_path_apks, f"{package_name}{file_name}.apk" f"{package_name}{file_name}.apk")
)
name_counter = 0 name_counter = 0
while True: while True:
if not os.path.exists(local_path): if not os.path.exists(local_path):
break break
name_counter += 1 name_counter += 1
local_path = os.path.join( local_path = os.path.join(self.results_path_apks,
self.results_path_apks, f"{package_name}{file_name}_{name_counter}.apk" f"{package_name}{file_name}_{name_counter}.apk")
)
try: try:
self._adb_download(remote_path, local_path) self._adb_download(remote_path, local_path)
except InsufficientPrivileges: except InsufficientPrivileges:
log.error( log.error("Unable to pull package file from %s: insufficient privileges, "
"Unable to pull package file from %s: insufficient privileges, " "it might be a system app", remote_path)
"it might be a system app",
remote_path,
)
self._adb_reconnect() self._adb_reconnect()
return None return None
except Exception as exc: except Exception as exc:
log.exception("Failed to pull package file from %s: %s", remote_path, exc) log.exception("Failed to pull package file from %s: %s",
remote_path, exc)
self._adb_reconnect() self._adb_reconnect()
return None return None
@@ -110,10 +106,10 @@ class DownloadAPKs(AndroidExtraction):
self.packages = m.results self.packages = m.results
def pull_packages(self) -> None: def pull_packages(self) -> None:
"""Download all files of all selected packages from the device.""" """Download all files of all selected packages from the device.
log.info( """
"Starting extraction of installed APKs at folder %s", self.results_path log.info("Starting extraction of installed APKs at folder %s",
) self.results_path)
# If the user provided the flag --all-apks we select all packages. # If the user provided the flag --all-apks we select all packages.
packages_selection = [] packages_selection = []
@@ -127,10 +123,8 @@ class DownloadAPKs(AndroidExtraction):
if not package.get("system", False): if not package.get("system", False):
packages_selection.append(package) packages_selection.append(package)
log.info( log.info("Selected only %d packages which are not marked as \"system\"",
'Selected only %d packages which are not marked as "system"', len(packages_selection))
len(packages_selection),
)
if len(packages_selection) == 0: if len(packages_selection) == 0:
log.info("No packages were selected for download") log.info("No packages were selected for download")
@@ -142,26 +136,19 @@ class DownloadAPKs(AndroidExtraction):
if not os.path.exists(self.results_path_apks): if not os.path.exists(self.results_path_apks):
os.makedirs(self.results_path_apks, exist_ok=True) os.makedirs(self.results_path_apks, exist_ok=True)
for i in track( for i in track(range(len(packages_selection)),
range(len(packages_selection)), description=f"Downloading {len(packages_selection)} packages..."):
description=f"Downloading {len(packages_selection)} packages...",
):
package = packages_selection[i] package = packages_selection[i]
log.info( log.info("[%d/%d] Package: %s", i, len(packages_selection),
"[%d/%d] Package: %s", package["package_name"])
i,
len(packages_selection),
package["package_name"],
)
# Sometimes the package path contains multiple lines for multiple # Sometimes the package path contains multiple lines for multiple
# apks. We loop through each line and download each file. # apks. We loop through each line and download each file.
for package_file in package["files"]: for package_file in package["files"]:
device_path = package_file["path"] device_path = package_file["path"]
local_path = self.pull_package_file( local_path = self.pull_package_file(package["package_name"],
package["package_name"], device_path device_path)
)
if not local_path: if not local_path:
continue continue

View File

@@ -23,24 +23,8 @@ from .settings import Settings
from .sms import SMS from .sms import SMS
from .whatsapp import Whatsapp from .whatsapp import Whatsapp
ADB_MODULES = [ ADB_MODULES = [ChromeHistory, SMS, Whatsapp, Processes, Getprop, Settings,
ChromeHistory, SELinuxStatus, DumpsysBatteryHistory, DumpsysBatteryDaily,
SMS, DumpsysReceivers, DumpsysActivities, DumpsysAccessibility,
Whatsapp, DumpsysDBInfo, DumpsysFull, DumpsysAppOps, Packages, Logcat,
Processes, RootBinaries, Files]
Getprop,
Settings,
SELinuxStatus,
DumpsysBatteryHistory,
DumpsysBatteryDaily,
DumpsysReceivers,
DumpsysActivities,
DumpsysAccessibility,
DumpsysDBInfo,
DumpsysFull,
DumpsysAppOps,
Packages,
Logcat,
RootBinaries,
Files,
]

View File

@@ -16,20 +16,13 @@ from typing import Callable, Optional
from adb_shell.adb_device import AdbDeviceTcp, AdbDeviceUsb from adb_shell.adb_device import AdbDeviceTcp, AdbDeviceUsb
from adb_shell.auth.keygen import keygen, write_public_keyfile from adb_shell.auth.keygen import keygen, write_public_keyfile
from adb_shell.auth.sign_pythonrsa import PythonRSASigner from adb_shell.auth.sign_pythonrsa import PythonRSASigner
from adb_shell.exceptions import ( from adb_shell.exceptions import (AdbCommandFailureException, DeviceAuthError,
AdbCommandFailureException, UsbDeviceNotFoundError, UsbReadFailedError)
DeviceAuthError,
UsbDeviceNotFoundError,
UsbReadFailedError,
)
from rich.prompt import Prompt from rich.prompt import Prompt
from usb1 import USBErrorAccess, USBErrorBusy from usb1 import USBErrorAccess, USBErrorBusy
from mvt.android.parsers.backup import ( from mvt.android.parsers.backup import (InvalidBackupPassword, parse_ab_header,
InvalidBackupPassword, parse_backup_file)
parse_ab_header,
parse_backup_file,
)
from mvt.common.module import InsufficientPrivileges, MVTModule from mvt.common.module import InsufficientPrivileges, MVTModule
ADB_KEY_PATH = os.path.expanduser("~/.android/adbkey") ADB_KEY_PATH = os.path.expanduser("~/.android/adbkey")
@@ -44,18 +37,13 @@ class AndroidExtraction(MVTModule):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.device = None self.device = None
self.serial = None self.serial = None
@@ -90,49 +78,36 @@ class AndroidExtraction(MVTModule):
try: try:
self.device = AdbDeviceUsb(serial=self.serial) self.device = AdbDeviceUsb(serial=self.serial)
except UsbDeviceNotFoundError: except UsbDeviceNotFoundError:
self.log.critical( self.log.critical("No device found. Make sure it is connected and unlocked.")
"No device found. Make sure it is connected and unlocked."
)
sys.exit(-1) sys.exit(-1)
# Otherwise we try to use the TCP transport. # Otherwise we try to use the TCP transport.
else: else:
addr = self.serial.split(":") addr = self.serial.split(":")
if len(addr) < 2: if len(addr) < 2:
raise ValueError( raise ValueError("TCP serial number must follow the format: `address:port`")
"TCP serial number must follow the format: `address:port`"
)
self.device = AdbDeviceTcp( self.device = AdbDeviceTcp(addr[0], int(addr[1]),
addr[0], int(addr[1]), default_transport_timeout_s=30.0 default_transport_timeout_s=30.)
)
while True: while True:
try: try:
self.device.connect(rsa_keys=[signer], auth_timeout_s=5) self.device.connect(rsa_keys=[signer], auth_timeout_s=5)
except (USBErrorBusy, USBErrorAccess): except (USBErrorBusy, USBErrorAccess):
self.log.critical( self.log.critical("Device is busy, maybe run `adb kill-server` and try again.")
"Device is busy, maybe run `adb kill-server` and try again."
)
sys.exit(-1) sys.exit(-1)
except DeviceAuthError: except DeviceAuthError:
self.log.error( self.log.error("You need to authorize this computer on the Android device. "
"You need to authorize this computer on the Android device. " "Retrying in 5 seconds...")
"Retrying in 5 seconds..."
)
time.sleep(5) time.sleep(5)
except UsbReadFailedError: except UsbReadFailedError:
self.log.error( self.log.error("Unable to connect to the device over USB. "
"Unable to connect to the device over USB. " "Try to unplug, plug the device and start again.")
"Try to unplug, plug the device and start again."
)
sys.exit(-1) sys.exit(-1)
except OSError as exc: except OSError as exc:
if exc.errno == 113 and self.serial: if exc.errno == 113 and self.serial:
self.log.critical( self.log.critical("Unable to connect to the device %s: "
"Unable to connect to the device %s: " "did you specify the correct IP address?",
"did you specify the correct IP address?", self.serial)
self.serial,
)
sys.exit(-1) sys.exit(-1)
else: else:
break break
@@ -169,11 +144,9 @@ class AndroidExtraction(MVTModule):
def _adb_root_or_die(self) -> None: def _adb_root_or_die(self) -> None:
"""Check if we have a `su` binary, otherwise raise an Exception.""" """Check if we have a `su` binary, otherwise raise an Exception."""
if not self._adb_check_if_root(): if not self._adb_check_if_root():
raise InsufficientPrivileges( raise InsufficientPrivileges("This module is optionally available "
"This module is optionally available " "in case the device is already rooted."
"in case the device is already rooted." " Do NOT root your own device!")
" Do NOT root your own device!"
)
def _adb_command_as_root(self, command): def _adb_command_as_root(self, command):
"""Execute an adb shell command. """Execute an adb shell command.
@@ -204,7 +177,7 @@ class AndroidExtraction(MVTModule):
remote_path: str, remote_path: str,
local_path: str, local_path: str,
progress_callback: Optional[Callable] = None, progress_callback: Optional[Callable] = None,
retry_root: Optional[bool] = True, retry_root: Optional[bool] = True
) -> None: ) -> None:
"""Download a file form the device. """Download a file form the device.
@@ -219,48 +192,41 @@ class AndroidExtraction(MVTModule):
self.device.pull(remote_path, local_path, progress_callback) self.device.pull(remote_path, local_path, progress_callback)
except AdbCommandFailureException as exc: except AdbCommandFailureException as exc:
if retry_root: if retry_root:
self._adb_download_root(remote_path, local_path, progress_callback) self._adb_download_root(remote_path, local_path,
progress_callback)
else: else:
raise Exception( raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
f"Unable to download file {remote_path}: {exc}"
) from exc
def _adb_download_root( def _adb_download_root(
self, self,
remote_path: str, remote_path: str,
local_path: str, local_path: str,
progress_callback: Optional[Callable] = None, progress_callback: Optional[Callable] = None
) -> None: ) -> None:
try: try:
# Check if we have root, if not raise an Exception. # Check if we have root, if not raise an Exception.
self._adb_root_or_die() self._adb_root_or_die()
# We generate a random temporary filename. # We generate a random temporary filename.
allowed_chars = ( allowed_chars = (string.ascii_uppercase
string.ascii_uppercase + string.ascii_lowercase + string.digits + string.ascii_lowercase
) + string.digits)
tmp_filename = "tmp_" + "".join(random.choices(allowed_chars, k=10)) tmp_filename = "tmp_" + ''.join(random.choices(allowed_chars, k=10))
# We create a temporary local file. # We create a temporary local file.
new_remote_path = f"/sdcard/{tmp_filename}" new_remote_path = f"/sdcard/{tmp_filename}"
# We copy the file from the data folder to /sdcard/. # We copy the file from the data folder to /sdcard/.
cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}") cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
if ( if cp_output.startswith("cp: ") and "No such file or directory" in cp_output:
cp_output.startswith("cp: ")
and "No such file or directory" in cp_output
):
raise Exception(f"Unable to process file {remote_path}: File not found") raise Exception(f"Unable to process file {remote_path}: File not found")
if cp_output.startswith("cp: ") and "Permission denied" in cp_output: if cp_output.startswith("cp: ") and "Permission denied" in cp_output:
raise Exception( raise Exception(f"Unable to process file {remote_path}: Permission denied")
f"Unable to process file {remote_path}: Permission denied"
)
# We download from /sdcard/ to the local temporary file. # We download from /sdcard/ to the local temporary file.
# If it doesn't work now, don't try again (retry_root=False) # If it doesn't work now, don't try again (retry_root=False)
self._adb_download( self._adb_download(new_remote_path, local_path, progress_callback,
new_remote_path, local_path, progress_callback, retry_root=False retry_root=False)
)
# Delete the copy on /sdcard/. # Delete the copy on /sdcard/.
self._adb_command(f"rm -rf {new_remote_path}") self._adb_command(f"rm -rf {new_remote_path}")
@@ -268,7 +234,8 @@ class AndroidExtraction(MVTModule):
except AdbCommandFailureException as exc: except AdbCommandFailureException as exc:
raise Exception(f"Unable to download file {remote_path}: {exc}") from exc raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
def _adb_process_file(self, remote_path: str, process_routine: Callable) -> None: def _adb_process_file(self, remote_path: str,
process_routine: Callable) -> None:
"""Download a local copy of a file which is only accessible as root. """Download a local copy of a file which is only accessible as root.
This is a wrapper around process_routine. This is a wrapper around process_routine.
@@ -306,10 +273,8 @@ class AndroidExtraction(MVTModule):
self._adb_command(f"rm -f {new_remote_path}") self._adb_command(f"rm -f {new_remote_path}")
def _generate_backup(self, package_name: str) -> bytes: def _generate_backup(self, package_name: str) -> bytes:
self.log.info( self.log.info("Please check phone and accept Android backup prompt. "
"Please check phone and accept Android backup prompt. " "You may need to set a backup password. \a")
"You may need to set a backup password. \a"
)
# TODO: Base64 encoding as temporary fix to avoid byte-mangling over # TODO: Base64 encoding as temporary fix to avoid byte-mangling over
# the shell transport... # the shell transport...
@@ -319,19 +284,19 @@ class AndroidExtraction(MVTModule):
header = parse_ab_header(backup_output) header = parse_ab_header(backup_output)
if not header["backup"]: if not header["backup"]:
self.log.error( self.log.error("Extracting SMS via Android backup failed. "
"Extracting SMS via Android backup failed. " "No valid backup data found.")
"No valid backup data found."
)
return None return None
if header["encryption"] == "none": if header["encryption"] == "none":
return parse_backup_file(backup_output, password=None) return parse_backup_file(backup_output, password=None)
for _ in range(0, 3): for _ in range(0, 3):
backup_password = Prompt.ask("Enter backup password", password=True) backup_password = Prompt.ask("Enter backup password",
password=True)
try: try:
decrypted_backup_tar = parse_backup_file(backup_output, backup_password) decrypted_backup_tar = parse_backup_file(backup_output,
backup_password)
return decrypted_backup_tar return decrypted_backup_tar
except InvalidBackupPassword: except InvalidBackupPassword:
self.log.error("You provided the wrong password! Please try again...") self.log.error("You provided the wrong password! Please try again...")

View File

@@ -8,7 +8,8 @@ import os
import sqlite3 import sqlite3
from typing import Optional, Union from typing import Optional, Union
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso from mvt.common.utils import (convert_chrometime_to_datetime,
convert_datetime_to_iso)
from .base import AndroidExtraction from .base import AndroidExtraction
@@ -23,18 +24,13 @@ class ChromeHistory(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = [] self.results = []
def serialize(self, record: dict) -> Union[dict, list]: def serialize(self, record: dict) -> Union[dict, list]:
@@ -43,7 +39,7 @@ class ChromeHistory(AndroidExtraction):
"module": self.__class__.__name__, "module": self.__class__.__name__,
"event": "visit", "event": "visit",
"data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, " "data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, "
f"redirect source: {record['redirect_source']})", f"redirect source: {record['redirect_source']})"
} }
def check_indicators(self) -> None: def check_indicators(self) -> None:
@@ -63,8 +59,7 @@ class ChromeHistory(AndroidExtraction):
assert isinstance(self.results, list) # assert results type for mypy assert isinstance(self.results, list) # assert results type for mypy
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute("""
"""
SELECT SELECT
urls.id, urls.id,
urls.url, urls.url,
@@ -74,35 +69,31 @@ class ChromeHistory(AndroidExtraction):
FROM urls FROM urls
JOIN visits ON visits.url = urls.id JOIN visits ON visits.url = urls.id
ORDER BY visits.visit_time; ORDER BY visits.visit_time;
""" """)
)
for item in cur: for item in cur:
self.results.append( self.results.append({
{ "id": item[0],
"id": item[0], "url": item[1],
"url": item[1], "visit_id": item[2],
"visit_id": item[2], "timestamp": item[3],
"timestamp": item[3], "isodate": convert_datetime_to_iso(
"isodate": convert_datetime_to_iso( convert_chrometime_to_datetime(item[3])),
convert_chrometime_to_datetime(item[3]) "redirect_source": item[4],
), })
"redirect_source": item[4],
}
)
cur.close() cur.close()
conn.close() conn.close()
self.log.info("Extracted a total of %d history items", len(self.results)) self.log.info("Extracted a total of %d history items",
len(self.results))
def run(self) -> None: def run(self) -> None:
self._adb_connect() self._adb_connect()
try: try:
self._adb_process_file( self._adb_process_file(os.path.join("/", CHROME_HISTORY_PATH),
os.path.join("/", CHROME_HISTORY_PATH), self._parse_db self._parse_db)
)
except Exception as exc: except Exception as exc:
self.log.error(exc) self.log.error(exc)

View File

@@ -19,18 +19,13 @@ class DumpsysAccessibility(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None: def check_indicators(self) -> None:
if not self.indicators: if not self.indicators:
@@ -51,10 +46,8 @@ class DumpsysAccessibility(AndroidExtraction):
self.results = parse_dumpsys_accessibility(output) self.results = parse_dumpsys_accessibility(output)
for result in self.results: for result in self.results:
self.log.info( self.log.info("Found installed accessibility service \"%s\"",
'Found installed accessibility service "%s"', result.get("service") result.get("service"))
)
self.log.info( self.log.info("Identified a total of %d accessibility services",
"Identified a total of %d accessibility services", len(self.results) len(self.results))
)

View File

@@ -19,18 +19,13 @@ class DumpsysActivities(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = results if results else {} self.results = results if results else {}

View File

@@ -21,18 +21,13 @@ class DumpsysAppOps(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]: def serialize(self, record: dict) -> Union[dict, list]:
records = [] records = []
@@ -42,15 +37,13 @@ class DumpsysAppOps(AndroidExtraction):
for entry in perm["entries"]: for entry in perm["entries"]:
if "timestamp" in entry: if "timestamp" in entry:
records.append( records.append({
{ "timestamp": entry["timestamp"],
"timestamp": entry["timestamp"], "module": self.__class__.__name__,
"module": self.__class__.__name__, "event": entry["access"],
"event": entry["access"], "data": f"{record['package_name']} access to "
"data": f"{record['package_name']} access to " f"{perm['name']}: {entry['access']}",
f"{perm['name']}: {entry['access']}", })
}
)
return records return records
@@ -64,14 +57,10 @@ class DumpsysAppOps(AndroidExtraction):
continue continue
for perm in result["permissions"]: for perm in result["permissions"]:
if ( if (perm["name"] == "REQUEST_INSTALL_PACKAGES"
perm["name"] == "REQUEST_INSTALL_PACKAGES" and perm["access"] == "allow"):
and perm["access"] == "allow" self.log.info("Package %s with REQUEST_INSTALL_PACKAGES "
): "permission", result["package_name"])
self.log.info(
"Package %s with REQUEST_INSTALL_PACKAGES " "permission",
result["package_name"],
)
def run(self) -> None: def run(self) -> None:
self._adb_connect() self._adb_connect()
@@ -80,6 +69,5 @@ class DumpsysAppOps(AndroidExtraction):
self.results = parse_dumpsys_appops(output) self.results = parse_dumpsys_appops(output)
self.log.info( self.log.info("Extracted a total of %d records from app-ops manager",
"Extracted a total of %d records from app-ops manager", len(self.results) len(self.results))
)

View File

@@ -19,18 +19,13 @@ class DumpsysBatteryDaily(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]: def serialize(self, record: dict) -> Union[dict, list]:
return { return {
@@ -38,7 +33,7 @@ class DumpsysBatteryDaily(AndroidExtraction):
"module": self.__class__.__name__, "module": self.__class__.__name__,
"event": "battery_daily", "event": "battery_daily",
"data": f"Recorded update of package {record['package_name']} " "data": f"Recorded update of package {record['package_name']} "
f"with vers {record['vers']}", f"with vers {record['vers']}"
} }
def check_indicators(self) -> None: def check_indicators(self) -> None:
@@ -59,6 +54,5 @@ class DumpsysBatteryDaily(AndroidExtraction):
self.results = parse_dumpsys_battery_daily(output) self.results = parse_dumpsys_battery_daily(output)
self.log.info( self.log.info("Extracted %d records from battery daily stats",
"Extracted %d records from battery daily stats", len(self.results) len(self.results))
)

View File

@@ -19,18 +19,13 @@ class DumpsysBatteryHistory(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None: def check_indicators(self) -> None:
if not self.indicators: if not self.indicators:
@@ -50,4 +45,5 @@ class DumpsysBatteryHistory(AndroidExtraction):
self.results = parse_dumpsys_battery_history(output) self.results = parse_dumpsys_battery_history(output)
self.log.info("Extracted %d records from battery history", len(self.results)) self.log.info("Extracted %d records from battery history",
len(self.results))

View File

@@ -21,18 +21,13 @@ class DumpsysDBInfo(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None: def check_indicators(self) -> None:
if not self.indicators: if not self.indicators:
@@ -54,7 +49,5 @@ class DumpsysDBInfo(AndroidExtraction):
self.results = parse_dumpsys_dbinfo(output) self.results = parse_dumpsys_dbinfo(output)
self.log.info( self.log.info("Extracted a total of %d records from database information",
"Extracted a total of %d records from database information", len(self.results))
len(self.results),
)

View File

@@ -18,18 +18,13 @@ class DumpsysFull(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def run(self) -> None: def run(self) -> None:
self._adb_connect() self._adb_connect()

View File

@@ -25,18 +25,13 @@ class DumpsysReceivers(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = results if results else {} self.results = results if results else {}
@@ -47,31 +42,21 @@ class DumpsysReceivers(AndroidExtraction):
for intent, receivers in self.results.items(): for intent, receivers in self.results.items():
for receiver in receivers: for receiver in receivers:
if intent == INTENT_NEW_OUTGOING_SMS: if intent == INTENT_NEW_OUTGOING_SMS:
self.log.info( self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"",
'Found a receiver to intercept outgoing SMS messages: "%s"', receiver["receiver"])
receiver["receiver"],
)
elif intent == INTENT_SMS_RECEIVED: elif intent == INTENT_SMS_RECEIVED:
self.log.info( self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"",
'Found a receiver to intercept incoming SMS messages: "%s"', receiver["receiver"])
receiver["receiver"],
)
elif intent == INTENT_DATA_SMS_RECEIVED: elif intent == INTENT_DATA_SMS_RECEIVED:
self.log.info( self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"",
'Found a receiver to intercept incoming data SMS message: "%s"', receiver["receiver"])
receiver["receiver"],
)
elif intent == INTENT_PHONE_STATE: elif intent == INTENT_PHONE_STATE:
self.log.info( self.log.info("Found a receiver monitoring "
"Found a receiver monitoring " "telephony state/incoming calls: \"%s\"",
'telephony state/incoming calls: "%s"', receiver["receiver"])
receiver["receiver"],
)
elif intent == INTENT_NEW_OUTGOING_CALL: elif intent == INTENT_NEW_OUTGOING_CALL:
self.log.info( self.log.info("Found a receiver monitoring outgoing calls: \"%s\"",
'Found a receiver monitoring outgoing calls: "%s"', receiver["receiver"])
receiver["receiver"],
)
ioc = self.indicators.check_app_id(receiver["package_name"]) ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc: if ioc:

View File

@@ -30,18 +30,13 @@ class Files(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.full_find = False self.full_find = False
def serialize(self, record: dict) -> Union[dict, list, None]: def serialize(self, record: dict) -> Union[dict, list, None]:
@@ -58,15 +53,12 @@ class Files(AndroidExtraction):
def check_indicators(self) -> None: def check_indicators(self) -> None:
for result in self.results: for result in self.results:
if result.get("is_suid"): if result.get("is_suid"):
self.log.warning( self.log.warning("Found an SUID file in a non-standard directory \"%s\".",
'Found an SUID file in a non-standard directory "%s".', result["path"])
result["path"],
)
if self.indicators and self.indicators.check_file_path(result["path"]): if self.indicators and self.indicators.check_file_path(result["path"]):
self.log.warning( self.log.warning("Found a known suspicous file at path: \"%s\"",
'Found a known suspicous file at path: "%s"', result["path"] result["path"])
)
self.detected.append(result) self.detected.append(result)
def backup_file(self, file_path: str) -> None: def backup_file(self, file_path: str) -> None:
@@ -81,13 +73,13 @@ class Files(AndroidExtraction):
local_file_path = os.path.join(local_files_folder, local_file_name) local_file_path = os.path.join(local_files_folder, local_file_name)
try: try:
self._adb_download(remote_path=file_path, local_path=local_file_path) self._adb_download(remote_path=file_path,
local_path=local_file_path)
except Exception: except Exception:
pass pass
else: else:
self.log.info( self.log.info("Downloaded file %s to local copy at %s",
"Downloaded file %s to local copy at %s", file_path, local_file_path file_path, local_file_path)
)
def find_files(self, folder: str) -> None: def find_files(self, folder: str) -> None:
assert isinstance(self.results, list) assert isinstance(self.results, list)
@@ -100,21 +92,20 @@ class Files(AndroidExtraction):
if len(file_line) < 6: if len(file_line) < 6:
self.log.info("Skipping invalid file info - %s", file_line.rstrip()) self.log.info("Skipping invalid file info - %s", file_line.rstrip())
continue continue
[unix_timestamp, mode, size, owner, group, full_path] = file_info [unix_timestamp, mode, size,
owner, group, full_path] = file_info
mod_time = convert_unix_to_iso(unix_timestamp) mod_time = convert_unix_to_iso(unix_timestamp)
self.results.append( self.results.append({
{ "path": full_path,
"path": full_path, "modified_time": mod_time,
"modified_time": mod_time, "mode": mode,
"mode": mode, "is_suid": (int(mode, 8) & stat.S_ISUID) == 2048,
"is_suid": (int(mode, 8) & stat.S_ISUID) == 2048, "is_sgid": (int(mode, 8) & stat.S_ISGID) == 1024,
"is_sgid": (int(mode, 8) & stat.S_ISGID) == 1024, "size": size,
"size": size, "owner": owner,
"owner": owner, "group": group,
"group": group, })
}
)
else: else:
output = self._adb_command(f"find '{folder}' -type f 2> /dev/null") output = self._adb_command(f"find '{folder}' -type f 2> /dev/null")
for file_line in output.splitlines(): for file_line in output.splitlines():
@@ -132,15 +123,15 @@ class Files(AndroidExtraction):
self.find_files(tmp_folder) self.find_files(tmp_folder)
for entry in self.results: for entry in self.results:
self.log.info("Found file in tmp folder at path %s", entry.get("path")) self.log.info("Found file in tmp folder at path %s",
entry.get("path"))
self.backup_file(entry.get("path")) self.backup_file(entry.get("path"))
for media_folder in ANDROID_MEDIA_FOLDERS: for media_folder in ANDROID_MEDIA_FOLDERS:
self.find_files(media_folder) self.find_files(media_folder)
self.log.info( self.log.info("Found %s files in primary Android tmp and media folders",
"Found %s files in primary Android tmp and media folders", len(self.results) len(self.results))
)
if self.fast_mode: if self.fast_mode:
self.log.info("Flag --fast was enabled: skipping full file listing") self.log.info("Flag --fast was enabled: skipping full file listing")

View File

@@ -20,18 +20,13 @@ class Getprop(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = {} if not results else results self.results = {} if not results else results
@@ -57,11 +52,10 @@ class Getprop(AndroidExtraction):
if entry.get("name", "") != "ro.build.version.security_patch": if entry.get("name", "") != "ro.build.version.security_patch":
continue continue
patch_date = datetime.strptime(entry["value"], "%Y-%m-%d") patch_date = datetime.strptime(entry["value"], "%Y-%m-%d")
if (datetime.now() - patch_date) > timedelta(days=6 * 30): if (datetime.now() - patch_date) > timedelta(days=6*30):
self.log.warning( self.log.warning("This phone has not received security updates "
"This phone has not received security updates " "for more than six months (last update: %s)",
"for more than six months (last update: %s)", entry["value"])
entry["value"],
)
self.log.info("Extracted %d Android system properties", len(self.results)) self.log.info("Extracted %d Android system properties",
len(self.results))

View File

@@ -18,40 +18,37 @@ class Logcat(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def run(self) -> None: def run(self) -> None:
self._adb_connect() self._adb_connect()
# Get the current logcat. # Get the current logcat.
output = self._adb_command('logcat -d -b all "*:V"') output = self._adb_command("logcat -d -b all \"*:V\"")
# Get the locat prior to last reboot. # Get the locat prior to last reboot.
last_output = self._adb_command('logcat -L -b all "*:V"') last_output = self._adb_command("logcat -L -b all \"*:V\"")
if self.results_path: if self.results_path:
logcat_path = os.path.join(self.results_path, "logcat.txt") logcat_path = os.path.join(self.results_path,
"logcat.txt")
with open(logcat_path, "w", encoding="utf-8") as handle: with open(logcat_path, "w", encoding="utf-8") as handle:
handle.write(output) handle.write(output)
self.log.info("Current logcat logs stored at %s", logcat_path) self.log.info("Current logcat logs stored at %s",
logcat_path)
logcat_last_path = os.path.join(self.results_path, "logcat_last.txt") logcat_last_path = os.path.join(self.results_path,
"logcat_last.txt")
with open(logcat_last_path, "w", encoding="utf-8") as handle: with open(logcat_last_path, "w", encoding="utf-8") as handle:
handle.write(last_output) handle.write(last_output)
self.log.info( self.log.info("Logcat logs prior to last reboot stored at %s",
"Logcat logs prior to last reboot stored at %s", logcat_last_path logcat_last_path)
)
self._adb_disconnect() self._adb_disconnect()

View File

@@ -93,65 +93,59 @@ class Packages(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]: def serialize(self, record: dict) -> Union[dict, list]:
records = [] records = []
timestamps = [ timestamps = [
{"event": "package_install", "timestamp": record["timestamp"]}, {
"event": "package_install",
"timestamp": record["timestamp"]
},
{ {
"event": "package_first_install", "event": "package_first_install",
"timestamp": record["first_install_time"], "timestamp": record["first_install_time"]
},
{
"event": "package_last_update",
"timestamp": record["last_update_time"]
}, },
{"event": "package_last_update", "timestamp": record["last_update_time"]},
] ]
for timestamp in timestamps: for timestamp in timestamps:
records.append( records.append({
{ "timestamp": timestamp["timestamp"],
"timestamp": timestamp["timestamp"], "module": self.__class__.__name__,
"module": self.__class__.__name__, "event": timestamp["event"],
"event": timestamp["event"], "data": f"{record['package_name']} (system: {record['system']},"
"data": f"{record['package_name']} (system: {record['system']}," f" third party: {record['third_party']})",
f" third party: {record['third_party']})", })
}
)
return records return records
def check_indicators(self) -> None: def check_indicators(self) -> None:
for result in self.results: for result in self.results:
if result["package_name"] in ROOT_PACKAGES: if result["package_name"] in ROOT_PACKAGES:
self.log.warning( self.log.warning("Found an installed package related to "
"Found an installed package related to " "rooting/jailbreaking: \"%s\"",
'rooting/jailbreaking: "%s"', result["package_name"])
result["package_name"],
)
self.detected.append(result) self.detected.append(result)
continue continue
if result["package_name"] in SECURITY_PACKAGES and result["disabled"]: if result["package_name"] in SECURITY_PACKAGES and result["disabled"]:
self.log.warning( self.log.warning("Found a security package disabled: \"%s\"",
'Found a security package disabled: "%s"', result["package_name"] result["package_name"])
)
if result["package_name"] in SYSTEM_UPDATE_PACKAGES and result["disabled"]: if result["package_name"] in SYSTEM_UPDATE_PACKAGES and result["disabled"]:
self.log.warning( self.log.warning("System OTA update package \"%s\" disabled on the phone",
'System OTA update package "%s" disabled on the phone', result["package_name"])
result["package_name"],
)
if not self.indicators: if not self.indicators:
continue continue
@@ -245,24 +239,22 @@ class Packages(AndroidExtraction):
for file_path in output.splitlines(): for file_path in output.splitlines():
file_path = file_path.strip() file_path = file_path.strip()
md5 = self._adb_command(f"md5sum {file_path}").split(" ", maxsplit=1)[0] md5 = self._adb_command(
sha1 = self._adb_command(f"sha1sum {file_path}").split(" ", maxsplit=1)[0] f"md5sum {file_path}").split(" ", maxsplit=1)[0]
sha256 = self._adb_command(f"sha256sum {file_path}").split(" ", maxsplit=1)[ sha1 = self._adb_command(
0 f"sha1sum {file_path}").split(" ", maxsplit=1)[0]
] sha256 = self._adb_command(
sha512 = self._adb_command(f"sha512sum {file_path}").split(" ", maxsplit=1)[ f"sha256sum {file_path}").split(" ", maxsplit=1)[0]
0 sha512 = self._adb_command(
] f"sha512sum {file_path}").split(" ", maxsplit=1)[0]
package_files.append( package_files.append({
{ "path": file_path,
"path": file_path, "md5": md5,
"md5": md5, "sha1": sha1,
"sha1": sha1, "sha256": sha256,
"sha256": sha256, "sha512": sha512,
"sha512": sha512, })
}
)
return package_files return package_files
@@ -298,7 +290,8 @@ class Packages(AndroidExtraction):
"files": package_files, "files": package_files,
} }
dumpsys_package = self._adb_command(f"dumpsys package {package_name}") dumpsys_package = self._adb_command(
f"dumpsys package {package_name}")
package_details = self.parse_package_for_details(dumpsys_package) package_details = self.parse_package_for_details(dumpsys_package)
new_package.update(package_details) new_package.update(package_details)
@@ -331,12 +324,10 @@ class Packages(AndroidExtraction):
dangerous_permissions_count += 1 dangerous_permissions_count += 1
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD: if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
self.log.info( self.log.info("Third-party package \"%s\" requested %d "
'Third-party package "%s" requested %d ' "potentially dangerous permissions",
"potentially dangerous permissions", result["package_name"],
result["package_name"], dangerous_permissions_count)
dangerous_permissions_count,
)
packages_to_lookup = [] packages_to_lookup = []
for result in self.results: for result in self.results:
@@ -344,18 +335,14 @@ class Packages(AndroidExtraction):
continue continue
packages_to_lookup.append(result) packages_to_lookup.append(result)
self.log.info( self.log.info("Found non-system package with name \"%s\" installed by \"%s\" on %s",
'Found non-system package with name "%s" installed by "%s" on %s', result["package_name"], result["installer"],
result["package_name"], result["timestamp"])
result["installer"],
result["timestamp"],
)
if not self.fast_mode: if not self.fast_mode:
self.check_virustotal(packages_to_lookup) self.check_virustotal(packages_to_lookup)
self.log.info( self.log.info("Extracted at total of %d installed package names",
"Extracted at total of %d installed package names", len(self.results) len(self.results))
)
self._adb_disconnect() self._adb_disconnect()

View File

@@ -17,18 +17,13 @@ class Processes(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None: def check_indicators(self) -> None:
if not self.indicators: if not self.indicators:
@@ -87,4 +82,5 @@ class Processes(AndroidExtraction):
self._adb_disconnect() self._adb_disconnect()
self.log.info("Extracted records on a total of %d processes", len(self.results)) self.log.info("Extracted records on a total of %d processes",
len(self.results))

View File

@@ -17,18 +17,13 @@ class RootBinaries(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def run(self) -> None: def run(self) -> None:
root_binaries = [ root_binaries = [
@@ -61,6 +56,6 @@ class RootBinaries(AndroidExtraction):
continue continue
self.detected.append(root_binary) self.detected.append(root_binary)
self.log.warning('Found root binary "%s"', root_binary) self.log.warning("Found root binary \"%s\"", root_binary)
self._adb_disconnect() self._adb_disconnect()

View File

@@ -19,18 +19,13 @@ class SELinuxStatus(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = {} if not results else results self.results = {} if not results else results
@@ -45,4 +40,4 @@ class SELinuxStatus(AndroidExtraction):
if status == "enforcing": if status == "enforcing":
self.log.info("SELinux is being regularly enforced") self.log.info("SELinux is being regularly enforced")
else: else:
self.log.warning('SELinux status is "%s"!', status) self.log.warning("SELinux status is \"%s\"!", status)

View File

@@ -53,7 +53,7 @@ ANDROID_DANGEROUS_SETTINGS = [
"description": "enabled installation of non Google Play apps", "description": "enabled installation of non Google Play apps",
"key": "install_non_market_apps", "key": "install_non_market_apps",
"safe_value": "0", "safe_value": "0",
}, }
] ]
@@ -65,18 +65,13 @@ class Settings(AndroidExtraction):
file_path: Optional[str] = None, file_path: Optional[str] = None,
target_path: Optional[str] = None, target_path: Optional[str] = None,
results_path: Optional[str] = None, results_path: Optional[str] = None,
fast_mode: bool = False, fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__), log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None, results: Optional[list] = None
) -> None: ) -> None:
super().__init__( super().__init__(file_path=file_path, target_path=target_path,
file_path=file_path, results_path=results_path, fast_mode=fast_mode,
target_path=target_path, log=log, results=results)
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = {} if not results else results self.results = {} if not results else results
@@ -87,12 +82,8 @@ class Settings(AndroidExtraction):
# Check if one of the dangerous settings is using an unsafe # Check if one of the dangerous settings is using an unsafe
# value (different than the one specified). # value (different than the one specified).
if danger["key"] == key and danger["safe_value"] != value: if danger["key"] == key and danger["safe_value"] != value:
self.log.warning( self.log.warning("Found suspicious setting \"%s = %s\" (%s)",
'Found suspicious setting "%s = %s" (%s)', key, value, danger["description"])
key,
value,
danger["description"],
)
break break
def run(self) -> None: def run(self) -> None:

Some files were not shown because too many files have changed in this diff Show More