-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
1dc7691
commit 4736e06
Showing
16 changed files
with
204 additions
and
119 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file was deleted.
Oops, something went wrong.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,13 @@ | ||
from pyky.ccakem import kem_keygen512, kem_encaps512, kem_decaps512 | ||
from pyky.ccakem import kem_keygen1024, kem_encaps1024, kem_decaps1024 | ||
|
||
def generate_kyber_keypair(): | ||
"""Generate a Kyber key pair.""" | ||
return kem_keygen512() | ||
return kem_keygen1024() | ||
|
||
def encrypt_session_key(public_key, session_key): | ||
def encrypt_session_key(public_key): | ||
"""Encrypt a session key using Kyber.""" | ||
return kem_encaps512(public_key) | ||
return kem_encaps1024(public_key) | ||
|
||
def decrypt_session_key(private_key, encrypted_session_key): | ||
"""Decrypt a session key using Kyber.""" | ||
return kem_decaps512(private_key, encrypted_session_key) | ||
return kem_decaps1024(private_key, encrypted_session_key) |
Large diffs are not rendered by default.
Oops, something went wrong.
Large diffs are not rendered by default.
Oops, something went wrong.
Empty file.
File renamed without changes.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import time | ||
from tqdm import tqdm | ||
from Crypto.Cipher import AES | ||
from Crypto.Util.Padding import pad, unpad | ||
from src.key_management import generate_kyber_keypair, encrypt_session_key, decrypt_session_key | ||
from src.encryption import generate_aes_key, aes_encrypt | ||
from src.decryption import aes_decrypt | ||
|
||
|
||
# Convert list of signed integers to bytes | ||
def convert_to_bytes(signed_int_list): | ||
return bytes([x & 0xFF for x in signed_int_list]) | ||
|
||
def loading_message(message, duration=0.1): | ||
"""Simulate a loading process with a message and duration.""" | ||
print(message) | ||
for _ in tqdm(range(100), desc=message, ncols=100, ascii=True, bar_format='{l_bar}{bar} | {percentage:3.0f}%'): | ||
time.sleep(duration / 100) | ||
print() | ||
|
||
def test_main(): | ||
# Generate Kyber key pair | ||
loading_message("1. Generating Kyber key pair...") | ||
priv_key, pub_key = generate_kyber_keypair() | ||
print("Kyber key pair generated.\n") | ||
|
||
|
||
# Encrypt AES session key with Kyber public key | ||
loading_message("2. Encrypting session key with Kyber public key...") | ||
shared_secret, cipher = encrypt_session_key(pub_key) | ||
print("AES session key encrypted with Kyber public key.\n") | ||
|
||
# Get Symmetric key for AES | ||
loading_message("3. AES session key...") | ||
aes_key = convert_to_bytes(shared_secret) | ||
print("AES session key generated.\n") | ||
|
||
# Encrypt a sample message with AES | ||
loading_message("Encrypting message with AES...") | ||
message = b"Hello, this is a test message!" | ||
encrypted_message = aes_encrypt(message, aes_key) | ||
print("Message encrypted with AES.\n") | ||
|
||
# Decrypt AES session key with Kyber private key | ||
loading_message("Decrypting AES symmetric key with Kyber private key and cipher text...") | ||
decrypted_aes_key = decrypt_session_key(priv_key, cipher) | ||
print("AES session key decrypted with Kyber private key.\n") | ||
|
||
# Decrypt the message with the decrypted AES key | ||
loading_message("Decrypting message with AES...") | ||
decrypted_message = aes_decrypt(encrypted_message, convert_to_bytes(decrypted_aes_key)) | ||
print("Message decrypted with AES.\n") | ||
|
||
# Check if the original message and decrypted message are the same | ||
assert message == decrypted_message, "Decryption failed: original and decrypted messages do not match!" | ||
print("Encryption and decryption test passed!") | ||
|
||
print("Original Message:", message) | ||
print("Decrypted Message:", decrypted_message) | ||
print("***TEST***=",message==decrypted_message) | ||
|
||
if __name__ == "__main__": | ||
loading_message("***This is the general test for Encryption and DEcryption using Kyber and AES.***", duration=1) | ||
test_main() | ||
print("Test completed successfully!") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import os | ||
from src.encryption import generate_aes_key, aes_encrypt | ||
from src.decryption import aes_decrypt | ||
from src.key_management import generate_kyber_keypair, encrypt_session_key, decrypt_session_key | ||
from tqdm import tqdm | ||
import time | ||
|
||
def loading_message(message, duration=0.1): | ||
"""Simulate a loading process with a message and duration.""" | ||
print(message) | ||
for _ in tqdm(range(100), desc=message, ncols=100, ascii=True, bar_format='{l_bar}{bar} | {percentage:3.0f}%'): | ||
time.sleep(duration / 100) | ||
print() | ||
|
||
folder_path = '/Users/abhisekjha/Multipath_Research/PQC/pqc_aes_multipath/data/input' | ||
files = os.listdir(folder_path) | ||
|
||
|
||
|
||
def convert_to_bytes(signed_int_list): | ||
return bytes([x & 0xFF for x in signed_int_list]) | ||
|
||
def pqc_multipath(): | ||
|
||
loading_message("0. Reading Encoded Message from Input folder") | ||
for file_name in files: | ||
file_path = os.path.join(folder_path, file_name) | ||
with open(file_path, 'rb') as file: | ||
M = file.read() | ||
|
||
|
||
#kyber keys | ||
loading_message("1. Generating Kyber Key Pair") | ||
private_key, public_key = generate_kyber_keypair() | ||
|
||
aes_key, cipher= encrypt_session_key(public_key) | ||
|
||
loading_message("2. Encrypt the message") | ||
encrypted_message = aes_encrypt(M, convert_to_bytes(aes_key)) | ||
|
||
|
||
loading_message("3. Decrypt the aes key with kyber private key") | ||
decrypted_aes_key = decrypt_session_key(private_key, cipher) | ||
|
||
loading_message("4. Decrypt the message") | ||
decrypted_message = aes_decrypt(encrypted_message, convert_to_bytes(decrypted_aes_key)) | ||
|
||
print("***Is Original Message and Decrypted Message SAME***??=>", M == decrypted_message) | ||
|
||
if __name__== "__main__": | ||
pqc_multipath() | ||
print("PQC Multipath Done") |
This file was deleted.
Oops, something went wrong.