chisataki’s blog

リコリス・リコイルじゃありません

EmbeddingBagレイヤーを用いて感情分析する

今回はPytorchのEmbeddingBagレイヤーを使い、日本語の入力文章がポジティブかネガティブか判定する簡易的な分類モデルを作成します。
単語の埋め込みベクトル化にはword2vecを用います。main関数内のパラメータを変えることでポジティブ、ニュートラル、ネガティブの3クラスなど、多クラス分類にも容易にカスタマイズできます。

EmbeddingBagを使う利点は、入力シーケンスがすべて同じ長さになるように、\<pad>でパディングする必要がなくなることです。

まずは以下のような教師データ20個をtsvファイルで準備します。
ラベル0はネガティブ、1はポジティブな文章です。

label    text
0   これはひどい映画でした。
1   この映画が気に入りました! 強くお勧めします。
0   ただひどい
1   良い映画と演技です
0   時間を無駄にしないでください - 本当にダメです
0   ひどい
1   素晴らしい映画。
0   これは才能の無駄遣いでした。
1   この映画がとても気に入りました。 見てください。
1   ここ数年で見た中で最高の映画。
0   演技も下手、脚本も下手。
1   この映画をみんなにお勧めします
1   面白くて楽しい。
0   私はこの映画がまったく好きではありませんでした。
1   古き良き時代の物語
0   その話は私には意味が分かりませんでした。
0   最初から最後まで素人っぽい。
1   この動きがとても気に入りました。 多くの楽しみ。
0   私はこの映画が嫌いで出て行きました。
1   すべての年齢層向けのスリル満点の冒険。


単語のベクトル化にはword2vecを事前にダウンロードしたものを使います。

from torchtext.vocab import Vectors

v=Vectors(name='japanese_word2vec_vectors.vec')


教師データを読み込み、label, textのボキャブラリーを構築します

import pandas as pd
from torchtext.vocab import vocab, build_vocab_from_iterator

df = pd.read_csv('sample_train.tsv', names=['label','text'], header=0, sep='\t')
# label
df['label'] = df['label'].astype(str) 
label_vocab = build_vocab_from_iterator(df['label'])

# text
text_vocab = vocab(v.stoi, min_freq=0, specials=('<unk>',), special_first=True)
text_vocab.set_default_index(0)


今回は単語分割にはjanomeを使用します。正規化などは行わず、単純にわかち書きのみ行います。

from janome.tokenizer import Tokenizer

j_t = Tokenizer()
def tokenizer_janome(text):
    return [tok for tok in j_t.tokenize(text, wakati=True)]


テキスト変換を行うtransformを定義します。文章の長さを整えるpaddingは行いません。

import torchtext.transforms as T

label_transform = T.Sequential(
    T.VocabTransform(label_vocab),
    T.ToTensor()
)
text_transform = T.Sequential(
    T.VocabTransform(text_vocab),
    T.ToTensor()
)


Dataloaderに渡すCollate_fnを定義します。EmbeddingBagに文章の区切り位置を示すoffsetを渡す必要があるので、ここで返却できるようにします。

def collate_batch(batch):

    label_l = []
    text_l = []
    offset_l = [0]
    for label, text in batch:
        label_l.append(label)
        tokens = text_transform(tokenizer_janome(text))
        offset_l.append(len(tokens))
        text_l.append(tokens)

    label_l = label_transform(label_l)
    offset_l = torch.tensor(offset_l[:-1]).cumsum(dim=0)    
    text_l = torch.cat(text_l)

    return label_l, offset_l, text_l


EmbeddingBagレイヤーを使用した分類クラスを定義します。

import torch.nn as nn

class TextClassificationModel(nn.Module):

    def __init__(self, pretrained_embeddings, num_class, freeze_embeddings = False):
        super().__init__()        
        self.embedding = nn.EmbeddingBag.from_pretrained(pretrained_embeddings,
                                                         freeze = freeze_embeddings,
                                                         )
        # EmbeggingBag で各文が埋め込みベクトルで表されます。
        # 単語の連続情報が失われるため、以下単純なニューラル ネットワークを使用します。
        self.fc = nn.Linear(pretrained_embeddings.shape[1], num_class) 
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text, offsets):
        embedded = self.embedding(text, offsets)
        return self.fc(embedded)


train関数を定義します。最適化手法にはSGD、損失関数はCrossEntropyLoss()を使用しています。

import torch

def train(net, ldr, bs, me, le, lr):
    # パラメータ: network, Dataloader, batch_size, max_epochs, log_every, learn_rate
    net.train()
    opt = torch.optim.SGD(net.parameters(), lr=lr)
    loss_func = torch.nn.CrossEntropyLoss()  # 内部でsoftmaxが行われる
    print("\nStarting training")
    for epoch in range(0, me):
        epoch_loss = 0.0
        for bix, (labels, offsets, texts) in enumerate(ldr):
            opt.zero_grad()
            oupt = net(texts, offsets)  
            loss_val = loss_func(oupt, labels)  
            loss_val.backward() 
            epoch_loss += loss_val.item()  
            opt.step()  
        if epoch % le == 0:
            print("epoch = %4d   loss = %0.4f" % (epoch, epoch_loss))
    print("Done ")


ここまで準備できたら、main関数を定義します。

from torch.utils.data import DataLoader

def main():

    # dataloader作成
    bat_size = 2
    train_ldr = DataLoader(df.values, batch_size=bat_size, shuffle=False, collate_fn=collate_batch)

    # model作成
    num_class=2
    net = TextClassificationModel(pretrained_embeddings, num_class)

    # 訓練
    max_epochs = 150
    log_interval = 30
    lrn_rate = 0.01
    train(net, train_ldr, bat_size, max_epochs, log_interval, lrn_rate)

    # 予測
    text = "この映画めちゃ面白い"
    net.eval()
    with torch.no_grad():
        oupt = net(text_transform(tokenizer_janome(text)), torch.tensor([0]))     
    predict = torch.softmax(oupt, dim=1)

    print("text: {}".format(text))
    print("Sentiment prediction probabilities [neg, pos]: ")
    print(predict)
    


出力

...
text: この映画めちゃ面白い
Sentiment prediction probabilities [neg, pos]: 
tensor([[0.0188, 0.9812]])


参考記事: visualstudiomagazine.com


分散表現の事前学習済みモデル(fastText等)からボキャブラリーを作成する(pytorch)

古いバージョンのtorchtext(~0.12)ではtorchtext.data.Field.build_vocab(vectors = ...) でベクトル化したボキャブラリーが簡単に作れましたが、新しいバージョンではこの機能がなくなってしまい、Gloveやword2vec, fastTextなど単語分散表現の学習済みモデルからどのようにボキャブラリーを構築すればいいのかわかりづらかったので、以下にまとめ

環境
torchtext : 0.16

from torchtext.vocab import FastText,vocab
# FastText単語ベクトル読み込み
myvec = FastText()
# ボキャブラリー作成
myvocab = vocab(ordered_dict = myvec.stoi, min_freq = 0, specials = ('<unk>',), special_first = True)

# 出力
myvocab.get_itos()
> ['<unk>, ',', 'the', '.', 'and', 'of',...]
len(myvocab)
> 999995

備考:

  • vocab()引数のordered_dict に myvec.stoi (単語:インデックス にマッピングされた辞書) を渡す
  • min_freq(ボキャブラリーに含める最小出現数) はデフォルトで1になっているため、0を指定する。(fastTextやGloVeから辞書を作る場合、myvec.stoi = {',': 0, 'the': 1, '.': 2, 'and': 3, 'of': 4,...}となり、0番目の単語が無視されてしまう)


参考

pytorch.org

AWS↔️GCPサービス比較【機械学習/AIまわり】

AWSサービス 分類 サービス詳細 対応GCPサービス 備考
S3 ストレージ   Cloud Strage  
Amazon EMR ETL   Dataproc  
AWS Glue ETL   Dataproc
Data Catalog
Composer(Airflow)
Data Fusion
Dataprep
Glueはバッチ/ストリーミングに対応
Amazon Kinesis Data Analytics データ分析 ストリーミングデータをリアルタイムで変換および分析 Dataflow Kinesis Data Analytics:Apache Flink使用。ストリームデータのみ
Dataflow:Apache Beam使用。バッチ・ストリーム両対応
Amazon EMR データ分析   Dataproc  
Amazon Athena データ分析 データレイクへのクエリ BigQuery Athena:SQLを使用してS3から直で検索
BigQuery:外部テーブル使用でgcsから検索
Amazon Redshift データ分析   BigQuery
(Bigtable)
Bigtable:NoSQL. 金融データ向き
IoT Analytics データ分析 バイスの分析 -  
Lake Formation データレイク管理   - Glue + IAM + S3 + etc
Amazon Kinesis ストリームデータ ストリームデータをリアルタイムで収集、処理、分析 PubSub  
Amazon MSK ストリームデータ フルマネージドApache Kafkaサービス Confluent Cloud? MSKとConfluent Cloud比較
AWS IoT Core ストリームデータ   Cloud IoT Core
Leverege Connect
Cloud IoT Coreは2023年8月16日で廃止→Leverage Connectへ
Elemental MediaLive コンテンツ配信   LiveStreamAPI ライブ動画のエンコード
Amazon CloudFront コンテンツ配信   Cloud CDN
Media CDN
Cloud CDN: webアプリ用
Media CDN: 動画・ライブ配信
Amazon Personalize 機械学習/AI レコメンド Recommendations AI  
Amazon Polly 機械学習/AI テキスト→音声 Text-to-Speech  
Amazon Rekogniton 機械学習/AI 画像認識/分析 Vision AI  
Amazon SageMaker 機械学習/AI モデル構築用統合プラットフォーム Vertex AI  
Amazon SageMaker Autopilot 機械学習/AI 表形式データからモデルを自動的に構築 AutoML tables
BigQueryML
・BigQueryMLは自動ではモデルを選択してくれない
AutoML tablesは2024年1月でVertex AIに統合予定
Amazon SageMaker Camvas 機械学習/AI ノーコードでモデルを構築 AutoML AutoML:Vertex AIの一部
Amazon SageMaker Clarify 機械学習/AI バイアス検出、モデルの説明 Vertex Explainable AI  
Amazon SageMaker Datawrangler 機械学習/AI データ前処理 Vertex AI Vizier Vertex Ai Vizier:パラメータ最適化のみ
Amazon SageMaker Debugger 機械学習/AI レーニング監視、最適化    
Amazon SageMaker Edge 機械学習/AI IoTデバイス機械学習    
Amazon SageMaker Feature Store 機械学習/AI 特徴量を保存、共有、管理 Vertex AI Feature Store  
Amazon SageMaker Data Labeling (GroundTruth) 機械学習/AI データの自動ラベリング Vertex AI Data Labeling Service COVID-19の緊急事態が発生したため、Data Labeling Service は通知があるまで制限されるか、利用できません。(2023/06/20時点)
Amazon SageMaker JumpStart 機械学習/AI 事前構築済みの機械学習モデルソリューション    
Amazon SageMaker Model Monitor 機械学習/AI モデルの監視、アラート Vertex AI Model Monitoring  
Amazon SageMaker Studio 機械学習/AI 統合開発環境(IDE)    
Amazon SageMaker Pipelines 機械学習/AI ワークフローの作成、管理 Vertex AI Pipelines  
Amazon SageMaker Notebook 機械学習/AI JupyterNotebookベースの開発環境 Vertex AI Workbench  
Amazon Quicksite BI   Looker Studio(旧データポータル)
Looker
 
AWS Transfer Family データ転送 S3またはEFSへのデータ転送 Storage Transfer Service ・Storage Transfer Service:クラウド間、オンプレミスとクラウド間のデータ転送
参考)GCP転送サービス比較
AWS Snowball Edge データ転送 ペタバイト規模のデータ移行 Transfer Appliance ・Snowball はサービス全体、Snowball Edge はサービスが現在使用しているタイプのデバイスを指す

Transfer Appliance:大容量データ向け
Amazon Glacier バックアップ&アーカイブ バックアップ、アーカイブストレージ Cloud Strage Coldline
Cloud Strage Archive
Coldline:(目安) 90日に1回アクセスされるデータ
Archive :365日に1回アクセスされるデータ
AWS Backup バックアップ&アーカイブ バックアップおよびDR (災害対策) を一元的に管理 Backup and DR  

参考

AWS や Azure サービスと Google Cloud を比較する
AWS/Azure/Google Cloudサービス比較 2023.06

BigQuery SQLコマンド集-2023年度版

データセットの管理

データセットを作成する
CREATE SCHEMA `PROJECT_ID.DATASET_ID`
  OPTIONS (
    description = 'データセット説明',
    location = 'US'
)

location例:
US: マルチリージョン
asia-northeast1: 東京

BigQuery のロケーション  |  Google Cloud


プロジェクトの全データセット名をリスト表示
SELECT
  schema_name
FROM
  `PROJECT_ID`.INFORMATION_SCHEMA.SCHEMATA

INFORMATION_SCHEMA.SCHEMATA ビューから全データセット名、作成日時、更新日時等が取得できる


データセットの削除
DROP SCHEMA IF EXISTS `PROJECT_ID.DATASET_ID`

削除前にデータセット内のテーブルとビューを削除すること


テーブルの管理

テーブルの作成
CREATE TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` (
  x INT64 OPTIONS (description = '列説明'),
  y STRING 
) OPTIONS (
    description = 'テーブル説明'
   )


テーブルのメタデータの表示
SELECT
  *
FROM
    `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLES`;

テーブル名、テーブルタイプ、作成日時、作成したコマンド(DDL)などが表示される


テーブルの削除
DROP TABLE `PROJECT_ID.DATASET_ID.TABLE_ID`


テーブルの外部エクスポート
EXPORT DATA
  OPTIONS (
    # 出力先URI
    uri = 'gs://bucket/folder/*.csv',
    format = 'CSV',
    overwrite = true,
    header = true,
    field_delimiter = ';')
AS (
  SELECT *
  FROM `PROJECT_ID.DATASET_ID.TABLE_ID`
);


ネストされた列と繰り返し列を含むテーブルを作成する
CREATE TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` (
  id INT64,
  addresses ARRAY <
    STRUCT <
      address STRING,
      status STRING
    >
  >
)
  • 補足: 各IDごとに複数住所(address)とステータス(status)を格納できるテーブルを定義。
    ネストデータ:STRUCT
    繰り返しデータ:ARRAY
  • BigQueryでは多くの場合、ネストされた繰り返しフィールドを使用して単一のテーブルにクエリを実行すると、複数のテーブルを結合する場合よりも効率的に処理されます。


ネストされたレコードの挿入
INSERT INTO  
  `PROJECT_ID.DATASET_ID.TABLE_ID` 
  (id, addresses) 
VALUES 
  ("1",[("past","Tokyo"),("now","Kanagawa")]),
  ("2",[("past","Chiba"),("now","Saitama")])


ネストされた繰り返し列(addresses)に対して、特定の位置にある ARRAYの値を取得する
SELECT
  # 1番目の値
  addresses[offset(0)].address
FROM
  `PROJECT_ID.DATASET_ID.TABLE_ID` 
ORDER BY id
  • offset(index): インデックスは 0 から始まります。範囲外の場合はエラーが発生します。
  • safe_offset(index): インデックスは 0 から始まります。範囲外の場合にNULLを返します。
  • ordinal(index): インデックスは 1 から始まります。範囲外の場合はエラーが発生します。
  • safe_ordinal(index): インデックスは 1 から始まります。範囲外の場合にNULLを返します。


テーブルに空の列を追加する
ALTER TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` 
ADD COLUMN new_column STRING;


テーブルの列の名前を変更する
ALTER TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` 
RENAME COLUMN old_column TO renamed_column; 


テーブルの列のデフォルト値を変更する
ALTER TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` 
ALTER COLUMN x SET DEFAULT 0;


列のデフォルト値を削除する
ALTER TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` 
ALTER COLUMN x DROP DEFAULT;


パーティーション分割

(時間単位列)パーティション分割テーブルを作成する
CREATE TABLE
  `PROJECT_ID.DATASET_ID.TABLE_ID` (transaction_id INT64, transaction_date DATE)
PARTITION BY
  # transaction_date列で分割
  transaction_date
  OPTIONS (
    require_partition_filter = TRUE
  )
  • パーティション:特定の範囲でテーブルを分割する。パーティション作成するとテーブルの一部のみスキャン→コスト削減、パフォーマンス向上
    参考:パーティション列からレコードを絞り込むことをプルーニングと呼ぶ。条件式が定数式であること等、プルーニングが動作するためにはいくつか条件あり。詳細
  • 整数範囲、時間単位列、取り込み時間パーティショニングの3種類
  • オプションでrequire_partition_filter = TRUE にすると、SELECT時のフィルター指定(where句)を強制できる→where句を指定しないとエラー


(取り込み時間)パーティション分割テーブルを作成する
CREATE TABLE
  `PROJECT_ID.DATASET_ID.TABLE_ID` (transaction_id INT64)
PARTITION BY
  _PARTITIONDATE

PARTITIONTIME (あるいはPARTITIONDATE) 列は、レコードの取り込み時間/日付がレコード挿入時に自動的に格納される。


パーティション フィルタ要件を更新する
ALTER TABLE `PROJECT_ID.DATASET_ID.TABLE_ID`
SET OPTIONS (
  require_partition_filter = FALSE);


クラスタリング

クラスタ化テーブルを作成する
CREATE TABLE `PROJECT_ID.DATASET_ID.TABLE_ID`
(
  customer_id STRING,
  product_id STRING,
  transaction_amount NUMERIC
)
CLUSTER BY
  # クラスタリング列
  customer_id,product_id;
  • 最大 4 つのクラスタリング列を指定可能。
  • 指定した列の順序によって、データの並べ替え順序が決まる。最も頻繁にフィルタリングまたは集計される列を最初に置く


クラスタリング列をWHERE句でフィルタリングする
SELECT
  SUM(x)
FROM
  `PROJECT_ID.DATASET_ID.TABLE_ID`
WHERE
  customer_id = "1" AND
  product_id = "002"
  • フィルタリング順序はクラスタリング指定列の順序になっている必要あり(例:上記customer_idとproduct_idの順序を逆にするとクエリが最適化されない)
  • パーティション同様、複雑な(動的に値が決まる)フィルタ式ではクラスタ化列を使用しない


テーブルクローンを作成する
CREATE TABLE
`PROJECT_ID.DATASET_ID.CRONED_TABLE_ID`
CLONE `PROJECT_ID.DATASET_ID.BASE_TABLE_ID`;
  • テーブルクローン:ベーステーブルの軽量で書き込み可能なコピー
  • 標準テーブル同様にクエリ、パーティション等可能
  • ビューや外部テーブルのクローンは不可(コピーと同じ)
  • ベーステーブルorクローンのデータを変更しても互いに反映されない(独立したテーブル)
  • コピーとの違いはストレージ料金。クローンは新たに変更・追加されたデータ分のみ課金される
    テーブルクローン概要


テーブルスナップショット

24 時間後に期限切れになるテーブルのスナップショットを作成
CREATE SNAPSHOT TABLE `PROJECT_ID.DATASET_ID.SNAPSHOT_TABLE_ID`
CLONE `PROJECT_ID.DATASET_ID.BASE_TABLE_ID`
OPTIONS (
  expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
);
  • テーブルスナップショット:別のテーブル(ベーステーブル)の軽量で読み取り専用のコピー
  • ベーステーブルと異なるデータセット内に作成することで、ベーステーブルのデータセットが誤って削除された場合でも、テーブル スナップショットからベーステーブルを復元できます
  • 過去 7 日間の任意の時点におけるテーブルのスナップショットを作成することが可能


テーブル スナップショットを新しいテーブルに復元
CREATE TABLE `PROJECT_ID.DATASET_ID.TABLE_ID`
CLONE `PROJECT_ID.DATASET_ID.SNAPSHOT_TABLE_ID`


テーブル スナップショットのメタデータを取得する
select *
from `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLE_SNAPSHOTS`
where table_name = 'some_table_snapshot';


テーブル スナップショットをコピーする
CREATE SNAPSHOT TABLE `PROJECT_ID.DATASET_ID.COPIED_SNAPSHOT_TABLE_ID`
CLONE `PROJECT_ID.DATASET_ID.SNAPSHOT_TABLE_ID`;

テーブル スナップショットを作成するプロセスと似ています。相違点は、ソーステーブルとしてコピー元のテーブル スナップショットを指定することです


テーブル スナップショットを削除する
DROP SNAPSHOT TABLE `PROJECT_ID.DATASET_ID.SNAPSHOT_TABLE_ID`;


ビュー, マテリアライズド ビュー

ビューを作成する
CREATE VIEW `PROJECT_ID.DATASET_ID.TABLE_VIEW_ID` (
  customer_id,
  transaction_amount_sum) AS (
  SELECT
    customer_id,
    SUM(transaction_amount)
  FROM
    `PROJECT_ID.DATASET_ID.TABLE_ID`
  GROUP BY
    customer_id
)

ビューを照会するたびに、ビューを定義しているクエリが実行されます。大規模なビューや計算負荷の高いビューを頻繁にクエリする場合は、マテリアライズド ビューの作成を検討


ビューを実行する
SELECT * FROM `PROJECT_ID.DATASET_ID.TABLE_VIEW_ID`

テーブルと同じ方法でビューにクエリを実行します。


マテリアライズド ビューを作成する
CREATE MATERIALIZED VIEW `PROJECT_ID.DATASET_ID.TABLE_MV_ID`
 AS (
  SELECT
    customer_id as id,
    SUM(transaction_amount) as amount_sum
  FROM
    `PROJECT_ID.DATASET_ID.TABLE_ID`
  GROUP BY
    customer_id
)

・マテリアライズド ビューはクエリの結果を定期的にキャッシュに保存するため、同じデータをベーステーブルから取得するクエリよりも高速で、消費するリソースも少なくて済みます 詳細


マテリアライズド ビューを変更する(例:自動更新を60分ごとにする)
ALTER MATERIALIZED VIEW `PROJECT_ID.DATASET_ID.TABLE_MV_ID`
SET OPTIONS (enable_refresh = true, refresh_interval_minutes = 60)
  • デフォルトではマテリアライズド ビューは以下2つのタイミングで自動更新される
    1) 30 分ごとに更新
    2) ベーステーブルの変更から 5 分以内に自動的に更新。ただし前の更新から 30 分以内に更新されることはない。
    変更の例としては、行の挿入や行の削除があります。

  • 自動更新をオフにするには、enable_refresh を false に設定します


マテリアライズド ビューを削除する
DROP MATERIALIZED VIEW `PROJECT_ID.DATASET_ID.TABLE_MV_ID`


ルーティン

ルーティンを作成する (プロシージャ例:変数idを設定し、INSERT を実行して、テキスト文字列として結果を表示する)
CREATE OR REPLACE PROCEDURE `PROJECT_ID.DATASET_ID`.create_customer()
BEGIN
  DECLARE id STRING;
  # 36文字のランダム英数字を作成
  SET id = GENERATE_UUID();
  INSERT INTO `PROJECT_ID.DATASET_ID.TABLE_ID` (customer_id)
    VALUES (id);
  SELECT FORMAT("Created customer %s", id);
END

ルーティン:ストアド プロシージャ、ユーザー定義関数(UDF)、テーブル関数のいずれか


作成したルーティン(プロシージャ)を呼び出す
CALL `PROJECT_ID.DATASET_ID`.create_customer();


データセット内のルーティンを一覧表示する
SELECT
  routine_name, routine_type
FROM
   `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.ROUTINES`


ルーティンの本文を表示する
SELECT
  routine_definition
FROM
  `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.ROUTINES`
WHERE
  routine_name = 'create_customer';


ルーティンを削除する
DROP PROCEDURE IF EXISTS `PROJECT_ID.DATASET_ID`.create_customer
  • ストアド プロシージャの削除: DROP PROCEDURE
  • ユーザー定義関数: DROP FUNCTION
  • テーブル関数: DROP TABLE FUNCTION


ユーザー定義関数(UDF)

SQLでUDFを作成(例:第1引数に4を足して第2引数で割った値をFLOAT型で返却する)
CREATE TEMP FUNCTION AddFourAndDivide(x INT64, y INT64)
RETURNS FLOAT64
AS (
  (x + 4) / y
);

-- UDF関数の呼び出し
SELECT
  val, AddFourAndDivide(val, 2)
FROM
  UNNEST([2,3,5,8]) AS val;
  • SQL式または JavaScriptコードを使用して関数を作成できます。
  • 永続的または一時的のいずれかとして定義できます。永続的な関数の場合ROUTINEとして作成可能。
  • UNNEST():配列を受け取り各要素を一行ずつ取り出す


JavascriptでUDFを作成する
CREATE TEMP FUNCTION multiplyInputs(x FLOAT64, y FLOAT64)
RETURNS FLOAT64
LANGUAGE js
AS r"""
  m = x*y;
  return m;
""";

SELECT multiplyInputs(2,3);

raw文字列(r""" ... """)でjavascriptを記載する


テーブル関数

テーブル関数の作成 (例: customer_idが一致するレコードを抽出するテーブル関数 table_function)
CREATE OR REPLACE TABLE FUNCTION `PROJECT_ID.DATASET_ID`.table_function (
  id STRING
)
AS (
  SELECT * FROM `PROJECT_ID.DATASET_ID_TABLE_ID`
  WHERE customer_id = id
)

テーブル関数:パラメータ付きビューのこと


テーブル関数の使用
SELECT * FROM `PROJECT_ID.DATASET_ID`.table_function("1")


リモート関数

リモート関数を作成する
# 作成例:Cloud Functionsにデプロイしたリモート関数の引数の合計を返す e.g. calls = [[1,2,3], [2,3,4]] →[[6], [9]]
# Cloud Function(ver2)で以下関数をデプロイ
import functions_framework

@functions_framework.http
def batch_add(request):
  try:
    return_value = []
    request_json = request.get_json()
    print('request_json', request_json)
    calls = request_json['calls']
    for call in calls:
      return_value.append(sum([int(x) for x in call if x is not None]))
    # callsの要素数と同じreturn_valueリストを返却
    response = { "replies":  return_value } 
    return response

  except Exception as e:
    return { "errorMessage": str(e) }

--- リモート関数の作成(事前にCloud Functionsと同じリージョンの接続を作成する必要あり)

CREATE FUNCTION `PROJECT_ID.DATASET_ID`.remote_add(x INT64, y INT64) RETURNS INT64
REMOTE WITH CONNECTION `PROJECT_ID.LOCATION.CONNECTION_NAME`
OPTIONS (
  endpoint = 'ENDPOINT_URL'
)
  • リモート関数:Cloud Functions, Cloud Runにデプロイされた関数
  • Cloud FunctionsはHTTPトリガーで作成
  • 接続はcloud Functionsと同じリージョンで作成し、IAMから"Cloud Run Invoker"の権限を与える(ver2の場合)


検索インデックスの管理

検索インデックスを作成 (例:Logsテーブルを作成し、全列にLOG_ANALYZERの検索インデックスを作成する)
-- テストテーブルの作成
CREATE TABLE `PROJECT_ID.DATASET_ID`.Logs (Level STRING, Source STRING, Message STRING)
AS (
  SELECT 'INFO' as Level, '65.177.8.234' as Source, 'Entry Foo-Bar created' as Message
  UNION ALL
  SELECT 'WARNING', '132.249.240.10', 'Entry Foo-Bar already exists, created by 65.177.8.234'
  UNION ALL
  SELECT 'INFO', '94.60.64.181', 'Entry Foo-Bar deleted'
  UNION ALL
  SELECT 'SEVERE', '4.113.82.10', 'Entry Foo-Bar does not exist, deleted by 94.60.64.181'
  UNION ALL
  SELECT 'INFO', '181.94.60.64', 'Entry Foo-Baz created'
);

-- テーブル全列に検索インデックスを作成
CREATE SEARCH INDEX my_index ON `PROJECT_ID.DATASET_ID`.Logs(ALL COLUMNS);
  • 検索インデックスはSEARCH()関数とともに使用することで、テキストの検索が大幅に高速化されます
  • 検索インデックスは、大きなテーブルを想定して設計されています。10 GB 未満のテーブルに検索インデックスを作成した場合、インデックスは入力されません。
  • SEARCH(text,query,analyzer)関数のデフォルトのテキスト分析ツールのタイプはLOG_ANALYZER(厳密な一致を求める場合はNO_OP_ANALYZERを指定).


検索インデックスを使用してテキストを検索する
-- テーブル全体から文字列'baz'を含むレコードを抽出
SELECT * FROM `PROJECT_ID.DATASET_ID`.Logs WHERE SEARCH(Logs, 'baz');

search関数では、トークンの区切りに日本語の句読点は含まれないので、日本語テキストの検索は不向き 参考


検索インデックスに関する情報を取得
SELECT table_name, index_name, ddl, index_status
FROM `PROJECT_ID.DATASET_ID`.INFORMATION_SCHEMA.SEARCH_INDEXES

テーブルサイズが10G未満の場合、statusはdisabled


検索インデックスを削除する
DROP SEARCH INDEX my_index ON  `PROJECT_ID.DATASET_ID`.Logs;


Cloud Storageにクエリを行う

BigLake テーブルを作成する
CREATE EXTERNAL TABLE `PROJECT_ID.DATASET_ID.BIGLAKE_TABLE_ID`
  WITH CONNECTION `PROJECT_ID.DATASET_ID.CONNECTION_ID`
  OPTIONS (
    format ="CSV",
    # GCSバケットパス
    uris = ['BUCKET_PATH'[,...]],
    );
  • 事前に接続の作成必要(bigquery connection APIの認証も必要)
  • テーブルをクリックするとコンソール上部にBigLakeマークが表示される
  • BigLakeテーブルは行アクセスポリシー編集可能


外部テーブルを作成する
CREATE EXTERNAL TABLE `PROJECT_ID.DATASET_ID.EXTERNAL_TABLE_ID`
  OPTIONS (
    format ="CSV",
    uris = ['BUCKET_PATH'[,...]]
    );

Kendra+LLM+LangChainでRAG実装する

この記事はAWSブログ記事の内容をまとめたものです:高精度な生成系 AI アプリケーションを Amazon Kendra、LangChain、大規模言語モデルを使って作る


記事の概要

LLMとAWS Kendraを連携させることで、

  • LLMからの応答をKendraに保存されたドキュメントのみに絞ることができる

  • 応答の元になったドキュメントのURLを提示してくれる

単語の説明

RAGとは LLM出力のアップデートが可能(最新検索結果に対応させる、企業データに限定するなど)

LLMとは 大規模言語モデルのこと

Kendraとは 単語の列ではなく自然な言語で資料検索できるAWSサービス

LangChainとは LLMとKendraを簡単に統合できるフレームワーク LangChain詳細


RAGサンプルアプリを動かしてみる

構成図


対象LLM

Hugging Face、AI21 Labs、Cohere、Anthropic、OpenAI. (今後, Bedrock (Amazon Titan等)も含まれる予定).

実装手順
  1. 公開されている AWS CloudFormationの テンプレートを利用して、新しい Amazon Kendra インデックスを作成します。


2. AWS SDK for Python (boto3) をインストール pip install boto3


3. LangChain をインストール pip install langchain


4. 興味のある LLM の API キーの取得 例:OpenAI https://openai.com/


5. GitHub repoから、KendraIndexRetriever インターフェイスとサンプルアプリケーションをインストール


6. 環境変数を以下のように設定
export AWS_REGION=""<YOUR-AWS-REGION>""
export KENDRA_INDEX_ID=""<YOUR-KENDRA-INDEX-ID>""
export FLAN_XL_ENDPOINT=""<YOUR-SAGEMAKER-ENDPOINT-FOR-FLAN-T-XL>""
export FLAN_XXL_ENDPOINT=""<YOUR-SAGEMAKER-ENDPOINT-FOR-FLAN-T-XXL>""
export OPENAI_API_KEY=""<YOUR-OPEN-AI-API-KEY>""
export ANTHROPIC_API_KEY=""<YOUR-ANTHROPIC-API-KEY>"""


7. GitHub リポジトリをCloneした場所の samples サブディレクトリに移動


8. コマンドラインpython sample-file-name.py の実行
出力例(Flan-T-XXLモデル):

"Amazon SageMaker is a machine learning service that lets you train and deploy models in the cloud.
Sources: https://docs.aws.amazon.com/sagemaker/latest/dg/data-parallel-intro.html https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects-whatis.html https://docs.aws.amazon.com/sagemaker/latest/dg/whatis.html"


引用元へのリンクとともに回答してくれる。


以上

GCP外部からBigQueryに接続する方法

1. IAM管理画面からサービスアカウントを作成する(付与ロール例: BigQuery編集者)

 

2. サービスアカウントのキーを作成し、秘密鍵(.json)をダウンロードする

 

3. ダウンロードした秘密鍵のパスを環境変数GOOGLE_APPLICATION_CREDENTIALS として保存する

(例:GOOGLE_APPLICATION_CREDENTIALS = C:\Users\yourname\yourproject-12345-fef892f8w8.json

 

4. BigQueryライブラリのインストール

pip install google-cloud-bigquery

備考:サポートされているPythonのバージョン >= 3.7, < 3.11(2023/03/24時点)

 

5. クエリを実行する

(例:データセット内テーブルの全レコードを取得)

 

from google.cloud import bigquery

# クライアントの作成

client = bigquery.Client()

# クエリ作成

query_job = client.query(
    """
    SELECT
      *
    FROM `projectname.dataset.sometable`
    """
)

results = query_job.result() 

# 実行結果の表示

for i,row in enumerate(results):
    print('{0}:'.format(i+1),row)

 

 

注意事項:

"google.api_core.exceptions.Forbidden: 403 Access Denied:"というエラーが出たら以下2点を確認

1. サービスアカウントに適切なロールが付与されているか(例:"BigQueryデータ閲覧者"ロールでテーブルを作成しようとしている)

2. (Spread sheetから作成されたテーブルの場合) Spread sheetがサービスアカウントと共有されているか。 また以下コードをクライアントのインスタンス化の直前に挿入する

credentials, project = google.auth.default(
    scopes=[
        "https://www.googleapis.com/auth/drive",
        "https://www.googleapis.com/auth/cloud-platform",
    ]
)
client = bigquery.Client(credentials=credentials, project=project)

 

参考

cloud.google.com

discourse.metabase.com

 

cloud.google.com

Dataformとは何者か?

概要

BigQuery用のSQLワークフローの開発、テスト、バージョン管理、スケジュール設定ができるサービス

具体的に何ができるの?

  • テーブル定義や集計処理を、SQLXというSQLの拡張言語で記述することで、テーブル間の依存関係を元にワークフローを実行できる
  • レコードの空欄(Null値)チェックや重複データが存在していないかなどのカスタム検証処理を実行できる
  • ファイルのバージョン管理にGitを使用。Dataformリポジトリを作成すれば、GitHubやGitLabリポジトリに接続できる。
  • ワークフロー構成(Dataformの機能の一つ)やWorkflows, Cloud Composerを使用して、ワークフローの実行をスケジュールできる
  • 依存関係を含んだワークフローを、インタラクティブ有向非巡回グラフ(DAG)として表示できる


DAG表示例:

DAG例. second_viewがfirst_viewを依存関係に持つ

Dataform実行の流れ

  1. リポジトリを作成
  2. 開発用のワークスペースを作成
  3. ワークスペースでワークフローを開発(SQLXあるいはJavascript)
  4. コードを(リアルタイムで)SQLクエリにコンパイル
  5. テーブルの依存関係に従ってワークフローを実施

SQLXファイルの構成

SQLXファイルはconfigブロック、jsブロック、本文ブロックの3つから構成される。

  • configブロック:作成するテーブルタイプ(通常のテーブル、増分テーブル、ビュー)やテスト項目、依存関係などを記述する
  • jsブロック:Javascriptを記述する。定義された定数はbodyブロックで使用することができる。
  • bodyブロック:実行するSQLコードを記述する。Dataformコア*の組み込み関数が使用できる(依存関係を参照するref関数など)



*Dataformコア:Dataform内部で使用される、テーブルとワークフローを作成するためのメタ言語(SQL拡張言語)。依存関係管理システム、テスト機能、ドキュメント(テーブルや列の説明など)を提供する。 @Dataform/core npm


SQLX記述例:

//configブロック
config : {
  type: "table",
  assertions: {
    nonNull: ["user_id", "customer_id"] // Null値がないことのテスト
  }
}

//jsブロック
js {
  const someDate = require("includes/sample.js"); // Javascript定数をインポート
}

//bodyブロック
SELECT 
  user_id,
  customer_id,
  session_date
FROM
  ${ref("some_table")} // 依存テーブルの参照
WHERE 
  session_date > ${someDate} // Javascript定数の参照


ワークフローの実行

SQLXファイルを作成したら、definitions/ フォルダに配置し、ファイル編集ペイン上部にある「実行を開始」から作成したワークフローが実行できます。


Javascriptのパッケージをインストールして使用する

上述のコードのように、Javascriptの定数や関数は自作してSQLXファイルのjsブロックで呼び出すことができますが、npmJSやGitHubで公開されているNPMパッケージをインストールして使うこともできます。 その場合、package.jsonファイルのdependenciesブロックに "パッケージ名":"URL" の形で記載します。

{
  "name": "your-repository",
  "dependencies": {
    "@dataform/core": "2.0.1",
    "dataform-scd": "https://github.com/dataform-co/dataform-scd/archive/0.4.tar.gz" // "ゆっくり変化するディメンション" パッケージのインストール 
  }
}

参照:ゆっくり変化するディメンション


検証処理(テスト項目)の記載方法

Dataformでは、Null値や重複レコードのチェック、またはカスタム作成したテスト項目をSQLXファイルのconfigブロック内のassertionsブロックに記載することで、テーブルの作成前に検証処理が可能になります。(もしテストが不合格であればテーブルの作成はされません)

config {
  type:"table",
  assertions: {
    nonNull : ["user_id"], // Nullではないことのテスト. 複数列記載可能
    uniqueKey : ["customer_id"], // 一意であることのテスト
    rowConditions : [
      'session_date is null or session_date > "2022-08-01"', // カスタムSQL式
    ]
  }
}    

参考:アサーションでテーブルを検証する