Skip to content
Snippets Groups Projects
Commit 2dbb0e9a authored by Gros Gabriel's avatar Gros Gabriel
Browse files

premier commit

parent e434d258
No related branches found
No related tags found
No related merge requests found
# Dossier de VS Code
.vscode/
# Dépendances (exemple pour Node.js)
node_modules/
# Fichiers de build
dist/
build/
Source diff could not be displayed: it is too large. Options to address this: view the blob.
import numpy as np
import ot
from problem import Parameters
def cost_matrix(tau, tau_pred):
"""
parameters
----------
tau: double[r]
ground truth delay times vector
tau_pred: double[s]
estimated delay times vector
"""
r = tau.shape[0]
s = tau_pred.shape[0]
return tau_pred.reshape(s, 1) @ np.ones((1, r)) - np.ones((s, 1)) @ tau.reshape(1, r)
def optimal_transport_matrix(a, a_pred, D):
"""
parameters
----------
a: double[r]
ground truth amplitudes
a_pred: double[s]
estimated amplitudes
D: double[s, r]
distance matrix
"""
return ot.emd(a_pred, a, D)
def wasserstein1(theta, theta_pred):
"""
parameters
----------
theta: Parameters
grand truth values
theta_pred: Parameters
estimated values
"""
Dk = cost_matrix(theta.tau, theta_pred.tau)
Gamma = optimal_transport_matrix(theta.a, theta_pred.a, np.abs(Dk))
return np.sum(Gamma * np.abs(Dk))
This diff is collapsed.
import numpy as np
import matplotlib.pyplot as plt
import scipy.linalg
from tqdm import tqdm # progress bar
from scipy.special import kv, gamma
import math as mt
from itertools import permutations
def generate_data_multi(n, means, scale, weights, distribution='gaussian', kernel=None):
"""
Generate n samples from a mixture with r components,
where each component i has mean means[i] and weight weights[i].
The parameter 'distribution' can be 'gaussian', 'lorentz', 'uniform' or 'student'.
For 'student', the degrees of freedom is taken from kernel.nu.
This version works even when n == 1.
"""
#means correspond aux tau_i
#weights correspond aux a_i, qu'on doit normaliser pour avoir une distribution de probabilité
#scale correspond à l'écart-type, on le fixera à 1
r = len(means)
# Special case: n == 1
if n == 1:
comp = np.random.choice(r, p=weights)
if distribution == 'gaussian':
return np.array([np.random.normal(means[comp], scale)])
elif distribution == 'lorentz':
sample = np.random.standard_cauchy() * scale + means[comp]
return np.array([np.clip(sample, -50, 50)])
elif distribution == 'uniform':
return np.array([np.random.uniform(means[comp] - scale, means[comp] + scale)])
elif distribution == 'student':
df = kernel.nu if (kernel is not None and hasattr(kernel, 'nu')) else 3
return np.array([np.random.standard_t(df) * scale + means[comp]])
else:
raise ValueError("Unknown distribution type")
# For n > 1: use a multinomial draw to exactly split n samples.
counts = np.random.multinomial(n, weights)
data = []
for i in range(r):
n_i = counts[i]
if distribution == 'gaussian':
samples = np.random.normal(means[i], scale, n_i)
elif distribution == 'lorentz':
samples = np.random.standard_cauchy(n_i) * scale + means[i]
samples = np.clip(samples, -50, 50)
elif distribution == 'uniform':
samples = np.random.uniform(means[i] - scale, means[i] + scale, n_i)
elif distribution == 'student':
df = kernel.nu if (kernel is not None and hasattr(kernel, 'nu')) else 3
samples = np.random.standard_t(df, n_i) * scale + means[i]
else:
raise ValueError("Unknown distribution type")
data.append(samples)
data = np.concatenate(data)
np.random.shuffle(data)
return data
def generate_data(m,taus,weights,sigma=1):
if sum(weights) !=1:
s = sum(weights)
weights = np.array(weights)/s
#Return m data distributed with the pdf : Sum weights[i]*N(taus[i],sigma)
r = len(taus)
quelle_cloche = np.random.choice(r , size = m , p = weights)
#quelle_cloche[i] correspond à la cloche de provenance de data[i]
#taus[quelle_cloche][i] donne la moyenne de la gaussienne dont provient data[i]
data = np.random.normal(np.array(taus)[quelle_cloche],sigma)
return data
def genere_hist(weights,means):
data = generate_data(100000,means,weights)
plt.hist(data, bins=50, density=True, alpha=0.6, color='b')
plt.xlabel("Valeurs")
plt.ylabel("Densité")
plt.title("Histogramme d'une somme de lois normales")
plt.show()
def FCE(data, freqs):
# Compute the empirical characteristic function at frequencies in freqs.
#L'idée est de confondre les données par leur fonction caractéristique empirique prise en N fréquences.
#Cela réduit la taille du problème de m à N.
#L'utilisation de np notamment pour np.exp vectorise les calculs -> gains de temps et gère les nombres complexes
phi = np.array([np.mean(np.exp(-2j * np.pi * f * data)) for f in freqs])
return phi.reshape(-1, 1) #Utile pour l'avoir en tant que vecteur pour après
def sample_uniform_frequencies(n):
#N = 2n+1 est la nouvelle taille du problème
N = 2 * n + 1
return np.arange(-n, n + 1).reshape(-1, 1) / N
def fourier(x,sigma=1):
return np.exp(-2 * (x * sigma * np.pi)**2)
def ESPRIT_(x, r, my_fourier, freqs1, true_freqs):
x_vec = x.flatten()
N = len(x_vec)
L = N // 2
M = N - L + 1
X = np.zeros((L, M), dtype=complex)
for i in range(L):
X[i, :] = x_vec[i:i+M]
U, s, Vh = np.linalg.svd(X, full_matrices=False)
U_r = U[:, :r]
U1 = U_r[:-1, :]
U2 = U_r[1:, :]
Psi = np.linalg.pinv(U1) @ U2
eigvals, _ = np.linalg.eig(Psi)
f_est = N * np.angle(eigvals) / (2 * np.pi)
t0_est = -np.sort(f_est)
V0 = my_fourier(true_freqs).reshape(-1, 1) * np.exp(2j * np.pi * np.outer(freqs1.flatten(), t0_est))
a0 = np.linalg.lstsq(V0, x_vec, rcond=None)[0]
return np.concatenate((a0, t0_est))
def S_err(theta_est, r, theta_star):
# Assumes theta_est and theta_star are concatenations: [amplitudes, centroids]
centroids_est = np.sort(theta_est[r:])
centroids_true = np.sort(theta_star[r:])
return np.linalg.norm(centroids_est - centroids_true)
def permute_and_compute_norm(a, b, k):
# Diviser a et b en 2 parties : weights_a (premières k composantes) et means_tau (les autres)
a_means, a_weights = a[:k], a[k:]
b_means, b_weights = b[:k], b[k:]
# Permuter les parties means entre a et b, et pareil pour les parties weights
a_means_permuted, b_means_permuted = list(permutations(a_means)), list(permutations(b_means))
a_weights_permuted, b_weights_permuted = list(permutations(a_weights)), list(permutations(b_weights))
min_norm = np.inf # Initialiser la norme minimale à un grand nombre
# Initialiser best_perm à la permutation identité
best_perm = (tuple(a_weights), tuple(b_weights), tuple(a_means), tuple(b_means))
# Tester toutes les permutations de means et weights
for perm_means_a in a_means_permuted:
for perm_means_b in b_means_permuted:
for perm_weights_a in a_weights_permuted:
for perm_weights_b in b_weights_permuted:
# Recomposer les vecteurs permutés
a_permuted = np.concatenate([np.array(perm_weights_a), np.array(perm_means_a)])
#b_permuted = np.concatenate([np.array(perm_weights_b), np.array(perm_means_b)])
# Calculer la norme Euclidienne
norm = np.linalg.norm(a_permuted - b)
if norm < min_norm:
min_norm = norm
best_perm = (perm_weights_a, perm_weights_b, perm_means_a, perm_means_b)
# Maintenant que nous avons best_perm, appliquer la permutation à a (modification directe de a)
perm_means_a, perm_means_b, perm_weights_a, perm_weights_b = best_perm
# Appliquer la permutation sur les "means" et "weights" de a
a_means_permuted = np.array(perm_means_a)
a_weights_permuted = np.array(perm_weights_a)
# Recomposer a avec les nouvelles parties permutées
c = np.concatenate([a_means_permuted, a_weights_permuted])
return min_norm , c # Retourner la norme minimale trouvée
def permute_and_compute_norm2(a, b, k): #Obsolète, ne modifie pas a
# Diviser a et b en 2 parties : weights_a (premières k composantes) et means_tau (les autres)
a_means, a_weights = a[:k], a[k:]
b_means, b_weights = b[:k], b[k:]
# Permuter les parties means entre a et b, et pareil pour les parties weights
a_means_permuted, b_means_permuted = list(permutations(a_means)), list(permutations(b_means))
a_weights_permuted, b_weights_permuted = list(permutations(a_weights)), list(permutations(b_weights))
min_norm = np.inf # Initialiser la norme minimale à un grand nombre
# Tester toutes les permutations de means et weights
for perm_means_a in a_means_permuted:
for perm_means_b in b_means_permuted:
for perm_weights_a in a_weights_permuted:
for perm_weights_b in b_weights_permuted:
# Recomposer les vecteurs permutés
a_permuted = np.concatenate([np.array(perm_means_a), np.array(perm_weights_a)])
b_permuted = np.concatenate([np.array(perm_means_b), np.array(perm_weights_b)])
# Calculer la norme Euclidienne
norm = np.linalg.norm(a_permuted - b_permuted)
if norm < min_norm:
min_norm = norm
best_perm = (perm_means_a, perm_means_b, perm_weights_a, perm_weights_b)
return min_norm #, best_perm
def batch_algo_gradient_descent(m, n, means, weights, num_steps=100, eta=0.1):
"""
First, perform an ESPRIT estimation using a batch of m samples.
Then, run a batch (offline) preconditioned gradient descent refinement for num_steps,
using the fixed empirical characteristic function computed from the m samples.
n is here for the frequencies : N=2n+1 frequencies used.
"""
r = len(means)
# Generate batch data and compute its FCE
data = generate_data(m, means, weights)
freqs = sample_uniform_frequencies(n) # shape (N_freq, 1)
baseline_freqs = sample_uniform_frequencies(n)
# Compute the empirical characteristic function once for the batch
Y = FCE(data, freqs) # shape (N_freq, 1)
# Initial ESPRIT estimation: theta_est = [a_est (r,), tau_est (r,)]
theta_est = ESPRIT_(Y, r, fourier, baseline_freqs, freqs)
a_est = theta_est[:r].reshape(-1, 1)
tau_est = theta_est[r:]
# Precompute the "G" matrix: diagonal with entries given by kernel's Fourier transform at freqs.
G_diag = fourier(freqs).flatten() # shape (N_freq,)
G_mat = np.diag(G_diag) # shape (N_freq, N_freq)
# Dictionary function Phi: for a given tau vector (r,), returns a matrix (N_freq x r)
def Phi(tau):
return np.exp(2j * np.pi * np.outer(freqs.flatten(), tau))
# Model function: Y_model = G * Phi(tau) * a.
def Y_model(a, tau):
return G_mat @ (Phi(tau) @ a)
# Compute K''(0): for K(t)=sum_f |ĝ(f)|^2 exp(2i*pi*f*t), we have
# K''(0)= - (2*pi)^2 * sum_f |ĝ(f)|^2 * f^2.
f_vals = freqs.flatten()
Kpp0 = - (2 * np.pi)**2 * np.sum(np.abs(fourier(freqs).flatten())**2 * (f_vals**2))
# Run batch gradient descent updates on the fixed FCE Y
for step in range(num_steps):
# Compute the current model prediction
Y_estimated = Y_model(a_est, tau_est)
# Residual: difference between model prediction and observed FCE
r_vec = Y_estimated - Y # shape (N_freq, 1)
# Gradient with respect to amplitudes: grad_a = (G*Phi(tau))^H * r
grad_a = (G_mat @ Phi(tau_est)).conj().T @ r_vec # shape (r, 1)
# Gradient with respect to tau:
grad_tau = np.zeros_like(tau_est, dtype=complex)
for i in range(r):
dPhi_dtau = 2j * np.pi * freqs.flatten() * np.exp(2j * np.pi * freqs.flatten() * tau_est[i])
grad_tau[i] = np.real(a_est[i, 0] * ((G_mat @ dPhi_dtau).conj().T @ r_vec)[0])
# Preconditioner for tau-update:
P_tau = (1/(-Kpp0)) / (np.abs(a_est.flatten())**2)
# Gradient descent update with learning rate eta
a_est = a_est - eta * grad_a
tau_est = tau_est - eta * (P_tau * grad_tau)
# Final estimated parameters (concatenated amplitudes and centroids)
theta_est_final = np.concatenate((a_est.flatten(), tau_est))
# True parameters vector: [weights, means]
theta_star = np.concatenate((weights, means))
norm , theta_est_final_prime = permute_and_compute_norm(theta_est_final,theta_star,r)
return norm , theta_est_final_prime , theta_star , theta_est_final
print(batch_algo_gradient_descent(1000,20,[0,10],[2/3,1/3]))
print(batch_algo_gradient_descent(1000,20,[0,5,10],[1/4,1/2,1/4]))
print(batch_algo_gradient_descent(1000,100,[0,5,10],[1/4,1/2,1/4]))
print(batch_algo_gradient_descent(1000,200,[0,5,10],[1/4,1/2,1/4]))
\ No newline at end of file
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment