active-learning-demo/active_learning.py
2025-04-17 22:52:52 -04:00

216 lines
5.8 KiB
Python

# This code is a demonstration of some python data science techniques
# %% import modules
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import cm
import pandas as pd
import numpy as np
import sklearn
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import ConstantKernel, RBF, WhiteKernel
import math
from scipy.interpolate import griddata
import itertools
import random
import scipy
def lhs_sampler(var_ranges: list[tuple[float, float]], n_samples: int):
"""Sample a number of variables using latin hypercube sampling
Args:
var_ranges: list of tuples where each tuple is (lower_bound, upper_bound)
n_samples: Number of samples to draw
"""
# Initialize sampler
dimensions = len(var_ranges)
sampler = scipy.stats.qmc.LatinHypercube(dimensions)
# Take samples and transform them to correct ranges
output = list()
for sample in sampler.random(n_samples):
output.append([x * (hi - lo) + lo for (lo, hi), x in zip(var_ranges, sample)])
return np.array(output)
# %% Test sampler
var_ranges = [(0.0, 1.0), (2.0, 4.0)]
samples = lhs_sampler(var_ranges, 4)
print(samples)
# %% Now compare full DOE and latin hypercube sampling
# def function(a, b):
# return math.sin(a) + math.sin(b)
def function(x, y):
return -x * x * x + 3 * x * x - 2 * x - y * y * y + 3 * y * y - 2 * y
x_range = [0, 4]
doe_vals = np.linspace(x_range[0], x_range[1], 3)
X_doe = np.array(list(itertools.product(*[doe_vals, doe_vals])))
y_doe = np.array([function(*point) for point in X_doe])
sampler = scipy.stats.qmc.LatinHypercube(2)
X_lhs = np.array(
[
[coord * (x_range[1] - x_range[0]) + x_range[0] for coord in point]
for point in sampler.random(9)
]
)
y_lhs = np.array([[function(*point) for point in X_lhs]])
# %% Fit GP model to results
lhs_model = GaussianProcessRegressor().fit(X_lhs, y_lhs.reshape(-1, 1))
num_bin = 50
x_space = np.linspace(x_range[0], x_range[1], num_bin)
X_pred = np.array(list(itertools.product(*[x_space for _ in range(2)])))
y_lhs_pred = lhs_model.predict(X_pred)
doe_model = GaussianProcessRegressor().fit(X_doe, y_doe.reshape(-1, 1))
y_doe_pred = doe_model.predict(X_pred)
# %% Now plot the performance of the full doe and latin hypercube sample
y_real = np.array([function(*x) for x in X_pred])
# Interpolate here
# y_doe_pred = griddata(X_doe, y_doe, (X_pred[:, 0], X_pred[:, 1]), method="cubic")
# y_lhs_pred = griddata(X_lhs, y_lhs, (X_pred[:, 0], X_pred[:, 1]), method="cubic")
norm = cm.colors.Normalize(vmax=y_real.max(), vmin=y_real.min())
levels = np.linspace(y_real.min(), y_real.max(), 10)
cmap = cm.RdBu
fig, ax = plt.subplots(nrows=1, ncols=3)
cset1 = ax[0].contourf(
x_space,
x_space,
y_doe_pred.reshape(num_bin, num_bin),
norm=norm,
levels=levels,
cmap=cmap,
)
ax[0].plot(X_doe[:, 0], X_doe[:, 1], "ko")
cset2 = ax[1].contourf(
x_space,
x_space,
y_lhs_pred.reshape(num_bin, num_bin),
norm=norm,
levels=levels,
cmap=cmap,
)
cset3 = ax[2].contourf(
x_space,
x_space,
y_real.reshape(num_bin, num_bin),
norm=norm,
levels=levels,
cmap=cmap,
)
ax[1].plot(X_lhs[:, 0], X_lhs[:, 1], "ko")
cbar = fig.colorbar(cset2, ax=ax)
cbar.set_ticks(ticks=[y_real.min(), y_real.max()])
plt.show()
# %%
# %% Active learning example
def function(x):
return np.sin(np.pi * x / 5) * np.cos(2 * np.pi * x / 3) + np.sqrt(
1 - (np.sin(np.pi * x / 4)) ** 2
)
def function_with_noise(x):
return function(x) + np.random.uniform(-0.2, 0.2)
def gpPrediction(l, sigma_f, sigma_n, X_train, y_train, X_test):
# GP model
kernel = 1.0 * RBF(
length_scale=1e-1, length_scale_bounds=(1e-2, 1e3)
) + WhiteKernel(noise_level=1e-2, noise_level_bounds=(1e-10, 1e1))
gp = GaussianProcessRegressor(
kernel=kernel,
alpha=0.0,
n_restarts_optimizer=10,
)
# Fitting in the gp model
gp.fit(X_train, y_train)
# Make the prediction on test set.
y_pred = gp.predict(X_test)
return y_pred, gp
def fit_gp(i, x, y):
pos_x = np.linspace(0, 10, 50).reshape(-1, 1)
l_init = 1
sigma_f_init = 3
sigma_n = 1
y_pred, gp = gpPrediction(l_init, sigma_f_init, sigma_n, x, y, pos_x)
# Generate samples from posterior distribution.
y_hat_samples = gp.sample_y(pos_x, n_samples=50)
# Compute the mean of the sample.
y_hat = np.apply_over_axes(func=np.mean, a=y_hat_samples, axes=1).squeeze()
# Compute the standard deviation of the sample.
y_hat_sd = np.apply_over_axes(func=np.std, a=y_hat_samples, axes=1).squeeze()
_, ax = plt.subplots(figsize=(15, 8))
# Plotting the training data.
sns.scatterplot(x=x.flatten(), y=y.flatten(), label="training data", ax=ax)
# Plot the functional evaluation
sns.lineplot(
x=pos_x.flatten(),
y=np.array([function(x) for x in pos_x]).flatten(),
color="red",
label="f(x)",
ax=ax,
)
# Plot corridor.
ax.fill_between(
x=pos_x.flatten(),
y1=(y_hat - 2 * y_hat_sd).flatten(),
y2=(y_hat + 2 * y_hat_sd).flatten(),
color="green",
alpha=0.3,
label="Credible Interval",
)
# Plot prediction.
sns.lineplot(x=pos_x.flatten(), y=y_pred.flatten(), color="green", label="pred")
# Labeling axes
ax.set(title="Gaussian Process Regression")
ax.legend(loc="lower left")
ax.set(xlabel="x", ylabel="")
idx = list(y_hat_sd).index(max(y_hat_sd))
add_x = pos_x[idx]
new_y = function_with_noise(add_x[0])
plt.plot(add_x, new_y, "ro")
plt.savefig(f"{i}.png")
return add_x[0], new_y
x = [0.0, 5.0, 10.0]
y = [function_with_noise(x) for x in x]
for i in range(25):
new_x, new_y = fit_gp(i, np.array(x).reshape(-1, 1), np.array(y).reshape(-1, 1))
x.append(new_x)
y.append(new_y)
# %%