EmbeddingBagレイヤーを用いて感情分析する
今回はPytorchのEmbeddingBagレイヤーを使い、日本語の入力文章がポジティブかネガティブか判定する簡易的な分類モデルを作成します。
単語の埋め込みベクトル化にはword2vecを用います。main関数内のパラメータを変えることでポジティブ、ニュートラル、ネガティブの3クラスなど、多クラス分類にも容易にカスタマイズできます。
EmbeddingBagを使う利点は、入力シーケンスがすべて同じ長さになるように、\<pad>でパディングする必要がなくなることです。
まずは以下のような教師データ20個をtsvファイルで準備します。
ラベル0はネガティブ、1はポジティブな文章です。
label text 0 これはひどい映画でした。 1 この映画が気に入りました! 強くお勧めします。 0 ただひどい 1 良い映画と演技です 0 時間を無駄にしないでください - 本当にダメです 0 ひどい 1 素晴らしい映画。 0 これは才能の無駄遣いでした。 1 この映画がとても気に入りました。 見てください。 1 ここ数年で見た中で最高の映画。 0 演技も下手、脚本も下手。 1 この映画をみんなにお勧めします 1 面白くて楽しい。 0 私はこの映画がまったく好きではありませんでした。 1 古き良き時代の物語 0 その話は私には意味が分かりませんでした。 0 最初から最後まで素人っぽい。 1 この動きがとても気に入りました。 多くの楽しみ。 0 私はこの映画が嫌いで出て行きました。 1 すべての年齢層向けのスリル満点の冒険。
単語のベクトル化にはword2vecを事前にダウンロードしたものを使います。
from torchtext.vocab import Vectors v=Vectors(name='japanese_word2vec_vectors.vec')
教師データを読み込み、label, textのボキャブラリーを構築します
import pandas as pd from torchtext.vocab import vocab, build_vocab_from_iterator df = pd.read_csv('sample_train.tsv', names=['label','text'], header=0, sep='\t') # label df['label'] = df['label'].astype(str) label_vocab = build_vocab_from_iterator(df['label']) # text text_vocab = vocab(v.stoi, min_freq=0, specials=('<unk>',), special_first=True) text_vocab.set_default_index(0)
今回は単語分割にはjanomeを使用します。正規化などは行わず、単純にわかち書きのみ行います。
from janome.tokenizer import Tokenizer j_t = Tokenizer() def tokenizer_janome(text): return [tok for tok in j_t.tokenize(text, wakati=True)]
テキスト変換を行うtransformを定義します。文章の長さを整えるpaddingは行いません。
import torchtext.transforms as T label_transform = T.Sequential( T.VocabTransform(label_vocab), T.ToTensor() ) text_transform = T.Sequential( T.VocabTransform(text_vocab), T.ToTensor() )
Dataloaderに渡すCollate_fnを定義します。EmbeddingBagに文章の区切り位置を示すoffsetを渡す必要があるので、ここで返却できるようにします。
def collate_batch(batch): label_l = [] text_l = [] offset_l = [0] for label, text in batch: label_l.append(label) tokens = text_transform(tokenizer_janome(text)) offset_l.append(len(tokens)) text_l.append(tokens) label_l = label_transform(label_l) offset_l = torch.tensor(offset_l[:-1]).cumsum(dim=0) text_l = torch.cat(text_l) return label_l, offset_l, text_l
EmbeddingBagレイヤーを使用した分類クラスを定義します。
import torch.nn as nn class TextClassificationModel(nn.Module): def __init__(self, pretrained_embeddings, num_class, freeze_embeddings = False): super().__init__() self.embedding = nn.EmbeddingBag.from_pretrained(pretrained_embeddings, freeze = freeze_embeddings, ) # EmbeggingBag で各文が埋め込みベクトルで表されます。 # 単語の連続情報が失われるため、以下単純なニューラル ネットワークを使用します。 self.fc = nn.Linear(pretrained_embeddings.shape[1], num_class) self.init_weights() def init_weights(self): initrange = 0.5 self.fc.weight.data.uniform_(-initrange, initrange) self.fc.bias.data.zero_() def forward(self, text, offsets): embedded = self.embedding(text, offsets) return self.fc(embedded)
train関数を定義します。最適化手法にはSGD、損失関数はCrossEntropyLoss()を使用しています。
import torch def train(net, ldr, bs, me, le, lr): # パラメータ: network, Dataloader, batch_size, max_epochs, log_every, learn_rate net.train() opt = torch.optim.SGD(net.parameters(), lr=lr) loss_func = torch.nn.CrossEntropyLoss() # 内部でsoftmaxが行われる print("\nStarting training") for epoch in range(0, me): epoch_loss = 0.0 for bix, (labels, offsets, texts) in enumerate(ldr): opt.zero_grad() oupt = net(texts, offsets) loss_val = loss_func(oupt, labels) loss_val.backward() epoch_loss += loss_val.item() opt.step() if epoch % le == 0: print("epoch = %4d loss = %0.4f" % (epoch, epoch_loss)) print("Done ")
ここまで準備できたら、main関数を定義します。
from torch.utils.data import DataLoader def main(): # dataloader作成 bat_size = 2 train_ldr = DataLoader(df.values, batch_size=bat_size, shuffle=False, collate_fn=collate_batch) # model作成 num_class=2 net = TextClassificationModel(pretrained_embeddings, num_class) # 訓練 max_epochs = 150 log_interval = 30 lrn_rate = 0.01 train(net, train_ldr, bat_size, max_epochs, log_interval, lrn_rate) # 予測 text = "この映画めちゃ面白い" net.eval() with torch.no_grad(): oupt = net(text_transform(tokenizer_janome(text)), torch.tensor([0])) predict = torch.softmax(oupt, dim=1) print("text: {}".format(text)) print("Sentiment prediction probabilities [neg, pos]: ") print(predict)
出力
... text: この映画めちゃ面白い Sentiment prediction probabilities [neg, pos]: tensor([[0.0188, 0.9812]])
参考記事:
visualstudiomagazine.com
分散表現の事前学習済みモデル(fastText等)からボキャブラリーを作成する(pytorch)
古いバージョンのtorchtext(~0.12)ではtorchtext.data.Field.build_vocab(vectors = ...) でベクトル化したボキャブラリーが簡単に作れましたが、新しいバージョンではこの機能がなくなってしまい、Gloveやword2vec, fastTextなど単語分散表現の学習済みモデルからどのようにボキャブラリーを構築すればいいのかわかりづらかったので、以下にまとめ
環境
torchtext : 0.16
from torchtext.vocab import FastText,vocab # FastText単語ベクトル読み込み myvec = FastText() # ボキャブラリー作成 myvocab = vocab(ordered_dict = myvec.stoi, min_freq = 0, specials = ('<unk>',), special_first = True) # 出力 myvocab.get_itos() > ['<unk>, ',', 'the', '.', 'and', 'of',...] len(myvocab) > 999995
備考:
- vocab()引数のordered_dict に myvec.stoi (単語:インデックス にマッピングされた辞書) を渡す
- min_freq(ボキャブラリーに含める最小出現数) はデフォルトで1になっているため、0を指定する。(fastTextやGloVeから辞書を作る場合、myvec.stoi = {',': 0, 'the': 1, '.': 2, 'and': 3, 'of': 4,...}となり、0番目の単語が無視されてしまう)
参考
BigQuery SQLコマンド集-2023年度版
- データセットの管理
- テーブルの管理
- パーティーション分割
- クラスタリング
- テーブルスナップショット
- ビュー, マテリアライズド ビュー
- ルーティン
- ユーザー定義関数(UDF)
- テーブル関数
- リモート関数
- 検索インデックスの管理
- Cloud Storageにクエリを行う
データセットの管理
データセットを作成する
CREATE SCHEMA `PROJECT_ID.DATASET_ID` OPTIONS ( description = 'データセット説明', location = 'US' )
location例:
US: マルチリージョン
asia-northeast1: 東京
BigQuery のロケーション | Google Cloud
プロジェクトの全データセット名をリスト表示
SELECT schema_name FROM `PROJECT_ID`.INFORMATION_SCHEMA.SCHEMATA
INFORMATION_SCHEMA.SCHEMATA ビューから全データセット名、作成日時、更新日時等が取得できる
データセットの削除
DROP SCHEMA IF EXISTS `PROJECT_ID.DATASET_ID`
削除前にデータセット内のテーブルとビューを削除すること
テーブルの管理
テーブルの作成
CREATE TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` ( x INT64 OPTIONS (description = '列説明'), y STRING ) OPTIONS ( description = 'テーブル説明' )
テーブルのメタデータの表示
SELECT * FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLES`;
テーブル名、テーブルタイプ、作成日時、作成したコマンド(DDL)などが表示される
テーブルの削除
DROP TABLE `PROJECT_ID.DATASET_ID.TABLE_ID`
テーブルの外部エクスポート
EXPORT DATA OPTIONS ( # 出力先URI uri = 'gs://bucket/folder/*.csv', format = 'CSV', overwrite = true, header = true, field_delimiter = ';') AS ( SELECT * FROM `PROJECT_ID.DATASET_ID.TABLE_ID` );
- エクスポート先はGoogle Cloud Storageのみ
- CSV、JSON、Avro、Parquet(プレビュー)形式対応
- 1ファイル最大 1 GB 。1 GB を超えるデータをエクスポートする場合は複数ファイルに分割される
- Cloud Storageに書き込む権限必要 参考:https://cloud.google.com/bigquery/docs/exporting-data?hl=ja
ネストされた列と繰り返し列を含むテーブルを作成する
CREATE TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` ( id INT64, addresses ARRAY < STRUCT < address STRING, status STRING > > )
- 補足: 各IDごとに複数住所(address)とステータス(status)を格納できるテーブルを定義。
ネストデータ:STRUCT
繰り返しデータ:ARRAY - BigQueryでは多くの場合、ネストされた繰り返しフィールドを使用して単一のテーブルにクエリを実行すると、複数のテーブルを結合する場合よりも効率的に処理されます。
ネストされたレコードの挿入
INSERT INTO `PROJECT_ID.DATASET_ID.TABLE_ID` (id, addresses) VALUES ("1",[("past","Tokyo"),("now","Kanagawa")]), ("2",[("past","Chiba"),("now","Saitama")])
ネストされた繰り返し列(addresses)に対して、特定の位置にある ARRAYの値を取得する
SELECT # 1番目の値 addresses[offset(0)].address FROM `PROJECT_ID.DATASET_ID.TABLE_ID` ORDER BY id
- offset(index): インデックスは 0 から始まります。範囲外の場合はエラーが発生します。
- safe_offset(index): インデックスは 0 から始まります。範囲外の場合にNULLを返します。
- ordinal(index): インデックスは 1 から始まります。範囲外の場合はエラーが発生します。
- safe_ordinal(index): インデックスは 1 から始まります。範囲外の場合にNULLを返します。
テーブルに空の列を追加する
ALTER TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` ADD COLUMN new_column STRING;
テーブルの列の名前を変更する
ALTER TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` RENAME COLUMN old_column TO renamed_column;
テーブルの列のデフォルト値を変更する
ALTER TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` ALTER COLUMN x SET DEFAULT 0;
列のデフォルト値を削除する
ALTER TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` ALTER COLUMN x DROP DEFAULT;
パーティーション分割
(時間単位列)パーティション分割テーブルを作成する
CREATE TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` (transaction_id INT64, transaction_date DATE) PARTITION BY # transaction_date列で分割 transaction_date OPTIONS ( require_partition_filter = TRUE )
- パーティション:特定の範囲でテーブルを分割する。パーティション作成するとテーブルの一部のみスキャン→コスト削減、パフォーマンス向上
参考:パーティション列からレコードを絞り込むことをプルーニングと呼ぶ。条件式が定数式であること等、プルーニングが動作するためにはいくつか条件あり。詳細 - 整数範囲、時間単位列、取り込み時間パーティショニングの3種類
- オプションでrequire_partition_filter = TRUE にすると、SELECT時のフィルター指定(where句)を強制できる→where句を指定しないとエラー
(取り込み時間)パーティション分割テーブルを作成する
CREATE TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` (transaction_id INT64) PARTITION BY _PARTITIONDATE
PARTITIONTIME (あるいはPARTITIONDATE) 列は、レコードの取り込み時間/日付がレコード挿入時に自動的に格納される。
パーティション フィルタ要件を更新する
ALTER TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` SET OPTIONS ( require_partition_filter = FALSE);
クラスタリング
クラスタ化テーブルを作成する
CREATE TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` ( customer_id STRING, product_id STRING, transaction_amount NUMERIC ) CLUSTER BY # クラスタリング列 customer_id,product_id;
- 最大 4 つのクラスタリング列を指定可能。
- 指定した列の順序によって、データの並べ替え順序が決まる。最も頻繁にフィルタリングまたは集計される列を最初に置く
クラスタリング列をWHERE句でフィルタリングする
SELECT SUM(x) FROM `PROJECT_ID.DATASET_ID.TABLE_ID` WHERE customer_id = "1" AND product_id = "002"
- フィルタリング順序はクラスタリング指定列の順序になっている必要あり(例:上記customer_idとproduct_idの順序を逆にするとクエリが最適化されない)
- パーティション同様、複雑な(動的に値が決まる)フィルタ式ではクラスタ化列を使用しない
テーブルクローンを作成する
CREATE TABLE `PROJECT_ID.DATASET_ID.CRONED_TABLE_ID` CLONE `PROJECT_ID.DATASET_ID.BASE_TABLE_ID`;
- テーブルクローン:ベーステーブルの軽量で書き込み可能なコピー
- 標準テーブル同様にクエリ、パーティション等可能
- ビューや外部テーブルのクローンは不可(コピーと同じ)
- ベーステーブルorクローンのデータを変更しても互いに反映されない(独立したテーブル)
- コピーとの違いはストレージ料金。クローンは新たに変更・追加されたデータ分のみ課金される
テーブルクローン概要
テーブルスナップショット
24 時間後に期限切れになるテーブルのスナップショットを作成
CREATE SNAPSHOT TABLE `PROJECT_ID.DATASET_ID.SNAPSHOT_TABLE_ID` CLONE `PROJECT_ID.DATASET_ID.BASE_TABLE_ID` OPTIONS ( expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) );
- テーブルスナップショット:別のテーブル(ベーステーブル)の軽量で読み取り専用のコピー
- ベーステーブルと異なるデータセット内に作成することで、ベーステーブルのデータセットが誤って削除された場合でも、テーブル スナップショットからベーステーブルを復元できます
- 過去 7 日間の任意の時点におけるテーブルのスナップショットを作成することが可能
テーブル スナップショットを新しいテーブルに復元
CREATE TABLE `PROJECT_ID.DATASET_ID.TABLE_ID` CLONE `PROJECT_ID.DATASET_ID.SNAPSHOT_TABLE_ID`
テーブル スナップショットのメタデータを取得する
select * from `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLE_SNAPSHOTS` where table_name = 'some_table_snapshot';
テーブル スナップショットをコピーする
CREATE SNAPSHOT TABLE `PROJECT_ID.DATASET_ID.COPIED_SNAPSHOT_TABLE_ID` CLONE `PROJECT_ID.DATASET_ID.SNAPSHOT_TABLE_ID`;
テーブル スナップショットを作成するプロセスと似ています。相違点は、ソーステーブルとしてコピー元のテーブル スナップショットを指定することです
テーブル スナップショットを削除する
DROP SNAPSHOT TABLE `PROJECT_ID.DATASET_ID.SNAPSHOT_TABLE_ID`;
ビュー, マテリアライズド ビュー
ビューを作成する
CREATE VIEW `PROJECT_ID.DATASET_ID.TABLE_VIEW_ID` ( customer_id, transaction_amount_sum) AS ( SELECT customer_id, SUM(transaction_amount) FROM `PROJECT_ID.DATASET_ID.TABLE_ID` GROUP BY customer_id )
ビューを照会するたびに、ビューを定義しているクエリが実行されます。大規模なビューや計算負荷の高いビューを頻繁にクエリする場合は、マテリアライズド ビューの作成を検討
ビューを実行する
SELECT * FROM `PROJECT_ID.DATASET_ID.TABLE_VIEW_ID`
テーブルと同じ方法でビューにクエリを実行します。
マテリアライズド ビューを作成する
CREATE MATERIALIZED VIEW `PROJECT_ID.DATASET_ID.TABLE_MV_ID` AS ( SELECT customer_id as id, SUM(transaction_amount) as amount_sum FROM `PROJECT_ID.DATASET_ID.TABLE_ID` GROUP BY customer_id )
・マテリアライズド ビューはクエリの結果を定期的にキャッシュに保存するため、同じデータをベーステーブルから取得するクエリよりも高速で、消費するリソースも少なくて済みます 詳細
マテリアライズド ビューを変更する(例:自動更新を60分ごとにする)
ALTER MATERIALIZED VIEW `PROJECT_ID.DATASET_ID.TABLE_MV_ID` SET OPTIONS (enable_refresh = true, refresh_interval_minutes = 60)
デフォルトではマテリアライズド ビューは以下2つのタイミングで自動更新される
1) 30 分ごとに更新
2) ベーステーブルの変更から 5 分以内に自動的に更新。ただし前の更新から 30 分以内に更新されることはない。
変更の例としては、行の挿入や行の削除があります。自動更新をオフにするには、enable_refresh を false に設定します
マテリアライズド ビューを削除する
DROP MATERIALIZED VIEW `PROJECT_ID.DATASET_ID.TABLE_MV_ID`
ルーティン
ルーティンを作成する (プロシージャ例:変数idを設定し、INSERT を実行して、テキスト文字列として結果を表示する)
CREATE OR REPLACE PROCEDURE `PROJECT_ID.DATASET_ID`.create_customer() BEGIN DECLARE id STRING; # 36文字のランダム英数字を作成 SET id = GENERATE_UUID(); INSERT INTO `PROJECT_ID.DATASET_ID.TABLE_ID` (customer_id) VALUES (id); SELECT FORMAT("Created customer %s", id); END
ルーティン:ストアド プロシージャ、ユーザー定義関数(UDF)、テーブル関数のいずれか
作成したルーティン(プロシージャ)を呼び出す
CALL `PROJECT_ID.DATASET_ID`.create_customer();
データセット内のルーティンを一覧表示する
SELECT routine_name, routine_type FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.ROUTINES`
ルーティンの本文を表示する
SELECT routine_definition FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.ROUTINES` WHERE routine_name = 'create_customer';
ルーティンを削除する
DROP PROCEDURE IF EXISTS `PROJECT_ID.DATASET_ID`.create_customer
ユーザー定義関数(UDF)
SQLでUDFを作成(例:第1引数に4を足して第2引数で割った値をFLOAT型で返却する)
CREATE TEMP FUNCTION AddFourAndDivide(x INT64, y INT64) RETURNS FLOAT64 AS ( (x + 4) / y ); -- UDF関数の呼び出し SELECT val, AddFourAndDivide(val, 2) FROM UNNEST([2,3,5,8]) AS val;
- SQL式または JavaScriptコードを使用して関数を作成できます。
- 永続的または一時的のいずれかとして定義できます。永続的な関数の場合ROUTINEとして作成可能。
- UNNEST():配列を受け取り各要素を一行ずつ取り出す
JavascriptでUDFを作成する
CREATE TEMP FUNCTION multiplyInputs(x FLOAT64, y FLOAT64) RETURNS FLOAT64 LANGUAGE js AS r""" m = x*y; return m; """; SELECT multiplyInputs(2,3);
raw文字列(r""" ... """)でjavascriptを記載する
テーブル関数
テーブル関数の作成 (例: customer_idが一致するレコードを抽出するテーブル関数 table_function)
CREATE OR REPLACE TABLE FUNCTION `PROJECT_ID.DATASET_ID`.table_function ( id STRING ) AS ( SELECT * FROM `PROJECT_ID.DATASET_ID_TABLE_ID` WHERE customer_id = id )
テーブル関数:パラメータ付きビューのこと
テーブル関数の使用
SELECT * FROM `PROJECT_ID.DATASET_ID`.table_function("1")
リモート関数
リモート関数を作成する
# 作成例:Cloud Functionsにデプロイしたリモート関数の引数の合計を返す e.g. calls = [[1,2,3], [2,3,4]] →[[6], [9]] # Cloud Function(ver2)で以下関数をデプロイ import functions_framework @functions_framework.http def batch_add(request): try: return_value = [] request_json = request.get_json() print('request_json', request_json) calls = request_json['calls'] for call in calls: return_value.append(sum([int(x) for x in call if x is not None])) # callsの要素数と同じreturn_valueリストを返却 response = { "replies": return_value } return response except Exception as e: return { "errorMessage": str(e) } --- リモート関数の作成(事前にCloud Functionsと同じリージョンの接続を作成する必要あり) CREATE FUNCTION `PROJECT_ID.DATASET_ID`.remote_add(x INT64, y INT64) RETURNS INT64 REMOTE WITH CONNECTION `PROJECT_ID.LOCATION.CONNECTION_NAME` OPTIONS ( endpoint = 'ENDPOINT_URL' )
- リモート関数:Cloud Functions, Cloud Runにデプロイされた関数
- Cloud FunctionsはHTTPトリガーで作成
- 接続はcloud Functionsと同じリージョンで作成し、IAMから"Cloud Run Invoker"の権限を与える(ver2の場合)
検索インデックスの管理
検索インデックスを作成 (例:Logsテーブルを作成し、全列にLOG_ANALYZERの検索インデックスを作成する)
-- テストテーブルの作成 CREATE TABLE `PROJECT_ID.DATASET_ID`.Logs (Level STRING, Source STRING, Message STRING) AS ( SELECT 'INFO' as Level, '65.177.8.234' as Source, 'Entry Foo-Bar created' as Message UNION ALL SELECT 'WARNING', '132.249.240.10', 'Entry Foo-Bar already exists, created by 65.177.8.234' UNION ALL SELECT 'INFO', '94.60.64.181', 'Entry Foo-Bar deleted' UNION ALL SELECT 'SEVERE', '4.113.82.10', 'Entry Foo-Bar does not exist, deleted by 94.60.64.181' UNION ALL SELECT 'INFO', '181.94.60.64', 'Entry Foo-Baz created' ); -- テーブル全列に検索インデックスを作成 CREATE SEARCH INDEX my_index ON `PROJECT_ID.DATASET_ID`.Logs(ALL COLUMNS);
- 検索インデックスはSEARCH()関数とともに使用することで、テキストの検索が大幅に高速化されます。
- 検索インデックスは、大きなテーブルを想定して設計されています。10 GB 未満のテーブルに検索インデックスを作成した場合、インデックスは入力されません。
- SEARCH(text,query,analyzer)関数のデフォルトのテキスト分析ツールのタイプはLOG_ANALYZER(厳密な一致を求める場合はNO_OP_ANALYZERを指定).
検索インデックスを使用してテキストを検索する
-- テーブル全体から文字列'baz'を含むレコードを抽出 SELECT * FROM `PROJECT_ID.DATASET_ID`.Logs WHERE SEARCH(Logs, 'baz');
search関数では、トークンの区切りに日本語の句読点は含まれないので、日本語テキストの検索は不向き 参考
検索インデックスに関する情報を取得
SELECT table_name, index_name, ddl, index_status FROM `PROJECT_ID.DATASET_ID`.INFORMATION_SCHEMA.SEARCH_INDEXES
テーブルサイズが10G未満の場合、statusはdisabled
検索インデックスを削除する
DROP SEARCH INDEX my_index ON `PROJECT_ID.DATASET_ID`.Logs;
Cloud Storageにクエリを行う
BigLake テーブルを作成する
CREATE EXTERNAL TABLE `PROJECT_ID.DATASET_ID.BIGLAKE_TABLE_ID` WITH CONNECTION `PROJECT_ID.DATASET_ID.CONNECTION_ID` OPTIONS ( format ="CSV", # GCSバケットパス uris = ['BUCKET_PATH'[,...]], );
- 事前に接続の作成必要(bigquery connection APIの認証も必要)
- テーブルをクリックするとコンソール上部にBigLakeマークが表示される
- BigLakeテーブルは行アクセスポリシー編集可能
外部テーブルを作成する
CREATE EXTERNAL TABLE `PROJECT_ID.DATASET_ID.EXTERNAL_TABLE_ID` OPTIONS ( format ="CSV", uris = ['BUCKET_PATH'[,...]] );
- BigLakeテーブルとの違いはサービスアカウントによる接続(WITH CONNECTION)があるかないか
- パーティション分割テーブルに外部テーブルを作成するには、gcs上のファイルがHive パーティション分割レイアウトに従う必要がある
参考:
外部テーブルを作成する
外部でパーティションに分割されたデータを使用する
Kendra+LLM+LangChainでRAG実装する
この記事はAWSブログ記事の内容をまとめたものです:高精度な生成系 AI アプリケーションを Amazon Kendra、LangChain、大規模言語モデルを使って作る
記事の概要
LLMとAWS Kendraを連携させることで、
LLMからの応答をKendraに保存されたドキュメントのみに絞ることができる
応答の元になったドキュメントのURLを提示してくれる
単語の説明
RAGとは LLM出力のアップデートが可能(最新検索結果に対応させる、企業データに限定するなど)
LLMとは 大規模言語モデルのこと
Kendraとは 単語の列ではなく自然な言語で資料検索できるAWSサービス
LangChainとは LLMとKendraを簡単に統合できるフレームワーク LangChain詳細
RAGサンプルアプリを動かしてみる
構成図
対象LLM
Hugging Face、AI21 Labs、Cohere、Anthropic、OpenAI. (今後, Bedrock (Amazon Titan等)も含まれる予定).
実装手順
- 公開されている AWS CloudFormationの テンプレートを利用して、新しい Amazon Kendra インデックスを作成します。
2. AWS SDK for Python (boto3) をインストール
pip install boto3
3. LangChain をインストール
pip install langchain
4. 興味のある LLM の API キーの取得
例:OpenAI https://openai.com/
5. GitHub repoから、KendraIndexRetriever インターフェイスとサンプルアプリケーションをインストール
6. 環境変数を以下のように設定
export AWS_REGION=""<YOUR-AWS-REGION>""
export KENDRA_INDEX_ID=""<YOUR-KENDRA-INDEX-ID>""
export FLAN_XL_ENDPOINT=""<YOUR-SAGEMAKER-ENDPOINT-FOR-FLAN-T-XL>""
export FLAN_XXL_ENDPOINT=""<YOUR-SAGEMAKER-ENDPOINT-FOR-FLAN-T-XXL>""
export OPENAI_API_KEY=""<YOUR-OPEN-AI-API-KEY>""
export ANTHROPIC_API_KEY=""<YOUR-ANTHROPIC-API-KEY>"""
7. GitHub リポジトリをCloneした場所の samples サブディレクトリに移動
8. コマンドラインで python sample-file-name.py
の実行
出力例(Flan-T-XXLモデル):
"Amazon SageMaker is a machine learning service that lets you train and deploy models in the cloud.
Sources: https://docs.aws.amazon.com/sagemaker/latest/dg/data-parallel-intro.html https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects-whatis.html https://docs.aws.amazon.com/sagemaker/latest/dg/whatis.html"
引用元へのリンクとともに回答してくれる。
以上
GCP外部からBigQueryに接続する方法
1. IAM管理画面からサービスアカウントを作成する(付与ロール例: BigQuery編集者)
2. サービスアカウントのキーを作成し、秘密鍵(.json)をダウンロードする
3. ダウンロードした秘密鍵のパスを環境変数名 GOOGLE_APPLICATION_CREDENTIALS として保存する
(例:GOOGLE_APPLICATION_CREDENTIALS = C:\Users\yourname\yourproject-12345-fef892f8w8.json
4. BigQueryライブラリのインストール
pip install google-cloud-bigquery
備考:サポートされているPythonのバージョン >= 3.7, < 3.11(2023/03/24時点)
5. クエリを実行する
(例:データセット内テーブルの全レコードを取得)
from google.cloud import bigquery
# クライアントの作成
client = bigquery.Client()
# クエリ作成
query_job = client.query(
"""
SELECT
*
FROM `projectname.dataset.sometable`
"""
)
results = query_job.result()
# 実行結果の表示
for i,row in enumerate(results):
print('{0}:'.format(i+1),row)
注意事項:
"google.api_core.exceptions.Forbidden: 403 Access Denied:"というエラーが出たら以下2点を確認
1. サービスアカウントに適切なロールが付与されているか(例:"BigQueryデータ閲覧者"ロールでテーブルを作成しようとしている)
2. (Spread sheetから作成されたテーブルの場合) Spread sheetがサービスアカウントと共有されているか。 また以下コードをクライアントのインスタンス化の直前に挿入する
credentials, project = google.auth.default(
scopes=[
"https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/cloud-platform",
]
)
client = bigquery.Client(credentials=credentials, project=project)
参考
Dataformとは何者か?
概要
BigQuery用のSQLワークフローの開発、テスト、バージョン管理、スケジュール設定ができるサービス
具体的に何ができるの?
- テーブル定義や集計処理を、SQLXというSQLの拡張言語で記述することで、テーブル間の依存関係を元にワークフローを実行できる。
- レコードの空欄(Null値)チェックや重複データが存在していないかなどのカスタム検証処理を実行できる。
- ファイルのバージョン管理にGitを使用。Dataformリポジトリを作成すれば、GitHubやGitLabリポジトリに接続できる。
- ワークフロー構成(Dataformの機能の一つ)やWorkflows, Cloud Composerを使用して、ワークフローの実行をスケジュールできる。
- 依存関係を含んだワークフローを、インタラクティブな有向非巡回グラフ(DAG)として表示できる。
DAG表示例:
Dataform実行の流れ
- リポジトリを作成
- 開発用のワークスペースを作成
- ワークスペースでワークフローを開発(SQLXあるいはJavascript)
- コードを(リアルタイムで)SQLクエリにコンパイル
- テーブルの依存関係に従ってワークフローを実施
SQLXファイルの構成
SQLXファイルはconfigブロック、jsブロック、本文ブロックの3つから構成される。
- configブロック:作成するテーブルタイプ(通常のテーブル、増分テーブル、ビュー)やテスト項目、依存関係などを記述する
- jsブロック:Javascriptを記述する。定義された定数はbodyブロックで使用することができる。
- bodyブロック:実行するSQLコードを記述する。Dataformコア*の組み込み関数が使用できる(依存関係を参照するref関数など)
*Dataformコア:Dataform内部で使用される、テーブルとワークフローを作成するためのメタ言語(SQL拡張言語)。依存関係管理システム、テスト機能、ドキュメント(テーブルや列の説明など)を提供する。 @Dataform/core npm
SQLX記述例:
//configブロック config : { type: "table", assertions: { nonNull: ["user_id", "customer_id"] // Null値がないことのテスト } } //jsブロック js { const someDate = require("includes/sample.js"); // Javascript定数をインポート } //bodyブロック SELECT user_id, customer_id, session_date FROM ${ref("some_table")} // 依存テーブルの参照 WHERE session_date > ${someDate} // Javascript定数の参照
ワークフローの実行
SQLXファイルを作成したら、definitions/ フォルダに配置し、ファイル編集ペイン上部にある「実行を開始」から作成したワークフローが実行できます。
Javascriptのパッケージをインストールして使用する
上述のコードのように、Javascriptの定数や関数は自作してSQLXファイルのjsブロックで呼び出すことができますが、npmJSやGitHubで公開されているNPMパッケージをインストールして使うこともできます。
その場合、package.jsonファイルのdependenciesブロックに "パッケージ名":"URL" の形で記載します。
{ "name": "your-repository", "dependencies": { "@dataform/core": "2.0.1", "dataform-scd": "https://github.com/dataform-co/dataform-scd/archive/0.4.tar.gz" // "ゆっくり変化するディメンション" パッケージのインストール } }
検証処理(テスト項目)の記載方法
Dataformでは、Null値や重複レコードのチェック、またはカスタム作成したテスト項目をSQLXファイルのconfigブロック内のassertionsブロックに記載することで、テーブルの作成前に検証処理が可能になります。(もしテストが不合格であればテーブルの作成はされません)
config { type:"table", assertions: { nonNull : ["user_id"], // Nullではないことのテスト. 複数列記載可能 uniqueKey : ["customer_id"], // 一意であることのテスト rowConditions : [ 'session_date is null or session_date > "2022-08-01"', // カスタムSQL式 ] } }