1
mirror of https://github.com/mvt-project/mvt synced 2025-11-13 01:37:36 +01:00

Compare commits

..

2 Commits

Author SHA1 Message Date
Donncha Ó Cearbhaill
d82e55e12c Initial commit of anonymous usage telemetry 2025-11-07 17:23:59 +01:00
github-actions[bot]
981371bd8b Add new iOS versions and build numbers (#714)
Co-authored-by: DonnchaC <DonnchaC@users.noreply.github.com>
2025-11-06 19:18:07 +01:00
7 changed files with 168 additions and 6 deletions

View File

@@ -31,7 +31,7 @@ dependencies = [
"PyYAML>=6.0.2",
"pyahocorasick==2.2.0",
"betterproto==1.2.5",
"pydantic==2.12.3",
"pydantic==2.11.7",
"pydantic-settings==2.10.1",
"NSKeyedUnArchiver==1.5.2",
"python-dateutil==2.9.0.post0",

View File

@@ -36,7 +36,7 @@ from mvt.common.help import (
)
from mvt.common.logo import logo
from mvt.common.updates import IndicatorsUpdates
from mvt.common.utils import init_logging, set_verbose_logging
from mvt.common.utils import init_logging, set_verbose_logging, CommandWrapperGroup
from .cmd_check_adb import CmdAndroidCheckADB
from .cmd_check_androidqf import CmdAndroidCheckAndroidQF
@@ -68,7 +68,7 @@ def _get_disable_flags(ctx):
# ==============================================================================
# Main
# ==============================================================================
@click.group(invoke_without_command=False)
@click.group(invoke_without_command=False, cls=CommandWrapperGroup)
@click.option(
"--disable-update-check", is_flag=True, help=HELP_MSG_DISABLE_UPDATE_CHECK
)

View File

@@ -1,10 +1,11 @@
import os
import yaml
import json
import uuid
from typing import Tuple, Type, Optional
from appdirs import user_config_dir
from pydantic import AnyHttpUrl, Field
from pydantic import AnyHttpUrl, BaseModel, Field
from pydantic_settings import (
BaseSettings,
InitSettingsSource,
@@ -17,6 +18,22 @@ MVT_CONFIG_FOLDER = user_config_dir("mvt")
MVT_CONFIG_PATH = os.path.join(MVT_CONFIG_FOLDER, "config.yaml")
class TelemetrySettings(BaseModel):
"""
Settings used by the Telemetry module.
"""
ENABLED: bool = Field(True, description="Flag for telemetry collection")
ENDPOINT: AnyHttpUrl = Field(
"https://t.mvt.re/events", description="Telemetry collection endpoint"
)
DEVICE_ID: str | None = Field(
default=None,
required=True,
description="Anonymous Unique ID for use in telemetry",
)
class MVTSettings(BaseSettings):
model_config = SettingsConfigDict(
env_prefix="MVT_",
@@ -24,7 +41,7 @@ class MVTSettings(BaseSettings):
extra="ignore",
nested_model_default_partial_updates=True,
)
# Allow to decided if want to load environment variables
# Flag to enable or disable loading of environment variables.
load_env: bool = Field(True, exclude=True)
# General settings
@@ -51,6 +68,9 @@ class MVTSettings(BaseSettings):
PROFILE: bool = Field(False, description="Profile the execution of MVT modules")
HASH_FILES: bool = Field(False, description="Should MVT hash output files")
# Telemetry settings
TELEMETRY: TelemetrySettings = TelemetrySettings(include=True)
@classmethod
def settings_customise_sources(
cls,
@@ -95,6 +115,8 @@ class MVTSettings(BaseSettings):
"""
# Set invalid env prefix to avoid loading env variables.
settings = MVTSettings(load_env=False)
if not settings.TELEMETRY.DEVICE_ID:
settings.TELEMETRY.DEVICE_ID = str(uuid.uuid4())
settings.save_settings()
# Load the settings again with any ENV variables.

113
src/mvt/common/telemetry.py Normal file
View File

@@ -0,0 +1,113 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
import sys
import platform
import requests
import json
import logging
import threading
from mvt.common.config import settings
from mvt.common.version import MVT_VERSION
logger = logging.getLogger(__name__)
class Telemetry(object):
"""
MVT collects anonymous telemetry to understand how MVT is used.
This data is helpful to prioritize features, identify platforms and versions. It
will also how many users are using custom indicators, modules and packages.
"""
def __init__(self):
self.endpoint = settings.TELEMETRY.ENDPOINT
self.device_id = settings.TELEMETRY.DEVICE_ID
def is_telemetry_enabled(self):
return settings.TELEMETRY.ENABLED
@staticmethod
def _installation_type():
"""Check if MVT is installed via pip, docker or source."""
if "site-packages" in __file__:
return "pypi"
elif os.environ.get("MVT_DOCKER_IMAGE", None):
return "docker"
else:
return "source"
def _get_device_properties(self):
return {
"os_type": platform.system(),
"os_version": platform.platform(),
"python_version": f"{platform.python_version()}/{platform.python_implementation()}",
"mvt_version": MVT_VERSION,
"mvt_installation_type": self._installation_type(),
"mvt_package_name": __package__,
"mvt_command": os.path.basename(sys.argv[0]),
"telemetry_version": "0.0.1",
}
def _build_event(self, event_name, event_properties):
return {
"event": event_name,
"distinct_id": self.device_id,
"properties": {
**self._get_device_properties(),
**event_properties,
},
}
def _send_event(self, event):
if not self.is_telemetry_enabled():
# Telemetry is disabled. Do not send any data.
return
event_json = json.dumps(event)
try:
telemetry_thread = threading.Thread(
target=self._send_event_thread, args=(event_json,)
)
telemetry_thread.start()
except Exception as e:
logger.debug(f"Failed to send telemetry data in a thread: {e}")
def _send_event_thread(self, event):
try:
response = requests.post(
self.endpoint,
data=json.dumps(event),
timeout=5,
headers={
"Content-Type": "application/json",
"User-Agent": f"mvt/{MVT_VERSION}",
},
)
response.raise_for_status()
except requests.RequestException as e:
logger.debug(f"Failed to send telemetry data: {e}")
def send_cli_command_event(self, command_name):
event = self._build_event(
event_name="run_mvt_cli_command",
event_properties={"cli_command_name": command_name},
)
self._send_event(event)
def send_module_detections_event(self, module_name, detections):
event = self._build_event(
event_name="module_detections",
event_properties={"module_name": module_name, "detections": detections},
)
self._send_event(event)
telemetry = Telemetry()

View File

@@ -10,12 +10,34 @@ import json
import logging
import os
import re
import click
from typing import Any, Iterator, Union
from rich.logging import RichHandler
from mvt.common.telemetry import telemetry
from mvt.common.config import settings
class CommandWrapperGroup(click.Group):
"""Allow hooks to run before and after MVT CLI commands"""
def add_command(self, cmd, name=None):
click.Group.add_command(self, cmd, name=name)
cmd.invoke = self.build_command_invoke(cmd.invoke)
def build_command_invoke(self, original_invoke):
def command_invoke(ctx):
"""Invoke the Click command"""
# Run telemetry before the command
telemetry.send_cli_command_event(ctx.command.name)
# Run the original command
original_invoke(ctx)
return command_invoke
class CustomJSONEncoder(json.JSONEncoder):
"""
Custom JSON encoder to handle non-standard types.

View File

@@ -18,6 +18,7 @@ from mvt.common.utils import (
generate_hashes_from_path,
init_logging,
set_verbose_logging,
CommandWrapperGroup,
)
from mvt.common.help import (
HELP_MSG_VERSION,
@@ -68,7 +69,7 @@ def _get_disable_flags(ctx):
# ==============================================================================
# Main
# ==============================================================================
@click.group(invoke_without_command=False)
@click.group(invoke_without_command=False, cls=CommandWrapperGroup)
@click.option(
"--disable-update-check", is_flag=True, help=HELP_MSG_DISABLE_UPDATE_CHECK
)

View File

@@ -1156,6 +1156,10 @@
"version": "18.7",
"build": "22H20"
},
{
"version": "18.7.2",
"build": "22H124"
},
{
"version": "26",
"build": "23A341"