import math
import multiprocessing
from random import randint
from .primes import Primes
[docs]class DHCryptosystem:
"""Object containing all values of the cryptosystem"""
def __init__(
self,
prime: int = None,
generator: int = None,
alice_secret: int = None,
bob_secret: int = None,
alice_sends: int = None,
bob_sends: int = None,
key: int = None,
):
self.prime = prime # publicly known
self.generator = generator # publicly known
self.alice_secret = alice_secret # only Alice knows this
self.bob_secret = bob_secret # only Bob knows this
self.alice_sends = alice_sends # publicly known
self.bob_sends = bob_sends # publicly known
self.alice_key = key # only Alice knows this Both keys should be the same
self.bob_key = key # only Bob knows this Both keys should be the same
[docs] def generate_from(self, bit_length: int = None, prime: int = None):
"""Generates the DHCryptosystem values (If not passed == if they are None) or assigns them
Args:
bit_size (int, optional): Bit size of the prime. Not necessary if prime also passed.
prime (int, optional): Prime number base of the cryptosystem.
Raises:
ValueError: If neither ``bit_size`` or ``prime`` is passed. At least one of these is required.
"""
if prime is None and bit_length is None:
raise ValueError("Either prime or bit_size must be specified")
self.prime = prime if prime is not None else Primes.get_prime(bit_length)
self.generator = randint(1, self.prime - 1) # nosec
self.alice_secret = randint(1, self.prime) # nosec
self.bob_secret = randint(1, self.prime) # nosec
self.alice_sends = pow(self.generator, self.alice_secret, self.prime)
self.bob_sends = pow(self.generator, self.bob_secret, self.prime)
self.alice_key = pow(self.bob_sends, self.alice_secret, self.prime)
self.bob_key = pow(self.alice_sends, self.bob_secret, self.prime)
[docs] def generate_rest(self):
"""Generates the missing attributes of the DHCryptosystem attributes if possible.
Raises:
ValueError: If the attributes that were already in the DHCryptosystem object are calculated\
from values that were autogenerated. Then you wouldn't have a valid DHCryptosystem.
"""
was_generated = []
if self.generator is None:
self.generator = randint(1, self.prime - 1) # nosec
was_generated.append(True)
if self.alice_secret is None:
self.alice_secret = randint(1, self.prime) # nosec
was_generated.append(True)
elif not (1 < self.alice_secret and self.alice_secret < self.prime):
raise ValueError("You can't generate a valid DHCryptosystem like that.")
if self.bob_secret is None:
self.bob_secret = randint(1, self.prime) # nosec
was_generated.append(True)
elif not (1 < self.bob_secret and self.bob_secret < self.prime):
raise ValueError("You can't generate a valid DHCryptosystem like that.")
if self.alice_sends is None:
self.alice_sends = pow(self.generator, self.alice_secret, self.prime)
was_generated.append(True)
elif True in was_generated:
raise ValueError("You can't generate a valid DHCryptosystem like that.")
if self.bob_sends is None:
self.bob_sends = pow(self.generator, self.bob_secret, self.prime)
was_generated.append(True)
elif True in was_generated:
raise ValueError("You can't generate a valid DHCryptosystem like that.")
if self.alice_key is None:
self.alice_key = pow(self.bob_sends, self.alice_secret, self.prime)
was_generated.append(True)
elif True in was_generated:
raise ValueError("You can't generate a valid DHCryptosystem like that.")
if self.bob_key is None:
self.bob_key = pow(self.alice_sends, self.bob_secret, self.prime)
was_generated.append(True)
elif True in was_generated:
raise ValueError("You can't generate a valid DHCryptosystem like that.")
def __repr__(self):
return (
f'<DHCryptosystem prime="{self.prime}" generator="{self.generator}" '
f'alice_secret="{self.alice_secret}" bob_secret="{self.bob_secret}" '
f'alice_sends="{self.alice_sends}" bob_sends="{self.bob_sends}" '
f'alice_key="{self.alice_key}" bob_key="{self.bob_key}">'
)
[docs]class DHCracker:
@classmethod
def _chunker(cls, num_chunks: int, prime: int) -> list:
"""Splits the range into num_cpus size list of subranges.
Args:
num_chunks (int): Number of chunks to create
prime (int): Prime number to get the range from
Returns:
list: List of ranges to iterate through
"""
chunk_size = int(prime / num_chunks)
prev_chunk = 0
chunks = []
for i in range(num_chunks):
this_chunk_start = prev_chunk
remaining = prime - chunk_size
if remaining < chunk_size:
chunks.append(range(this_chunk_start, this_chunk_start + remaining + 1))
else:
chunks.append(range(this_chunk_start, this_chunk_start + chunk_size + 1))
prev_chunk = this_chunk_start + chunk_size + 1
return chunks
@classmethod
def _multi_cracking(cls, crack_me, chunks: list) -> int or None:
"""Used for spawning cracker process for each subrange if the prime
Args:
crack_me (CrackMeDH object): Object containing the publicly know values of the cryptosystem.
chunks (list): List of iterators
Returns:
int or None: Key (int) if it was found, else None
"""
jobs = []
key = multiprocessing.Value("i", -1)
for chunk in chunks:
process = multiprocessing.Process(target=cls._cracker, args=(crack_me, chunk, key))
process.start()
jobs.append(process)
for process in jobs:
process.join()
return key.value if not key.value == -1 else None
@classmethod
def _cracker(cls, crack_me, chunk: range, key: multiprocessing.Value):
"""Where the magic of brute force happens
Args:
crack_me (CrackMeDH object): Object containing the publicly know values of the cryptosystem.
chunk (range): list containing the start value and end+1 from whixh we create range
key (multiprocessing.Value): thread safe variable used to save the foud key
"""
for i in chunk:
if key.value != -1:
return
test = pow(crack_me.generator, i, crack_me.prime)
if test == crack_me.alice_sends:
with key.get_lock():
key.value = pow(crack_me.bob_sends, i, crack_me.prime)
break
if test == crack_me.bob_sends:
with key.get_lock():
key.value = pow(crack_me.alice_sends, i, crack_me.prime)
break
[docs] @classmethod
def brute_force(cls, crack_me, num_cpus: int) -> int or None:
"""Calculates the DHCryptosystem key by utilizing multiprocessing enhanced brute force. CPU and time intensive.
Args:
crack_me (DHCryptosystem object): Needs to be containing the publicly known values of the cryptosystem.
num_cpus (int): Number of CPU cores to utilize.\
Do not exceed the number of logical cores your CPU has, this will result in slower execution.
Returns:
int or None: int if a key was found, else None
"""
chunks = cls._chunker(num_cpus, crack_me.prime)
key = cls._multi_cracking(crack_me, chunks)
return key
[docs] @classmethod
def baby_step(cls, crack_me) -> int or None:
"""Discrete logarithm problem solution using the Baby-step Giant-step algorithm. RAM intensive.
Args:
crack_me (DHCryptosystem object): Object containing the publicly know values of the cryptosystem.
Returns:
int or None: int if a key was found, else None
"""
N = math.isqrt(crack_me.prime) + 1
baby_steps_tabulka = {}
baby_step = 1
for i in range(N + 1):
baby_steps_tabulka[baby_step] = i
baby_step = (baby_step * crack_me.generator) % crack_me.prime
inverzni_k_N = pow(crack_me.generator, (crack_me.prime - 2) * N, crack_me.prime)
giant_step = crack_me.alice_sends
for j in range(N + 1):
if giant_step in baby_steps_tabulka:
temp = (j * N) + baby_steps_tabulka[giant_step]
log = pow(crack_me.generator, temp, crack_me.prime)
if log == crack_me.alice_sends:
cracked_key = pow(crack_me.bob_sends, temp, crack_me.prime)
return int(cracked_key)
if log == crack_me.bob_sends:
cracked_key = pow(crack_me.alice_sends, temp, crack_me.prime)
return int(cracked_key)
else:
giant_step = (giant_step * inverzni_k_N) % crack_me.prime
return None
[docs] @classmethod
def mov_attack(cls, secret: int, g: int, order: int) -> int or None:
"""The MOV attack on Elliptic curve DH.
Args:
secret (int): Secret to be cracked
g (int)
order (int)
Returns:
int or None: int if secret was cracked, else None
"""
for i in range(1, order + 1):
a = g ** i % (order + 1)
if a == secret:
return i
return None