개요
운영 중인 서비스에서 Trino를 백엔드로 사용하고 있는데, 기존에는 Trino CLI나 별도의 프런트엔드를 사용해서 작업을 수행했었다. 최근 프로그래밍적인 방법을 사용해서 작업을 할 필요가 있어, 방법을 적어두려고 한다.
사용 언어는 Python이다.
필요 라이브러리 설치
Python으로 Trino를 다루기 위해서는 trino 클라이언트 라이브러리를 선택할 수 있다. trino 클라이언트 라이브러리는 아래와 같이 pip 명령어로 설치할 수 있는데……
pip install trino
테스트 환경이 컨테이너 기반이라서 그런지 trino 라이브러리로는 연결이 잘 안 됐다. 때문에 이 글에서는 sqlalchemy를 사용해 다뤄보려고 한다.
pip install sqlalchemy
사용법
연결 URL
sqlalchemy를 사용해서 데이터소스와 연결할 때는 sqlalchemy URL을 사용한다. Trino의 경우에는 아래와 같은 URL 형식을 따른다.
trino://<username>:<password>@<host>:<port>/<catalog>/<schema>
# 예시
trino://trino@trino:8080/test_cal
password나 schema는 선택사항이다. 이 글에서는 달리 지정하지 않고 사용한다.
연결 생성
sqlalchemy는 create_engine 함수를 사용해 엔진을 생성하고, 연결을 시도한다.
from sqlalchemy import create_engine
engine = create_engine(SQLALCHEMY_URL)
connection = engine.connect()
연결을 시도하는 시점은 engine 객체의 connect 함수를 호출할 때이다.
쿼리 수행
쿼리는 connection 객체의 execute 함수를 사용하여 수행할 수 있다. 단, execute 함수에 전달하는 질의문은 text라는 별도의 타입으로 전달해야 한다.
from sqlalchemy.sql.expression import text
# SELECT 결과 가져오기
rows = connection.execute(text(SQL)).fetchall()
# INSERT, UPDATE 등 변경사항 커밋
connection.execute(text(SQL)).commit()
쿼리를 수행한 뒤에는 그 결과를 fetchall로 가져오거나, commit을 사용해야 한다. 이 부분은 PyMySQL을 사용하는 방법과 동일한 것 같다.
예시
간단히 trino와 연결하여 카탈로그 목록을 확인해보는 코드를 작성한다.
from sqlalchemy import create_engine
from sqlalchemy.sql.expression import text
engine = create_engine(trino_url)
connection = engine.connect()
rows = connection.execute(text("SHOW catalogs")).fetchall()
rows
쿼리를 수행한 결과를 확인할 수 있다.
참고 문서
https://github.com/trinodb/trino-python-client
https://docs.sqlalchemy.org/en/20/core/sqlelement.html#sqlalchemy.sql.expression.TextClause