Source code for train_lib.security.Hashing

from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from typing import List, Union, BinaryIO
import os


[docs]def hash_immutable_files(immutable_files: Union[List[str], List[BinaryIO]], user_id: str, session_id: bytes, binary_files=False, ordered_file_list: list = None, immutable_file_names: List[str] = None): """ Calculates the hash of all immutable files in the train, A, R, Q as well as the :param binary_files: boolean parameter indicating whether the files are binary files or file paths :param user_id: :param session_id: :param immutable_files: :return: byte object representing the hash of all files """ digest = hashes.Hash(hashes.SHA512(), backend=default_backend()) digest.update(user_id.encode("utf-8")) query_file = None if binary_files: if immutable_file_names: for f in ordered_file_list: # TODO ugly workaround fix this if f[:2] == "./": f = f[2:] index = immutable_file_names.index(f) if f == "query.json": query_file = immutable_files[index].read() data = immutable_files[index].read() digest.update(data) else: for file in immutable_files: digest.update(file.read()) else: for file in ordered_file_list: with open(file, "rb") as f: digest.update(f.read()) digest.update(session_id) if query_file: digest.update(query_file) return digest.finalize()
[docs]def hash_results(result_files: Union[List[str], List[BinaryIO]], session_id: bytes, binary_files=False): """ Creates a hash of the results of train execution :param result_files: List :param session_id: :return: """ digest = hashes.Hash(hashes.SHA512(), backend=default_backend()) if binary_files: for file in result_files: data = file.read() digest.update(data) else: for file in result_files: with open(file, "rb") as f: digest.update(f.read()) digest.update(session_id) return digest.finalize()