개요
아래와 같이 S3 Select 기능을 이용해 S3 버킷에 저장된 GZIP으로 압축된 JSON 데이터의 내용을 읽어 들이려고 한다.
import boto3
client = boto3.client('s3')
response = client.select_object_content(
Bucket=bucket,
Key=obj_key,
Expression='SQL',
ExpressionType='SQL',
InputSerialization={
'CompressionType': 'GZIP',
'JSON': {
'Type': 'LINES'
}
},
OutputSerialization={
'CSV': {},
},
)
for event in response.get('Payload'):
if 'Records' in event:
records = event['Records']['Payload'].decode('utf-8')
payloads = (''.join(r for r in records))
기존에는 S3 Select를 이용해 필요한 데이터만 읽으려고 했으나, S3 Select의 경우 레코드 제한으로 인해 동작하지 않는 경우가 있었다.
OverMaxRecordSize: The character number in one record is more than our max Threshold, maxCharsPerRecord: 1,048,576
이런 경우는 AWS 기능의 한계로, 별 수 없이 S3 데이터를 전부 읽어 필요한 데이터를 파싱해야 한다.
Pandas 사용
S3는 CSV, JSON, Parquet 형식의 데이터에 대해 Select 기능을 제공한다. 이 중 별도의 압축 형식인 Parquet을 제외한 CSV와 JSON의 경우, 압축된 데이터가 아니거나 GZIP, BZIP2으로 압축되었을 때 Select를 할 수 있다.
Pandas가 제공하는 read_csv, read_json 함수는 Pandas가 동작하는 시스템 내에 위치한 파일 뿐만 아니라 URL을 통해서 데이터를 읽을 수 있고, compression 매개변수를 이용하면 특정 방식으로 압축된 데이터 또한 읽을 수 있다. compression 매개변수에는 'zip', 'gzip', 'bz2', 'zstd', 'tar'를 지정할 수 있다.
따라서 Pandas를 이용하는 경우, 아래와 같은 코드로 데이터를 읽을 수 있다.
import pandas as pd
bucket = 'bucket name'
data_key = 'data key'
data_location = f's3://{bucket}/{data_key}'
# CSV
data = pd.read_csv(data_location, compression='gzip')
# JSON
data = pd.read_json(data_location, compression='gzip', lines=True|False)
참고 문서
https://stackoverflow.com/questions/41161006/reading-contents-of-a-gzip-file-from-a-aws-s3-in-python
https://pandas.pydata.org/docs/reference/api/pandas.read_json.html