( reference : Machine Learning Data Lifecycle in Production )
Assignment 1. TensorFlow Data Validation (TFDV)
Goal
- generate & visualize statistics from df
- infer data schema
- detect & fix anomalies
1. Setup & Imports
import os
import pandas as pd
import tensorflow as tf
import tempfile, urllib, zipfile
import tensorflow_data_validation as tfdv
from tensorflow.python.lib.io import file_io
from tensorflow_data_validation.utils import slicing_util
from tensorflow_metadata.proto.v0.statistics_pb2 import DatasetFeatureStatisticsList, DatasetFeatureStatistics
tf.get_logger().setLevel('ERROR')
2. Import Dataset
# replace '?' with NA
df = pd.read_csv('dataset_diabetes/diabetic_data.csv', header=0, na_values = '?')
df.shape # (101766, 50)
(1) Data Split
Data Split
- train : eval : serving = 70 : 15 : 15
Drop label column
- only in SERVING df
def prepare_data_splits_from_dataframe(df):
    # (1) Data Split Length
    train_len = int(len(df) * 0.7)
    eval_serv_len = len(df) - train_len
    eval_len = eval_serv_len // 2
    serv_len = eval_serv_len - eval_len 
 
    # (2) Data Split 
    train_df = df.iloc[:train_len].sample(frac=1, random_state=48).reset_index(drop=True)
    eval_df = df.iloc[train_len: train_len + eval_len].sample(frac=1, random_state=48).reset_index(drop=True)
    serving_df = df.iloc[train_len + eval_len: train_len + eval_len + serv_len].sample(frac=1, random_state=48).reset_index(drop=True)
 
    # (3) Drop label column
    serving_df = serving_df.drop(['readmitted'], axis=1)
    return train_df, eval_df, serving_df
train_df, eval_df, serving_df = prepare_data_splits_from_dataframe(df)
- train_df : (71236, 50)
- eval_df : (15265, 50)
- serving_df : (15265, 49)
3. Generate & Visualize Statistics
make descriptive statistics from dataset ( = EDA )
(1) Remove Irrelevant Features
- class : tfdv.StatsOptions- params : feature_allowlist: features to include
 
- params : 
- 일부 변수 제거
features_to_remove = {'encounter_id', 'patient_nbr'}
approved_cols = [col for col in df.columns if (col not in features_to_remove)]
- StatsOptions class & 반드시 포함시킬 변수 지정
stats_options = tfdv.StatsOptions(feature_allowlist=approved_cols)
print(stats_options.feature_allowlist)
['race', 'gender', 'age', 'weight', 'admission_type_id', 'discharge_disposition_id', 'admission_source_id', 'time_in_hospital', 'payer_code', 'medical_specialty', 'num_lab_procedures', 'num_procedures', 'num_medications', 'number_outpatient', 'number_emergency', 'number_inpatient', 'diag_1', 'diag_2', 'diag_3', 'number_diagnoses', 'max_glu_serum', 'A1Cresult', 'metformin', 'repaglinide', 'nateglinide', 'chlorpropamide', 'glimepiride', 'acetohexamide', 'glipizide', 'glyburide', 'tolbutamide', 'pioglitazone', 'rosiglitazone', 'acarbose', 'miglitol', 'troglitazone', 'tolazamide', 'examide', 'citoglipton', 'insulin', 'glyburide-metformin', 'glipizide-metformin', 'glimepiride-pioglitazone', 'metformin-rosiglitazone', 'metformin-pioglitazone', 'change', 'diabetesMed', 'readmitted']
(2) Generate Training Statistics
output 형태 : DatasetFeatureStatisticsList
train_stats = tfdv.generate_statistics_from_dataframe(train_df, stats_options)
datasets {
  num_examples: 71236
  features {
    type: STRING
    string_stats {
      common_stats {
        num_non_missing: 69868
        num_missing: 1368
        min_num_values: 1
        max_num_values: 1
        avg_num_values: 1.0
        num_values_histogram {
          buckets {
            low_value: 1.0
            high_value: 1.0
            sample_count: 6986.8
          }
          buckets {
            low_value: 1.0
            high_value: 1.0
            sample_count: 6986.8
          }
          buckets {
            low_value: 1.0
            high_value: 1.0
...
    path {
      step: "readmitted"
    }
  }
}
- feature 개수 ( 48개 )
len(train_stats.datasets[0].features)
- data 개수 ( 71236개 )
train_stats.datasets[0].num_examples
- feature 명 ( 첫 번째 & 마지막)
train_stats.datasets[0].features[0].path.step[0] # race
train_stats.datasets[0].features[-1].path.step[0] # readmitted
(3) Visualize Training Statistics
tfdv.visualize_statistics(train_stats)

4. Infer a data schema
schema : 데이터의 특성을 정의함
- error 탐지에 사용될 수 있음
schema = tfdv.infer_schema(train_stats)
tfdv.display_schema(schema)


TRAINING DATASET만을 대상으로 data schema를 infer 한다
- evaluation & serving dataset을 대상으로 각각의 statistics를 계산한 뒤, 지금 이 schema와의 비교를 통해 anomaly, drift, skew등을 발견한다
len(schema.feature) # schema 내 feature의 개수 48
list(schema.feature)[1].domain #  2번째 feature
5. Calculate, Visualize, Fix Evaluation Anomalies
(1) Compare Training & Evaluation Statistics
- statistics 만들기
    - 이전과 동일, but 이번에는 evaluation data 넣어주기
 
eval_stats = tfdv.generate_statistics_from_dataframe(eval_df, stats_options=stats_options)
- visualize
지정해야할 파라미터들
- lhs_statistics : (좌) stat
- rhs_statistics : (우) stat
- lhs_name  : (좌) name
- rhs_name : (우) name
tfdv.visualize_statistics(lhs_statistics=eval_stats, 
                          rhs_statistics=train_stats,
                          lhs_name='EVAL_DATASET', 
                          rhs_name='TRAIN_DATASET')

(2) Detect Anomalies
위의 시각화 dashboard를 확인해보면, 특정한 “범주형 변수 glimepiride-pioglitazone”이
Training data에 1개의 unique 값과, Evaluation data에 2개 있음을 확인할 수 있다.
train_df["glimepiride-pioglitazone"].describe()
count     71236
unique        1
top          No
freq      71236
Name: glimepiride-pioglitazone, dtype: object
eval_df["glimepiride-pioglitazone"].describe()
count     15265
unique    2    
top       No   
freq      15264
Name: glimepiride-pioglitazone, dtype: object
이걸 일일히 다 확인하기가,,,,,
TFDV function을 사용하여 확인하자!
- tfdv.validate_statistics()- 인자 1) DatasetFeatureStatisticsList
- 인자 2) Schema
 
- tfdv.display_anomalies()- 인자 ) anomalies
 
def calculate_and_display_anomalies(statistics, schema):
    anomalies = tfdv.validate_statistics(statistics, schema)
    tfdv.display_anomalies(anomalies)
확인 해본 결과, 2개의 변수에 anomaly가 있음을 확인할 수 있다.
- [ ‘glimepiride-pioglitazone’ 변수 ]
    - schema ( TRAIN only )에는 없는 “Steady”라는 값이 evaluation에서 발견
 
- [ ‘medical_specialty’ 변수 ]
    - schema ( TRAIN only )에는 없는 “Neurophysiology”라는 값이 evaluation에서 발견
 
calculate_and_display_anomalies(eval_stats, schema=schema)

(3) Fix evaluation anomalies in the schema
위에서, evaluation에만 발견된 값을 (train) schema에 넣어줌으로써 문제 해결!
- domain.value.append(“feature_value”)
glimepiride_pioglitazone_domain = tfdv.get_domain(schema, 'glimepiride-pioglitazone') 
glimepiride_pioglitazone_domain.value.append('Steady')
medical_specialty_domain = tfdv.get_domain(schema, 'medical_specialty') 
medical_specialty_domain.value.append('Neurophysiology')
더 이상의 anomaly 가 발견되지 않는다 :)
calculate_and_display_anomalies(eval_stats, schema=schema)
6. Schema Environments
( 일반적으로 ) pipeline 내에 있는 모든 데이터셋들은 동일한 schema를 가져야 한다.
하지만, 예외가 있는데….
- label column : 이건 serving dataset에는 없다!
check anomalies in serving set
options = tfdv.StatsOptions(schema=schema, 
                            infer_type_from_schema=True, 
                            feature_allowlist=approved_cols)
이번엔, serving data를 넣어준다.
- readimitted칼럼은, serving dataset을 위해 drop됨을 알 수 있다.
serving_stats = tfdv.generate_statistics_from_dataframe(serving_df, 
                                                        stats_options=options)
calculate_and_display_anomalies(serving_stats, schema=schema)

Anomaly 기준 바꾸기
- Get the feature and relax to match 90% of the domain
payer_code = tfdv.get_feature(schema, 'payer_code')
payer_code.distribution_constraints.min_domain_mass = 0.9 
medical_specialty = tfdv.get_feature(schema, 'medical_specialty')
medical_specialty.distribution_constraints.min_domain_mass = 0.9 
calculate_and_display_anomalies(serving_stats, schema=schema)

(1) Modify Domains
여러 변수에서, 가질 수 있는 값이 총 4종류 (down, no, steady, up)이 있는 것을 알 수있다.
하지만, 많은 train df내의 변수들에는, 이 4종류가 전부 없는 경우가 있다. ( 아래 사진 확인 )

따라서, 이러한 4개 미만의 값들을 가진 변수들의 도메인을 전부 변경해주자! ( overwrite )
( 4개의 종류를 모두 가지고 있는 변수 중 하나인 metformin 을 사용 )
def modify_domain_of_features(features_list, schema, to_domain_name):
    for feature in features_list:
        tfdv.set_domain(schema, feature, to_domain_name)
    return schema
domain_change_features = ['repaglinide', 'nateglinide', 'chlorpropamide', 'glimepiride', 
                          'acetohexamide', 'glipizide', 'glyburide', 'tolbutamide', 'pioglitazone', 'rosiglitazone', 'acarbose', 'miglitol', 'troglitazone', 'tolazamide', 'examide', 'citoglipton', 'insulin', 'glyburide-metformin', 'glipizide-metformin', 'glimepiride-pioglitazone', 'metformin-rosiglitazone', 'metformin-pioglitazone']
schema = modify_domain_of_features(domain_change_features, schema, 'metformin')
tfdv.display_schema(schema)
마지막으로 확인해보자.
calculate_and_display_anomalies(serving_stats, schema=schema)

serving set에 없어야할 readmitted 칼럼만이 anomaly로 탐지되고, 나머지는 잘 해결된 것을 알 수 있다 :)
하지만…사실 이 또한 anomaly라고 볼 수 없다 ( 당연한거니까…! )
따라서, 이러한 오류(?) anomaly 문구(?)를 뜨지 않게 해보자.
schema.default_environment.append('TRAINING')
schema.default_environment.append('SERVING')
tfdv.get_feature(schema, 'readmitted').not_in_environment.append('SERVING')
serving_anomalies_with_env = tfdv.validate_statistics(serving_stats, schema, environment='SERVING')
tfdv.display_anomalies(serving_anomalies_with_env)
# no anomalies found!
7. Checking Data Drift & Skew
지금까지는 data validation 과정을 위해 anomaly detection을 했다.
하지만 이게 전부가 아니다. 우리는 data drift & data skew 또한 확인해야 한다~
특정 변수 ( diabetesMed )를 대상으로 skew 확인하기
특정 변수 ( payer_code )를 대상으로 drift 확인하기
- 기준 : L-infinity distance 0.3 ( 도메인 지식 필요 )
diabetes_med = tfdv.get_feature(schema, 'diabetesMed')
diabetes_med.skew_comparator.infinity_norm.threshold = 0.03 
payer_code = tfdv.get_feature(schema, 'payer_code')
payer_code.drift_comparator.infinity_norm.threshold = 0.03 
skew_drift_anomalies = tfdv.validate_statistics(train_stats, schema,
                                          previous_statistics=eval_stats,
                                          serving_statistics=serving_stats)
tfdv.display_anomalies(skew_drift_anomalies)

8. Display Stats for Data Slices
데이터를 slice로 나눈 뒤 ( ex. 특정 변수의 값을 기준으로 ),
이에 대한 분석을 따로 진행할 수도 있다.
# dataset들이 각각의 요소로 들어가 있는 "dataset_list"
def split_datasets(dataset_list):
    datasets = []
    for dataset in dataset_list.datasets:
        proto_list = DatasetFeatureStatisticsList()
        proto_list.datasets.extend([dataset])
        datasets.append(proto_list)
    return datasets
# 시각화
def display_stats_at_index(index, datasets):
    if index < len(datasets):
        print(datasets[index].datasets[0].name)
        tfdv.visualize_statistics(datasets[index])
def sliced_stats_for_slice_fn(slice_fn, approved_cols, dataframe, schema):
  # (1) slice할 옵션 ( ex. 나누는 기준 )
  slice_stats_options = tfdv.StatsOptions(schema=schema,
                                          slice_functions=[slice_fn],
                                          infer_type_from_schema=True,
                                          feature_allowlist=approved_cols)
  # (2) df를 csv로 바꾸기
  # ( slice function은, `tfdv.generate_statistics_from_csv`에서만 작동하므로 ) 
  CSV_PATH = 'slice_sample.csv'
  dataframe.to_csv(CSV_PATH)
  # (3) 나누어진 dataset 바탕으로 statistics 계산
  sliced_stats = tfdv.generate_statistics_from_csv(CSV_PATH, stats_options=slice_stats_options)
  # (4) DatasetFeatureStatisticsList() 형태로 변환 후 반환
  slice_info_datasets = split_datasets(sliced_stats)
  return slice_info_datasets
# (1) 나눌 기준 설정
slice_fn = slicing_util.get_feature_value_slicer(features={'medical_specialty': None})
# (2) 나눠진 데이터셋 ( 형태 : DatasetFeatureStatisticsList를 각각의 요소로 가지는 list )
slice_datasets = sliced_stats_for_slice_fn(slice_fn, approved_cols, dataframe=train_df, schema=schema)
medical_specialty 변수에는, 총 68종류의 값들이 있다.
이 각각을 조건으로 slicing하여, 필터링할 수 있다.

- ex) 10번째 : medical_specialty_Gastroenterology- medical_specialty == ‘Gastroenterology’ 조건
 
display_stats_at_index(10, slice_datasets) 

9. Freeze the schema
이제, 위처럼 만든 schema를 “frozen”된 상태로 저장할 수 있다.
이는, 새롭게 들어노느 데이터에 대해 validation을 진행할 때 사용할 수 있다!
OUTPUT_DIR = "output"
file_io.recursive_create_dir(OUTPUT_DIR)
schema_file = os.path.join(OUTPUT_DIR, 'schema.pbtxt')
tfdv.write_schema_text(schema, schema_file) 
