Python/NumPy | Pandas

[Pandas] apply 시 컬럼의 값에 따라 다른 함수 사용하기

비번변경 2024. 4. 23. 17:18

개요

Pandas에서 제공하는 apply 함수는 매개변수로 전달받은 함수를 적용하는 함수이다. 당연히 하나의 함수를 열이나 행에 적용하는데…… 각 데이터의 값에 따라 적용해야 하는 함수가 달라져야 하는 필요가 생겼다.

 

지금부터 다음의 문제 상황을 맞이했다고 해보자.

 

업무에서 처리되는 데이터는 S3에 다음과 같은 경로에 떨어진다.

's3://test-bucket/summary/titanic/class=First/date=2024-03-28/27b3ce6245b44ff0950e98419089e66c-0.parquet'
's3://test-bucket/summary/titanic/class=Second/date=2024-03-28/27b3ce6245b44ff0950e98419089e66c-0.parquet'
's3://test-bucket/summary/titanic/class=Third/date=2024-03-28/27b3ce6245b44ff0950e98419089e66c-0.parquet'
's3://test-bucket/raw/iris/species=setosa/year=2024/month=03/day=28/time=0000/2770df01aff1402db5b4de55d1be2a57-0.parquet'
's3://test-bucket/raw/iris/species=versicolor/year=2024/month=03/day=28/time=0000/2770df01aff1402db5b4de55d1be2a57-0.parquet'
's3://test-bucket/raw/iris/species=virginica/year=2024/month=03/day=28/time=0000/2770df01aff1402db5b4de55d1be2a57-0.parquet'

값을 보면 아래의 패턴을 확인할 수 있다.

- 최상위 prefix가 summary인 경우 : 날짜(시간) 파티션 컬럼이 date이고 패턴이 %Y-%m-%d이다.

- 최상위 prefix가 raw인 경우 : 날짜(시간) 파티션 컬럼이 year, month, day, time이고 패턴이 year=%Y/month=%m/day=%d/time=%H%M이다.

 

이때 time을 제외한 day를 기준으로 데이터가 존재하는지를 확인하려고 한다. 단, 데이터가 굉장히 많으므로 raw 데이터의 경우에는 day=%d 아래의 객체가 아니라 하위 prefix가 존재하는지만 확인하려고 한다.

- 최상위 prefix가 summary인 경우 : date=%Y-%m-%d/ 아래에 객체가 존재하는지 확인한다.

- 최상위 prefix가 raw인 경우 : year=%Y/month=%m/day=%d/ 아래에 하위 prefix가 존재하는지 확인한다.

 

s3 특정 prefix 아래 객체 목록을 가져오는 함수 list_objects와 s3 특정 prefix의 하위 prefix 목록을 가져오는 함수 list_subprefixs가 별도로 구현되어 있다고 할 때, 이를 어떻게 한 번에 처리할 수 있을까?

 

 

s3 관련 함수 구현

현재 s3 관련하여 구현되어 있는 함수는 다음과 같다.

def list_objects(bucket, prefix, client=get_client()):
    paginator = client.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
    response = list()
    for page in pages:
        response += page['Contents']
    return response


def list_subprefixs(bucket, prefix, client=get_client()):
    if prefix != '' and not prefix.endswith('/'):
        prefix += '/'
    paginator = client.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter='/')
    response = list()
    for page in pages:
        response += page['CommonPrefixes']
    result = [r.get('Prefix') for r in response]
    return result

요청하지 않는 prefix를 대상으로 저장되어 있는 객체 목록 확인을 요청하면 응답에서 Contents, CommonPrefixes를 찾지 못해 오류가 발생하는 상태이므로, 예외 처리도 필요하다.

 

 

테스트 데이터

테스트를 위해 사용할 데이터는 다음과 같은 형태로 가공해 둔 상태라고 하자.

data = ['s3://test-bucket/summary/titanic/class=First/date=2024-03-28/27b3ce6245b44ff0950e98419089e66c-0.parquet',
's3://test-bucket/summary/titanic/class=Second/date=2024-03-28/27b3ce6245b44ff0950e98419089e66c-0.parquet',
's3://test-bucket/summary/titanic/class=Third/date=2024-03-28/27b3ce6245b44ff0950e98419089e66c-0.parquet',
's3://test-bucket/raw/iris/species=setosa/year=2024/month=03/day=28/time=0000/2770df01aff1402db5b4de55d1be2a57-0.parquet',
's3://test-bucket/raw/iris/species=versicolor/year=2024/month=03/day=28/time=0000/2770df01aff1402db5b4de55d1be2a57-0.parquet',
's3://test-bucket/raw/iris/species=virginica/year=2024/month=03/day=28/time=0000/2770df01aff1402db5b4de55d1be2a57-0.parquet']
column = ['obj_key']
df = pd.DataFrame(data, columns=column)
df['prefix'] = ["/".join(key.split('/')[3:-1]) for key in df['obj_key']]
df[['bucket', 'top_prefix']] = df['obj_key'].str.split('/').str[2:4].to_list()
df = df.drop(columns='obj_key')

 

s3 관련 함수도 간단히 함수 이름만 반환하도록 변경했다.

def list_objects(bucket, prefix, client=get_client()):
    return sys._getframe().f_code.co_name

def list_subprefixs(bucket, prefix, client=get_client()):
    return sys._getframe().f_code.co_name

 

 

접근

현황과 요구사항을 고려하면 다음과 같이 내용을 정리할 수 있다.

 

1. list_objects, list_subprefixs 함수 실행 시 예외 처리가 필요하다.

=> list_objects, list_subprefixs 함수 호출하고 예외 처리를 수행하는 함수 exists를 정의한 뒤, exists를 apply 한다.

 

2. list_objects, list_subprefixs 함수의 매개변수는 동일하다.

=> 함수 exists의 매개변수에 bucket, prefix, client가 포함되어야 한다.

 

3. top_prefix에 따라 list_objects, list_subprefixs 중 호출할 함수를 결정해야 한다.

=> 함수 exists 함수에 top_prefix 값을 전달받을 매개변수를 추가하고, 함수 내에서 조건문에 따라 호출할 함수를 결정한다.

 

 

구현

먼저 top_prefix에 따라 다른 함수를 apply 하는 부분이 잘 동작하는지 확인한다.

def exists(bucket, prefix, type, client=None):
    run_func = list_objects if type == 'summary' else list_subprefixs
    return run_func(bucket, prefix, client)
    
    
df['flag_exist'] = df.apply(lambda row: exists(row['bucket'], row['prefix'], row['top_prefix']) , axis=1)

top_prefix가 summary인 경우에는 list_objects 함수를 호출하고, top_prefix가 raw인 경우에는 list_subprefixs 함수를 호출한 모습을 확인할 수 있다.

이제 예외 처리 부분을 추가하면 최종적으로 구현한 exists 함수는 다음과 같은 형태가 된다.

def exists(bucket, prefix, type, client=None):
    run_func = list_objects if type == 'summary' else list_subprefixs
    try:
        run_func(bucket, prefix, client)
    except KeyError:
        return False
    return True

 

 

번외

만약 처리할 데이터 내의 top_prefix가 모두 동일하다면

즉, 한 번 실행할 때 한 종류의 데이터를 처리한다면 호출할 함수를 결정하는 조건문을 함수 밖으로 빼낼 수 있다.

exists 함수를 apply 하기 전에 호출할 함수를 결정하고, 해당 함수를 exists 함수에 매개변수로 전달하여 실행하는 방식이다.

def exists(search_func, bucket, prefix, client=None):
    try:
        run_func(bucket, prefix, client)
    except KeyError:
        return False
    return True
    
    
search_func = list_objects if df.loc[0, 'top_prefix'] == 'summary' else list_subprefixs
df['flag_exist'] = df.apply(lambda row: exists(search_func, row['bucket'], row['prefix']) , axis=1)