安装
pip install XXDScorecard
使用
scorecard developing utilities.
import XXDScorecard.XXDBinning as binning
from sklearn.model_selection import train_test_split
df = pd.read_csv('data.csv')
train_df, test_df = train_test_split(df,
test_size = 0.3,
random_state = 100,
stratify = df.flgGood)
一、数值型
nb = binning.XXDNumberBin()
1. 数值型等频分箱
nb.pct_bin(train_df, 'req_inc_ratio', 'flgGood', max_bin = 10)
2. 分箱结果
nb.get_bin_stats()
3. WOE图
nb.plot_woe()
4. 测试集转woe
nb.trans_to_woe(test_df['req_inc_ratio'])
5. 手动调整分箱
nb.manual_bin(train_df, 'req_inc_ratio', 'flgGood', [20,30,40])
6. 自动单调分箱
nb.monotone_bin(train_df, 'req_inc_ratio', 'flgGood', max_bin = 3)
二、字符型
cb = binning.XXDCharBin()
1. 自动分箱
cb.pct_bin(train_df, 'name', 'flgGood')
2. woe图
cb.plot_woe()
3. 分箱结果
cb.get_bin_stats()
4. 字符型手动分箱
cb.manual_bin(train_df,
'name',
'flgGood',
[['yuqing', 'xuxiaodong'],
['jack ma'],
['yq', 'dd', 'xxd', 'qq']])
5. 测试集转woe
cb.trans_to_woe(test_df['name'])
三、源码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
用于数值型和字符型变量的分箱类
支持
手动分箱
等频分箱
单调分箱
IV、WOE计算及转换
"""
from pandas.api.types import is_string_dtype
from pandas.api.types import is_numeric_dtype
import matplotlib.pyplot as plt
plt.style.use('seaborn')
import pandas as pd
import numpy as np
from pylab import mpl
mpl.rcParams['font.sans-serif'] = ['SimHei'] # 指定默认字体
mpl.rcParams['axes.unicode_minus'] = False # 解决保存图像是负号'-'显示为方块的问题
class XXDNumberBin():
def __init__(self):
self.__bin_stats = None
def get_bin_stats(self):
if self.__bin_stats is not None:
return self.__bin_stats.reset_index(drop=True)
def get_cutoff(self):
if self.__bin_stats is not None:
return self.__bin_stats.Max.dropna().tolist()
def trans_bin_to_woe(self,B):
'''
B: Series
'''
if self.__bin_stats is None:
raise ValueError('ERROR: 尚未调用分箱函数,无法转换!')
woe = self.__bin_stats['WoE'].sort_index()
return B.map(lambda x:woe[x])
def plot_woe(self,title=None):
if self.__bin_stats is None:
raise ValueError('ERROR: 尚未调用分箱函数,无法转换!')
woe = self.__bin_stats[['WoE','Range']].sort_index()
plt.clf()
if title is None:
title = self.__varname
plt.title('{}(WOE)'.format(title))
plt.bar(range(len(woe)), woe.WoE,tick_label=woe.Range)
plt.show()
print('Cutoff:{}'.format(self.get_cutoff()))
def get_iv(self):
if self.__bin_stats is None:
raise ValueError('ERROR: 尚未调用分箱函数,无法转换!')
return self.__bin_stats['TotalIV'].iloc[0]
def get_varname(self):
return self.__varname;
def trans_to_bin(self,X):
'''
如果训练集有缺失:
1)缺失值分到缺失组,
2)小于最小值的分到第一组
3) 超过最大值的分最后一组。
如果训练集没有缺失:
1)缺失值\小于最小值分到第一组;
2)超过最大值的分最后一组
X: series
'''
if self.__bin_stats is None:
raise ValueError('ERROR: 尚未调用分箱函数,无法转换!')
if not is_numeric_dtype(X):
X = X.astype(float)
cuts = self.__bin_stats['Max'].sort_values(na_position ='first')
mx = cuts.max()
return X.map(lambda x:(cuts>=x).idxmax() if x<=mx else cuts.index[-1],na_action='ignore').fillna(cuts.index[0])
def trans_to_woe(self,X):
'''
如果训练集有缺失:
1)缺失值分到缺失组,
2)小于最小值的分到第一组
3) 超过最大值的分最后一组。
如果训练集没有缺失:
1)缺失值\小于最小值分到第一组;
2)超过最大值的分最后一组
X : series
'''
if self.__bin_stats is None:
raise ValueError('ERROR: 尚未调用分箱函数,无法转换!')
if not is_numeric_dtype(X):
X = X.astype(float)
cuts = self.__bin_stats['Max'].sort_values(na_position ='first')
mx = cuts.max()
woe = self.__bin_stats['WoE'].sort_index()
return X.map(lambda x:woe[(cuts>=x).idxmax()] if x<=mx else woe.iloc[-1] ,na_action='ignore').fillna(woe.iloc[0])
def __cc(self,dfx):
mx=dfx.XX.max()
mn=dfx.XX.min()
cnt=len(dfx)
bad=dfx.YY.sum()
good=cnt-bad
return pd.Series({'Var':self.__varname,'Range':'<={:.3f}'.format(mx) if pd.notna(mx) else 'Miss',
'Min':mn, 'Max':mx,'CntRec':cnt,'CntGood':good,'CntBad':bad})
def calc_stats(self,data):
'''
计算woe,iv等。
data: df[['bin','XX',YY']]
'''
res = data.groupby(data['bin']).apply(self.__cc)
cntg= (data.YY==0).sum()
cntb= (data.YY==1).sum()
res['Pct']=res.CntRec/len(data)
res['PctBad']=res.CntBad/cntb
res['PctGood']=res.CntGood/cntg
res['BadRate']=res.CntBad/res.CntRec
res['CumGood']=res.CntGood.cumsum()
res['CumBad']=res.CntBad.cumsum()
res['Odds']=res.BadRate/(1-res.BadRate)
res['LnOdds']=np.log(res.Odds)
res['WoE'] = np.log(res.PctBad/res.PctGood)
res['IV'] = (res.PctBad-res.PctGood)*res.WoE
res['TotalIV'] = res.IV.replace({np.inf:0,-np.inf:0}).sum()
#res=res.append(pd.Series({'Var':x,'Min':XX.min(),'Max':XX.max(),'LnOdds':np.log(),'IV':res.IV.sum()},name='ALL'))
return res
def manual_bin(self,df,x,y,cutoff=[]):
'''
手动分箱
df: 数据
x: 变量名
y: 目标变量
'''
self.__varname = x
XX,YY = df[x],df[y]
assert YY.isin([0,1]).all(),'ERROR: {} 目标变量非0/1!'.format(y)
if not is_numeric_dtype(XX):
XX = XX.astype(float)
data = pd.DataFrame({'XX':XX,'YY':YY})
cnt = XX.count()
assert cnt>0,'ERROR: "{}" 变量值全为 NULL !'.format(x)
edges = pd.Series(cutoff+[np.inf]).sort_values()
mx = edges.max()
data['bin'] = XX.map(lambda x:(edges>=x).idxmax() if x<=mx else edges.index[-1],na_action='ignore').fillna(-1)
self.__bin_stats= self.calc_stats(data)
def pct_bin(self, df, x, y, max_bin = 10, min_pct = 0.06):
'''
等频分箱。
df: 数据
x: 变量名
y: 目标变量
'''
self.__varname = x
XX,YY = df[x],df[y]
assert YY.isin([0,1]).all(),'ERROR: {} 目标变量非0/1!'.format(y)
if not is_numeric_dtype(XX):
XX = XX.astype(float)
data = pd.DataFrame({'XX':XX,'YY':YY})
cnt = XX.count()
assert cnt>0,'ERROR: "{}" 变量值全为 NULL !'.format(x)
min_sample = int(len(XX)*min_pct)
if cnt<= min_sample:
print('WARN: "{}" 非空值少于 {} !'.format(x,min_pct))
nuniq = XX.nunique()
if nuniq<= 50:
print('WARN: "{}" 数值型变量只有 {} 个取值!'.format(x,nuniq))
cut_ok = False
ZZ = XX.rank(pct=1)
while not cut_ok:
edges = pd.Series(np.linspace(0,1,max_bin+1))
bins =ZZ.map(lambda r:(edges>=r).idxmax(),na_action='ignore').fillna(-1)
cut_ok = True
if bins.value_counts().min() < min_sample and cnt>min_sample and max_bin>1:
max_bin=max_bin-1
cut_ok=False
data['bin']=bins
self.__bin_stats= self.calc_stats(data)
def monotone_bin(self,df,x,y,max_bin=10):
'''
单调分箱。
df: 数据
x: 变量名
y: 目标变量
'''
self.__varname = x
XX,YY = df[x],df[y]
assert YY.isin([0,1]).all(),'ERROR: {} 目标变量非0/1!'.format(y)
if not is_numeric_dtype(XX):
XX = XX.astype(float)
data = pd.DataFrame({'XX':XX,'YY':YY})
cnt = XX.count()
assert cnt>0,'ERROR: "{}" 变量值全为 NULL !'.format(x)
cut_ok = False
ZZ = XX.rank(pct=1)
while not cut_ok:
edges = pd.Series(np.linspace(0,1,max_bin+1))
data['bin']=ZZ.map(lambda r:(edges>=r).idxmax(),na_action='ignore').fillna(-1)
res=self.calc_stats(data).sort_index()
woe = res[~res.Max.isna()].WoE
cut_ok = woe.is_monotonic_decreasing or woe.is_monotonic_increasing
max_bin = max_bin-1
self.__bin_stats= res
class XXDCharBin():
def __init__(self):
self.__bin_stats=None
def get_bin_stats(self):
if self.__bin_stats is not None:
return self.__bin_stats.copy()
def trans_bin_to_woe(self,B):
if self.__bin_stats is None:
raise ValueError('ERROR: 尚未调用分箱函数,无法转换!')
data = B.to_frame()
woe = self.__bin_stats['WoE'].sort_index()
return B.map(lambda x:woe[x],na_action='ignore').fillna(woe.iloc[0])
def plot_woe(self,title=None):
if self.__bin_stats is None:
raise ValueError('ERROR: 尚未调用分箱函数,无法转换!')
woe = self.__bin_stats[['WoE','Range']].sort_values(by='WoE')
plt.clf()
if title is None:
title = self.__varname
plt.title('{}(WOE)'.format(title))
plt.bar(range(len(woe)), woe.WoE)
plt.show()
print(woe.Range.reset_index(drop=True))
def get_iv(self):
if self.__bin_stats is None:
raise ValueError('ERROR: 尚未调用分箱函数,无法转换!')
return self.__bin_stats['TotalIV'].iloc[0]
def get_varname(self):
return self.__varname;
def trans_to_bin(self,X):
'''
新值分到缺失
X: series
'''
if self.__bin_stats is None:
raise ValueError('ERROR: 尚未调用分箱函数,无法转换!')
if not is_string_dtype(X):
X = X.astype(str)
data = X.to_frame()
data['bin'] = -1
for bin,values in enumerate(self.__bins):
data.loc[X.isin(values),'bin']=bin
return data['bin']
def trans_to_woe(self,X):
'''
新值分到缺失
X: series
'''
if self.__bin_stats is None:
raise ValueError('ERROR: 尚未调用分箱函数,无法转换!')
if not is_string_dtype(X):
X = X.astype(str)
data = X.to_frame()
woe = self.__bin_stats['WoE'].sort_index()
data['woe'] = woe.iloc[0]
for bin,values in enumerate(self.__bins):
data.loc[X.isin(values),'woe']=woe[bin]
return data['woe']
def __cc(self,dfx):
cnt=len(dfx)
bad=dfx.YY.sum()
good=cnt-bad
return pd.Series({'Var':self.__varname,'Range':dfx.XX.unique(),'CntRec':cnt,'CntGood':good,'CntBad':bad})
def calc_stats(self,data):
'''
计算woe,iv等。
'''
res = data.groupby(data['bin']).apply(self.__cc)
cntg= (data.YY==0).sum()
cntb= (data.YY==1).sum()
res['Pct']=res.CntRec/len(data)
res['PctBad']=res.CntBad/cntb
res['PctGood']=res.CntGood/cntg
res['BadRate']=res.CntBad/res.CntRec
res['CumGood']=res.CntGood.cumsum()
res['CumBad']=res.CntBad.cumsum()
res['Odds']=res.BadRate/(1-res.BadRate)
res['LnOdds']=np.log(res.Odds)
res['WoE'] = np.log(res.PctBad/res.PctGood)
res['IV'] = (res.PctBad-res.PctGood)*res.WoE
res['TotalIV'] = res.IV.replace({np.inf:0,-np.inf:0}).sum()
return res
def manual_bin(self,df,x,y,bins=[]):
'''
手动分箱
df: 数据
x: 变量名
y: 目标变量
bins: [['a'],['b'],['c','d'],['e']]
'''
self.__varname = x
data = pd.DataFrame({'XX':df[x],'YY':df[y]})
assert data.YY.isin([0,1]).all(),'ERROR: {} 目标变量非0/1!'.format(y)
if not is_string_dtype(data.XX):
data['XX'] = data.XX.astype(str)
cnt = data.XX.count()
assert cnt>0,'ERROR: "{}" 变量值全为 NULL !'.format(x)
data['bin'] = -1
for i,values in enumerate(bins):
data.loc[data.XX.isin(values),'bin']=i
data.loc[data.XX.isnull(),'bin'] = -2
self.__bins=bins.copy()
res = self.calc_stats(data)
self.__bin_stats= res
def pct_bin(self,df,x,y,sp_bins = [],max_bin=10):
'''
字符型自动分箱,
sp_bins: 特殊值分箱. [['a'],['b'],['c','d'],['e']]
df: 数据
x: 变量名
y: 目标变量
'''
spvars = []
for binb in sp_bins:
spvars = spvars + binb
assert len(set(spvars))==len(spvars),'ERROR: "{}" : sp_bins are overlapping!'.format(x)
data = pd.DataFrame({'XX':df[x],'YY':df[y]})
assert data.YY.isin([0,1]).all(),'ERROR: {} 目标变量非0/1!'.format(y)
data = data.dropna()
cnt = data.shape[0]
assert cnt>0,'ERROR: "{}" 变量值全为 NULL !'.format(x)
if not is_string_dtype(data.XX):
data['XX'] = data.XX.astype(str)
nuniq = data.XX.nunique()
if nuniq> 50:
print('WARN: "{}" 字符型变量取值数超过 {} 个!'.format(x,nuniq))
db = data[~data.XX.isin(spvars)]
dbr=db.groupby('XX').YY.mean().reset_index()
dbr['rr'] = dbr.YY.rank(pct=1)
edges = pd.Series(np.linspace(0,1,max_bin+1))
dbr['bin'] =dbr.rr.map(lambda r:(edges>=r).idxmax())
xx = dbr.groupby('bin').apply(lambda yy:yy.XX.tolist())
sp_bins = sp_bins +xx.tolist()
self.manual_bin(df,x,y,sp_bins.copy())