Spotifyで音楽を聴いたり、Instagramで友達の画像を見たり、YouTubeで新作の予告編を見たりと、今や私たちが使うほぼすべてのアプリケーションで膨大な量のデータを抱えています。
サーバーからあなたへ、常にデータが送信されているのです。
これは、一人のユーザーにとっては問題ないでしょう。
しかし、何千、何百万という大きなデータのリクエストを同時に処理することを想像してみてください。
このようなデータの流れは、物理的にユーザーに提供できるようにするために、何らかの方法で縮小する必要があります。
圧縮技術には様々なものがあり、その使い方や互換性も様々です。
例えば、MPEG-2 Audio Layer III (MP3)コーデックのように、オーディオファイルにしか使えない圧縮技術もある。
圧縮には、主に2つのタイプがあります。
- ロスレス。ロスレス:あまり「削らない」としても、データの完全性と正確性が優先されます。
- ロッシー。ロッシー:データの整合性と正確性は、いかに速く提供できるかということほど重要ではありません。リアルタイムビデオ転送を想像してみてください。
例えば、オートエンコーダーを使うと、この画像を分解し、以下のような32ベクトルコードとして表現することができます。
それを使って、画像を再構成することができます。
もちろん、これは非可逆圧縮の例で、かなりの情報が失われている。
しかし、同じ手法で、より多くの空間を表現に割り当てることで、より正確にこれを行うことができる。
オートエンコーダーとは?
オートエンコーダーとは、定義上、何かを自動的に符号化する技術である。
ニューラルネットワークを使うことで、オートエンコーダーはデータ(この場合は画像)をかなり小さなデータビットに分解する方法を学習し、その表現を使って元のデータをできるだけオリジナルに近い形で再構成することができる。
この作業には、2つの重要な要素があります。
- エンコーダー。エンコーダー:元の入力を小さなエンコーディングに圧縮する方法を学習する。
- デコーダ。エンコーダ:元のデータをエンコーダで生成された符号化方式から復元する方法を学習する。
この2つを共生させることで、元のデータをあまり失うことなく、最も効率的に復元できるデータ表現が得られる。
出典:ResearchGate
エンコーダー
エンコーダーの仕事は、保存できる最小のデータ表現を見つけることです。
元のデータの最も顕著な特徴を抽出し、デコーダーが理解できる方法でそれを表現します。
例えば、大きな数字を記憶するように、その中に記憶できるパターンを見つけ、そのパターンから数列全体を復元しようとする。
エンコーダの最も単純なものは、単純な人工ニューラルネットワーク(ANN)である。
ただし、ANNの中でも特殊なCNN(Convolutional Neural Networks)を利用したエンコーダーもある。
エンコーダーは入力データを受け取り、それを符号化したもの(圧縮データ)を生成する。
そして、その圧縮されたデータを使ってユーザーに送信し、そこでデコードと再構成を行うことができるのです。
LFWデータセットの例でエンコードを見てみましょう。
このエンコーディングは我々にとってはあまり意味をなさないが、デコーダにとっては十分なものである。
さて、ここで疑問が生じます。
という疑問が湧きます。
しかし、エンコーダーはどのようにしてこのような画像の圧縮を学習したのでしょうか?
という疑問はもっともです。
ここで、学習時の共生が活きてきます。
デコーダー
デコーダはエンコーダと似たような働きをするが、その逆である。
デコーダは、圧縮されたコード表現を生成するのではなく、読み取ることを学習し、その情報に基づいて画像を生成します。
当然ながら、再構成時の損失を最小限に抑えることが目的だ。
出力は、平均二乗誤差(MSE)を使って、再構成された画像と元の画像を比較することで評価されます(元の画像と似ているほど、誤差は小さくなります)。
このとき、デコーダからエンコーダまで、すべてのパラメータを逆伝播して更新する。
このように、入力画像と出力画像の差分をもとに、デコーダとエンコーダはそれぞれの仕事を評価され、よりよいものになるようにパラメータを更新していくのです。
オートエンコーダの構築
Kerasはニューラルネットワークの構築をより簡単にするPythonのフレームワークです。
異なるタイプのレイヤーを積み重ねて、深いニューラルネットワークを作ることができます – オートエンコーダを作るためにこれを行います。
まず、pipでKerasをインストールします。
$ pip install keras
データの前処理
今回もLFWデータセットを使用する。
いつものように、このようなプロジェクトでは、オートエンコーダーが仕事をしやすくするためにデータを前処理する。
そのために、まず使用するデータセットにつながるいくつかのパスを定義する。
# http://www.cs.columbia.edu/CAVE/databases/pubfig/download/lfw_attributes.txt
ATTRS_NAME = "lfw_attributes.txt"
# http://vis-www.cs.umass.edu/lfw/lfw-deepfunneled.tgz
IMAGES_NAME = "lfw-deepfunneled.tgz"
# http://vis-www.cs.umass.edu/lfw/lfw.tgz
RAW_IMAGES_NAME = "lfw.tgz"
1つは生の行列を画像に変換し、カラーシステムをRGBに変更する関数です。
def decode_image_from_raw_bytes(raw_bytes):
img = cv2.imdecode(np.asarray(bytearray(raw_bytes), dtype=np.uint8), 1)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
return img
もうひとつは、実際にデータセットを読み込んで、それを我々のニーズに合わせて適合させるためのものです。
def load_lfw_dataset(
use_raw=False,
dx=80, dy=80,
dimx=45, dimy=45):
# Read attrs
df_attrs = pd.read_csv(ATTRS_NAME, sep=' ', skiprows=1)
df_attrs = pd.DataFrame(df_attrs.iloc[:, :-1].values, columns=df_attrs.columns[1:])
imgs_with_attrs = set(map(tuple, df_attrs[["person", "imagenum"]].values))
# Read photos
all_photos = []
photo_ids = []
# tqdm in used to show progress bar while reading the data in a notebook here, you can change
# tqdm_notebook to use it outside a notebook
with tarfile.open(RAW_IMAGES_NAME if use_raw else IMAGES_NAME) as f:
for m in tqdm.tqdm_notebook(f.getmembers()):
# Only process image files from the compressed data
if m.isfile() and m.name.endswith(".jpg"):
# Prepare image
img = decode_image_from_raw_bytes(f.extractfile(m).read())
# Crop only faces and resize it
img = img[dy:-dy, dx:-dx]
img = cv2.resize(img, (dimx, dimy))
# Parse person and append it to the collected data
fname = os.path.split(m.name)[-1]
fname_splitted = fname[:-4].replace('_', ' ').split()
person_id = ' '.join(fname_splitted[:-1])
photo_number = int(fname_splitted[-1])
if (person_id, photo_number) in imgs_with_attrs:
all_photos.append(img)
photo_ids.append({'person': person_id, 'imagenum': photo_number})
photo_ids = pd.DataFrame(photo_ids)
all_photos = np.stack(all_photos).astype('uint8')
# Preserve photo_ids order!
all_attrs = photo_ids.merge(df_attrs, on=('person', 'imagenum')).drop(["person", "imagenum"], axis=1)
return all_photos, all_attrs
オートエンコーダの実装
import numpy as np
X, attr = load_lfw_dataset(use_raw=True, dimx=32, dimy=32)
我々のデータは X
行列であり、3次元行列の形をしています。
これは RGB 画像のデフォルトの表現です。
赤、緑、青の3つの行列を用意し、この3つの組み合わせで画像の色を生成します。
これらの画像は、各ピクセルが0から255の範囲で大きな値を持つことになります。
一般に機械学習では、値を小さく、0を中心にする傾向があります。
この方がモデルの学習が速く、良い結果が得られるからです。
X = X.astype('float32') / 255.0 - 0.5
ここで、配列 X
の最小値と最大値をテストしてみると、 -.5
と .5
になります。
print(X.max(), X.min())
0.5 -0.5
画像を表示するために、show_image
関数を作成しましょう。
ピクセル値がマイナスになることはありえないので、画像に 0.5
を追加します。
import matplotlib.pyplot as plt
def show_image(x):
plt.imshow(np.clip(x + 0.5, 0, 1))
では、データを見てみましょう。
show_image(X[6])
では、データをトレーニングセットとテストセットに分割してみましょう。
from sklearn.model_selection import train_test_split
X_train, X_test = train_test_split(X, test_size=0.1, random_state=42)
sklearn の train_test_split()
関数は、テスト比率を与えてデータを分割することができ、残りはもちろんトレーニングサイズである。
機械学習でよく目にする random_state
は、何度実行しても同じ結果になるように使用されています。
さて、次はモデルです。
from keras.layers import Dense, Flatten, Reshape, Input, InputLayer
from keras.models import Sequential, Model
def build_autoencoder(img_shape, code_size):
# The encoder
encoder = Sequential()
encoder.add(InputLayer(img_shape))
encoder.add(Flatten())
encoder.add(Dense(code_size))
# The decoder
decoder = Sequential()
decoder.add(InputLayer((code_size,)))
decoder.add(Dense(np.prod(img_shape))) # np.prod(img_shape) is the same as 32*32*3, it's more generic than saying 3072
decoder.add(Reshape(img_shape))
return encoder, decoder
この関数は image_shape
(画像の大きさ) と code_size
(出力される表現の大きさ) をパラメータとして受けとります。
この例では,画像の形状は (32, 32, 3)
となり, 32
が幅と高さ, 3
が色チャンネルの行列を表します.つまり、私たちの画像の寸法は 3072
です。
論理的には、code_size
が小さいほど画像は圧縮されますが、保存される特徴量は少なくなり、再現される画像はそれだけオリジナルと異なるものになります。
Kerasの逐次モデルは、基本的に順次レイヤーを追加し、ネットワークを深化させるために使用されます。
各レイヤーは次のレイヤーにフィードされますが、ここでは単純に InputLayer
(入力のプレースホルダー)で、入力ベクトルのサイズ – image_shape
から始めています。
Flattenレイヤーの仕事は
(32,32,3)行列を 1 次元配列 (
3072`) に平らにすることです。
ネットワークアーキテクチャは 3D 行列を受け付けないためです。
エンコーダの最後のレイヤーは Dense
レイヤーで、これが実際のニューラルネットワークとなります。
この層は、最適な出力を得るための最適なパラメータを見つけようとします。
この場合はエンコーディングで、この層の出力サイズ(この層のニューロン数)を code_size
に設定します。
デコーダも逐次モデルである。
入力(エンコーディング)を受け取り、それを行の形で再構成しようとする。
そして、それを Dense
レイヤーを通して 32x32x3
の行列に積み重ねます。
最後の Reshape
レイヤーは、それを画像に再形成します。
では、これらをつなげてモデルを作ってみましょう。
# Same as (32,32,3), we neglect the number of instances from shape
IMG_SHAPE = X.shape[1:]
encoder, decoder = build_autoencoder(IMG_SHAPE, 32)
inp = Input(IMG_SHAPE)
code = encoder(inp)
reconstruction = decoder(code)
autoencoder = Model(inp,reconstruction)
autoencoder.compile(optimizer='adamax', loss='mse')
print(autoencoder.summary())
このコードはとても簡単です。
変数 code
はエンコーダの出力で、これをデコーダに渡して変数 reconstruction
を生成しています。
その後、inp
と reconstruction
のパラメータで Model
を作成し、adamax
オプティマイザーと mse
損失関数でコンパイルして、両者をリンクしています。
ここでいうモデルのコンパイルとは、その目的と到達方法を定義することを意味します。
この文脈での目的は mse
を最小化することであり、オプティマイザ(基本的にはグローバルミニマムを見つけるための微調整されたアルゴリズム)を使用することでそれに到達することができます。
この時点で、結果を要約することができます。
_________________________________________________________________
Layer (type) Output Shape Param #=================================================================
input_6 (InputLayer) (None, 32, 32, 3) 0
_________________________________________________________________
sequential_3 (Sequential) (None, 32) 98336
_________________________________________________________________
sequential_4 (Sequential) (None, 32, 32, 3) 101376
=================================================================
Total params: 199,712
Trainable params: 199,712
Non-trainable params: 0
_________________________________________________________________
ここで、入力は 32,32,3
であることがわかります。
ここでいう None
とはインスタンスのインデックスのことで、データをモデルに渡すと (m, 32,32,3)
という形になり、ここで m
はインスタンスの数なので None
のままにしておきます。
隠れ層は32で、これは我々が選んだ符号化サイズです。
そして最後に、ご覧のようにデコーダーの出力は(32,32,3)` となります。
では、モデルを交換しましょう。
history = autoencoder.fit(x=X_train, y=X_train, epochs=20,
validation_data=[X_test, X_test])
今回の場合、構築した画像と元の画像を比較するので、 x
と y
は共に X_train
と等しくなります。
理想的には、入力と出力が等しくなるようにします。
また、epochs
は学習データを何回モデルに通すかを定義し、validation_data
は学習後にモデルを評価するための検証セットです。
Train on 11828 samples, validate on 1315 samples
Epoch 1/20
11828/11828 [==============================] - 3s 272us/step - loss: 0.0128 - val_loss: 0.0087
Epoch 2/20
11828/11828 [==============================] - 3s 227us/step - loss: 0.0078 - val_loss: 0.0071
.
.
.
Epoch 20/20
11828/11828 [==============================] - 3s 237us/step - loss: 0.0067 - val_loss: 0.0066
エポック数に対する損失を可視化することで、エポック数の概要を把握することができます。
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
3回目のエポック以降、損失に大きな進展がないことがわかります。
このように可視化することで、実際に何回のエポックでモデルを学習させればよいのか、より良いアイデアを得ることができます。
この場合、20エポックまで学習する必要はなく、ほとんどの学習が冗長になります。
また、オーバーフィットになる可能性もあり、トレーニングデータセットとテストデータセット以外の新しいデータに対してのパフォーマンスが悪くなります。
さて、最も期待される部分、それは結果を可視化することです。
def visualize(img,encoder,decoder):
"""Draws original, encoded and decoded images"""
# img[None] will have shape of (1, 32, 32, 3) which is the same as the model input
code = encoder.predict(img[None])[0]
reco = decoder.predict(code[None])[0]
plt.subplot(1,3,1)
plt.title("Original")
show_image(img)
plt.subplot(1,3,2)
plt.title("Code")
plt.imshow(code.reshape([code.shape[-1]//2,-1]))
plt.subplot(1,3,3)
plt.title("Reconstructed")
show_image(reco)
plt.show()
for i in range(5):
img = X_test[i]
visualize(img,encoder,decoder)
結果はあまり良くないことがおわかりいただけると思います。
しかし、真ん中に見える32
という極めて小さなベクトルに画像全体が符号化されていることを考慮すれば、これは決して悪いことではありません。
画像デノイズ
オートエンコーダのもう一つの一般的な使い方はノイズ除去である。
画像にランダムなノイズを加えてみよう。
def apply_gaussian_noise(X, sigma=0.1):
noise = np.random.normal(loc=0.0, scale=sigma, size=X.shape)
return X + noise
ここでは、標準正規分布に基づくランダムなノイズを sigma
というスケールで追加します(デフォルトは 0.1
)。
参考までに、sigma
の値を変えた場合のノイズの見え方を示します。
plt.subplot(1,4,1)
show_image(X_train[0])
plt.subplot(1,4,2)
show_image(apply_gaussian_noise(X_train[:1],sigma=0.01)[0])
plt.subplot(1,4,3)
show_image(apply_gaussian_noise(X_train[:1],sigma=0.1)[0])
plt.subplot(1,4,4)
show_image(apply_gaussian_noise(X_train[:1],sigma=0.5)[0])
このように、シグマ
が0.5
まで増加すると、画像はほとんど見えなくなってしまいます。
そこで、シグマ値が 0.1
のノイズの多い画像から、元の画像を再生成してみることにします。
このために生成するモデルは先程のものと同じですが、学習方法は異なります。
今回は、元画像と対応するノイズの多い画像で学習させます。
code_size = 100
# We can use bigger code size for better quality
encoder, decoder = build_autoencoder(IMG_SHAPE, code_size=code_size)
inp = Input(IMG_SHAPE)
code = encoder(inp)
reconstruction = decoder(code)
autoencoder = Model(inp, reconstruction)
autoencoder.compile('adamax', 'mse')
for i in range(25):
print("Epoch %i/25, Generating corrupted samples..."%(i+1))
X_train_noise = apply_gaussian_noise(X_train)
X_test_noise = apply_gaussian_noise(X_test)
# We continue to train our model with new noise-augmented data
autoencoder.fit(x=X_train_noise, y=X_train, epochs=1,
validation_data=[X_test_noise, X_test])
では、モデル結果を見てみましょう。
X_test_noise = apply_gaussian_noise(X_test)
for i in range(5):
img = X_test_noise[i]
visualize(img,encoder,decoder)
オートエンコーダー応用例
オートエンコーダーには、これまで紹介した以外にも様々な使い道があります。
オートエンコーダーは、Deepfakesのような、異なるモデルのエンコーダーとデコーダーを持つアプリケーションで使用することができる。
例えば、「人物X」用のオートエンコーダーと「人物Y」用のオートエンコーダーがあるとします。
人物X」のエンコーダーと「人物Y」のデコーダーを使い、「人物X」の顕著な特徴を持つ「人物Y」の画像を生成することを止めるものは何もないのです。
Credit: AlanZucconi
オートエンコーダーは画像のセグメンテーションにも使用できる。
例えば、自律走行車のように、車両が判断を下すために異なるアイテムをセグメンテーションする必要がある場合などだ。
クレジット: PapersWithCode
結論
オートエンコーダーは、次元削減技術である主成分分析や画像のノイズ除去など、様々な用途に使用することができます。
MNISTデータセットなど、様々なデータセットで試してみて、どのような結果が得られるか見てみるとよいでしょう。