Source code for train_lib.security.SecurityProtocol

from .KeyManager import KeyManager
from .SymmetricEncryption import FileEncryptor
from .SecurityErrors import ValidationError
from .Hashing import *
from train_lib.docker_util.docker_ops import *
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.asymmetric import utils, padding
import os
from typing import Union
import time
from tarfile import TarInfo
import docker
import logging


[docs]class SecurityProtocol: """ Class that performs the security protocol outlined in the security concept :param station_id: PID used to identify the station and to access the correct security values inside the train_config.json :type station_id: str :param config: either a string containing a path to the train_config.json or a dictionary containing the values parsed from said json file :type config: Union[str, dict] :param results_dir: path to the directory containing the results :param train_dir: path to the directory containing the immutable files defining a train """ def __init__(self, station_id: str, config: Union[str, dict], results_dir: str = None, train_dir: str = None, docker_client=None): self.station_id = station_id self.key_manager = KeyManager(train_config=config) self.results_dir = results_dir self.train_dir = train_dir self.docker_client = docker_client # self.redis = redis.Redis(decode_responses=True)
[docs] def pre_run_protocol(self, img: str = None, private_key_path: str = None, immutable_dir: str = "/opt/pht_train", mutable_dir: str = "/opt/pht_results"): """ Decrypts the files contained in the train. And performs the steps necessary to validate a train before it is being run :param img: identifier of the image from which the security relevant files will be extracted :param private_key_path: :param immutable_dir: :param mutable_dir: :return: """ logging.info("Executing pre-run protocol...") # Execute the protocol with directly passed files and the instances config file if img and private_key_path: logging.info("Extracting files from image...") # Get the content of the immutable files from the image as ByteObjects immutable_files, file_names = files_from_archive(extract_archive(img, immutable_dir)) # Check that no files have been added or removed assert len(immutable_files) == len(self.key_manager.get_security_param("immutable_file_list")) self.validate_immutable_files( files=immutable_files, immutable_file_names=file_names, ordered_file_list=self.key_manager.get_security_param("immutable_file_list")) if not self._is_first_station_on_route(): self.verify_digital_signature() file_encryptor = FileEncryptor(self.key_manager.get_sym_key(self.station_id, private_key_path=private_key_path)) # Decrypt all previously encrypted files mutable_files, mf_members, mf_dir = result_files_from_archive(extract_archive(img, mutable_dir)) decrypted_files = file_encryptor.decrypt_files(mutable_files, binary_files=True) self.validate_previous_results(files=decrypted_files) archive = self._make_results_archive(mf_dir, mf_members, decrypted_files) logging.info("Adding decrypted files to image") # print(archive.name) self._update_image(img, archive, results_path="/opt") logging.info("Pre-run protocol success") return # Execute the protocol parsing the files from the given train and results directories elif self.results_dir and self.train_dir: self.validate_immutable_files(self.train_dir) if not self._is_first_station_on_route(): self.verify_digital_signature() files = self._parse_files(self.results_dir) file_encryptor = FileEncryptor(self.key_manager.get_sym_key(self.station_id)) # Decrypt all previously encrypted files file_encryptor.decrypt_files(files) self.validate_previous_results() logging.info("Pre-run protocol success") return else: raise ValueError("Neither instance variables for train and results directories nor the the mutable files" "and immutable files arguments are set.")
[docs] def post_run_protocol(self, img: str = None, private_key_path: str = None, mutable_dir: str = "/opt/pht_results"): """ Updates the necessary values in the train_config.json and encrypts the updated files after a successful train execution. :param img: identifier of the image <repository>:<tag> :param private_key_path: path to the private key associated with the current station and with the corresponding public key registered in vault under the PID chosen by the station :param mutable_dir: path to the directory in which the mutable files are stored :return: """ # execute the post run station side extracting the relevant files from the image if img and private_key_path: logging.info(f"Executing post-run protocol - target image: {img} \n") # Get the mutable files and tar archive structure mutable_files, mf_members, mf_dir = result_files_from_archive(extract_archive(img, mutable_dir)) # Run the post run protocol encrypted_mutable_files = self._post_run_outside_container(mutable_files, private_key_path) results_archive = self._make_results_archive(mf_dir, mf_members, encrypted_mutable_files) results_archive.seek(0) # update the container with the encrypted files self._update_image(img, results_archive, results_path="/opt", config_path="/opt") logging.info(f"Successfully executed post run protocol on img: {img}") # execute the post run protocol running inside the docker container else: logging.info("Executing post-run protocol: \n") self._post_run_in_container()
def _post_run_outside_container(self, mutable_files: List[BytesIO], private_key_path: str) -> List[BytesIO]: """ Performs the post run protocol on the mutable files contained in an image. Consisting of encrypting the mutable files and updateing the train_config.json. The extracted mutable files are encrypted and the train_config.json is updated to reflect the current state of the train. The changed files are written to the base image and the resulting image is tagged as latest. :param mutable_files: list of BytesIO objects containing the mutable files for a train :param private_key_path: path to private key used to sign the results :return: """ logging.info(f"prev results hash {self.key_manager.get_security_param('e_d')}", ) # Update the hash value of the mutable files e_d = hash_results(result_files=mutable_files, session_id=bytes.fromhex(self.key_manager.get_security_param("session_id")), binary_files=True) self.key_manager.set_security_param("e_d", e_d.hex()) logging.info(f"new results hash: {self.key_manager.get_security_param('e_d')}") # Load the local private key and sign the hash of the results files sk = self.key_manager.load_private_key(key_path=private_key_path) e_d_sig = sk.sign(e_d, padding.PSS(mgf=padding.MGF1(hashes.SHA512()), salt_length=padding.PSS.MAX_LENGTH), utils.Prehashed(hashes.SHA512())) self.key_manager.set_security_param("e_d_sig", e_d_sig.hex()) # Update the digital signature of the train self.sign_digital_signature(sk) for file in mutable_files: file.seek(0) # Encrypt the files new_sym_key = self.key_manager.generate_symmetric_key() file_encryptor = FileEncryptor(new_sym_key) encrypted_results = file_encryptor.encrypt_files(mutable_files, binary_files=True) # Encrypt the new symmetric key with the available public keys and store them in the image encrypted_symmetric_keys = self.key_manager.encrypt_symmetric_key(new_sym_key) self.key_manager.set_security_param("encrypted_key", encrypted_symmetric_keys) # at the last station encrypt the symmetric key using the rsa public key of the user user_public_key = self.key_manager.load_public_key( self.key_manager.get_security_param("rsa_user_public_key")) user_encrypted_sym_key = self.key_manager._rsa_pk_encrypt(new_sym_key, user_public_key) self.key_manager.set_security_param("user_encrypted_sym_key", user_encrypted_sym_key) return encrypted_results def _update_image(self, img, results_archive: BytesIO, results_path: str, config_path: str = None): """ Update the base image with the encrypted results files and the updated train_config.json and tag it as latest :param img: identifier of the image <repository>:<tag> :param results_archive: tar archive containing the encrypted results :param results_path: path to write the results to :param config_path: path to write the updated train_config.json :return: """ # If a config path is given update the train config inside the container client = self.docker_client if self.docker_client else docker.from_env() base_image = img.split(":")[0] + ":" + "base" container = client.containers.create(base_image) if config_path: logging.info("Updating train config") config_archive = self._make_train_config_archive() config_archive.seek(0) # add_archive(img, config_archive, config_path) container.put_archive(config_path, config_archive) # add the updated results archive logging.info("Adding encrypted result files") container.put_archive(results_path, results_archive) # add_archive(img, results_archive, results_path) logging.info("Updating user key file ") user_key = self._make_user_key() # add user key to opt directory # add_archive(img, user_key, "/opt") container.put_archive("/opt", user_key) # Tag container as latest repo, tag = img.split(":") container.commit(repository=repo, tag=tag) container.wait() container.remove() @staticmethod def _make_results_archive(archive_members, file_members, updated_files): """ Creates a tar archive containing the updated results :param archive_members: tar directory structure of the pht_results directory :param file_members: the members of the archive representing actual files :param updated_files: updated files associated with the file_members that will be written to the archive :return: updated tar archive to be copied into the new image. """ archive_obj = BytesIO() tar = tarfile.open(fileobj=archive_obj, mode="w") # Create directory structure for member in archive_members: if member not in file_members: tar.addfile(member) # Add the updated members to a new archive for i, file_member in enumerate(file_members): file_size = updated_files[i].getbuffer().nbytes updated_files[i].seek(0) file_member.size = file_size file_member.mtime = time.time() tar.addfile(file_member, fileobj=updated_files[i]) # Close tarfile and reset BytesIO tar.close() archive_obj.seek(0) return archive_obj def _make_train_config_archive(self) -> BytesIO: """ Create in memory tar archive containing the train configuration json file :return: """ archive_obj = BytesIO() tar = tarfile.open(fileobj=archive_obj, mode="w") data = self.key_manager.save_keyfile(binary_file=True) # Create TarInfo Object based on the data info = TarInfo(name="train_config.json") info.size = data.getbuffer().nbytes info.mtime = time.time() # add config data and reset the archive tar.addfile(info, data) tar.close() archive_obj.seek(0) return archive_obj def _make_user_key(self): archive_obj = BytesIO() tar = tarfile.open(fileobj=archive_obj, mode="w") # Extract user key from config and convert it to bytesio data = BytesIO(bytes.fromhex(self.key_manager.get_security_param("user_encrypted_sym_key"))) info = TarInfo(name="user_sym_key.key") info.size = data.getbuffer().nbytes info.mtime = time.time() tar.addfile(info, data) tar.close() archive_obj.seek(0) return archive_obj def _post_run_in_container(self): """ Execute the post-run protocol inside of the container :return: """ # Update the values hash and signature of the results files = self._parse_files(self.results_dir) e_d = hash_results(files, bytes.fromhex(self.key_manager.get_security_param("session_id"))) self.key_manager.set_security_param("e_d", e_d.hex()) sk = self.key_manager.load_private_key(env_key="RSA_STATION_PRIVATE_KEY") e_d_sig = sk.sign(e_d, padding.PSS(mgf=padding.MGF1(hashes.SHA512()), salt_length=padding.PSS.MAX_LENGTH), utils.Prehashed(hashes.SHA512())) self.key_manager.set_security_param("e_d_sig", e_d_sig.hex()) # Sign the train after execution self.sign_digital_signature(sk) # Write new values to file and encrypt files with new symmetric key new_sym_key = self.key_manager.generate_symmetric_key() file_encryptor = FileEncryptor(new_sym_key) # Encrypt the files file_encryptor.encrypt_files(files) self.key_manager.set_security_param("encrypted_key", self.key_manager.encrypt_symmetric_key(new_sym_key)) # at the last station encrypt the symmetric key using the rsa public key of the user user_public_key = self.key_manager.load_public_key( self.key_manager.get_security_param("rsa_user_public_key")) user_encrypted_sym_key = self.key_manager._rsa_pk_encrypt(new_sym_key, user_public_key) self.key_manager.set_security_param("user_encrypted_sym_key", user_encrypted_sym_key) self.key_manager.save_keyfile() logging.info("Post-protocol success")
[docs] def validate_immutable_files(self, train_dir: str = None, files: list = None, ordered_file_list: List[str] = None, immutable_file_names: List[str] = None): """ Checks if the hash of the immutable files is the same as the one stored at the creation of the train :raises ValidationError: when the files to be executed do not match the agreed upon files :return: """ # check the signature of the stored hash value using ec signature verifying that it is created by the user user_pk = self.key_manager.load_public_key(self.key_manager.get_security_param("rsa_user_public_key")) e_h = bytes.fromhex(self.key_manager.get_security_param("e_h")) e_h_sig = bytes.fromhex(self.key_manager.get_security_param("e_h_sig")) # now check before the run that no immutable files have changed, based on stored hash if train_dir: immutable_files = self._parse_files(train_dir) immutable_files = [str(file) for file in immutable_files if "train_config.json" not in str(file)] current_hash = hash_immutable_files(immutable_files, str(self.key_manager.get_security_param("user_id")), bytes.fromhex(self.key_manager.get_security_param("session_id")), ) elif files: current_hash = hash_immutable_files(files, str(self.key_manager.get_security_param("user_id")), bytes.fromhex(self.key_manager.get_security_param("session_id")), binary_files=True, ordered_file_list=ordered_file_list, immutable_file_names=immutable_file_names ) logging.info(f"e_h: {e_h}") logging.info(f"file hash: {current_hash}") if e_h != current_hash: raise ValidationError("Immutable Files have changed") # Verify that the hash value corresponds with the signature user_pk.verify(e_h_sig, current_hash, padding.PSS(mgf=padding.MGF1(hashes.SHA512()), salt_length=padding.PSS.MAX_LENGTH), utils.Prehashed(hashes.SHA512()))
[docs] def validate_previous_results(self, files: List[BinaryIO] = None): """ Verify that the results from the execution of the previous station did not change, by hashing the stored results from the previous station and comparing it with the decrypted stored hash from the previous station """ # verify the hash of the results of the previous station prev_results_hash = self.key_manager.get_security_param("e_d") results_sig = self.key_manager.get_security_param("e_d_sig") # Load the public key of the station ds = self.key_manager.get_security_param("digital_signature") # Get public key of the previous station station_public_key = self.key_manager.get_security_param("rsa_public_keys")[ds[-1]["station"]] station_public_key = self.key_manager.load_public_key(station_public_key) if files: results_hash = hash_results(files, session_id=bytes.fromhex(self.key_manager.get_security_param("session_id")), binary_files=True) else: files = self._parse_files(self.results_dir) results_hash = hash_results(files, bytes.fromhex(self.key_manager.get_security_param("session_id"))) station_public_key.verify(bytes.fromhex(results_sig), results_hash, padding.PSS(mgf=padding.MGF1(hashes.SHA512()), salt_length=padding.PSS.MAX_LENGTH), utils.Prehashed(hashes.SHA512())) # Compare with the files currently present in the train if results_hash != bytes.fromhex(prev_results_hash): raise ValidationError("The previously hashed results do not match the stored ones")
[docs] def sign_digital_signature(self, sk: RSAPrivateKey): """ Update the digital signature of the train after successful execution of the train. If there is no previous signature present creates a signature based on the session id, otherwise the signature of the previous station is loaded, signed using the current stations private key and appended to the list of signatures stored in the train. :param sk: private key of the currently running station """ ds = self.key_manager.get_security_param("digital_signature") hasher = hashes.Hash(hashes.SHA512(), default_backend()) if ds is None: hasher.update(bytes.fromhex(self.key_manager.get_security_param("session_id"))) digest = hasher.finalize() sig = sk.sign(digest, padding.PSS(mgf=padding.MGF1(hashes.SHA512()), salt_length=padding.PSS.MAX_LENGTH), utils.Prehashed(hashes.SHA512()) ) ds = [{"station": self.station_id, "sig": (sig.hex(), digest.hex())}] self.key_manager.set_security_param("digital_signature", ds) else: hasher.update(bytes.fromhex(ds[-1]["sig"][0])) digest = hasher.finalize() sig = sk.sign(digest, padding.PSS(mgf=padding.MGF1(hashes.SHA512()), salt_length=padding.PSS.MAX_LENGTH), utils.Prehashed(hashes.SHA512()) ) ds.append({"station": self.station_id, "sig": (sig.hex(), digest.hex())}) self.key_manager.set_security_param("digital_signature", ds)
[docs] def verify_digital_signature(self): """ Verifies the digital signature of the train by iterating over the list of signatures and verifying each one using the correct public key stored in the train configuration json :raise: InvalidSignatureError if any of the signed values can not be validated using the provided public keys """ ds = self.key_manager.get_security_param("digital_signature") for sig in ds: public_key = self.key_manager.load_public_key( self.key_manager.get_security_param("rsa_public_keys")[sig["station"]]) public_key.verify(bytes.fromhex(sig["sig"][0]), bytes.fromhex(sig["sig"][1]), padding.PSS(mgf=padding.MGF1(hashes.SHA512()), salt_length=padding.PSS.MAX_LENGTH), utils.Prehashed(hashes.SHA512()) )
def _is_first_station_on_route(self): """ Returns true if current station is the first station on the route :return: """ # Check if there are previous results if not station is first station on route return self.key_manager.get_security_param("e_d") is None @staticmethod def _parse_files(target_dir): """ Parses the exported files from a container and sorts them into relevant categories :param target_dir: directory in which to find all files :return: Tuple consisting of lists of paths for the different file types """ files = list() logging.info("Detecting files...") for (dir_path, dir_names, file_names) in os.walk(target_dir): files += [os.path.join(dir_path, file) for file in file_names] logging.info(f"Found {len(files)} Files") return files def _previous_station_id(self): """ :return: station id of previous station on route """ # get the key of the last entry in the ds dictionary as the previous station id return self.key_manager.get_security_param("digital_signature")[-1]["station"]