metasploit-payloads/python/meterpreter/tests/test_ext_server_stdapi.py

326 lines
11 KiB
Python

# coding=utf-8
import unittest
import code
import sys
import socket
import os
if sys.version_info >= (3, 3):
from unittest import mock
else:
import mock
ERROR_SUCCESS = 0
is_windows = sys.platform.startswith("win")
windows_only_test_reason = "Windows only test"
def create_meterpreter_context():
with open("meterpreter.py", "rb") as file:
# Read and patch out the Meterpreter socket connection logic side effect onwards
source = file.read()
source_without_socket_connection = source[
0 : source.index(b"# PATCH-SETUP-ENCRYPTION #")
]
context = {}
exec(source_without_socket_connection, context, context)
return context
def create_ext_server_stdapi_context(meterpreter, meterpreter_context):
with open("ext_server_stdapi.py", "rb") as file:
extension_content = file.read()
context = {}
context.update(meterpreter_context["EXPORTED_SYMBOLS"])
context["meterpreter"] = meterpreter
exec(extension_content, context, context)
return context
class MockMeterpreter:
def __init__(self):
self.extension_functions = {}
def register_extension(self, extension_name):
pass
def register_function(self, func):
self.extension_functions[func.__name__] = func
return func
class ExtServerStdApiTest(unittest.TestCase):
def setUp(self):
self.mock_meterpreter = MockMeterpreter()
self.meterpreter_context = create_meterpreter_context()
self.ext_server_stdapi = create_ext_server_stdapi_context(
self.mock_meterpreter, self.meterpreter_context
)
def assertMethodErrorSuccess(self, method_name, request, response):
result = self.ext_server_stdapi[method_name](request, response)
self.assertErrorSuccess(result)
return result
def assertErrorSuccess(self, result):
self.assertEqual(result[0], ERROR_SUCCESS)
self.assertIsInstance(result[1], bytes)
def assertRegex(self, text, regexp, msg=None):
if hasattr(super(self.__class__.__bases__[0], self), 'assertRegex'):
super(self.__class__.__bases__[0], self).assertRegex(text, regexp, msg)
else:
# Python 2.7 fallback
self.assertRegexpMatches(text, regexp, msg)
class ExtServerStdApiNetworkTest(ExtServerStdApiTest):
def test_stdapi_net_config_get_interfaces(self):
request = bytes()
response = bytes()
self.assertMethodErrorSuccess(
"stdapi_net_config_get_interfaces", request, response
)
def test_stdapi_net_config_get_routes(self):
request = bytes()
response = bytes()
self.assertMethodErrorSuccess("stdapi_net_config_get_routes", request, response)
def test_stdapi_sys_process_get_processes(self):
request = bytes()
response = bytes()
self.assertMethodErrorSuccess(
"stdapi_sys_process_get_processes", request, response
)
@mock.patch("subprocess.Popen")
def test_stdapi_net_config_get_routes_via_osx_netstat(self, mock_popen):
command_result = b"""
Routing tables
Internet:
Destination Gateway Flags Netif Expire
default 10.79.0.1 UGScg en0
192.168.1 link#6 UCS en0 !
Internet6:
Destination Gateway Flags Netif Expire
default fe80::%utun0 UGcIg utun0
fe80::e8fa:527d:5e1a:1122%en5 f3:3a:1c:c6:f7:75 UHLI lo0
fe80::e8fa:527d:5e1a:ae4c%bridge100 f3.3a.1c.c6.f7.75 UHLI lo0
""".lstrip()
process_mock = mock.Mock()
attrs = {
"communicate.return_value": (command_result, b""),
"wait.return_value": ERROR_SUCCESS,
}
process_mock.configure_mock(**attrs)
mock_popen.return_value = process_mock
result = self.ext_server_stdapi[
"stdapi_net_config_get_routes_via_osx_netstat"
]()
expected = [
{
"gateway": b"\nO\x00\x01",
"iface": "en0",
"metric": 0,
"netmask": b"\x00\x00\x00\x00",
"subnet": b"\x00\x00\x00\x00",
},
{
"gateway": b"\x00\x00\x00\x00",
"iface": "en0",
"metric": 0,
"netmask": b"\xff\xff\xff\xff",
"subnet": b"\xc0\xa8\x01\x00",
},
{
"gateway": b"\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
"iface": "utun0",
"metric": 0,
"netmask": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
"subnet": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
},
{
"gateway": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
"iface": "lo0",
"metric": 0,
"netmask": b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00",
"subnet": b'\xfe\x80\x00\x00\x00\x00\x00\x00\xe8\xfaR}^\x1a\x11"',
},
{
"gateway": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
"iface": "lo0",
"metric": 0,
"netmask": b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00",
"subnet": b"\xfe\x80\x00\x00\x00\x00\x00\x00\xe8\xfaR}^\x1a\xaeL",
},
]
self.assertEqual(result, expected)
@mock.patch("subprocess.Popen")
def test_stdapi_net_config_get_interfaces_via_osx_ifconfig(self, mock_popen):
command_result = b"""
en0: flags=8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500
options=400<CHANNEL_IO>
ether 11:22:33:44:55:66
inet 192.168.1.166 netmask 0xffffff00 broadcast 192.168.1.255
media: autoselect
status: active
""".lstrip()
process_mock = mock.Mock()
attrs = {
"communicate.return_value": (command_result, b""),
"wait.return_value": ERROR_SUCCESS,
}
process_mock.configure_mock(**attrs)
mock_popen.return_value = process_mock
result = self.ext_server_stdapi[
"stdapi_net_config_get_interfaces_via_osx_ifconfig"
]()
expected = [
{
"addrs": [
(
socket.AF_INET,
b"\xc0\xa8\x01\xa6",
b"\xff\xff\xff\x00",
)
],
"flags": 8863,
"flags_str": "UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST",
"hw_addr": '\x11"3DUf',
"index": 0,
"mtu": 1500,
"name": "en0",
}
]
self.assertEqual(result, expected)
class ExtServerStdApiFileSystemTest(ExtServerStdApiTest):
def test_stdapi_fs_stat(self):
request = bytes()
request += self.meterpreter_context["tlv_pack"](
self.ext_server_stdapi["TLV_TYPE_FILE_PATH"],
os.path.dirname(os.path.abspath(__file__)),
)
response = bytes()
self.assertMethodErrorSuccess("stdapi_fs_stat", request, response)
# Older versions of Python on Windows return invalid/negative values for st_rdev
# https://github.com/python/cpython/commit/a10c1f221a5248cedf476736eea365e1dfc84910#diff-b419a047f587ec3afef8493e19dbfc142624bf278f3298bfc74729abd89e311d
@mock.patch("os.stat")
@mock.patch("sys.platform")
def test_stdapi_fs_stat_with_negative_st_rdev_on_windows(
self, mock_sys_platform, mock_os_stat
):
os_stat_result = mock.MagicMock()
os_stat_result.configure_mock(
**{
"st_mode": 33206,
"st_ino": 281474976726344,
"st_dev": 3323847249,
"st_nlink": 1,
"st_uid": 0,
"st_gid": 0,
"st_size": 9884,
"st_rdev": -1910224650,
"st_atime": 1686079301.2200336,
"st_mtime": 1686079301.2200336,
"st_ctime": 1686079301.2200336,
}
)
mock_os_stat.return_value = os_stat_result
mock_sys_platform.return_value = "win32"
request = bytes()
request += self.meterpreter_context["tlv_pack"](
self.ext_server_stdapi["TLV_TYPE_FILE_PATH"], "/mock/path"
)
response = bytes()
self.assertMethodErrorSuccess("stdapi_fs_stat", request, response)
class ExtServerStdApiSysProcess(ExtServerStdApiTest):
def test_stdapi_sys_process_get_processes(self):
request = bytes()
response = bytes()
result = self.assertMethodErrorSuccess(
"stdapi_sys_process_get_processes", request, response
)
self.assertErrorSuccess(result)
@mock.patch("subprocess.Popen")
def test_stdapi_sys_process_get_processes_via_ps(self, mock_popen):
command_result = b"""
PID PPID USER COMMAND
1 0 root /sbin/launchd
88 1 root /usr/sbin/syslogd
89 1 root /usr/libexec/UserEventAgent (System)
""".lstrip()
process_mock = mock.Mock()
attrs = {
"communicate.return_value": (command_result, b""),
"wait.return_value": ERROR_SUCCESS,
}
process_mock.configure_mock(**attrs)
mock_popen.return_value = process_mock
request = bytes()
response = bytes()
result = self.ext_server_stdapi["stdapi_sys_process_get_processes_via_ps"](
request, response
)
self.assertErrorSuccess(result)
class ExtServerStdApiSystemConfigTest(ExtServerStdApiTest):
def test_stdapi_sys_config_getuid(self):
request = bytes()
response = bytes()
_result_code, result_tlvs = self.assertMethodErrorSuccess(
"stdapi_sys_config_getuid", request, response
)
user_name = self.meterpreter_context["packet_get_tlv"](
result_tlvs, self.ext_server_stdapi["TLV_TYPE_USER_NAME"]
).get("value")
self.assertRegex(user_name, ".+")
@unittest.skipUnless(is_windows, windows_only_test_reason)
def test_stdapi_sys_config_getsid(self):
request = bytes()
response = bytes()
_result_code, result_tlvs = self.assertMethodErrorSuccess(
"stdapi_sys_config_getsid", request, response
)
sid = self.meterpreter_context["packet_get_tlv"](
result_tlvs, self.ext_server_stdapi["TLV_TYPE_SID"]
).get("value")
self.assertRegex(sid, "S-1-5-.*")
if __name__ == "__main__":
unittest.main()