Source code for train_lib.fhir.fhir_k_anonymity

from typing import List
import pandas as pd
from pandas.api.types import is_datetime64_any_dtype as is_datetime
from pandas.api.types import is_numeric_dtype, is_string_dtype, is_categorical_dtype
from icecream import ic


[docs]def is_k_anonymized(df: pd.DataFrame, k: int = 3, id_cols: List[str] = None): """ Checks if a dataframe satisfies k-anonymity for the given k. If id_cols is given only these columns are checked :param df: dataframe to check for k-anonymity :param k: the number samples that need to have the same values :param id_cols: optional subset of columns in the dataframe that are exclusively checked for k-anonymity :return: boolean indicating wether the dataframe satisfies k-anonymity """ for index, row in df.iterrows(): if id_cols: query = ' & '.join([f'{col} == "{row[col]}"' for col in id_cols]) else: query = ' & '.join([f'{col} == "{row[col]}"' for col in df.columns]) rows = df.query(query) if rows.shape[0] < k: return False return True
[docs]def anonymize(df: pd.DataFrame, k: int = 3, id_cols: List[str] = None) -> pd.DataFrame: """ Attempts to generalize the given dataframe to make it k-anonymized :param df: dataframe to check :param k: :param id_cols: optional parameter specifying a subset of columns in the dataframe to generalize :return: """ anon_df = df.copy() # If id cols are given anonymize those otherwise use all columns for col in id_cols if id_cols else df.columns: if is_datetime(df[col]): anon_df[col] = generalize_datetime_column(df[col]) elif is_numeric_dtype(df[col]): anon_df[col] = generalize_numeric_column(df[col]) elif is_string_dtype(df[col]) or is_categorical_dtype(df[col]): # TODO categorical/string variable handling anon_df[col] = df[col] else: anon_df[col] = anon_df[col] if is_k_anonymized(anon_df, k=k): return anon_df else: print("More generalization required")
[docs]def generalize_numeric_column(num_col: pd.Series): return num_col
[docs]def generalize_datetime_column(date_col: pd.Series, level: int = 2): col = pd.to_datetime(date_col) if level == 2: generalized_col = col.apply(lambda x: x.strftime('m-%Y')) return generalized_col elif level == 3: generalized_col = col.apply(lambda x: x.strftime('%Y')) return generalized_col