I am using the standard packages numpy, theano and matplotlib. I have compiled the theano with gpu support.
import numpy as np
import theano
import theano.tensor as T
import matplotlib.pyplot as plt
%matplotlib inline
Define auxiliary variables:
ADAGRAD_EPS = 1e-10
rng = np.random.RandomState(1234)
floatX = theano.config.floatX
In variational autoencoders the loss function contains two parts:
We define these two parts as separate functions:
def kld_unit_mvn(mu, var):
return (mu.shape[1] + T.sum(T.log(var), axis=1) - T.sum(T.square(mu), axis=1) - T.sum(var, axis=1)) / 2.0
def log_diag_mvn(mu, var):
def f(x):
k = mu.shape[1]
logp = (-k / 2.0) * np.log(2 * np.pi) - 0.5 * T.sum(T.log(var), axis=1) - T.sum(0.5 * (1.0 / var) * (x - mu) * (x - mu), axis=1)
return logp
return f
The building block of the neural network is class Layer that contains the matrix W of input weights and array b of input biases.
class Layer(object):
def __init__(self, n_input, n_output, activation, x):
W_init = np.asarray(
rng.uniform(
low=-np.sqrt(6. / (n_input + n_output)),
high=np.sqrt(6. / (n_input + n_output)),
size=(n_input, n_output)
),
dtype=floatX
)
self.W = theano.shared(value = W_init.astype(floatX),
name = 'W',
borrow = True)
self.b = theano.shared(value = np.zeros((n_output,), dtype=floatX),
name = 'b',
borrow = True)
self.activation = activation
self.params = [self.W, self.b]
lin_output = T.dot(x, self.W) + self.b
self.output = (lin_output if self.activation is None else self.activation(lin_output))
def output2(self, x):
lin_output = T.dot(x, self.W) + self.b
return (lin_output if self.activation is None else self.activation(lin_output))
To build the variational autoencoder I am using the class GaussianMLP:
class GaussianMLP(object):
def __init__(self, layer_sizes, activations, x, y=None, eps=None):
# add layers to the network
self.layers = []
inp = x
for n_input, n_output, activation in zip(layer_sizes[:-2], layer_sizes[1:-1], activations[1:-1]):
self.layers.append(Layer(n_input, n_output, activation, inp))
inp = self.layers[-1].output
# add layers for reparametrization trick
self.mu_layer = Layer(layer_sizes[-2], layer_sizes[-1], None, inp)
self.logvar_layer = Layer(layer_sizes[-2], layer_sizes[-1], None, inp)
self.params = []
for layer in self.layers:
self.params += layer.params
self.params = self.params + self.mu_layer.params + self.logvar_layer.params
mu = self.mu_layer.output
var = T.exp(self.logvar_layer.output)
sigma = T.sqrt(var)
# define the cost function
if eps:
# encoder cost
self.output = mu + sigma * eps
self.cost = -T.sum(kld_unit_mvn(mu, var))
else:
# decoder cost
self.mu = mu
self.var = var
self.output = mu
self.cost = -T.sum(log_diag_mvn(self.output , var)(y))
self.detailed_cost = log_diag_mvn(self.output , var)(y)
def output2(self, x):
for layer in self.layers:
x = layer.output2(x)
x = self.mu_layer.output2(x)
return x
The variational autoencoder is an object of class VAE that contains two objects of the class GaussianMLP: the first object represents the encoder neural network, the second - the decoder neural network.
class VAE(object):
def __init__(self, enc_layer_sizes, dec_layer_sizes, enc_activations, dec_activations, args):
self.x = T.matrix('x', dtype=floatX)
self.eps = T.matrix('eps', dtype=floatX)
self.enc_mlp = GaussianMLP(enc_layer_sizes, enc_activations, self.x, eps=self.eps)
self.dec_mlp = GaussianMLP(dec_layer_sizes, dec_activations, self.enc_mlp.output, y=self.x)
self.cost = (self.enc_mlp.cost + self.dec_mlp.cost) / args['batch_size']
self.params = self.enc_mlp.params + self.dec_mlp.params
self.gparams = [T.grad(self.cost, p) + args['lmbda'] * p for p in self.params]
self.gaccums = [theano.shared(value=np.zeros(p.get_value().shape, dtype=floatX)) for p in self.params]
# define updates for the optimization
self.updates = [
(param, param - args['lr'] * gparam / T.sqrt(gaccum + T.square(gparam) + ADAGRAD_EPS))
for param, gparam, gaccum in zip(self.params, self.gparams, self.gaccums)
]
self.updates += [
(gaccum, gaccum + T.square(gparam))
for gaccum, gparam in zip(self.gaccums, self.gparams)
]
self.get_dec_cost = theano.function(
inputs=[self.x, self.eps],
outputs=self.dec_mlp.cost
)
self.get_dec_mu = theano.function(
inputs=[self.x, self.eps],
outputs = self.dec_mlp.mu
)
self.get_dec_var = theano.function(
inputs=[self.x, self.eps],
outputs = self.dec_mlp.var
)
self.get_dec_detailed_cost = theano.function(
inputs=[self.x, self.eps],
outputs=self.dec_mlp.detailed_cost
)
self.get_enc_cost = theano.function(
inputs=[self.x],
outputs=self.enc_mlp.cost
)
# define the train function
self.train = theano.function(
inputs=[self.x, self.eps],
outputs=self.cost,
updates=self.updates
)
# get codes
self.encode = theano.function(
inputs=[self.x, self.eps],
outputs=self.enc_mlp.output
)
self.decode = theano.function(
inputs=[self.enc_mlp.output],
outputs=self.dec_mlp.output
)
# decoding
self.decode2 = theano.function(
inputs=[self.eps],
outputs=self.dec_mlp.output2(self.eps)
)
The main code starts here. First, I create a toy dataset:
np.random.seed(0)
N = 1000
X = np.random.rand(N)
X = np.transpose(np.vstack((X, -1.0*np.sqrt(0.25 - (X - 0.5)**2) + 0.5 + 0.05*np.random.randn(N))).astype(floatX))
plt.figure(figsize = (8,4))
plt.scatter(X[:, 0], X[:, 1], lw=.3, s=3, cmap=plt.cm.cool)
plt.axis([0, 1, 0, 0.5])
plt.show()
The desired output of our autoencoder is the projection of original points on the semicircle. Another aim of the variational autoencoder is to generate points on the semicircle from uniform distribution (or any convenient type of distribution, for example, unit normal distribution). I have decided to use uniform distribution in this problem because of the structure of the toy dataset. The next step is to define the autoencoder:
args = {}
args['batch_size'] = 1000
args['lmbda'] = 0.0
args['lr'] = 0.1
args['epochs'] = 5000
num_batches = X.shape[0] / args['batch_size']
enc_layer_sizes = [X.shape[1], 10, 10, 1]
enc_activations = [None, T.tanh, None, None]
dec_layer_sizes = [enc_layer_sizes[-1], 10, 10, X.shape[1]]
dec_activations = [None, None, T.tanh, None]
vae = VAE(enc_layer_sizes, dec_layer_sizes, enc_activations, dec_activations, args)
I am using nnvisual.py code to show how the autoencoder looks like. This code is modified code from here: https://github.com/miloharper/visualise-neural-network (thanks to miloharper for it). I have changed the orientation of the neural network in the original file.
from nnvisual import *
network = NeuralNetwork()
for l in enc_layer_sizes + dec_layer_sizes[1:]:
network.add_layer(l)
network.draw()
The main training loop:
np.random.seed(232)
for epoch in xrange(args['epochs']*num_batches):
k = epoch % num_batches
x = X[k * args['batch_size']:(k+1) * args['batch_size'], :]
eps = np.random.rand(x.shape[0], enc_layer_sizes[-1]).astype(floatX)
cost = vae.train(x, eps)
Let us see how well the autoencoder produces the result from the original data:
np.random.seed(2234)
eps = np.random.rand(X.shape[0], enc_layer_sizes[-1]).astype(floatX)
Z = vae.encode(X, eps)
X_projected = vae.decode(Z)
plt.figure(figsize = (8,4))
plt.scatter(X[:, 0], X[:, 1], c='blue', lw=.3, s=3)
plt.scatter(X_projected[:, 0], X_projected[:, 1], c='red', lw=.3, s=3)
plt.axis([0, 1, 0, 0.5])
plt.show()
And finally, we will see how our variational autoencoder produces points from the uniform distribution:
np.random.seed(25)
eps = 6.0*np.random.rand(X.shape[0], enc_layer_sizes[-1]).astype(floatX) - 3.0
X_produced = vae.decode2(eps)
plt.figure(figsize = (8,4))
plt.scatter(X_produced[:, 0], X_produced[:, 1], c='green', lw=.3, s=3)
plt.axis([0, 1, 0, 0.5])
plt.show()