Tech Waves

produced by Hakuhodo DY ONE

テキストのみでデータ分析 - AIによるBigQueryワークフロー

はじめに

背景と目的

マーケターが市場分析や売上調査を行う際には、SQLの作成・実行から可視化まで多くの専門的スキルが求められ、業務のボトルネックになりがちです。

そこで今回、テキストでの要望をもとにAIがSQL生成・実行・可視化までを支援し、アウトプットまでの作業負荷を大幅に軽減するシステムを開発しました。本記事ではその仕組みと設計のポイントをご紹介します。

 

解決する課題

以下のフローを自動化します。現状では、対象とするテーブルはユーザーから指示されているものとしています。

  • SQL作成の自動化:テーブルスキーマ・ユーザーの入力テキストから、適切なBigQuery用SQLを生成。
  • SQLの実行:BigQuery APIを用いて、SQLの実行からデータ取得
  • 取得したデータから、可視化用グラフの作成

 

ソリューション全体像

本システムは 2 つの Cloud Run Functions(Generator / Executer)で構成されています。

Generator:ユーザーの自然言語入力を受け取り、BigQuery から取得したスキーマ情報を基に SQL を生成する。

Executer:Generator が生成した SQL を BigQuery API で実行し、結果セットを Generator に返却する。

 

Executerは入力されたSQL文字列からクエリの実行を行うだけですので、LLMは使用していません。

GeneratorのToolsとしての機能をサーバーで行っています。そのため、こちらを将来的にMCPサーバー化したいと考えています。

画面の実装も、フレームワークを用いずに直接pythonからHTMLを返す仕組みになっています。

 

Generatorの実装

SchemaResearcher クラスと Text2SQLAgentクラスの2つを実装します。


SchemaResearcherクラス

bigquery.Clientインスタンスを作成後、get_table関数からテーブル情報を取得し、スキーマ情報を返すように実装します。


class SchemaResearcher:
    def __init__(self, project_id: str):
        from google.cloud import bigquery
        self.client = bigquery.Client(project=project_id)

    def get_table_schema_info(self, dataset_id: str, table_id: str) -> str:
        table = self.client.get_table(f"{dataset_id}.{table_id}")
        return table.schema


Text2SQLAgentクラス

こちらはSQLを生成するAgentを作成します。

システムプロンプトは以下のように定義しています。

あなたは優秀な BigQuery SQL クエリ生成アシスタントです。

ユーザーの自然言語の質問を理解し、適切な SQL クエリを生成してください。

データベースのスキーマ情報を考慮し、正確なクエリを提供してください。

重要: SQL クエリを生成する際は、必ず以下の完全修飾テーブル名を使用してください: {self.full_table_name}

テーブル名の省略や、プレースホルダーは使用しないでください。 SQL クエリのみを返し、説明や追加のテキストは含めないでください。

 

プロンプトは、SchemaResearcherから得た情報とユーザーの入力テキストから以下のものを用意します。

f"""
以下はデータベースのスキーマ情報です:
{schema_info}
対象テーブル: {self.full_table_name}
ユーザーの質問に基づいて、適切なSQLクエリを生成してください:
{human_order}
必ず上記の完全修飾テーブル名(バッククォートで囲まれた形式)を使用してください。
SQLクエリのみを返してください。
"""

schema_info: SchemaResearcherのget_table_schema_infoから取得したテーブルのスキーマ情報

human_order: ユーザーの入力テキスト

こちらをLLMに投入することでSQLが返ってくるようになります。


Executerの実装

Generatorが生成したSQLを実行するインスタンスです。

ただし、セキュリティ上 SELECT文のみ許可、クエリサイズの制限を設けています。

なお、意図せぬテーブルの削除や巨大なSQLを何度も投入され予期せぬコストが発生しないように注意する仕組みは必要です。

また、メモリの制限上、データ件数は1,000件までとしています。

これらをチェックする validate_sql関数を以下のように定義しています。

def validate_sql(sql):
    """
    SQLクエリの安全性を検証
    Args:
        sql: 検証するSQLクエリ
    Returns:
        エラーメッセージ(問題なければNone)
    """
    
    # nullチェック
    if not sql or not sql.strip():
        return "SQL query is empty"

    sql_stripped = sql.strip().upper()

    # SELECT文のみ許可
    if not sql_stripped.startswith("SELECT"):
        return "Only SELECT queries are allowed"

    # 危険なキーワードのチェック
    dangerous_keywords = [
        'DROP', 'DELETE', 'INSERT', 'UPDATE', 'ALTER',
        'CREATE', 'TRUNCATE', 'GRANT', 'REVOKE', 'EXEC',
        'EXECUTE', 'MERGE', 'REPLACE'
    ]

    for keyword in dangerous_keywords:
        # 単語境界を考慮した検索
        if re.search(r'\b' + keyword + r'\b', sql_stripped):
            return f"Forbidden keyword detected: {keyword}"

    # セミコロンによる複数クエリの実行を防止
    if ';' in sql.strip()[:-1]:  # 末尾のセミコロンは許可
        return "Multiple statements are not allowed"

    return None

 

BigQueryへのSQL実行は、1000件のデータ取得およびスキャンサイズ1GBの制御事項を考慮するため、以下の設定を行います。

# LIMITが指定されていない場合、デフォルトで1000件に制限
sql_upper = sql.strip().upper()
if 'LIMIT' not in sql_upper:
    # 末尾のセミコロンを削除してからLIMITを追加
    if sql.strip().endswith(';'):
        sql = sql.strip()[:-1]
    sql += ' LIMIT 1000'

 

client = bigquery.Client()
# クエリ実行の設定 1GB以上の上限を超えるクエリは失敗する。課金はされない。
job_config = bigquery.QueryJobConfig(
    maximum_bytes_billed=10**9  # 1GB制限
)
query_job = client.query(sql, job_config=job_config)

 

課金の上限は状況により異なるため、環境変数にしてソースコード外で調整できるようにしても問題ありません。

また、IAMでアクセスできるサービスアカウントのみから実行を受け付ける設定をします。セキュリティタブで認証が必要(IAM)に設定すれば実行権限を持つSAに限定されます。(generator側にtokenの生成ロジックを実装する必要はあります。)

 

デモ実行

① 質問内容に『カテゴリ別売上を知りたい』と入力します。


② 対象テーブルのメタデータを読み取り、適切なSQLを生成します。


SQLの実行ボタンを押すと、BigQuery API経由でデータを取得し、テーブル表示します。


④ 『グラフを表示』のボタンを押すと、chart.jsを用いてバーチャートが表示されます。


まとめ

BigQuery内のデータに対し、ユーザーが入力した質問からSQLの作成、データの可視化まで実装してみました。

今回はBigQueryをデータソースとしましたが、他のCDP, API経由で取得したデータ等の取得、特定アプリケーションへの組み込み等も可能です。

SQLは書けないが、データを分析してみたい方」向けに実装してみてはいかがでしょうか。