# This code is a demonstration of some python data science techniques # %% import modules import seaborn as sns import matplotlib.pyplot as plt from matplotlib import cm import pandas as pd import numpy as np import sklearn from sklearn.gaussian_process import GaussianProcessRegressor from sklearn.gaussian_process.kernels import ConstantKernel, RBF, WhiteKernel import math from scipy.interpolate import griddata import itertools import random import scipy def lhs_sampler(var_ranges: list[tuple[float, float]], n_samples: int): """Sample a number of variables using latin hypercube sampling Args: var_ranges: list of tuples where each tuple is (lower_bound, upper_bound) n_samples: Number of samples to draw """ # Initialize sampler dimensions = len(var_ranges) sampler = scipy.stats.qmc.LatinHypercube(dimensions) # Take samples and transform them to correct ranges output = list() for sample in sampler.random(n_samples): output.append([x * (hi - lo) + lo for (lo, hi), x in zip(var_ranges, sample)]) return np.array(output) # %% Test sampler var_ranges = [(0.0, 1.0), (2.0, 4.0)] samples = lhs_sampler(var_ranges, 4) print(samples) # %% Now compare full DOE and latin hypercube sampling # def function(a, b): # return math.sin(a) + math.sin(b) def function(x, y): return -x * x * x + 3 * x * x - 2 * x - y * y * y + 3 * y * y - 2 * y x_range = [0, 4] doe_vals = np.linspace(x_range[0], x_range[1], 3) X_doe = np.array(list(itertools.product(*[doe_vals, doe_vals]))) y_doe = np.array([function(*point) for point in X_doe]) sampler = scipy.stats.qmc.LatinHypercube(2) X_lhs = np.array( [ [coord * (x_range[1] - x_range[0]) + x_range[0] for coord in point] for point in sampler.random(9) ] ) y_lhs = np.array([[function(*point) for point in X_lhs]]) # %% Fit GP model to results lhs_model = GaussianProcessRegressor().fit(X_lhs, y_lhs.reshape(-1, 1)) num_bin = 50 x_space = np.linspace(x_range[0], x_range[1], num_bin) X_pred = np.array(list(itertools.product(*[x_space for _ in range(2)]))) y_lhs_pred = lhs_model.predict(X_pred) doe_model = GaussianProcessRegressor().fit(X_doe, y_doe.reshape(-1, 1)) y_doe_pred = doe_model.predict(X_pred) # %% Now plot the performance of the full doe and latin hypercube sample y_real = np.array([function(*x) for x in X_pred]) # Interpolate here # y_doe_pred = griddata(X_doe, y_doe, (X_pred[:, 0], X_pred[:, 1]), method="cubic") # y_lhs_pred = griddata(X_lhs, y_lhs, (X_pred[:, 0], X_pred[:, 1]), method="cubic") norm = cm.colors.Normalize(vmax=y_real.max(), vmin=y_real.min()) levels = np.linspace(y_real.min(), y_real.max(), 10) cmap = cm.RdBu fig, ax = plt.subplots(nrows=1, ncols=3) cset1 = ax[0].contourf( x_space, x_space, y_doe_pred.reshape(num_bin, num_bin), norm=norm, levels=levels, cmap=cmap, ) ax[0].plot(X_doe[:, 0], X_doe[:, 1], "ko") cset2 = ax[1].contourf( x_space, x_space, y_lhs_pred.reshape(num_bin, num_bin), norm=norm, levels=levels, cmap=cmap, ) cset3 = ax[2].contourf( x_space, x_space, y_real.reshape(num_bin, num_bin), norm=norm, levels=levels, cmap=cmap, ) ax[1].plot(X_lhs[:, 0], X_lhs[:, 1], "ko") cbar = fig.colorbar(cset2, ax=ax) cbar.set_ticks(ticks=[y_real.min(), y_real.max()]) plt.show() # %% # %% Active learning example def function(x): return np.sin(np.pi * x / 5) * np.cos(2 * np.pi * x / 3) + np.sqrt( 1 - (np.sin(np.pi * x / 4)) ** 2 ) def function_with_noise(x): return function(x) + np.random.uniform(-0.2, 0.2) def gpPrediction(l, sigma_f, sigma_n, X_train, y_train, X_test): # GP model kernel = 1.0 * RBF( length_scale=1e-1, length_scale_bounds=(1e-2, 1e3) ) + WhiteKernel(noise_level=1e-2, noise_level_bounds=(1e-10, 1e1)) gp = GaussianProcessRegressor( kernel=kernel, alpha=0.0, n_restarts_optimizer=10, ) # Fitting in the gp model gp.fit(X_train, y_train) # Make the prediction on test set. y_pred = gp.predict(X_test) return y_pred, gp def fit_gp(i, x, y): pos_x = np.linspace(0, 10, 50).reshape(-1, 1) l_init = 1 sigma_f_init = 3 sigma_n = 1 y_pred, gp = gpPrediction(l_init, sigma_f_init, sigma_n, x, y, pos_x) # Generate samples from posterior distribution. y_hat_samples = gp.sample_y(pos_x, n_samples=50) # Compute the mean of the sample. y_hat = np.apply_over_axes(func=np.mean, a=y_hat_samples, axes=1).squeeze() # Compute the standard deviation of the sample. y_hat_sd = np.apply_over_axes(func=np.std, a=y_hat_samples, axes=1).squeeze() _, ax = plt.subplots(figsize=(15, 8)) # Plotting the training data. sns.scatterplot(x=x.flatten(), y=y.flatten(), label="training data", ax=ax) # Plot the functional evaluation sns.lineplot( x=pos_x.flatten(), y=np.array([function(x) for x in pos_x]).flatten(), color="red", label="f(x)", ax=ax, ) # Plot corridor. ax.fill_between( x=pos_x.flatten(), y1=(y_hat - 2 * y_hat_sd).flatten(), y2=(y_hat + 2 * y_hat_sd).flatten(), color="green", alpha=0.3, label="Credible Interval", ) # Plot prediction. sns.lineplot(x=pos_x.flatten(), y=y_pred.flatten(), color="green", label="pred") # Labeling axes ax.set(title="Gaussian Process Regression") ax.legend(loc="lower left") ax.set(xlabel="x", ylabel="") idx = list(y_hat_sd).index(max(y_hat_sd)) add_x = pos_x[idx] new_y = function_with_noise(add_x[0]) plt.plot(add_x, new_y, "ro") plt.savefig(f"{i}.png") return add_x[0], new_y x = [0.0, 5.0, 10.0] y = [function_with_noise(x) for x in x] for i in range(25): new_x, new_y = fit_gp(i, np.array(x).reshape(-1, 1), np.array(y).reshape(-1, 1)) x.append(new_x) y.append(new_y) # %%