diff --git a/python/meterpreter/README.md b/python/meterpreter/README.md index c29c9956..2bee202a 100644 --- a/python/meterpreter/README.md +++ b/python/meterpreter/README.md @@ -22,3 +22,9 @@ python3 ./tests/test_ext_server_stdapi.py TestExtServerStdApi.test_stdapi_net_co # Or: python3 -m unittest tests.test_ext_server_stdapi.ExtServerStdApiFileSystemTest.test_stdapi_fs_stat ``` + +To debug tests, add the following code snippet to enter into an interactive debugger at the calling stack frame: + +```python +import pdb; pdb.set_trace() +``` diff --git a/python/meterpreter/ext_server_stdapi.py b/python/meterpreter/ext_server_stdapi.py index e3243851..d7abbcd8 100644 --- a/python/meterpreter/ext_server_stdapi.py +++ b/python/meterpreter/ext_server_stdapi.py @@ -899,7 +899,7 @@ def get_stat_buffer(path): st_buf += struct.pack('<QQQQ', long(si.st_size), long(si.st_atime), long(si.st_mtime), long(si.st_ctime)) return st_buf -def get_token_user(handle): +def get_token_user_sid(handle): TokenUser = 1 advapi32 = ctypes.windll.advapi32 advapi32.OpenProcessToken.argtypes = [ctypes.c_void_p, ctypes.c_uint32, ctypes.POINTER(ctypes.c_void_p)] @@ -913,9 +913,17 @@ def get_token_user(handle): ctypes.windll.kernel32.CloseHandle(token_handle) if not result: return None - return ctstruct_unpack(TOKEN_USER, token_user_buffer) + token_user = ctstruct_unpack(TOKEN_USER, token_user_buffer) -def get_username_from_token(token_user): + GetLengthSid = ctypes.windll.advapi32.GetLengthSid + GetLengthSid.argtypes = [ctypes.c_void_p] + GetLengthSid.restype = ctypes.c_uint32 + sid_length = GetLengthSid(token_user.User.Sid) + sid_bytes = ctypes.string_at(token_user.User.Sid, sid_length) + + return sid_bytes + +def get_username_from_sid(sid): user = (ctypes.c_char * 512)() domain = (ctypes.c_char * 512)() user_len = ctypes.c_uint32() @@ -926,7 +934,7 @@ def get_username_from_token(token_user): use.value = 0 LookupAccountSid = ctypes.windll.advapi32.LookupAccountSidA LookupAccountSid.argtypes = [ctypes.c_void_p] * 7 - if not LookupAccountSid(None, token_user.User.Sid, user, ctypes.byref(user_len), domain, ctypes.byref(domain_len), ctypes.byref(use)): + if not LookupAccountSid(None, sid, user, ctypes.byref(user_len), domain, ctypes.byref(domain_len), ctypes.byref(use)): return None return str(ctypes.string_at(domain)) + '\\' + str(ctypes.string_at(user)) @@ -1232,13 +1240,13 @@ def stdapi_sys_config_getenv(request, response): @register_function_if(has_windll) def stdapi_sys_config_getsid(request, response): - token = get_token_user(ctypes.windll.kernel32.GetCurrentProcess()) - if not token: + sid = get_token_user_sid(ctypes.windll.kernel32.GetCurrentProcess()) + if not sid: return error_result_windows(), response sid_str = ctypes.c_char_p() ConvertSidToStringSid = ctypes.windll.advapi32.ConvertSidToStringSidA ConvertSidToStringSid.argtypes = [ctypes.c_void_p, ctypes.c_void_p] - if not ConvertSidToStringSid(token.User.Sid, ctypes.byref(sid_str)): + if not ConvertSidToStringSid(sid, ctypes.byref(sid_str)): return error_result_windows(), response sid_str = str(ctypes.string_at(sid_str)) response += tlv_pack(TLV_TYPE_SID, sid_str) @@ -1249,10 +1257,10 @@ def stdapi_sys_config_getuid(request, response): if has_pwd: username = pwd.getpwuid(os.getuid()).pw_name elif has_windll: - token = get_token_user(ctypes.windll.kernel32.GetCurrentProcess()) - if not token: + sid = get_token_user_sid(ctypes.windll.kernel32.GetCurrentProcess()) + if not sid: return error_result_windows(), response - username = get_username_from_token(token) + username = get_username_from_sid(sid) if not username: return error_result_windows(), response else: @@ -1607,9 +1615,9 @@ def stdapi_sys_process_get_processes_via_windll(request, response): else: exe_path = '' process_username = '' - process_token_user = get_token_user(proc_h) - if process_token_user: - process_username = get_username_from_token(process_token_user) or '' + process_token_user_sid = get_token_user_sid(proc_h) + if process_token_user_sid: + process_username = get_username_from_sid(process_token_user_sid) or '' parch = windll_GetNativeSystemInfo() is_wow64 = ctypes.c_ubyte() is_wow64.value = 0 diff --git a/python/meterpreter/tests/test_ext_server_stdapi.py b/python/meterpreter/tests/test_ext_server_stdapi.py index d60356e7..4ed4181f 100644 --- a/python/meterpreter/tests/test_ext_server_stdapi.py +++ b/python/meterpreter/tests/test_ext_server_stdapi.py @@ -13,6 +13,9 @@ else: ERROR_SUCCESS = 0 +is_windows = sys.platform.startswith("win") +windows_only_test_reason = "Windows only test" + def create_meterpreter_context(): with open("meterpreter.py", "rb") as file: @@ -69,6 +72,13 @@ class ExtServerStdApiTest(unittest.TestCase): self.assertEqual(result[0], ERROR_SUCCESS) self.assertIsInstance(result[1], bytes) + def assertRegex(self, text, regexp, msg=None): + # Python 2.7 + if self.assertRegexpMatches: + self.assertRegexpMatches(text, regexp, msg) + else: + super().assertRegex(text, regexp, msg) + class ExtServerStdApiNetworkTest(ExtServerStdApiTest): def test_stdapi_net_config_get_interfaces(self): @@ -202,31 +212,6 @@ en0: flags=8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500 self.assertEqual(result, expected) - @mock.patch("subprocess.Popen") - def test_stdapi_sys_process_get_processes_via_ps(self, mock_popen): - command_result = b""" - PID PPID USER COMMAND - 1 0 root /sbin/launchd - 88 1 root /usr/sbin/syslogd - 89 1 root /usr/libexec/UserEventAgent (System) -""".lstrip() - - process_mock = mock.Mock() - attrs = { - "communicate.return_value": (command_result, b""), - "wait.return_value": ERROR_SUCCESS, - } - process_mock.configure_mock(**attrs) - mock_popen.return_value = process_mock - - request = bytes() - response = bytes() - result = self.ext_server_stdapi["stdapi_sys_process_get_processes_via_ps"]( - request, response - ) - - self.assertErrorSuccess(result) - class ExtServerStdApiFileSystemTest(ExtServerStdApiTest): def test_stdapi_fs_stat(self): @@ -273,5 +258,68 @@ class ExtServerStdApiFileSystemTest(ExtServerStdApiTest): self.assertMethodErrorSuccess("stdapi_fs_stat", request, response) +class ExtServerStdApiSysProcess(ExtServerStdApiTest): + def test_stdapi_sys_process_get_processes(self): + request = bytes() + response = bytes() + result = self.assertMethodErrorSuccess( + "stdapi_sys_process_get_processes", request, response + ) + + self.assertErrorSuccess(result) + + @mock.patch("subprocess.Popen") + def test_stdapi_sys_process_get_processes_via_ps(self, mock_popen): + command_result = b""" + PID PPID USER COMMAND + 1 0 root /sbin/launchd + 88 1 root /usr/sbin/syslogd + 89 1 root /usr/libexec/UserEventAgent (System) +""".lstrip() + + process_mock = mock.Mock() + attrs = { + "communicate.return_value": (command_result, b""), + "wait.return_value": ERROR_SUCCESS, + } + process_mock.configure_mock(**attrs) + mock_popen.return_value = process_mock + + request = bytes() + response = bytes() + result = self.ext_server_stdapi["stdapi_sys_process_get_processes_via_ps"]( + request, response + ) + + self.assertErrorSuccess(result) + + +class ExtServerStdApiSystemConfigTest(ExtServerStdApiTest): + def test_stdapi_sys_config_getuid(self): + request = bytes() + response = bytes() + _result_code, result_tlvs = self.assertMethodErrorSuccess( + "stdapi_sys_config_getuid", request, response + ) + + user_name = self.meterpreter_context["packet_get_tlv"]( + result_tlvs, self.ext_server_stdapi["TLV_TYPE_USER_NAME"] + ).get("value") + self.assertRegex(user_name, ".+") + + @unittest.skipUnless(is_windows, windows_only_test_reason) + def test_stdapi_sys_config_getsid(self): + request = bytes() + response = bytes() + _result_code, result_tlvs = self.assertMethodErrorSuccess( + "stdapi_sys_config_getsid", request, response + ) + + sid = self.meterpreter_context["packet_get_tlv"]( + result_tlvs, self.ext_server_stdapi["TLV_TYPE_SID"] + ).get("value") + self.assertRegex(sid, "S-1-5-.*") + + if __name__ == "__main__": unittest.main()