1
mirror of https://github.com/rapid7/metasploit-payloads synced 2024-11-20 14:39:22 +01:00

Fix python meterpreter getuid crash on windows

This commit is contained in:
adfoster-r7 2023-06-21 13:00:49 +01:00
parent 1432ebbba3
commit 2e8e97cdb2
3 changed files with 100 additions and 38 deletions

View File

@ -22,3 +22,9 @@ python3 ./tests/test_ext_server_stdapi.py TestExtServerStdApi.test_stdapi_net_co
# Or:
python3 -m unittest tests.test_ext_server_stdapi.ExtServerStdApiFileSystemTest.test_stdapi_fs_stat
```
To debug tests, add the following code snippet to enter into an interactive debugger at the calling stack frame:
```python
import pdb; pdb.set_trace()
```

View File

@ -899,7 +899,7 @@ def get_stat_buffer(path):
st_buf += struct.pack('<QQQQ', long(si.st_size), long(si.st_atime), long(si.st_mtime), long(si.st_ctime))
return st_buf
def get_token_user(handle):
def get_token_user_sid(handle):
TokenUser = 1
advapi32 = ctypes.windll.advapi32
advapi32.OpenProcessToken.argtypes = [ctypes.c_void_p, ctypes.c_uint32, ctypes.POINTER(ctypes.c_void_p)]
@ -913,9 +913,17 @@ def get_token_user(handle):
ctypes.windll.kernel32.CloseHandle(token_handle)
if not result:
return None
return ctstruct_unpack(TOKEN_USER, token_user_buffer)
token_user = ctstruct_unpack(TOKEN_USER, token_user_buffer)
def get_username_from_token(token_user):
GetLengthSid = ctypes.windll.advapi32.GetLengthSid
GetLengthSid.argtypes = [ctypes.c_void_p]
GetLengthSid.restype = ctypes.c_uint32
sid_length = GetLengthSid(token_user.User.Sid)
sid_bytes = ctypes.string_at(token_user.User.Sid, sid_length)
return sid_bytes
def get_username_from_sid(sid):
user = (ctypes.c_char * 512)()
domain = (ctypes.c_char * 512)()
user_len = ctypes.c_uint32()
@ -926,7 +934,7 @@ def get_username_from_token(token_user):
use.value = 0
LookupAccountSid = ctypes.windll.advapi32.LookupAccountSidA
LookupAccountSid.argtypes = [ctypes.c_void_p] * 7
if not LookupAccountSid(None, token_user.User.Sid, user, ctypes.byref(user_len), domain, ctypes.byref(domain_len), ctypes.byref(use)):
if not LookupAccountSid(None, sid, user, ctypes.byref(user_len), domain, ctypes.byref(domain_len), ctypes.byref(use)):
return None
return str(ctypes.string_at(domain)) + '\\' + str(ctypes.string_at(user))
@ -1232,13 +1240,13 @@ def stdapi_sys_config_getenv(request, response):
@register_function_if(has_windll)
def stdapi_sys_config_getsid(request, response):
token = get_token_user(ctypes.windll.kernel32.GetCurrentProcess())
if not token:
sid = get_token_user_sid(ctypes.windll.kernel32.GetCurrentProcess())
if not sid:
return error_result_windows(), response
sid_str = ctypes.c_char_p()
ConvertSidToStringSid = ctypes.windll.advapi32.ConvertSidToStringSidA
ConvertSidToStringSid.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
if not ConvertSidToStringSid(token.User.Sid, ctypes.byref(sid_str)):
if not ConvertSidToStringSid(sid, ctypes.byref(sid_str)):
return error_result_windows(), response
sid_str = str(ctypes.string_at(sid_str))
response += tlv_pack(TLV_TYPE_SID, sid_str)
@ -1249,10 +1257,10 @@ def stdapi_sys_config_getuid(request, response):
if has_pwd:
username = pwd.getpwuid(os.getuid()).pw_name
elif has_windll:
token = get_token_user(ctypes.windll.kernel32.GetCurrentProcess())
if not token:
sid = get_token_user_sid(ctypes.windll.kernel32.GetCurrentProcess())
if not sid:
return error_result_windows(), response
username = get_username_from_token(token)
username = get_username_from_sid(sid)
if not username:
return error_result_windows(), response
else:
@ -1607,9 +1615,9 @@ def stdapi_sys_process_get_processes_via_windll(request, response):
else:
exe_path = ''
process_username = ''
process_token_user = get_token_user(proc_h)
if process_token_user:
process_username = get_username_from_token(process_token_user) or ''
process_token_user_sid = get_token_user_sid(proc_h)
if process_token_user_sid:
process_username = get_username_from_sid(process_token_user_sid) or ''
parch = windll_GetNativeSystemInfo()
is_wow64 = ctypes.c_ubyte()
is_wow64.value = 0

View File

@ -13,6 +13,9 @@ else:
ERROR_SUCCESS = 0
is_windows = sys.platform.startswith("win")
windows_only_test_reason = "Windows only test"
def create_meterpreter_context():
with open("meterpreter.py", "rb") as file:
@ -69,6 +72,13 @@ class ExtServerStdApiTest(unittest.TestCase):
self.assertEqual(result[0], ERROR_SUCCESS)
self.assertIsInstance(result[1], bytes)
def assertRegex(self, text, regexp, msg=None):
# Python 2.7
if self.assertRegexpMatches:
self.assertRegexpMatches(text, regexp, msg)
else:
super().assertRegex(text, regexp, msg)
class ExtServerStdApiNetworkTest(ExtServerStdApiTest):
def test_stdapi_net_config_get_interfaces(self):
@ -202,31 +212,6 @@ en0: flags=8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500
self.assertEqual(result, expected)
@mock.patch("subprocess.Popen")
def test_stdapi_sys_process_get_processes_via_ps(self, mock_popen):
command_result = b"""
PID PPID USER COMMAND
1 0 root /sbin/launchd
88 1 root /usr/sbin/syslogd
89 1 root /usr/libexec/UserEventAgent (System)
""".lstrip()
process_mock = mock.Mock()
attrs = {
"communicate.return_value": (command_result, b""),
"wait.return_value": ERROR_SUCCESS,
}
process_mock.configure_mock(**attrs)
mock_popen.return_value = process_mock
request = bytes()
response = bytes()
result = self.ext_server_stdapi["stdapi_sys_process_get_processes_via_ps"](
request, response
)
self.assertErrorSuccess(result)
class ExtServerStdApiFileSystemTest(ExtServerStdApiTest):
def test_stdapi_fs_stat(self):
@ -273,5 +258,68 @@ class ExtServerStdApiFileSystemTest(ExtServerStdApiTest):
self.assertMethodErrorSuccess("stdapi_fs_stat", request, response)
class ExtServerStdApiSysProcess(ExtServerStdApiTest):
def test_stdapi_sys_process_get_processes(self):
request = bytes()
response = bytes()
result = self.assertMethodErrorSuccess(
"stdapi_sys_process_get_processes", request, response
)
self.assertErrorSuccess(result)
@mock.patch("subprocess.Popen")
def test_stdapi_sys_process_get_processes_via_ps(self, mock_popen):
command_result = b"""
PID PPID USER COMMAND
1 0 root /sbin/launchd
88 1 root /usr/sbin/syslogd
89 1 root /usr/libexec/UserEventAgent (System)
""".lstrip()
process_mock = mock.Mock()
attrs = {
"communicate.return_value": (command_result, b""),
"wait.return_value": ERROR_SUCCESS,
}
process_mock.configure_mock(**attrs)
mock_popen.return_value = process_mock
request = bytes()
response = bytes()
result = self.ext_server_stdapi["stdapi_sys_process_get_processes_via_ps"](
request, response
)
self.assertErrorSuccess(result)
class ExtServerStdApiSystemConfigTest(ExtServerStdApiTest):
def test_stdapi_sys_config_getuid(self):
request = bytes()
response = bytes()
_result_code, result_tlvs = self.assertMethodErrorSuccess(
"stdapi_sys_config_getuid", request, response
)
user_name = self.meterpreter_context["packet_get_tlv"](
result_tlvs, self.ext_server_stdapi["TLV_TYPE_USER_NAME"]
).get("value")
self.assertRegex(user_name, ".+")
@unittest.skipUnless(is_windows, windows_only_test_reason)
def test_stdapi_sys_config_getsid(self):
request = bytes()
response = bytes()
_result_code, result_tlvs = self.assertMethodErrorSuccess(
"stdapi_sys_config_getsid", request, response
)
sid = self.meterpreter_context["packet_get_tlv"](
result_tlvs, self.ext_server_stdapi["TLV_TYPE_SID"]
).get("value")
self.assertRegex(sid, "S-1-5-.*")
if __name__ == "__main__":
unittest.main()