From 3fedca008765e3a04a25d301b64b9243e3332bdb Mon Sep 17 00:00:00 2001 From: speyrefitte Date: Wed, 21 Jan 2026 17:03:17 +0100 Subject: [PATCH] plugins: Add kerberos plugin --- .../framework/plugins/windows/kerberos.py | 1042 +++++++++++++++++ .../framework/symbols/windows/kerberos.json | 629 ++++++++++ 2 files changed, 1671 insertions(+) create mode 100644 volatility3/framework/plugins/windows/kerberos.py create mode 100644 volatility3/framework/symbols/windows/kerberos.json diff --git a/volatility3/framework/plugins/windows/kerberos.py b/volatility3/framework/plugins/windows/kerberos.py new file mode 100644 index 0000000000..60ae3633a1 --- /dev/null +++ b/volatility3/framework/plugins/windows/kerberos.py @@ -0,0 +1,1042 @@ +""" +Klist plugin made with love by the Airbus CERT team + +The plugin intend to: + - Sessions : list sessions managed by the kerberos security provider + - Tickets : list ticket in cache + - Dump : Dump ticket in kirbi format + - VadTicketScan : Carve ticket into lsass process Vad +""" +import logging +from typing import Iterable, Tuple, List, Optional, Callable + +import os +import pefile + +from volatility3.framework import interfaces, symbols, exceptions, constants, objects +from volatility3.framework import renderers +from volatility3.framework.configuration import requirements +from volatility3.framework.layers import scanners +from volatility3.framework.objects import utility +from volatility3.framework.renderers import format_hints +from volatility3.framework.symbols import intermed +from volatility3.framework.symbols.windows import pdbutil, versions +from volatility3.framework.symbols.windows.extensions import pe, conversion +from volatility3.plugins.windows import pslist, vadinfo, pe_symbols + +vollog = logging.getLogger(__name__) + +def UnwrapOr(default): + def wrapper_1(function): + """Decorator to protect accessor of particular fields""" + def wrapper_2(*args, **kwargs): + try: + return function(*args, **kwargs) + except Exception as e: + return default + + return wrapper_2 + return wrapper_1 + +def _layout_scanner( + source: interfaces.objects.ObjectInterface, + object_type: str, + condition: Callable, + start=0x0, + increment=0x8, + max_offset=512 + ): + """Will scan an object layout memory by casting it into object_type and respect condition + source: object which want to test the layout + condition: function call to to know if object was found + + :return: list of offset from source where condition is valid + """ + result = [] + # align for 64 bits + offset = start + layer = source._context.layers[source.vol.layer_name] + while offset < max_offset: + obj = source._context.object( + layer_name=source.vol.layer_name, + object_type=object_type, + offset=source.vol.offset + offset + ) + + if condition(obj): + result.append(offset) + + offset += 0x8 + return result + +def _scan_for_list_header( + obj: interfaces.objects.ObjectInterface, + max_offset=512, + max_element=50, + inc=0x8 + ): + """Will test the LIST_ENTRY by looping over all element and test if it come back to the original + + """ + result = [] + # align for 64 bits + offset = 0 + while (offset < max_offset and inc > 0) or (offset > max_offset and inc < 0): + list_head = obj._context.object( + layer_name=obj.vol.layer_name, + object_type=obj._context.modules['kernel'].symbol_table_name+constants.BANG+'_LIST_ENTRY', + offset=obj.vol.offset + offset + ) + + try: + index_element = 0 + element = list_head + while element.Flink != list_head.vol.offset: + element = element.Flink + index_element += 1 + if index_element > max_element: + raise exceptions.VolatilityException("Max element reach before looping") + + #find + result.append(offset) + + except: + """The list does not loop on self element so not a list""" + + offset += inc + return result + +def _scan_for_unicode_string(obj: interfaces.objects.ObjectInterface): + """Search for unicode string where pattern is MaxLength = Length + 2 (null terminator)""" + return _layout_scanner( + source=obj, + object_type=obj._context.modules['kernel'].symbol_table_name+constants.BANG+'_UNICODE_STRING', + condition=lambda s: s.Length + 2 == s.MaximumLength and obj._context.layers[obj.vol.layer_name].is_valid(s.Buffer, s.MaximumLength) + ) + +def _scan_for_timestamp(obj, max_offset=512): + """ Search for timestamp integer that start with 0x01D""" + return _layout_scanner( + source=obj, + object_type=obj._context.modules['kernel'].symbol_table_name+constants.BANG+'unsigned long long', + condition=lambda timestamp: (timestamp & 0xff00000000000000 == 0x0100000000000000) and not obj._context.layers[obj.vol.layer_name].is_valid(timestamp, 8) + ) + +def _etype_str(etype:int) -> str: + if etype == 18: + return "AES_256_CTS_HMAC_SHA_196" + elif etype == 17: + return "AES_128_CTS_HMAC_SHA_196" + elif etype == 23: + return "RC4_HMAC" + else: + return "UNKNOWN_ETYPE" + +class KERB_INTERNAL_NAME(objects.StructType): + """Use to reprensent an internal name in kerberos + + The name is concataned using '/' char + """ + @UnwrapOr("UNKNOWN") + def get_string(self): + result = [] + for i in range(0, self.NbString): + result.append( + self._context.object( + layer_name=self.vol.layer_name, + object_type=self.array.get_symbol_table_name() + constants.BANG + "_UNICODE_STRING", + offset=self.array.vol.offset + i * 0x10 + ).String + ) + return "/".join(result) + + +class KERB_CREDENTIAL(objects.StructType): + """Use to format the session id to have the same everywhere""" + __primary_offset__ = None + + @UnwrapOr(None) + def get_primary(self): + """ Search for primary struct whch start by two unicode string""" + self.__class__.__primary_offset__ = self.__class__.__primary_offset__ or _scan_for_unicode_string(self)[0] + + return self._context.object( + layer_name=self.vol.layer_name, + object_type=self.get_symbol_table_name() + constants.BANG + "_KERB_PRIMARY_CREDENTIAL", + offset= self.vol.offset + self.__class__.__primary_offset__ + ) + + def get_logon_session_id(self): + if self.LogonSessionId_1.HighPart == 0 and self.LogonSessionId_1.LowPart == 0: + return self.LogonSessionId_2 + else: + return self.LogonSessionId_1 + + +class KERB_PRIMARY_CREDENTIAL(objects.StructType): + __ticket_cache_offset__ = None + + @UnwrapOr(None) + def get_ticket_cache(self): + """Ticket cache are three _LIST_ENTRY in the Credential struct""" + self.__class__.__ticket_cache_offset__ = self.__class__.__ticket_cache_offset__ or _scan_for_list_header(self)[:3] + return [ + self._context.object( + layer_name=self.vol.layer_name, + object_type=self.Username.get_symbol_table_name() + constants.BANG + "_LIST_ENTRY", + offset=self.vol.offset + offset + ) for offset in self.__class__.__ticket_cache_offset__ + ] + +class KERB_TICKET_INFO(objects.StructType): + """Dump ticket info name into Kirbi format""" + __ticket_time_offset__ = None + __ticket_offset__ = None + __session_key_offset__ = None + __client_name_offset__ = None + + def reset_cache(): + KERB_TICKET_INFO.__ticket_time_offset__ = None + KERB_TICKET_INFO.__ticket_offset__ = None + KERB_TICKET_INFO.__session_key_offset__ = None + KERB_TICKET_INFO.__client_name_offset__ = None + + def _scan_for_ticket(self): + """Search for Ticket where tkt_vno is always 5 (specification) and EType is in [1, 3, 17, 18, 23, 24]""" + return _layout_scanner( + source=self, + object_type=self.get_symbol_table_name() + constants.BANG + "_KERB_TICKET", + condition=lambda ticket: ticket.tkt_vno == 5 and ticket.EType in [1, 3, 17, 18, 23, 24] + )[0] + + def _scan_for_session_key(self): + """Session key layout is more challenging and could failed as the layout is very dependant to Windows version""" + for key_session_type in ["KERB_SESSION_KEY_V1", "KERB_SESSION_KEY_V2"]: + + scan = _layout_scanner( + source=self, + object_type=self.get_symbol_table_name() + constants.BANG + key_session_type, + condition=lambda session_key: (session_key.EType == 18 and session_key.Size == 0x20) or (session_key.EType == 17 and session_key.Size == 0x10) or (session_key.EType == 23 and session_key.Size == 0x10) + ) + + if scan: + return scan[0], key_session_type + + return None + + def _scan_for_client_name(self): + """client name is the first valid address from kdcCalled offset""" + return _layout_scanner( + source=self, + start=self.KdcCalled.vol.offset + 0x10 - self.vol.offset, + object_type=self.get_symbol_table_name() + constants.BANG + "unsigned long long", + condition=lambda pointer: self._context.layers[self.vol.layer_name].is_valid(pointer) + )[0] + + + def get_client_name(self): + self.__class__.__client_name_offset__ = self.__class__.__client_name_offset__ or self._scan_for_client_name() + return self._context.object( + layer_name=self.vol.layer_name, + object_type=self.get_symbol_table_name() + constants.BANG + "pointer", + subtype=self._context.modules[self.get_symbol_table_name()].get_type("_KERB_INTERNAL_NAME"), + offset=self.vol.offset + self.__class__.__client_name_offset__ + ) + + @UnwrapOr(-1) + def get_ticket_time(self): + """ Search for at least three timestamp in the info layout""" + self.__class__.__ticket_time_offset__ = self.__class__.__ticket_time_offset__ or (_scan_for_timestamp(self) + [0,0,0])[:3] + return [ + self._context.object( + layer_name=self.vol.layer_name, + object_type=self.get_symbol_table_name( ) + constants.BANG + "unsigned long long", + offset=self.vol.offset + offset + ) for offset in self.__class__.__ticket_time_offset__ + ] + + @UnwrapOr(None) + def get_ticket(self): + """The layout of ticket is just after the renew time""" + self.__class__.__ticket_offset__ = self.__class__.__ticket_offset__ or self._scan_for_ticket() + return self._context.object( + layer_name=self.vol.layer_name, + object_type=self.get_symbol_table_name() + constants.BANG + "_KERB_TICKET", + offset=self.vol.offset + self.__class__.__ticket_offset__ + ) + + @UnwrapOr(None) + def get_session_key(self): + self.__class__.__session_key_offset__ = self.__class__.__session_key_offset__ or self._scan_for_session_key() + return self._context.object( + layer_name=self.vol.layer_name, + object_type=self.get_symbol_table_name() + constants.BANG + self.__class__.__session_key_offset__[1], + offset=self.vol.offset + self.__class__.__session_key_offset__[0] + ) + + @UnwrapOr(0) + def get_ticket_flags(self): + """Ticket flag are 0x16 far from Client Name scan result""" + self.__class__.__client_name_offset__ = self.__class__.__client_name_offset__ or self._scan_for_client_name() + return self._context.object( + layer_name=self.vol.layer_name, + object_type=self.get_symbol_table_name() + constants.BANG + "unsigned long long", + offset=self.vol.offset + self.__class__.__client_name_offset__ + 0x10 + ) + + @UnwrapOr("Unable to parse service name") + def get_service_name(self): + return self.ServiceName + + @UnwrapOr(-1) + def get_start_time(self): + return self.get_ticket_time()[0] + + @UnwrapOr(-1) + def get_end_time(self): + return self.get_ticket_time()[1] + + @UnwrapOr(-1) + def get_renew_time(self): + return self.get_ticket_time()[2] + + + def toKRBCRED(self): + """Function inspired by impacket convertion script""" + try: + from impacket.krb5.asn1 import AS_REP, seq_set, TGS_REP, EncTGSRepPart, EncASRepPart, Ticket, KRB_CRED, EncKrbCredPart, KrbCredInfo, seq_set_iter, PrincipalName, EncryptedData + from impacket.krb5.types import KerberosTime + from pyasn1.type.univ import noValue + from datetime import datetime, timezone + from pyasn1.codec.der import decoder, encoder + except: + raise exceptions.VolatilityException("Unable to dump ticket, install impacket before") + + session_key = self.get_session_key() + if session_key is None: + raise Exception("Unable to find session key") + + ticket = self.get_ticket() + if ticket is None: + raise Exception("Unable to find ticket") + + krbCredInfo = KrbCredInfo() + + krbCredInfo['key'] = noValue + krbCredInfo['key']['keytype'] = session_key.EType + krbCredInfo['key']['keyvalue'] = self._context.layers[self.vol.layer_name].read(session_key.Buffer, session_key.Size) + + + krbCredInfo['prealm'] = ticket.Realm.dereference().cast("string", encoding="utf-8", errors="replace", max_length=512) + + krbCredInfo['pname'] = noValue + krbCredInfo['pname']['name-type'] = self.get_client_name().dereference().PrincipalType + seq_set_iter(krbCredInfo['pname'], 'name-string', (self.get_client_name().dereference().get_string(),)) + + krbCredInfo['flags'] = self.get_ticket_flags() + + krbCredInfo['starttime'] = KerberosTime.to_asn1(conversion.wintime_to_datetime(self.get_start_time())) + krbCredInfo['endtime'] = KerberosTime.to_asn1(conversion.wintime_to_datetime(self.get_end_time())) + krbCredInfo['renew-till'] = KerberosTime.to_asn1(conversion.wintime_to_datetime(self.get_renew_time())) + + krbCredInfo['srealm'] = self.ServerRealm.String + + krbCredInfo['sname'] = noValue + krbCredInfo['sname']['name-type'] = self.ServiceName.dereference().PrincipalType + tmp_service_class, tmp_service_hostname, *_ = self.ServiceName.dereference().get_string().split('/') + ["unknown", "unknown"] + seq_set_iter(krbCredInfo['sname'], 'name-string', (tmp_service_class, tmp_service_hostname)) + + encKrbCredPart = EncKrbCredPart() + seq_set_iter(encKrbCredPart, 'ticket-info', (krbCredInfo,)) + + krbCred = KRB_CRED() + krbCred['pvno'] = 5 + krbCred['msg-type'] = 22 + + krbCred['enc-part'] = noValue + krbCred['enc-part']['etype'] = 0 + krbCred['enc-part']['cipher'] = encoder.encode(encKrbCredPart) + + newticket = Ticket() + newticket['tkt-vno'] = ticket.tkt_vno + + newticket['sname'] = noValue + newticket['sname']['name-type'] = self.ServiceName.dereference().PrincipalType + seq_set_iter(newticket['sname'], 'name-string', (tmp_service_class, tmp_service_hostname)) + + newticket['realm'] = self.ServerRealm.String + + newticket['enc-part']['etype'] = ticket.EType + newticket['enc-part']['kvno'] = ticket.Kvno + newticket['enc-part']['cipher'] = self._context.layers[self.vol.layer_name].read(ticket.Cipher, ticket.CipherLength) + + seq_set_iter(krbCred, 'tickets', (newticket,)) + + encodedKrbCred = encoder.encode(krbCred) + + return encodedKrbCred + +class KlistCommand: + + def _find_global_session_table_with_pdb_symbols( + self, + kerberos_symbols: str, + kerberos_types: interfaces.context.ModuleInterface, + proc_layer_name: str, + kerberos_base: int, + ) -> interfaces.objects.ObjectInterface: + """ + Find the Global Session Table base on public symbol + """ + + + kerberos_module = self.context.module( + kerberos_symbols, layer_name=proc_layer_name, offset=kerberos_base + ) + + KerbGlobalLogonSessionTableAddr = kerberos_module.get_symbol( + "?KerbGlobalLogonSessionTable@@3U_RTL_AVL_TABLE@@A" + ) + + if KerbGlobalLogonSessionTableAddr is None: + raise exceptions.SymbolError("Unable to find the KerbGlobalLogonSessionTable symbol") + + KerbGlobalLogonSessionTable = kerberos_types.object( + object_type="_RTL_AVL_TABLE", + offset=KerbGlobalLogonSessionTableAddr.address + ) + + return KerbGlobalLogonSessionTable + + def _get_kerberos_types( + self, + context: interfaces.context.ContextInterface, + config, + config_path: str, + proc_layer_name: str, + kerberos_base: int, + ) -> interfaces.context.ModuleInterface: + """ + Builds a symbol table from the kerberos types generated after binary analysis + + Args: + context: the context to operate upon + config: + config_path: + proc_layer_name: name of the lsass.exe process layer + kerberos_base: base address of kerberos.dll inside of lsass.exe + """ + + kernel = self.context.modules[self.config["kernel"]] + table_mapping = {"nt_symbols": kernel.symbol_table_name} + is_64bit = symbols.symbol_table_is_64bit( + context=context, symbol_table_name=kernel.symbol_table_name + ) + + kerberos_symbol_table = intermed.IntermediateSymbolTable.create( + context=context, + config_path=config_path, + sub_path="windows", + filename="kerberos", + class_types={ + "_KERB_INTERNAL_NAME": KERB_INTERNAL_NAME, + "_KERB_CREDENTIAL": KERB_CREDENTIAL, + "_KERB_PRIMARY_CREDENTIAL": KERB_PRIMARY_CREDENTIAL, + "_KERB_TICKET_INFO": KERB_TICKET_INFO + }, + table_mapping=table_mapping, + ) + + return context.module( + kerberos_symbol_table, proc_layer_name, offset=kerberos_base + ) + + def _find_lsass_proc( + self, proc_list: Iterable + ) -> Tuple[interfaces.context.ContextInterface, str]: + """ + Walks the process list and returns the first valid lsass instances. + There should be only one lsass process, but malware will often use the + process name to try and blend in. + + Args: + proc_list: The process list generator + + Return: + The process object for lsass + """ + + for proc in proc_list: + try: + proc_layer_name = proc.add_process_layer() + + return proc, proc_layer_name + + except exceptions.InvalidAddressException as excp: + vollog.debug( + f"Invalid address {excp.invalid_address} in layer {excp.layer_name}" + ) + + return None, None + + def _find_kerberos( + self, lsass_proc: interfaces.context.ContextInterface + ) -> Tuple[int, int]: + """ + """ + for vad in lsass_proc.get_vad_root().traverse(): + filename = vad.get_file_name() + + if isinstance(filename, str) and filename.lower().endswith("kerberos.dll"): + base = vad.get_start() + return base, vad.get_size() + + return None, None + + def _search_sessions_from_root( + self, avl_root_node: interfaces.objects.ObjectInterface + ) -> List[interfaces.objects.ObjectInterface]: + """ + Recurcively parse the tree to find each node from the BalancedRoot + """ + + return self._search_sessions(avl_root_node.LeftChild) + self._search_sessions(avl_root_node.RightChild) + + def _search_sessions( + self, avl_node: interfaces.objects.ObjectInterface + ) -> List[interfaces.objects.ObjectInterface]: + """ + Will output every node of the tree casted into _KERB_LOGON_SESSION_ENTRY + """ + + if avl_node == 0: + return [] + try: + return [avl_node.dereference().cast("_KERB_LOGON_SESSION_ENTRY")] + self._search_sessions(avl_node.LeftChild) + self._search_sessions(avl_node.RightChild) + except exceptions.PagedInvalidAddressException: + return [] + + + def _find_kerberos_ticket( + self, + kerberos_types: interfaces.context.ModuleInterface, + kerberos_logon_session: interfaces.objects.ObjectInterface, + ): + + ticket_caches = kerberos_logon_session.Credentials.get_primary().get_ticket_cache() + + for cache in ticket_caches: + try: + next_ticket = cache.Flink + while next_ticket != cache.vol.offset: + ticket_info = kerberos_types.object( + object_type="_KERB_TICKET_INFO", + offset=next_ticket, + absolute=True + ) + yield kerberos_logon_session, ticket_info + next_ticket = next_ticket.Flink + except exceptions.InvalidAddressException: + vollog.info("_LIST_ENTRY corrupted") + + def _load_kerberos_type_and_symbol( + self, + procs + ) -> Tuple[str, int, interfaces.context.ModuleInterface, str]: + """Will find type and symbol for the kerberos.dll + """ + kernel = self.context.modules[self.config["kernel"]] + + if not symbols.symbol_table_is_64bit( + context=self.context, symbol_table_name=kernel.symbol_table_name + ): + vollog.info("This plugin only supports 64bit Windows memory samples") + return None + + + + lsass_proc, proc_layer_name = self._find_lsass_proc(procs) + + if not lsass_proc: + vollog.info( + "Unable to find a valid lsass.exe process in the process list. This should never happen. Analysis cannot proceed." + ) + return None + + kerberos_base, kerberos_size = self._find_kerberos(lsass_proc) + if not kerberos_base: + vollog.info( + "Unable to find the location of kerberos.dll inside of lsass.exe. Analysis cannot proceed." + ) + return None + + kerberos_types = self._get_kerberos_types( + self.context, self.config, self.config_path, proc_layer_name, kerberos_base + ) + + try: + kerberos_symbols = pdbutil.PDBUtility.symbol_table_from_pdb( + self.context, + interfaces.configuration.path_join(self.config_path, "kerberos"), + proc_layer_name, + "KERBEROS.pdb", + kerberos_base, + kerberos_size, + ) + except exceptions.VolatilityException as e: + vollog.debug( + f"Unable to use the kerberos PDB. Stopping PDB symbols based analysis : {e}" + ) + return None + + return proc_layer_name, kerberos_base, kerberos_types, kerberos_symbols + + def _find_kerberos_sessions( + self, + proc_layer_name, + kerberos_base, + kerberos_types, + kerberos_symbols + ) -> List[interfaces.objects.ObjectInterface]: + """ + This function will find the Global session table, parse the tree and output all found sessions + as _KERB_LOGON_SESSION_ENTRY + """ + + global_session_table = self._find_global_session_table_with_pdb_symbols( + kerberos_symbols, kerberos_types, proc_layer_name, kerberos_base + ) + + return self._search_sessions_from_root(global_session_table.BalancedRoot) + + def _lsass_proc_filter(self, proc): + """ + Used to filter to only lsass.exe processes + + There should only be one of these, but malware can/does make lsass.exe + named processes to blend in or uses lsass.exe as a process hollowing target + """ + process_name = utility.array_to_string(proc.ImageFileName) + + return process_name != "lsass.exe" + +class Sessions(interfaces.plugins.PluginInterface, KlistCommand): + """Looks for sessions managed by the kerberos security provider""" + + _required_framework_version = (2, 4, 0) + _version = (1, 0, 0) + + @classmethod + def get_requirements(cls): + # Since we're calling the plugin, make sure we have the plugin's requirements + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", component=pslist.PsList, version=(3, 0, 0) + ), + requirements.VersionRequirement( + name="vadinfo", component=vadinfo.VadInfo, version=(2, 0, 0) + ), + requirements.VersionRequirement( + name="pdbutil", component=pdbutil.PDBUtility, version=(1, 0, 0) + ), + requirements.VersionRequirement( + name="pe_symbols", component=pe_symbols.PESymbols, version=(3, 0, 0) + ), + requirements.VersionRequirement( + name="bytes_scanner", + component=scanners.BytesScanner, + version=(1, 0, 0), + ), + ] + + def _generator(self, procs): + """ + Args: + procs: the process list filtered to lsass.exe instances + """ + proc_layer_name, kerberos_base, kerberos_types, kerberos_symbols = self._load_kerberos_type_and_symbol(procs) + kerberos_sessions = self._find_kerberos_sessions( + proc_layer_name, kerberos_base, kerberos_types, kerberos_symbols + ) + + + for session in kerberos_sessions: + logon_session_entry = session.entry.dereference().cast('_KERB_LOGON_SESSION') + try: + yield (0, + ( + "%s:%s"%(hex(logon_session_entry.Credentials.get_logon_session_id().HighPart)[2:],hex(logon_session_entry.Credentials.get_logon_session_id().LowPart)[2:]), + logon_session_entry.Credentials.get_primary().Username.String, + logon_session_entry.Credentials.get_primary().Realm.String, + len(list(self._find_kerberos_ticket(kerberos_types, logon_session_entry))) + ) + ) + except exceptions.InvalidAddressException as e: + vollog.debug( + "Unable to parse session %s"%str(e) + ) + continue + + def run(self): + return renderers.TreeGrid( + [ + ("Session", str), + ("Domain", str), + ("TargetUsername", str), + ("Nb Tickets", int), + ], + self._generator( + pslist.PsList.list_processes( + context=self.context, + kernel_module_name=self.config["kernel"], + filter_func=self._lsass_proc_filter, + ) + ), + ) + +class Tickets(interfaces.plugins.PluginInterface, KlistCommand): + """Looks for tickets managed by the kerberos security provider""" + + _required_framework_version = (2, 4, 0) + _version = (1, 0, 0) + + @classmethod + def get_requirements(cls): + # Since we're calling the plugin, make sure we have the plugin's requirements + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", component=pslist.PsList, version=(3, 0, 0) + ), + requirements.VersionRequirement( + name="vadinfo", component=vadinfo.VadInfo, version=(2, 0, 0) + ), + requirements.VersionRequirement( + name="pdbutil", component=pdbutil.PDBUtility, version=(1, 0, 0) + ), + requirements.VersionRequirement( + name="pe_symbols", component=pe_symbols.PESymbols, version=(3, 0, 0) + ), + requirements.VersionRequirement( + name="bytes_scanner", + component=scanners.BytesScanner, + version=(1, 0, 0), + ) + ] + + def _generator(self, procs): + """ + """ + proc_layer_name, kerberos_base, kerberos_types, kerberos_symbols = self._load_kerberos_type_and_symbol(procs) + kerberos_sessions = self._find_kerberos_sessions( + proc_layer_name, kerberos_base, kerberos_types, kerberos_symbols + ) + + for session in kerberos_sessions: + logon_session_entry = session.entry.dereference().cast('_KERB_LOGON_SESSION') + + for session, ticket_info in self._find_kerberos_ticket(kerberos_types, logon_session_entry): + primary = session.Credentials.get_primary() + username = "UNKNOWN" + realm = "UNKNOWN" + if not primary is None: + username = session.Credentials.get_primary().Username.String + realm = session.Credentials.get_primary().Realm.String + + ticket = ticket_info.get_ticket() + ticket_etype = -1 + if not ticket is None: + ticket_etype = ticket.EType + + session_key = ticket_info.get_session_key() + session_key_etype = -1 + if not session_key is None: + session_key_etype = session_key.EType + + yield (0, ( + format_hints.Hex(ticket_info.vol.offset), + "%s @ %s"%(username, realm), + "%s @ %s"%(ticket_info.ServiceName.dereference().get_string(), realm), + _etype_str(ticket_etype), + format_hints.Hex(ticket_info.get_ticket_flags()), + str(conversion.wintime_to_datetime(ticket_info.get_start_time())), + str(conversion.wintime_to_datetime(ticket_info.get_end_time())), + str(conversion.wintime_to_datetime(ticket_info.get_renew_time())), + _etype_str(session_key_etype), + ticket_info.KdcCalled.String, + )) + + + def run(self): + return renderers.TreeGrid( + [ + ("Address", format_hints.Hex), + ("Client", str), + ("Server", str), + ("KerbTicket Encryption Type", str), + ("Ticket Flags", format_hints.Hex), + ("Start Time", str), + ("End Time", str), + ("Renew Time", str), + ("Session Key Type", str), + ("Kdc Called", str), + ], + self._generator( + pslist.PsList.list_processes( + context=self.context, + kernel_module_name=self.config["kernel"], + filter_func=self._lsass_proc_filter, + ) + ), + ) + + +class Dump(interfaces.plugins.PluginInterface, KlistCommand): + """Dump in memory ticket into Kirbi format""" + + _required_framework_version = (2, 4, 0) + _version = (1, 0, 0) + + @classmethod + def get_requirements(cls): + # Since we're calling the plugin, make sure we have the plugin's requirements + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", component=pslist.PsList, version=(3, 0, 0) + ), + requirements.VersionRequirement( + name="vadinfo", component=vadinfo.VadInfo, version=(2, 0, 0) + ), + requirements.VersionRequirement( + name="pdbutil", component=pdbutil.PDBUtility, version=(1, 0, 0) + ), + requirements.VersionRequirement( + name="pe_symbols", component=pe_symbols.PESymbols, version=(3, 0, 0) + ), + requirements.VersionRequirement( + name="bytes_scanner", + component=scanners.BytesScanner, + version=(1, 0, 0), + ), + requirements.ListRequirement( + name="address", + element_type=int, + description="Address of ticket to dump", + optional=False, + ), + requirements.ListRequirement( + name="output", + element_type=str, + description="Output file path", + optional=False, + ) + ] + + def _generator(self, procs): + proc_layer_name, kerberos_base, kerberos_types, kerberos_symbols = self._load_kerberos_type_and_symbol(procs) + kerberos_sessions = self._find_kerberos_sessions( + proc_layer_name, kerberos_base, kerberos_types, kerberos_symbols + ) + + for address in self.config["address"]: + try: + ticket = kerberos_types.object( + object_type="_KERB_TICKET_INFO", + offset=address, + absolute=True + ) + + path = os.path.join(self.config["output"][0], "%s.kirbi"%hex(ticket.vol.offset)) + with open(path, "wb") as output: + output.write(ticket.toKRBCRED()) + yield (0, (path,)) + except Exception as e: + yield (0, ("%s : %s"%(hex(address), str(e)),)) + + def run(self): + procs = pslist.PsList.list_processes( + context=self.context, + kernel_module_name=self.config["kernel"], + filter_func=self._lsass_proc_filter, + ) + + return renderers.TreeGrid( + [ + ("Output", str) + ], + self._generator( + pslist.PsList.list_processes( + context=self.context, + kernel_module_name=self.config["kernel"], + filter_func=self._lsass_proc_filter, + ) + ), + ) + +class VadTicketScan(interfaces.plugins.PluginInterface, KlistCommand): + """Scan VAD space and carve ticket layout to find unreferenced ticket in the cache + """ + + _required_framework_version = (2, 4, 0) + _version = (0, 1, 0) + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", component=pslist.PsList, version=(3, 0, 0) + ) + ] + + def _generator(self, procs): + lsass_proc, proc_layer_name = self._find_lsass_proc(procs) + + if not lsass_proc: + vollog.info( + "Unable to find a valid lsass.exe process in the process list. This should never happen. Analysis cannot proceed." + ) + return None + + kerberos_base, kerberos_size = self._find_kerberos(lsass_proc) + if not kerberos_base: + vollog.info( + "Unable to find the location of kerberos.dll inside of lsass.exe. Analysis cannot proceed." + ) + return None + + kerberos_types = self._get_kerberos_types( + self.context, self.config, self.config_path, proc_layer_name, kerberos_base + ) + + kernel = self.context.modules[self.config["kernel"]] + + sanity_check = 1024 * 1024 * 1024 # 1 GB + + + max_vad_size = 0 + vad_maps_to_scan = [] + + for start, size in self.get_vad_maps(lsass_proc): + if size > sanity_check: + vollog.debug( + f"VAD at 0x{start:x} over sanity-check size, not scanning" + ) + continue + max_vad_size = max(max_vad_size, size) + vad_maps_to_scan.append((start, size)) + + if not vad_maps_to_scan: + vollog.warning( + f"No VADs were found for task {lsass_proc.UniqueProcessId}, not scanning" + ) + return + + for start, size in vad_maps_to_scan: + + buffer = self.context.layers[proc_layer_name].read(start, size, pad=True) + # We will find every 5 value which is one of the marker of kerberos ticket + offset = buffer.find(5) + while offset != -1: + try: + ticket = kerberos_types.object( + object_type="_KERB_TICKET", + offset=start + offset - 4, + absolute=True + ) + if ticket.tkt_vno == 5 and ticket.EType in [1, 3, 17, 18, 23, 24]: + #Found a candidate try to retrieve ticket_info list header in backward + header_offset = _scan_for_list_header(ticket, max_offset=-0x200, inc=-0x8)[0] + + # reset cache to avoid layout computing on bad object + KERB_TICKET_INFO.reset_cache() + + ticket_info = kerberos_types.object( + object_type="_KERB_TICKET_INFO", + offset=ticket.vol.offset + header_offset, + absolute=True + ) + + yield (0, ( + format_hints.Hex(ticket_info.vol.offset), + str(ticket_info.get_client_name().dereference().get_string()), + str(ticket_info.ServiceName.dereference().get_string()), + _etype_str(int(ticket_info.get_ticket().EType)), + format_hints.Hex(int(ticket_info.get_ticket_flags())), + str(conversion.wintime_to_datetime(ticket_info.get_start_time())), + str(conversion.wintime_to_datetime(ticket_info.get_end_time())), + str(conversion.wintime_to_datetime(ticket_info.get_renew_time())), + _etype_str(int(ticket_info.get_session_key().EType)), + ticket_info.KdcCalled.String, + )) + + except: + pass + + offset = buffer.find(5, offset + 1) + + + + @staticmethod + def get_vad_maps( + task: interfaces.objects.ObjectInterface, + ) -> Iterable[Tuple[int, int]]: + """Creates a map of start/end addresses within a virtual address + descriptor tree. + + Args: + task: The EPROCESS object of which to traverse the vad tree + + Returns: + An iterable of tuples containing start and size for each descriptor + """ + vad_root = task.get_vad_root() + for vad in vad_root.traverse(): + yield (vad.get_start(), vad.get_size()) + + def run(self): + procs = pslist.PsList.list_processes( + context=self.context, + kernel_module_name=self.config["kernel"], + filter_func=self._lsass_proc_filter, + ) + + return renderers.TreeGrid( + [ + ("Address", format_hints.Hex), + ("Client", str), + ("Server", str), + ("KerbTicket Encryption Type", str), + ("Ticket Flags", format_hints.Hex), + ("Start Time", str), + ("End Time", str), + ("Renew Time", str), + ("Session Key Type", str), + ("Kdc Called", str), + ], + self._generator( + pslist.PsList.list_processes( + context=self.context, + kernel_module_name=self.config["kernel"], + filter_func=self._lsass_proc_filter, + ) + ), + ) \ No newline at end of file diff --git a/volatility3/framework/symbols/windows/kerberos.json b/volatility3/framework/symbols/windows/kerberos.json new file mode 100644 index 0000000000..416c087c82 --- /dev/null +++ b/volatility3/framework/symbols/windows/kerberos.json @@ -0,0 +1,629 @@ +{ + "symbols": {}, + "user_types": { + "_LARGE_INTEGER": { + "fields": { + "HighPart": { + "offset": 4, + "type": { + "kind": "base", + "name": "unsigned long" + } + }, + "LowPart": { + "offset": 0, + "type": { + "kind": "base", + "name": "unsigned long" + } + }, + "QuadPart": { + "offset": 0, + "type": { + "kind": "base", + "name": "unsigned long long" + } + } + }, + "kind": "union", + "size": 8 + }, + "LUID": { + "fields": { + "LowPart": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 0 + }, + "HighPart": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 4 + } + }, + "kind": "struct", + "size": 8 + }, + "RTL_BALANCED_LINKS": { + "fields": { + "Parent": { + "type": { + "kind": "pointer", + "subtype": { + "kind": "struct", + "name": "RTL_BALANCED_LINKS" + } + }, + "offset": 0 + }, + "LeftChild": { + "type": { + "kind": "pointer", + "subtype": { + "kind": "struct", + "name": "RTL_BALANCED_LINKS" + } + }, + "offset": 8 + }, + "RightChild": { + "type": { + "kind": "pointer", + "subtype": { + "kind": "struct", + "name": "RTL_BALANCED_LINKS" + } + }, + "offset": 16 + }, + "Balance": { + "type": { + "kind": "base", + "name": "unsigned char" + }, + "offset": 24 + }, + "Reserved": { + "type": { + "kind": "array", + "subtype": { + "kind": "base", + "name": "unsigned char" + }, + "count": 3 + }, + "offset": 25 + }, + "pad": { + "type": { + "kind": "array", + "subtype": { + "kind": "base", + "name": "unsigned char" + }, + "count": 4 + }, + "offset": 28 + } + }, + "kind": "struct", + "size": 32 + }, + "_RTL_AVL_TABLE": { + "fields": { + "BalancedRoot": { + "type": { + "kind": "struct", + "name": "RTL_BALANCED_LINKS" + }, + "offset": 0 + }, + "OrderedPointer": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 32 + }, + "WhichOrderedElement": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 40 + }, + "NumberGenericTableElements": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 44 + }, + "DepthOfTree": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 48 + }, + "pad1": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 52 + }, + "RestartKey": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 56 + }, + "DeleteCount": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 64 + }, + "pad2": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 68 + }, + "CompareRoutine": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 72 + }, + "AllocateRoutine": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 80 + }, + "FreeRoutine": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 88 + }, + "TableContext": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 96 + } + }, + "kind": "struct", + "size": 104 + }, + "_KERB_PLAINTEXT_PASSWORD": { + "fields": { + "unkObj": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 0 + }, + "isProtected": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 8 + }, + "str": { + "type": { + "kind": "struct", + "name": "nt_symbols!_UNICODE_STRING" + }, + "offset": 16 + } + }, + "kind": "struct", + "size": 32 + }, + "KERB_SESSION_KEY_V1": { + "fields": { + "EType": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 0 + }, + "Size": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 8 + }, + "Buffer": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 16 + } + }, + "kind": "struct", + "size": 24 + }, + "KERB_SESSION_KEY_V2": { + "fields": { + "EType": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 4 + }, + "Size": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 8 + }, + "Buffer": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 16 + } + }, + "kind": "struct", + "size": 24 + }, + "_KERB_TICKET": { + "fields": { + "pad": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 0 + }, + "tkt_vno": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 4 + }, + "Realm": { + "type": { + "kind": "pointer", + "subtype":{ + "kind": "base", + "name": "unsigned char" + } + }, + "offset": 8 + }, + "unk_0": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 16 + }, + "Sname": { + "type": { + "kind": "pointer", + "subtype": { + "kind": "struct", + "name": "_KERB_INTERNAL_NAME" + } + }, + "offset": 24 + }, + "unk_1": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 32 + }, + "EType": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 36 + }, + "Kvno": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 40 + }, + "CipherLength": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 48 + }, + "Cipher": { + "type": { + "kind": "pointer", + "subtype": { + "kind": "base", + "name": "unsigned char" + } + }, + "offset": 56 + } + }, + "kind": "struct", + "size": 64 + }, + "_KERB_PRIMARY_CREDENTIAL": { + "fields": { + "Username": { + "type": { + "kind": "struct", + "name": "nt_symbols!_UNICODE_STRING" + }, + "offset": 0 + }, + "Realm": { + "type": { + "kind": "struct", + "name": "nt_symbols!_UNICODE_STRING" + }, + "offset": 16 + } + }, + "kind": "struct", + "size": 0 + }, + "_KERB_CREDENTIAL": { + "fields": { + "head": { + "type": { + "kind": "struct", + "name": "nt_symbols!_LIST_ENTRY" + }, + "offset": 0 + }, + "LogonSessionId_1": { + "type": { + "kind": "struct", + "name": "LUID" + }, + "offset": 56 + }, + "LogonSessionId_2": { + "type": { + "kind": "struct", + "name": "LUID" + }, + "offset": 64 + } + }, + + "kind": "struct", + "size": 0 + }, + "_KERB_LOGON_SESSION": { + "fields": { + "type": { + "type": { + "kind": "base", + "name": "unsigned long long" + }, + "offset": 0 + }, + "Credentials": { + "type": { + "kind": "struct", + "name": "_KERB_CREDENTIAL" + }, + "offset": 8 + } + }, + "kind": "struct", + "size": 488 + }, + "_KERB_LOGON_SESSION_ENTRY": { + "fields": { + "header": { + "type": { + "kind": "struct", + "name": "RTL_BALANCED_LINKS" + }, + "offset": 0 + }, + "entry": { + "type": { + "kind": "pointer", + "subtype": { + "kind": "struct", + "name": "_KERB_LOGON_SESSION" + } + }, + "offset": 32 + } + }, + "kind": "struct", + "size": 40 + }, + "_KERB_TICKET_INFO": { + "fields": { + "head": { + "type": { + "kind": "struct", + "name": "nt_symbols!_LIST_ENTRY" + }, + "offset": 0 + }, + "ServiceName": { + "type": { + "kind": "pointer", + "subtype": { + "kind": "struct", + "name": "_KERB_INTERNAL_NAME" + } + }, + "offset": 32 + }, + "TargetName": { + "type": { + "kind": "pointer", + "subtype": { + "kind": "struct", + "name": "_KERB_INTERNAL_NAME" + } + }, + "offset": 40 + }, + "ServerRealm": { + "type": { + "kind": "struct", + "name": "nt_symbols!_UNICODE_STRING" + }, + "offset": 48 + }, + "TargetDomainName": { + "type": { + "kind": "struct", + "name": "nt_symbols!_UNICODE_STRING" + }, + "offset": 64 + }, + "AltTargetDomainName": { + "type": { + "kind": "struct", + "name": "nt_symbols!_UNICODE_STRING" + }, + "offset": 96 + }, + "KdcCalled": { + "type": { + "kind": "struct", + "name": "nt_symbols!_UNICODE_STRING" + }, + "offset": 112 + }, + "ClientName": { + "type": { + "kind": "pointer", + "subtype": { + "kind": "struct", + "name": "_KERB_INTERNAL_NAME" + } + }, + "offset": 144 + } + }, + "kind": "struct", + "size": 368 + }, + "_KERB_INTERNAL_NAME": { + "fields": { + "PrincipalType": { + "type": { + "kind": "base", + "name": "unsigned short" + }, + "offset": 0 + }, + "NbString": { + "type": { + "kind": "base", + "name": "unsigned short" + }, + "offset": 2 + }, + "pad": { + "type": { + "kind": "base", + "name": "unsigned int" + }, + "offset": 4 + }, + "array": { + "type": { + "kind": "struct", + "name": "nt_symbols!_UNICODE_STRING" + }, + "offset": 8 + } + }, + "kind": "struct", + "size": 24 + } + }, + "enums": {}, + "base_types": { + "unsigned long": { + "kind": "int", + "size": 4, + "signed": false, + "endian": "little" + }, + "unsigned long long": { + "kind": "int", + "size": 8, + "signed": false, + "endian": "little" + }, + "unsigned char": { + "kind": "char", + "size": 1, + "signed": false, + "endian": "little" + }, + "pointer": { + "kind": "int", + "size": 8, + "signed": false, + "endian": "little" + }, + "unsigned int": { + "kind": "int", + "size": 4, + "signed": false, + "endian": "little" + }, + "unsigned short": { + "kind": "int", + "size": 2, + "signed": false, + "endian": "little" + }, + "long": { + "kind": "int", + "size": 4, + "signed": false, + "endian": "little" + } + }, + "metadata": { + "producer": { + "version": "0.0.1", + "name": "airbus-cert by c2json", + "datetime": "2025-12-15T16:36:36.703042" + }, + "format": "4.0.0" + } +}