Initial Commit

This commit is contained in:
Alex Selimov 2025-04-17 22:52:52 -04:00
commit 93dd342791
3 changed files with 221 additions and 0 deletions

6
README.md Normal file
View File

@ -0,0 +1,6 @@
# Active Learning Demo
This is a basic script I used for showcasing an Active Learning approach using Gaussian Processes (GP) for a presentation on improved sampling strategies for computationally expensive functions.
The actual Active Learning part is extremely simple, the next sampling point is selected based solely on the maximum standard deviation of the GP.
In previous roles I have implemented more robust approaches that better balance exploration versus exploitation such as using maximum entropy or the Mismatch-first farthest-traversal heuristic to select the next sampling point.
Included in this repo is a simple video I made using `imagemagick` and the outputs of the `active_learning.py` script.

BIN
active_learning.mp4 Normal file

Binary file not shown.

215
active_learning.py Normal file
View File

@ -0,0 +1,215 @@
# This code is a demonstration of some python data science techniques
# %% import modules
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import cm
import pandas as pd
import numpy as np
import sklearn
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import ConstantKernel, RBF, WhiteKernel
import math
from scipy.interpolate import griddata
import itertools
import random
import scipy
def lhs_sampler(var_ranges: list[tuple[float, float]], n_samples: int):
"""Sample a number of variables using latin hypercube sampling
Args:
var_ranges: list of tuples where each tuple is (lower_bound, upper_bound)
n_samples: Number of samples to draw
"""
# Initialize sampler
dimensions = len(var_ranges)
sampler = scipy.stats.qmc.LatinHypercube(dimensions)
# Take samples and transform them to correct ranges
output = list()
for sample in sampler.random(n_samples):
output.append([x * (hi - lo) + lo for (lo, hi), x in zip(var_ranges, sample)])
return np.array(output)
# %% Test sampler
var_ranges = [(0.0, 1.0), (2.0, 4.0)]
samples = lhs_sampler(var_ranges, 4)
print(samples)
# %% Now compare full DOE and latin hypercube sampling
# def function(a, b):
# return math.sin(a) + math.sin(b)
def function(x, y):
return -x * x * x + 3 * x * x - 2 * x - y * y * y + 3 * y * y - 2 * y
x_range = [0, 4]
doe_vals = np.linspace(x_range[0], x_range[1], 3)
X_doe = np.array(list(itertools.product(*[doe_vals, doe_vals])))
y_doe = np.array([function(*point) for point in X_doe])
sampler = scipy.stats.qmc.LatinHypercube(2)
X_lhs = np.array(
[
[coord * (x_range[1] - x_range[0]) + x_range[0] for coord in point]
for point in sampler.random(9)
]
)
y_lhs = np.array([[function(*point) for point in X_lhs]])
# %% Fit GP model to results
lhs_model = GaussianProcessRegressor().fit(X_lhs, y_lhs.reshape(-1, 1))
num_bin = 50
x_space = np.linspace(x_range[0], x_range[1], num_bin)
X_pred = np.array(list(itertools.product(*[x_space for _ in range(2)])))
y_lhs_pred = lhs_model.predict(X_pred)
doe_model = GaussianProcessRegressor().fit(X_doe, y_doe.reshape(-1, 1))
y_doe_pred = doe_model.predict(X_pred)
# %% Now plot the performance of the full doe and latin hypercube sample
y_real = np.array([function(*x) for x in X_pred])
# Interpolate here
# y_doe_pred = griddata(X_doe, y_doe, (X_pred[:, 0], X_pred[:, 1]), method="cubic")
# y_lhs_pred = griddata(X_lhs, y_lhs, (X_pred[:, 0], X_pred[:, 1]), method="cubic")
norm = cm.colors.Normalize(vmax=y_real.max(), vmin=y_real.min())
levels = np.linspace(y_real.min(), y_real.max(), 10)
cmap = cm.RdBu
fig, ax = plt.subplots(nrows=1, ncols=3)
cset1 = ax[0].contourf(
x_space,
x_space,
y_doe_pred.reshape(num_bin, num_bin),
norm=norm,
levels=levels,
cmap=cmap,
)
ax[0].plot(X_doe[:, 0], X_doe[:, 1], "ko")
cset2 = ax[1].contourf(
x_space,
x_space,
y_lhs_pred.reshape(num_bin, num_bin),
norm=norm,
levels=levels,
cmap=cmap,
)
cset3 = ax[2].contourf(
x_space,
x_space,
y_real.reshape(num_bin, num_bin),
norm=norm,
levels=levels,
cmap=cmap,
)
ax[1].plot(X_lhs[:, 0], X_lhs[:, 1], "ko")
cbar = fig.colorbar(cset2, ax=ax)
cbar.set_ticks(ticks=[y_real.min(), y_real.max()])
plt.show()
# %%
# %% Active learning example
def function(x):
return np.sin(np.pi * x / 5) * np.cos(2 * np.pi * x / 3) + np.sqrt(
1 - (np.sin(np.pi * x / 4)) ** 2
)
def function_with_noise(x):
return function(x) + np.random.uniform(-0.2, 0.2)
def gpPrediction(l, sigma_f, sigma_n, X_train, y_train, X_test):
# GP model
kernel = 1.0 * RBF(
length_scale=1e-1, length_scale_bounds=(1e-2, 1e3)
) + WhiteKernel(noise_level=1e-2, noise_level_bounds=(1e-10, 1e1))
gp = GaussianProcessRegressor(
kernel=kernel,
alpha=0.0,
n_restarts_optimizer=10,
)
# Fitting in the gp model
gp.fit(X_train, y_train)
# Make the prediction on test set.
y_pred = gp.predict(X_test)
return y_pred, gp
def fit_gp(i, x, y):
pos_x = np.linspace(0, 10, 50).reshape(-1, 1)
l_init = 1
sigma_f_init = 3
sigma_n = 1
y_pred, gp = gpPrediction(l_init, sigma_f_init, sigma_n, x, y, pos_x)
# Generate samples from posterior distribution.
y_hat_samples = gp.sample_y(pos_x, n_samples=50)
# Compute the mean of the sample.
y_hat = np.apply_over_axes(func=np.mean, a=y_hat_samples, axes=1).squeeze()
# Compute the standard deviation of the sample.
y_hat_sd = np.apply_over_axes(func=np.std, a=y_hat_samples, axes=1).squeeze()
_, ax = plt.subplots(figsize=(15, 8))
# Plotting the training data.
sns.scatterplot(x=x.flatten(), y=y.flatten(), label="training data", ax=ax)
# Plot the functional evaluation
sns.lineplot(
x=pos_x.flatten(),
y=np.array([function(x) for x in pos_x]).flatten(),
color="red",
label="f(x)",
ax=ax,
)
# Plot corridor.
ax.fill_between(
x=pos_x.flatten(),
y1=(y_hat - 2 * y_hat_sd).flatten(),
y2=(y_hat + 2 * y_hat_sd).flatten(),
color="green",
alpha=0.3,
label="Credible Interval",
)
# Plot prediction.
sns.lineplot(x=pos_x.flatten(), y=y_pred.flatten(), color="green", label="pred")
# Labeling axes
ax.set(title="Gaussian Process Regression")
ax.legend(loc="lower left")
ax.set(xlabel="x", ylabel="")
idx = list(y_hat_sd).index(max(y_hat_sd))
add_x = pos_x[idx]
new_y = function_with_noise(add_x[0])
plt.plot(add_x, new_y, "ro")
plt.savefig(f"{i}.png")
return add_x[0], new_y
x = [0.0, 5.0, 10.0]
y = [function_with_noise(x) for x in x]
for i in range(25):
new_x, new_y = fit_gp(i, np.array(x).reshape(-1, 1), np.array(y).reshape(-1, 1))
x.append(new_x)
y.append(new_y)
# %%