본문 바로가기
Data Science/Project

[홀로서기 #07] 80 Features, 8M Records... Pandas 말고 Dask!

by 루크 Luke 2022. 1. 19.
반응형

홀로서기 기획 연재물은 최근 개인 프로젝트를 진행하면서 겪은 어려움들을 기록한 지극히 개인적인 콘텐츠입니다.


빅데이터에 압도되다.

  누가 보면 비웃을 수도 있을 것이다. '이 친구, 초짜구만. 고작 800만 개로 빅데이터라니!'. 그렇다. 필자는 아직 주니어다. 기업 데이터라고 해도 많아 봐야 고작 몇 십만개 정도였다. 그리고 그런 경험 정도로, 이번 새로운 프로젝트를 시작했다. 무려 8M(M은 흔히 1백만 개를 이르는 말이다) 사이즈의 데이터를 다루는 프로젝트이다.

  기존에 Python으로 프로젝트를 진행할 때는, Pandas 모듈의 DataFrame을 활용했다. 기본 통계 분석부터, Matplotlib, Seaborn, Plotly Express까지 간편하게 연계하여 시각화를 진행했고, 잘 갖춰진 도구들 덕분에 효과적으로 인사이트를 낼 수 있었다. 그렇지만 이번엔 달랐다. 데이터를 Load 해오는 것부터 몇 십분을 기다려야 했고, Value 하나 탐색하는 데에도 꽤 긴 시간이 소요된다. 아, 이런게 빅데이터인가? Hadoop 등의 별도의 툴을 경험한 사람을 필요로 하는 이유가 바로 이런 것인가, 생각했다. 솔직히 말하면, 잠시 이 빅데이터에 압도된 심정에 취해, 무력감이 들기도 했다. 그동안 나는 무엇을 위해 파이썬을 배워왔는가, 하면서 감정이 요동치기도 했다. 별 것도 아닌 것 가지고 말이지.

 

돌고 돌아 Dask

  이번 프로젝트를 소개하자면,  딥러닝 모델을 활용해서 사이버 공격(DDoS 등) 등의 Anomaly를 탐지하는 것이다. Anomaly Detection 이라고도 하는데, 데이터셋은 2018년 뉴브런즈윅 대학교의 서버 로그 흐름 데이터(Logs of the University of New Brunswick's servers)를 사용한다. (참고로, 캐글 Kaggle은 정말 좋은 플랫폼인 것 같다.) 데이터셋은 

약 828만 개의 데이터는 총 80개의 컬럼으로 이루어져있다.

  데이터를 취합하고 느낀 것은, 이번 프로젝트의 핵심이 Feature의 특징(공통점이나 차이점)을 얼마나 잘 파악하느냐의 문제라는 것이다. 80개의 컬럼을 모두 사용하는 것은 모델 학습에 큰 시간과 에너지를 요구한다. Feature Selection과 관련한 부분도 이후에 프로젝트를 진행하면서 함께 캐볼 예정이다. 아무튼, Feature를 분석하기 위해서는, 결국 데이터들이 가진 Feature별 특성이나, Label별로 어떤 특징들을 구분지을 수 있는지 알아야 하는데, 판다스 모듈로는 효율적인 프로젝트 수행이 불가능하다는 것을 알았다.

  그래서, Dask를 만났다. pandas는 매우 강력하지만, 단점이 있다. 그것은 데이터가 많아질수록 사용하는 메모리가 늘어나고 속도가 매우 느려진다는 것이다. 1천만 Row도 처리가 가능하지만, 그 경우에 10기가의 메모리가 필요하고, 싱글 코어로만 작동하기 때문에 RAM 범위 안에서만 작동이 가능하다. (Colab에서 큰 사이즈의 데이터를 pandas로 돌리면 런타임이 계속 끊기는 이유이기도 하다.) 그래서 이러한 단점을 해소하기 위해, Modin, Dask, Vaex 등의 라이브러리가 존재한다.

  특징을 하나씩 살펴보면,

  • Dask
    • 기본적으로 pandas보다는 느리지만 큰 데이터에도 잘 작동한다. (느리다고 하는데, 체감상 더 빨랐다.)
    • 여러 개의 머신이나 하나의 머신에서 여러 개의 코어를 작동하기에 좋다.
    • 메모리를 초과하는 데이터도 다룰 수 있다.
  • Modin
    • pandas의 함수와 fallback을 재구현한 DataFrame을 위한 새로운 algebra
    • 어떤 코드 수정도 없이 행과 열을 자동으로 병렬로 사용
    • 내부적으로 Ray나 Dask를 사용함
    • dask가 pandas를 대체하기 위한 framework라면, modin은 그런 dask를 pandas 문법처럼 사용하게 해주는 api
  • Vaex
    • pandas와 상관없는 pandas와 유사한 새로운 프로젝트
    • 램에 효율적인 새로운 string 타입을 갖고 있어서, string 타입을 다룰 때 좋다.
    • 메모리 매핑 lazy computation
    • 수 억개의 데이터를 랩탑에서 사용 가능

 

  그래서 이번 프로젝트는, 그래도 가장 쓰기 편리하고 보편적으로 알려진 Dask를 활용해보기로 했다. pandas에 비해 눈에 띄게 달라진 부분은 데이터를 불러오는 부분이었다. 그래서 기본적인 전처리까지는 pandas로 진행해서 저장하고, 분석부터는 해당 데이터를 dask.DataFrame으로 불러와서 작업했다. 처음에 Dask를 설치하고, DataFrame을 불러올 때 애를 먹었다. 아래 코드를 보면 도움이 될 것이다.

# install code for Colab
!pip install "dask[complete]"
!python -m pip install dask[dataframe] --upgrade

- 기본적으로 dask[dataframe]을 설치해주면 되는데, Client 등을 활용하면 여러 개의 코어를 쓸 수 있다고 들었다. 그래서 dask[complete]로 설치해주고, dataframe만 upgrade를 진행해줬다. 설치만 해주면 에러가 뜨는 경우가 있으니, dataframe 버전을 업그레이드 하는 것도 잊지 말자.

 

# dask
import dask.dataframe as dd
from dask.distributed import Client
cluster = Client(n_workers=5)
df = dd.read_csv('/content/drive/MyDrive/Colab Notebooks/DL_proj/ddos_10days/concat/9days_concat.csv')

- 그 후에, import library를 수행한다. Client는 앞서 설명한대로, 코어 수를 지정해주는 코드라고 생각해주면 된다. 2든 3이든 1보다 큰 값을 넣어서 코어 수를 늘려준다. 이렇게 read_csv를 진행하면, Pandas와는 비교할 수 없게 빠른 속도로 데이터를 불러준다. (1분도 안 걸린다.)

 

# timestamp check
df.groupby(df['Timestamp']).count().compute()

- Dask에서의 가장 큰 특징은, 우리가 데이터 모양을 보려면 compute()를 무조건 찍어주어야 한다는 것이다. compute()없이 데이터프레임의 특정 메서드를 찍어보면, 요약된 느낌으로만 보여주는 경우가 많다. compute()를 꼭 찍어주자. (그리고 compute를 찍으면 오랜 시간 기다려줘야 한다.. 물론 체감상 pandas보다는 낫다.)

 

# dask progressbar setting
from dask.diagnostics import ProgressBar 
pbar = ProgressBar()
pbar.register()

- groupby 등의 데이터 작업을 하다보면 기다리다가 지치는 경우가 많다. 꼭 ProgressBar를 임포트해서 진행 상황을 봐주면서 작업하자. pandas의 tqdm과 비슷한 모듈이라고 느꼈는데, for문에서 주로 쓰는 것과 달리 compute() 상황에서 계속 progressbar를 보여줌으로써 우리의 무료함을 달래준다.

 

EDA에서 많이 쓰는 Dask Code

# display format
pd.options.display.float_format = '{:.3f}'.format

# describe
df.describe().compute()

# unique, value_counts
df.columnname.unique().compute()
df.columnname.value_counts().compute()

# groupby
df.groupby(columnname).count().compute()
df.groupby(columnname1, columnname2).count().compute()
df.groupby(columnname).mean().compute()
df.groupby(columnname).min().compute()
df.groupby(columnname).max().compute()
df.groupby(columnname).median().compute()

# Labeling
def func(x) :
    if x == 'A':
        return "LabelA"
    else:
        return "LabelB"

df['Label'] = df[feature].apply(lambda x : func(x))

 


Photo : Unsplash, "Data Science"

 

이번 프로젝트는, 서버 공격 등의 이상 탐지 모델을 만드는 것이다.

로그 흐름 데이터인 만큼, 대용량 데이터를 만져볼 좋은 기회인 듯하다.Dask와 익숙해지는 시간이 필요할 것 같다.

Anomaly detection project, New Hi! 홀로서기 #7 끝.


 
반응형

댓글