diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 131a7e8f5..fb85ab467 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,7 +8,7 @@ on: jobs: build: name: Test for Py${{ matrix.python-version }} - if: github.event.review.state == 'APPROVED' + if: github.event.review.state == 'APPROVED' || github.event_name == 'workflow_dispatch' runs-on: ${{ matrix.os }} strategy: max-parallel: 5 @@ -19,7 +19,7 @@ jobs: - uses: actions/checkout@v4 - name: Install poetry run: | - pipx install poetry + pipx install poetry==1.8.4 - name: NetExec set up python ${{ matrix.python-version }} on ${{ matrix.os }} uses: actions/setup-python@v5 with: diff --git a/LICENSE b/LICENSE index 07c28cf6b..0447e3d2f 100755 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2023, Marshall-Hallenbeck, NeffIsBack, zblurx, mpgn_x64 +Copyright (c) 2025, Marshall-Hallenbeck, NeffIsBack, zblurx, mpgn_x64 Copyright (c) 2022, byt3bl33d3r All rights reserved. diff --git a/nxc/cli.py b/nxc/cli.py index 462061692..a8a818f5b 100755 --- a/nxc/cli.py +++ b/nxc/cli.py @@ -98,6 +98,13 @@ def gen_cli_args(): kerberos_group.add_argument("--use-kcache", action="store_true", help="Use Kerberos authentication from ccache file (KRB5CCNAME)") kerberos_group.add_argument("--aesKey", metavar="AESKEY", nargs="+", help="AES key to use for Kerberos Authentication (128 or 256 bits)") kerberos_group.add_argument("--kdcHost", metavar="KDCHOST", help="FQDN of the domain controller. If omitted it will use the domain part (FQDN) specified in the target parameter") + + certificate_group = std_parser.add_argument_group("Certificate", "Options for certificate authentication") + certificate_group.add_argument("--pfx-cert", metavar="PFXCERT", help="Use certificate authentication from pfx file .pfx") + certificate_group.add_argument("--pfx-base64", metavar="PFXB64", help="Use certificate authentication from pfx file encoded in base64") + certificate_group.add_argument("--pfx-pass", metavar="PFXPASS", help="Password of the pfx certificate") + certificate_group.add_argument("--cert-pem", metavar="CERTPEM", help="Use certificate authentication from PEM file") + certificate_group.add_argument("--key-pem", metavar="KEYPEM", help="Private key for the PEM format") server_group = std_parser.add_argument_group("Servers", "Options for nxc servers") server_group.add_argument("--server", choices={"http", "https"}, default="https", help="use the selected server") diff --git a/nxc/connection.py b/nxc/connection.py index e2114a7d9..3beb84fd6 100755 --- a/nxc/connection.py +++ b/nxc/connection.py @@ -1,4 +1,7 @@ import random +import sys +import contextlib + from os.path import isfile from threading import BoundedSemaphore from functools import wraps @@ -13,10 +16,9 @@ from nxc.logger import nxc_logger, NXCAdapter from nxc.context import Context from nxc.protocols.ldap.laps import laps_search +from nxc.helpers.pfx import pfx_auth from impacket.dcerpc.v5 import transport -import sys -import contextlib sem = BoundedSemaphore(1) global_failed_logins = 0 @@ -548,6 +550,14 @@ def login(self): self.logger.info("Successfully authenticated using Kerberos cache") return True + if self.args.pfx_cert or self.args.pfx_base64 or self.args.cert_pem: + self.logger.debug("Trying to authenticate using Certificate pfx") + if not self.args.username: + self.logger.fail("You must specify a username when using certificate authentication") + return False + with sem: + return pfx_auth(self) + if hasattr(self.args, "laps") and self.args.laps: self.logger.debug("Trying to authenticate using LAPS") username[0], secret[0], domain[0] = laps_search(self, username, secret, cred_type, domain, self.dns_server) diff --git a/nxc/helpers/pfx.py b/nxc/helpers/pfx.py new file mode 100644 index 000000000..4d084468a --- /dev/null +++ b/nxc/helpers/pfx.py @@ -0,0 +1,538 @@ +# SECUREAUTH LABS. Copyright 2018 SecureAuth Corporation. All rights reserved. +# +# This software is provided under under a slightly modified version +# of the Apache Software License. See the accompanying LICENSE file +# for more information. +# +# Author: +# Alberto Solino (@agsolino) +# Dirk-jan Mollema (@_dirkjan) +# +# Description: +# This script will use an existing TGT to request a PAC for the current user using U2U. +# When the TGT was obtained using PKINIT, the resulting PAC will contain the NT hash which can be +# used for silver tickets and for backwards compatibility with other tooling. +# +# References: +# +# U2U: https://tools.ietf.org/html/draft-ietf-cat-user2user-02 +# +# Based on examples from minikerberos by skelsec +# Parts of this code was inspired by the following project by @rubin_mor +# https://github.com/morRubin/AzureADJoinedMachinePTC +# +# Author: +# Tamas Jos (@skelsec) +# Dirk-jan Mollema (@_dirkjan) +# + +import os +import secrets +import hashlib +import datetime +import logging +import random +import base64 + +from binascii import unhexlify, hexlify + +from oscrypto.keys import parse_pkcs12, parse_certificate, parse_private +from oscrypto.asymmetric import rsa_pkcs1v15_sign, load_private_key + +from asn1crypto import cms +from asn1crypto import algos +from asn1crypto import core +from asn1crypto import keys + +from minikerberos.pkinit import PKINIT, DirtyDH +from minikerberos.protocol.constants import NAME_TYPE, PaDataType +from minikerberos.protocol.encryption import Enctype, _enctype_table, Key +from minikerberos.protocol.asn1_structs import KDC_REQ_BODY, PrincipalName, KDCOptions, EncASRepPart, AS_REQ, PADATA_TYPE, \ + PA_PAC_REQUEST +from minikerberos.protocol.rfc4556 import PKAuthenticator, AuthPack, PA_PK_AS_REP, KDCDHKeyInfo, PA_PK_AS_REQ + +from pyasn1.codec.der import decoder, encoder +from pyasn1.type.univ import noValue + +from impacket.dcerpc.v5.rpcrt import TypeSerialization1 +from impacket.krb5 import constants +from impacket.krb5.asn1 import AP_REQ, AS_REP, TGS_REQ, Authenticator, TGS_REP, seq_set, seq_set_iter, EncTicketPart, AD_IF_RELEVANT, Ticket as TicketAsn1 +from impacket.krb5.kerberosv5 import sendReceive +from impacket.krb5.pac import PACTYPE, PAC_INFO_BUFFER, PAC_CREDENTIAL_INFO, \ + PAC_CREDENTIAL_DATA, NTLM_SUPPLEMENTAL_CREDENTIAL +from impacket.krb5.types import Principal, KerberosTime, Ticket + +# Imports for pfx_auth +from minikerberos.network.clientsocket import KerberosClientSocket +from minikerberos.common.target import KerberosTarget +from minikerberos.common.ccache import CCACHE + +from impacket.krb5.ccache import CCache as impacket_CCache + +from nxc.paths import NXC_PATH + + +class myPKINIT(PKINIT): + """ + Copy of minikerberos PKINIT + With some changes where it differs from PKINIT used in NegoEx + """ + + @staticmethod + def from_pfx(pfxfile, pfxpass, dh_params=None, b64=False): + with open(pfxfile, "rb") as f: + pfxdata = f.read() + + if b64: + pfxdata = base64.b64decode(pfxdata) + + return myPKINIT.from_pfx_data(pfxdata, pfxpass, dh_params) + + @staticmethod + def from_pfx_data(pfxdata, pfxpass, dh_params=None): + pkinit = myPKINIT() + # oscrypto does not seem to support pfx without password, so convert it to PEM using cryptography instead + if not pfxpass: + from cryptography.hazmat.primitives.serialization import pkcs12 + from cryptography.hazmat.primitives import serialization + privkey, cert, extra_certs = pkcs12.load_key_and_certificates(pfxdata, None) + pem_key = privkey.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + pkinit.privkey = load_private_key(parse_private(pem_key)) + pem_cert = cert.public_bytes( + encoding=serialization.Encoding.PEM + ) + pkinit.certificate = parse_certificate(pem_cert) + else: + if isinstance(pfxpass, str): + pfxpass = pfxpass.encode() + pkinit.privkeyinfo, pkinit.certificate, pkinit.extra_certs = parse_pkcs12(pfxdata, password=pfxpass) + pkinit.privkey = load_private_key(pkinit.privkeyinfo) + pkinit.setup(dh_params=dh_params) + return pkinit + + @staticmethod + def from_pem(certfile, privkeyfile, dh_params=None): + pkinit = myPKINIT() + with open(certfile, "rb") as f: + pkinit.certificate = parse_certificate(f.read()) + with open(privkeyfile, "rb") as f: + pkinit.privkey = load_private_key(parse_private(f.read())) + pkinit.setup(dh_params=dh_params) + return pkinit + + def sign_authpack(self, data, wrap_signed=False): + return self.sign_authpack_native(data, wrap_signed) + + def setup(self, dh_params=None): + self.issuer = self.certificate.issuer.native["common_name"] + if dh_params is None: + print("Generating DH params...") + print("DH params generated.") + else: + if isinstance(dh_params, dict): + self.diffie = DirtyDH.from_dict(dh_params) + elif isinstance(dh_params, bytes): + self.diffie = DirtyDH.from_asn1(dh_params) + elif isinstance(dh_params, DirtyDH): + self.diffie = dh_params + else: + raise Exception("DH params must be either a bytearray or a dict") + + def build_asreq(self, domain=None, cname=None, kdcopts=None): + if kdcopts is None: + kdcopts = ["forwardable", "renewable", "renewable-ok"] + if isinstance(kdcopts, list): + kdcopts = set(kdcopts) + if cname is not None: + if isinstance(cname, str): + cname = [cname] + else: + cname = [self.cname] + + now = datetime.datetime.now(datetime.timezone.utc) + + kdc_req_body_data = {} + kdc_req_body_data["kdc-options"] = KDCOptions(kdcopts) + kdc_req_body_data["cname"] = PrincipalName({"name-type": NAME_TYPE.PRINCIPAL.value, "name-string": cname}) + kdc_req_body_data["realm"] = domain.upper() + kdc_req_body_data["sname"] = PrincipalName({"name-type": NAME_TYPE.SRV_INST.value, "name-string": ["krbtgt", domain.upper()]}) + kdc_req_body_data["till"] = (now + datetime.timedelta(days=1)).replace(microsecond=0) + kdc_req_body_data["rtime"] = (now + datetime.timedelta(days=1)).replace(microsecond=0) + kdc_req_body_data["nonce"] = secrets.randbits(31) + kdc_req_body_data["etype"] = [18, 17] # 23 breaks... + kdc_req_body = KDC_REQ_BODY(kdc_req_body_data) + + checksum = hashlib.sha1(kdc_req_body.dump()).digest() + + authenticator = {} + authenticator["cusec"] = now.microsecond + authenticator["ctime"] = now.replace(microsecond=0) + authenticator["nonce"] = secrets.randbits(31) + authenticator["paChecksum"] = checksum + + dp = {} + dp["p"] = self.diffie.p + dp["g"] = self.diffie.g + dp["q"] = 0 # mandatory parameter, but it is not needed + + pka = {} + pka["algorithm"] = "1.2.840.10046.2.1" + pka["parameters"] = keys.DomainParameters(dp) + + spki = {} + spki["algorithm"] = keys.PublicKeyAlgorithm(pka) + spki["public_key"] = self.diffie.get_public_key() + + authpack = {} + authpack["pkAuthenticator"] = PKAuthenticator(authenticator) + authpack["clientPublicValue"] = keys.PublicKeyInfo(spki) + authpack["clientDHNonce"] = self.diffie.dh_nonce + + authpack = AuthPack(authpack) + signed_authpack = self.sign_authpack(authpack.dump(), wrap_signed=True) + + payload = PA_PK_AS_REQ() + payload["signedAuthPack"] = signed_authpack + + pa_data_1 = {} + pa_data_1["padata-type"] = PaDataType.PK_AS_REQ.value + pa_data_1["padata-value"] = payload.dump() + + pa_data_0 = {} + pa_data_0["padata-type"] = int(PADATA_TYPE("PA-PAC-REQUEST")) + pa_data_0["padata-value"] = PA_PAC_REQUEST({"include-pac": True}).dump() + + asreq = {} + asreq["pvno"] = 5 + asreq["msg-type"] = 10 + asreq["padata"] = [pa_data_0, pa_data_1] + asreq["req-body"] = kdc_req_body + + return AS_REQ(asreq).dump() + + def sign_authpack_native(self, data, wrap_signed=False): + """ + Creating PKCS7 blob which contains the following things: + + 1. 'data' blob which is an ASN1 encoded "AuthPack" structure + 2. the certificate used to sign the data blob + 3. the singed 'signed_attrs' structure (ASN1) which points to the "data" structure (in point 1) + """ + da = {} + da["algorithm"] = algos.DigestAlgorithmId("1.3.14.3.2.26") # for sha1 + + si = {} + si["version"] = "v1" + si["sid"] = cms.IssuerAndSerialNumber({ + "issuer": self.certificate.issuer, + "serial_number": self.certificate.serial_number, + }) + + si["digest_algorithm"] = algos.DigestAlgorithm(da) + si["signed_attrs"] = [ + cms.CMSAttribute({"type": "content_type", "values": ["1.3.6.1.5.2.3.1"]}), # indicates that the encap_content_info's authdata struct (marked with OID '1.3.6.1.5.2.3.1' is signed ) + cms.CMSAttribute({"type": "message_digest", "values": [hashlib.sha1(data).digest()]}), # hash of the data, the data itself will not be signed, but this block of data will be. + ] + si["signature_algorithm"] = algos.SignedDigestAlgorithm({"algorithm": "1.2.840.113549.1.1.1"}) + si["signature"] = rsa_pkcs1v15_sign(self.privkey, cms.CMSAttributes(si["signed_attrs"]).dump(), "sha1") + + ec = {} + ec["content_type"] = "1.3.6.1.5.2.3.1" + ec["content"] = data + + sd = {} + sd["version"] = "v3" + sd["digest_algorithms"] = [algos.DigestAlgorithm(da)] # must have only one + sd["encap_content_info"] = cms.EncapsulatedContentInfo(ec) + sd["certificates"] = [self.certificate] + sd["signer_infos"] = cms.SignerInfos([cms.SignerInfo(si)]) + + if wrap_signed is True: + ci = {} + ci["content_type"] = "1.2.840.113549.1.7.2" # signed data OID + ci["content"] = cms.SignedData(sd) + return cms.ContentInfo(ci).dump() + + return cms.SignedData(sd).dump() + + def decrypt_asrep(self, as_rep): + def truncate_key(value, keysize): + output = b"" + currentNum = 0 + while len(output) < keysize: + currentDigest = hashlib.sha1(bytes([currentNum]) + value).digest() + if len(output) + len(currentDigest) > keysize: + output += currentDigest[:keysize - len(output)] + break + output += currentDigest + currentNum += 1 + + return output + + for pa in as_rep["padata"]: + if pa["padata-type"] == 17: + pkasrep = PA_PK_AS_REP.load(pa["padata-value"]).native + break + else: + raise Exception("PA_PK_AS_REP not found!") + ci = cms.ContentInfo.load(pkasrep["dhSignedData"]).native + sd = ci["content"] + keyinfo = sd["encap_content_info"] + if keyinfo["content_type"] != "1.3.6.1.5.2.3.2": + raise Exception("Keyinfo content type unexpected value") + authdata = KDCDHKeyInfo.load(keyinfo["content"]).native + pubkey = int("".join(["1"] + [str(x) for x in authdata["subjectPublicKey"]]), 2) + + pubkey = int.from_bytes(core.BitString(authdata["subjectPublicKey"]).dump()[7:], "big", signed=False) + shared_key = self.diffie.exchange(pubkey) + + server_nonce = pkasrep["serverDHNonce"] + fullKey = shared_key + self.diffie.dh_nonce + server_nonce + + etype = as_rep["enc-part"]["etype"] + cipher = _enctype_table[etype] + if etype == Enctype.AES256: + t_key = truncate_key(fullKey, 32) + elif etype == Enctype.AES128: + t_key = truncate_key(fullKey, 16) + elif etype == Enctype.RC4: + raise NotImplementedError("RC4 key truncation documentation missing. it is different from AES") + + key = Key(cipher.enctype, t_key) + enc_data = as_rep["enc-part"]["cipher"] + logging.info("AS-REP encryption key (you might need this later):") + logging.info(hexlify(t_key).decode("utf-8")) + dec_data = cipher.decrypt(key, 3, enc_data) + encasrep = EncASRepPart.load(dec_data).native + cipher = _enctype_table[int(encasrep["key"]["keytype"])] + session_key = Key(cipher.enctype, encasrep["key"]["keyvalue"]) + return encasrep, session_key, cipher, hexlify(t_key).decode("utf-8") + + +class GETPAC: + + def printPac(self, data, key=None): + nthash = None + encTicketPart = decoder.decode(data, asn1Spec=EncTicketPart())[0] + adIfRelevant = decoder.decode(encTicketPart["authorization-data"][0]["ad-data"], asn1Spec=AD_IF_RELEVANT())[ + 0] + # So here we have the PAC + pacType = PACTYPE(adIfRelevant[0]["ad-data"].asOctets()) + buff = pacType["Buffers"] + found = False + for _bufferN in range(pacType["cBuffers"]): + infoBuffer = PAC_INFO_BUFFER(buff) + data = pacType["Buffers"][infoBuffer["Offset"] - 8:][:infoBuffer["cbBufferSize"]] + if logging.getLogger().level == logging.DEBUG: + print("TYPE 0x%x" % infoBuffer["ulType"]) + if infoBuffer["ulType"] == 2: + found = True + credinfo = PAC_CREDENTIAL_INFO(data) + if logging.getLogger().level == logging.DEBUG: + credinfo.dump() + newCipher = _enctype_table[credinfo["EncryptionType"]] + out = newCipher.decrypt(key, 16, credinfo["SerializedData"]) + type1 = TypeSerialization1(out) + # I'm skipping here 4 bytes with its the ReferentID for the pointer + newdata = out[len(type1) + 4:] + pcc = PAC_CREDENTIAL_DATA(newdata) + if logging.getLogger().level == logging.DEBUG: + pcc.dump() + for cred in pcc["Credentials"]: + credstruct = NTLM_SUPPLEMENTAL_CREDENTIAL(b"".join(cred["Credentials"])) + if logging.getLogger().level == logging.DEBUG: + credstruct.dump() + + logging.info("Recovered NT Hash") + logging.info(hexlify(credstruct["NtPassword"]).decode("utf-8")) + nthash = hexlify(credstruct["NtPassword"]).decode("utf-8") + + buff = buff[len(infoBuffer):] + + if not found: + logging.info("Did not find the PAC_CREDENTIAL_INFO in the PAC. Are you sure your TGT originated from a PKINIT operation?") + return nthash + + def __init__(self, username, domain, kdcHost, key, tgt): + self.__username = username + self.__domain = domain.upper() + self.__kdcHost = kdcHost + self.__asrep_key = key + self.__tgt = tgt["KDC_REP"] + self.__cipher = tgt["cipher"] + self.__sessionKey = tgt["sessionKey"] + + def dump(self): + # Try all requested protocols until one works. + tgt = self.__tgt + cipher = self.__cipher + sessionKey = self.__sessionKey + + decodedTGT = decoder.decode(tgt, asn1Spec=AS_REP())[0] + + # Extract the ticket from the TGT + ticket = Ticket() + ticket.from_asn1(decodedTGT["ticket"]) + + apReq = AP_REQ() + apReq["pvno"] = 5 + apReq["msg-type"] = int(constants.ApplicationTagNumbers.AP_REQ.value) + + opts = [] + apReq["ap-options"] = constants.encodeFlags(opts) + seq_set(apReq, "ticket", ticket.to_asn1) + + authenticator = Authenticator() + authenticator["authenticator-vno"] = 5 + authenticator["crealm"] = str(decodedTGT["crealm"]) + + clientName = Principal() + clientName.from_asn1(decodedTGT, "crealm", "cname") + + seq_set(authenticator, "cname", clientName.components_to_asn1) + + now = datetime.datetime.utcnow() + authenticator["cusec"] = now.microsecond + authenticator["ctime"] = KerberosTime.to_asn1(now) + + if logging.getLogger().level == logging.DEBUG: + logging.debug("AUTHENTICATOR") + print(authenticator.prettyPrint()) + print("\n") + + encodedAuthenticator = encoder.encode(authenticator) + + # Key Usage 7 + # TGS-REQ PA-TGS-REQ padata AP-REQ Authenticator (includes + # TGS authenticator subkey), encrypted with the TGS session + # key (Section 5.5.1) + encryptedEncodedAuthenticator = cipher.encrypt(sessionKey, 7, encodedAuthenticator, None) + + apReq["authenticator"] = noValue + apReq["authenticator"]["etype"] = cipher.enctype + apReq["authenticator"]["cipher"] = encryptedEncodedAuthenticator + + encodedApReq = encoder.encode(apReq) + + tgsReq = TGS_REQ() + + tgsReq["pvno"] = 5 + tgsReq["msg-type"] = int(constants.ApplicationTagNumbers.TGS_REQ.value) + + tgsReq["padata"] = noValue + tgsReq["padata"][0] = noValue + tgsReq["padata"][0]["padata-type"] = int(constants.PreAuthenticationDataTypes.PA_TGS_REQ.value) + tgsReq["padata"][0]["padata-value"] = encodedApReq + + reqBody = seq_set(tgsReq, "req-body") + + opts = [] + opts.append(constants.KDCOptions.forwardable.value) + opts.append(constants.KDCOptions.renewable.value) + opts.append(constants.KDCOptions.canonicalize.value) + opts.append(constants.KDCOptions.enc_tkt_in_skey.value) + + reqBody["kdc-options"] = constants.encodeFlags(opts) + + serverName = Principal(self.__username, type=constants.PrincipalNameType.NT_UNKNOWN.value) + + seq_set(reqBody, "sname", serverName.components_to_asn1) + reqBody["realm"] = str(decodedTGT["crealm"]) + + now = datetime.datetime.utcnow() + datetime.timedelta(days=1) + + reqBody["till"] = KerberosTime.to_asn1(now) + reqBody["nonce"] = random.getrandbits(31) + seq_set_iter(reqBody, "etype", + (int(cipher.enctype), int(constants.EncryptionTypes.rc4_hmac.value))) + + myTicket = ticket.to_asn1(TicketAsn1()) + seq_set_iter(reqBody, "additional-tickets", (myTicket,)) + if logging.getLogger().level == logging.DEBUG: + logging.debug("Final TGS") + print(tgsReq.prettyPrint()) + if logging.getLogger().level == logging.DEBUG: + logging.debug("Final TGS") + print(tgsReq.prettyPrint()) + + message = encoder.encode(tgsReq) + logging.info("Requesting ticket to self with PAC") + + r = sendReceive(message, self.__domain, self.__kdcHost) + + tgs = decoder.decode(r, asn1Spec=TGS_REP())[0] + + if logging.getLogger().level == logging.DEBUG: + logging.debug("TGS_REP") + print(tgs.prettyPrint()) + + cipherText = tgs["ticket"]["enc-part"]["cipher"] + + # Key Usage 2 + # AS-REP Ticket and TGS-REP Ticket (includes tgs session key or + # application session key), encrypted with the service key + # (section 5.4.2) + + # S4USelf + U2U uses this other key + plainText = cipher.decrypt(sessionKey, 2, cipherText) + specialkey = Key(18, unhexlify(self.__asrep_key)) + return self.printPac(plainText, specialkey) + + +def pfx_auth(self): + """Handles the authentication using a PFX or PEM file""" + # Static DH params because the ones generated by cryptography are considered unsafe by AD for some weird reason + dhparams = { + "p": int("00ffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece65381ffffffffffffffff", 16), + "g": 2 + } + self.logger.info("Loading certificate and key from file") + + # Load the certificate and key from file + if self.args.pfx_cert or self.args.pfx_base64: + pfx = self.args.pfx_cert if self.args.pfx_cert else self.args.pfx_base64 + ini = myPKINIT.from_pfx(pfx, self.args.pfx_pass, dhparams, bool(self.args.pfx_base64)) + elif self.args.cert_pem and self.args.key_pem: + ini = myPKINIT.from_pem(self.args.cert_pem, self.args.key_pem, dhparams) + else: + self.logger.fail("You must either specify a PFX file + optional password or a combination of Cert PEM file and Private key PEM file") + return None + + username = self.args.username[0] + log_ccache = os.path.expanduser(f"{NXC_PATH}/logs/{self.hostname}_{self.host}_{datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S')}-{username}.ccache".replace(":", "-")) + + # Request a TGT with the cert data + req = ini.build_asreq(self.domain, username) + self.logger.info("Requesting TGT") + + sock = KerberosClientSocket(KerberosTarget(self.kdcHost)) + try: + res = sock.sendrecv(req) + except Exception as e: + self.logger.fail(str(e)) + return False + + encasrep, session_key, cipher, key = ini.decrypt_asrep(res.native) + ccache_minikerberos = CCACHE() + ccache_minikerberos.add_tgt(res.native, encasrep) + ccache_minikerberos.to_file(log_ccache) + self.logger.info(f"Saved TGT to file {log_ccache}") + self.logger.info(f"Using Kerberos Cache {log_ccache}") + ccache = impacket_CCache.loadFile(log_ccache) + principal = f"krbtgt/{self.domain.upper()}@{self.domain.upper()}" + creds = ccache.getCredential(principal) + if creds is not None: + tgt = creds.toTGT() + dumper = GETPAC(username, self.domain, self.kdcHost, key, tgt) + nthash = dumper.dump() + if not self.kerberos: + self.hash_login(self.domain, username, nthash) + else: + self.kerberos_login(self.domain, username, "", nthash, "", self.kdcHost, False) + + self.logger.info("Successfully authenticated using Certificate") + return True diff --git a/nxc/modules/enum_av.py b/nxc/modules/enum_av.py index 5946bd15e..8a7425569 100644 --- a/nxc/modules/enum_av.py +++ b/nxc/modules/enum_av.py @@ -84,10 +84,7 @@ def detect_running_processes(self, context, connection, results): prod_results = results.setdefault(product["name"], {}) prod_results.setdefault("pipes", []).append(pipe) except Exception as e: - if "STATUS_ACCESS_DENIED" in str(e): - context.log.fail("Error STATUS_ACCESS_DENIED while enumerating pipes, probably due to using SMBv1") - else: - context.log.fail(str(e)) + context.log.fail(str(e)) def dump_results(self, results, context): if not results: diff --git a/nxc/modules/wcc.py b/nxc/modules/wcc.py index 52f516b9f..fd2f7d1ca 100644 --- a/nxc/modules/wcc.py +++ b/nxc/modules/wcc.py @@ -54,7 +54,7 @@ def __init__(self, name, description="", checkers=None, checker_args=None, check self.reasons = [] def run(self): - for checker, args, kwargs in zip(self.checkers, self.checker_args, self.checker_kwargs): + for checker, args, kwargs in zip(self.checkers, self.checker_args, self.checker_kwargs, strict=True): if checker is None: checker = HostChecker.check_registry diff --git a/nxc/modules/winscp.py b/nxc/modules/winscp.py index 3770f000a..15afb971b 100644 --- a/nxc/modules/winscp.py +++ b/nxc/modules/winscp.py @@ -5,7 +5,6 @@ # - https://github.com/rapid7/metasploit-framework/blob/master/lib/rex/parser/winscp.rb import traceback -from typing import Tuple from impacket.dcerpc.v5.rpcrt import DCERPCException from impacket.dcerpc.v5 import rrp from impacket.examples.secretsdump import RemoteOperations @@ -98,7 +97,7 @@ def decrypt_passwd(self, host: str, username: str, password: str) -> str: clearpass = clearpass[len(key):] return clearpass - def dec_next_char(self, pass_bytes) -> "Tuple[int, bytes]": + def dec_next_char(self, pass_bytes) -> tuple[int, bytes]: """ Decrypts the first byte of the password and returns the decrypted byte and the remaining bytes. diff --git a/nxc/nxcdb.py b/nxc/nxcdb.py index 7ae45479a..4ebe1f8c0 100644 --- a/nxc/nxcdb.py +++ b/nxc/nxcdb.py @@ -153,7 +153,7 @@ def do_export(self, line): if cred[4] == "hash": usernames.append(cred[2]) passwords.append(cred[3]) - output_list = [":".join(combination) for combination in zip(usernames, passwords)] + output_list = [":".join(combination) for combination in zip(usernames, passwords, strict=True)] write_list(filename, output_list) else: print(f"[-] No such export option: {line[1]}") diff --git a/nxc/protocols/ldap.py b/nxc/protocols/ldap.py index 4c5215546..e77a23f31 100644 --- a/nxc/protocols/ldap.py +++ b/nxc/protocols/ldap.py @@ -205,6 +205,9 @@ def create_conn_obj(self): except Exception as e: self.logger.debug("Exception:", exc_info=True) self.logger.info(f"Skipping item, cannot process due to error {e}") + except ConnectionRefusedError as e: + self.logger.debug(f"{e} on host {self.host}") + return False except OSError as e: self.logger.error(f"Error getting ldap info {e}") @@ -1114,7 +1117,7 @@ def printTable(items, header): rbcdRights.append(str(rbcd.get("sAMAccountName"))) rbcdObjType.append(str(rbcd.get("objectCategory"))) - for rights, objType in zip(rbcdRights, rbcdObjType): + for rights, objType in zip(rbcdRights, rbcdObjType, strict=True): answers.append([rights, objType, "Resource-Based Constrained", sAMAccountName]) if delegation in ["Unconstrained", "Constrained", "Constrained w/ Protocol Transition"]: diff --git a/nxc/protocols/mssql/database.py b/nxc/protocols/mssql/database.py index 9b6edf85a..94ba5e3e4 100755 --- a/nxc/protocols/mssql/database.py +++ b/nxc/protocols/mssql/database.py @@ -189,7 +189,7 @@ def add_admin_user(self, credtype, domain, username, password, host, user_id=Non nxc_logger.debug(f"Hosts: {hosts}") if users is not None and hosts is not None: - for user, host in zip(users, hosts): + for user, host in zip(users, hosts, strict=True): user_id = user[0] host_id = host[0] link = {"userid": user_id, "hostid": host_id} diff --git a/nxc/protocols/nfs.py b/nxc/protocols/nfs.py index ccaceba4d..bd1456c21 100644 --- a/nxc/protocols/nfs.py +++ b/nxc/protocols/nfs.py @@ -167,7 +167,7 @@ def shares(self): # Mount shares and check permissions self.logger.highlight(f"{'UID':<11}{'Perms':<9}{'Storage Usage':<17}{'Share':<30} {'Access List':<15}") self.logger.highlight(f"{'---':<11}{'-----':<9}{'-------------':<17}{'-----':<30} {'-----------':<15}") - for share, network in zip(shares, networks): + for share, network in zip(shares, networks, strict=True): try: mnt_info = self.mount.mnt(share, self.auth) self.logger.debug(f"Mounted {share} - {mnt_info}") @@ -225,7 +225,7 @@ def enum_shares(self): networks = self.export_info(self.mount.export()) self.logger.display("Enumerating NFS Shares Directories") - for share, network in zip(shares, networks): + for share, network in zip(shares, networks, strict=True): try: mount_info = self.mount.mnt(share, self.auth) self.logger.debug(f"Mounted {share} - {mount_info}") diff --git a/nxc/protocols/smb.py b/nxc/protocols/smb.py index bb3cba178..924d72db1 100755 --- a/nxc/protocols/smb.py +++ b/nxc/protocols/smb.py @@ -156,7 +156,8 @@ def __init__(self, args, db, host): self.remote_ops = None self.bootkey = None self.output_filename = None - self.smbv1 = None + self.smbv1 = None # Check if SMBv1 is supported + self.smbv3 = None # Check if SMBv3 is supported self.is_timeouted = False self.signing = False self.smb_share_name = smb_share_name @@ -295,6 +296,10 @@ def enum_host_info(self): except Exception as e: self.logger.debug(f"Error logging off system: {e}") + # Check smbv1 + if not self.args.no_smbv1: + self.smbv1 = self.create_smbv1_conn(check=True) + # DCOM connection with kerberos needed self.remoteName = self.host if not self.kerberos else f"{self.hostname}.{self.targetDomain}" @@ -538,10 +543,10 @@ def hash_login(self, domain, username, ntlm_hash): self.create_conn_obj() return False - def create_smbv1_conn(self): - self.logger.debug(f"Creating SMBv1 connection to {self.host}") + def create_smbv1_conn(self, check=False): + self.logger.info(f"Creating SMBv1 connection to {self.host}") try: - self.conn = SMBConnection( + conn = SMBConnection( self.remoteName, self.host, None, @@ -549,6 +554,9 @@ def create_smbv1_conn(self): preferredDialect=SMB_DIALECT, timeout=self.args.smb_timeout, ) + self.smbv1 = True + if not check: + self.conn = conn except OSError as e: if "Connection reset by peer" in str(e): self.logger.info(f"SMBv1 might be disabled on {self.host}") @@ -567,7 +575,7 @@ def create_smbv1_conn(self): return True def create_smbv3_conn(self): - self.logger.debug(f"Creating SMBv3 connection to {self.host}") + self.logger.info(f"Creating SMBv3 connection to {self.host}") try: self.conn = SMBConnection( self.remoteName, @@ -576,32 +584,35 @@ def create_smbv3_conn(self): self.port, timeout=self.args.smb_timeout, ) + self.smbv3 = True except (Exception, NetBIOSTimeout, OSError) as e: - self.logger.info(f"Error creating SMBv3 connection to {self.host}: {e}") + if "timed out" in str(e): + self.is_timeouted = True + self.logger.debug(f"Timeout creating SMBv3 connection to {self.host}") + else: + self.logger.info(f"Error creating SMBv3 connection to {self.host}: {e}") return False return True - def create_conn_obj(self, no_smbv1=False): + def create_conn_obj(self): """ Tries to create a connection object to the target host. - On first try, it will try to create a SMBv1 connection. + On first try, it will try to create a SMBv3 connection. On further tries, it will remember which SMB version is supported and create a connection object accordingly. :param no_smbv1: If True, it will not try to create a SMBv1 connection """ - no_smbv1 = self.args.no_smbv1 if self.args.no_smbv1 else no_smbv1 - # Initial negotiation - if not no_smbv1 and self.smbv1 is None: - self.smbv1 = self.create_smbv1_conn() - if self.smbv1: + if self.smbv3 is None: + self.smbv3 = self.create_smbv3_conn() + if self.smbv3: return True elif not self.is_timeouted: - return self.create_smbv3_conn() - elif not no_smbv1 and self.smbv1: - return self.create_smbv1_conn() - else: + return self.create_smbv1_conn() + elif self.smbv3: return self.create_smbv3_conn() + else: + return self.create_smbv1_conn() def check_if_admin(self): self.logger.debug(f"Checking if user is admin on {self.host}") diff --git a/nxc/protocols/smb/database.py b/nxc/protocols/smb/database.py index b21fa6755..91cbf9188 100755 --- a/nxc/protocols/smb/database.py +++ b/nxc/protocols/smb/database.py @@ -2,7 +2,6 @@ import sys import warnings from datetime import datetime -from typing import Optional from sqlalchemy import func, Table, select, delete from sqlalchemy.dialects.sqlite import Insert # used for upsert @@ -350,7 +349,7 @@ def add_admin_user(self, credtype, domain, username, password, host, user_id=Non hosts = self.get_hosts(host) if users and hosts: - for user, host in zip(users, hosts): + for user, host in zip(users, hosts, strict=True): user_id = user[0] host_id = host[0] link = {"userid": user_id, "hostid": host_id} @@ -693,7 +692,7 @@ def add_domain_backupkey(self, domain: str, pvk: bytes): except Exception as e: nxc_logger.debug(f"Issue while inserting DPAPI Backup Key: {e}") - def get_domain_backupkey(self, domain: Optional[str] = None): + def get_domain_backupkey(self, domain: str | None = None): """ Get domain backupkey :domain is the domain fqdn @@ -748,11 +747,11 @@ def add_dpapi_secrets( def get_dpapi_secrets( self, filter_term=None, - host: Optional[str] = None, - dpapi_type: Optional[str] = None, - windows_user: Optional[str] = None, - username: Optional[str] = None, - url: Optional[str] = None, + host: str | None = None, + dpapi_type: str | None = None, + windows_user: str | None = None, + username: str | None = None, + url: str | None = None, ): """Get dpapi secrets from nxcdb""" q = select(self.DpapiSecrets) diff --git a/nxc/protocols/smb/samrfunc.py b/nxc/protocols/smb/samrfunc.py index b970a62fa..ef849be59 100644 --- a/nxc/protocols/smb/samrfunc.py +++ b/nxc/protocols/smb/samrfunc.py @@ -77,7 +77,7 @@ def get_local_administrators(self): member_sids = self.samr_query.get_alias_members(domain_handle, self.groups["Administrators"]) member_names = self.lsa_query.lookup_sids(member_sids) - for sid, name in zip(member_sids, member_names): + for sid, name in zip(member_sids, member_names, strict=True): print(f"{name} - {sid}") diff --git a/nxc/protocols/ssh/database.py b/nxc/protocols/ssh/database.py index e94704be3..88aa6e278 100644 --- a/nxc/protocols/ssh/database.py +++ b/nxc/protocols/ssh/database.py @@ -256,7 +256,7 @@ def add_admin_user(self, credtype, username, secret, host_id=None, cred_id=None) hosts = self.get_hosts(host_id) if creds and hosts: - for cred, host in zip(creds, hosts): + for cred, host in zip(creds, hosts, strict=True): cred_id = cred[0] host_id = host[0] link = {"credid": cred_id, "hostid": host_id} diff --git a/nxc/protocols/winrm/database.py b/nxc/protocols/winrm/database.py index ffb00a09b..fdc09c080 100644 --- a/nxc/protocols/winrm/database.py +++ b/nxc/protocols/winrm/database.py @@ -213,7 +213,7 @@ def add_admin_user(self, credtype, domain, username, password, host, user_id=Non hosts = self.get_hosts(host) if users and hosts: - for user, host in zip(users, hosts): + for user, host in zip(users, hosts, strict=True): user_id = user[0] host_id = host[0] link = {"userid": user_id, "hostid": host_id} diff --git a/pyproject.toml b/pyproject.toml index 54a027f0d..f53cf95ac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -126,7 +126,7 @@ line-length = 65000 # Allow unused variables when underscore-prefixed. dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" -target-version = "py37" +target-version = "py310" [tool.ruff.flake8-quotes] docstring-quotes = "double"