'''
A collection of functions built for the geostatistical analysis of galaxy data.
Created by: Benjamin Metha, Tree Smith, Jaime Blackwell
Last Updated: May 26, 2025
'''
__version__ = '0.1.8'
import numpy as np
import scipy
import emcee
from sklearn.metrics.pairwise import euclidean_distances
from astropy.wcs import WCS
from statsmodels.regression.linear_model import GLS
ASEC_PER_RAD = 206265.0
EPSILON = 1e-6
#########################
# Unit Conversions #
#########################
[docs]
def make_RA_DEC_grid(header):
'''
Given a hdu header, create a grid of RA//DEC for each pixel in that file.
'''
world = WCS(header)
x = np.arange(header['NAXIS1'])
y = np.arange(header['NAXIS2'])
X, Y = np.meshgrid(x, y)
RA_grid, DEC_grid = world.wcs_pix2world(X, Y, 0)
return RA_grid, DEC_grid
[docs]
def make_physical_lag_grid(header, meta):
'''
Given a hdu header, create a grid of RA//DEC for each pixel in that file.
Parameters
----------
header: hdu header file
Must contain wcs
meta: dict
Must contain RA, DEC of the galaxy centre, and PA, i, and D to get
the galaxy's absolute units (D should be in units of megaparsecs; PA and
i should be in units of degrees).
'''
# Generate lag matrices in pixel space
nx = header['NAXIS1']
ny = header['NAXIS2']
lag_x_pix = np.arange(2*nx - 1) - (nx - 1)
lag_y_pix = np.arange(2*ny - 1) - (ny - 1)
lag_X_pix, lag_Y_pix = np.meshgrid(lag_x_pix, lag_y_pix)
# Convert pixel lags into RA, DEC lags
world = WCS(header)
RA_grid, DEC_grid = world.wcs_pix2world(lag_X_pix, lag_Y_pix, 0)
centre_RA = RA_grid [ny - 1, nx - 1]
centre_DEC = DEC_grid[ny - 1, nx - 1]
delta_RA_deg = RA_grid - centre_RA
delta_DEC_deg = DEC_grid - centre_DEC
# Next, convert RA and DEC to physical pc using the meta dict:
PA = np.radians(meta['PA'])
i = np.radians(meta['i'])
# 1: Rotate RA, DEC by PA to get y (major axis direction) and x (minor axis direction)
x_deg = delta_RA_deg*np.cos(PA) - delta_DEC_deg*np.sin(PA)
y_deg = delta_DEC_deg*np.cos(PA) + delta_RA_deg*np.sin(PA)
# 2: Stretch x values to remove inclination effects
x_deg = x_deg / np.cos(i)
# 3: Convert units to kpc
x_rad = np.radians(x_deg)
y_rad = np.radians(y_deg)
x_kpc = x_rad * meta['D'] * 1000
y_kpc = y_rad * meta['D'] * 1000
return x_kpc, y_kpc
[docs]
def RA_DEC_to_XY(RA, DEC, meta):
'''
Takes in list of RA, DEC coordinates and transforms them into a
list of deprojected XY values, where X and Y are the distances from the
galaxy's centre in units of kpc
Parameters
----------
RA: ndarray like of shape (N,)
List of RA values
DEC: ndarray like of shape (N,)
List of DEC values
meta: dict
Must contain RA, DEC of the galaxy centre, and PA, i, and D to get
the galaxy's absolute units
Returns
-------
XY_kpc: (N,2) ndarray
Contains X and Y coords of all data points with units of kpc
'''
delta_RA_deg = RA - meta['RA']
delta_DEC_deg = DEC - meta['DEC']
PA = np.radians(meta['PA'])
i = np.radians(meta['i'])
# 1: Rotate RA, DEC by PA to get y (major axis direction) and x (minor axis direction)
x_deg = delta_RA_deg*np.cos(PA) - delta_DEC_deg*np.sin(PA)
y_deg = delta_DEC_deg*np.cos(PA) + delta_RA_deg*np.sin(PA)
# 2: Stretch x values to remove inclination effects
x_deg = x_deg / np.cos(i)
# 3: Convert units to kpc
x_rad = np.radians(x_deg)
y_rad = np.radians(y_deg)
x_kpc = x_rad * meta['D'] * 1000
y_kpc = y_rad * meta['D'] * 1000
XY_kpc = np.stack((x_kpc, y_kpc)).T
return XY_kpc
[docs]
def RA_DEC_to_radius(RA, DEC, meta):
'''
Converts a list of RA//DEC values to distances from a galaxy's centre,
using a supplied metadata dictionary.
Parameters
----------
RA: float or np.array
Right ascension of points
DEC: float or np.array
Declination of points
Returns
-------
r: np array
Distance from each point to the galaxy's centre
'''
return deprojected_distances(RA, DEC, meta['RA'], meta['DEC'], meta).T[0]
[docs]
def deprojected_distances(RA1, DEC1, RA2=None, DEC2=None, meta=dict()):
'''
Computes the deprojected distances between one set of RAs/DECs and
another, for a known galaxy.
Parameters
----------
RA1: float, list, or np array-like
List of (first) RA values. Must be in degrees.
DEC1: float, list, or np array-like
List of (first) DEC values. Must be in degrees.
RA2: float, list, or np array-like
(Optional) second list of RA values. Must be in degrees.
If no argument is provided, then the first list will be used again.
DEC2: float, list, or np array-like
(Optional) second list of DEC values. Must be in degrees.
If no argument is provided, then the first list will be used again.
meta: dict
Metadata used to calculate the distances. Must contain:
PA: float
Principle Angle of the galaxy, degrees.
i: float
inclination of the galaxy along this principle axis, degrees.
D: float
Distance from this galaxy to Earth, Mpc.
Returns
-------
dists: np array
Array of distances between all RA, DEC pairs provided.
Units: kpc.
'''
# Check parameters
try:
meta['PA']
except KeyError:
assert False, "Error: PA not defined for metadata"
try:
meta['i']
except KeyError:
assert False, "Error: i not defined for metadata"
try:
meta['D']
except KeyError:
assert False, "Error: D not defined for metadata"
# If RA1 and DEC1 are arrays, they must have the same length.
# If one of them is a float, they must both be floats.
# You can't supply only one of RA2 and DEC2
try:
assert len(RA1) == len(DEC1), "Error: len of RA1 must match len of DEC1"
RA1 = np.array(RA1)
DEC1 = np.array(DEC1)
except TypeError:
assert type(RA1) == type(DEC1), "Error: type of RA1 must match type of DEC1"
# Then cast them to arrays
RA1 = np.array([RA1])
DEC1 = np.array([DEC1])
if type(RA2) == type(None):
RA2 = RA1
if type(DEC2) == type(None):
DEC2 = DEC1
try:
assert len(RA2) == len(DEC2), "Error: len of RA2 must match len of DEC2"
RA2 = np.array(RA2)
DEC2 = np.array(DEC2)
except TypeError:
assert type(RA2) == type(DEC2), "Error: type of RA2 must match type of DEC2"
RA2 = np.array([RA2])
DEC2 = np.array([DEC2])
# Now onto the maths
PA = np.radians(meta['PA'])
i = np.radians(meta['i'])
# 1: Rotate RA, DEC by PA to get y (major axis direction) and x (minor axis direction)
x1 = RA1*np.cos(PA) - DEC1*np.sin(PA)
y1 = DEC1*np.cos(PA) + RA1*np.sin(PA)
x2 = RA2*np.cos(PA) - DEC2*np.sin(PA)
y2 = DEC2*np.cos(PA) + RA2*np.sin(PA)
# 2: Stretch x values to remove inclination effects
long_x1 = x1 /np.cos(i)
long_x2 = x2 /np.cos(i)
# 3: Compute Euclidean Distances between x1,y1 and x2,y2 to get angular offsets (degrees).
vec1 = np.stack((y1, long_x1)).T
vec2 = np.stack((y2, long_x2)).T
deg_dists = euclidean_distances(vec1, vec2)
rad_dists = np.radians(deg_dists)
# 4: Convert angular offsets to kpc distances using D, and the small-angle approximation.
Mpc_dists = rad_dists * meta['D']
kpc_dists = Mpc_dists * 1000
return kpc_dists
[docs]
def to_data_dict(header, Z, e_Z):
'''
Parameters
----------
header: hdu header file
Must contain wcs
Z: np array
Grid with values of our random field
e_Z: np array
Same shape as Z
Gives uncertainty of Z at each location
Returns
-------
data_dict: dict
Contains RA, DEC, Z and e_Z for every
'''
RA_grid, DEC_grid = make_RA_DEC_grid(header)
# Trim off nans
wanted_pixels = ~np.isnan(Z.flatten()) & ~np.isinf(Z.flatten())
data_dict = {
'RA': RA_grid.flatten()[wanted_pixels],
'DEC': DEC_grid.flatten()[wanted_pixels],
'Z': Z.flatten()[wanted_pixels],
'e_Z': e_Z.flatten()[wanted_pixels]
}
return data_dict
#########################
# Spatial Statistics #
#########################
[docs]
def fast_semivariogram(Z_grid, header=None, meta=None, bin_size=2, d_lim=None):
'''
A fast algorithm for computing the semivariogram of galaxy data.
Parameters
----------
Z_grid (2d np.array)
Random field for which we are computing the semivariogram
header: hdu header file
Must contain wcs.
If not supplied, semivariogram will be computed in units of pixels, with
no deprojection.
meta: dict
Metadata used to calculate the distances. Must be supplied if header is supplied. Must contain:
PA: float
Principle Angle of the galaxy, degrees.
i: float
inclination of the galaxy along this principle axis, degrees.
D: float
Distance from this galaxy to Earth, Mpc.
bin_size:
Size of bins for semivariogram.
Defaults to 2 (pixels) -- should be changed if using physical separations
d_lim: float, or None
Maximum distance up to which compute the semivariogram.
If not supplied, goes up to the maximum possible distance in the data.
Returns
-------
svg: numpy array
Semivariogram of the data at each separation
bc: numpy array
centres of each semivariogram bin
N: Number of pairs in each bin?? ( Tree to confirm)
'''
# set up steps (mask and padding)
nx, ny = Z_grid.shape # shape
pad_shape =(2*nx -1, 2*ny-1) #required padding
M_mask = (np.isnan(Z_grid)) # mask
Z_copy = np.zeros_like(Z_grid)
# Z_grid[M_mask] = 0 # set nans to 0
M = (~M_mask).astype(float) # 1 if non-nan
Z_copy[~M_mask] = Z_grid[~M_mask]
# lags
if header is not None:
if meta is None:
raise ValueError("Error: if header is supplied, metadata must also be supplied.")
# If header and meta are both given,
# Compute physical lag separations for all grid points
lag_X, lag_Y = make_physical_lag_grid(header, meta)
else:
# If no header is supplied, compute lags in units of pixels
lag_x = np.arange(pad_shape[1]) - (Z_grid.shape[1] - 1) # horizontal shift (cols)
lag_y = np.arange(pad_shape[0]) - (Z_grid.shape[0] - 1) # vertical shift (rows)
lag_X, lag_Y = np.meshgrid(lag_x, lag_y)
# want the norm of the separation:
r = (lag_X**2 + lag_Y**2)**0.5 #total lag distance
if d_lim is None:
d_lim = np.max(r)
gamma = scipy.signal.fftconvolve(M, (M*Z_copy**2)[::-1, ::-1], mode='full') + scipy.signal.fftconvolve((M*Z_copy**2), M[::-1, ::-1], mode='full') - 2*scipy.signal.fftconvolve((M*Z_copy), (M*Z_copy)[::-1, ::-1])
N = scipy.signal.fftconvolve(M, M[::-1, ::-1], mode='full')
svg_values = scipy.stats.binned_statistic(r.flatten(), gamma.flatten(), statistic=np.nansum, bins=int(d_lim/bin_size), range=(EPSILON, d_lim))
bin_edges = svg_values.bin_edges
bin_centres = (bin_edges[1:] + bin_edges[:-1])/2
svg_values = svg_values.statistic
N_values = scipy.stats.binned_statistic(r.flatten(), N.flatten(), statistic=np.nansum, bins=int(d_lim/bin_size), range=(EPSILON, d_lim)).statistic
# set N=0 values to nan, to avoid divide by zero warnings.
N_values = np.where(N_values > 0, N_values, np.nan)
svg_values = 0.5*(svg_values/N_values)
# Return these values to zero now.
N_values = np.where(~np.isnan(N_values), N_values, 0)
return svg_values, bin_centres, N_values
#########################
# Model Fitting #
#########################
[docs]
def fit_radial_linear_trend(data_dict, meta, return_covariances=False):
'''
Fits a radial trend to the galaxy data.
Designed for computing metallicity gradients -- other mean models may be
required for other galaxy data (e.g. velocities)
Does not account for small scale variations in the data
Parameters
----------
data_dict: dict
Contains RA, DEC, Z, e_Z for each measured value of our random field.
meta: dict
Metadata used to calculate the distances. Must contain:
RA: float
Right ascension of the centre of the galaxy.
DEC: float
Declination of the centre of the galaxy.
PA: float
Principle Angle of the galaxy, degrees.
i: float
inclination of the galaxy along this principle axis, degrees.
D: float
Distance from this galaxy to Earth, Mpc.
return_covariances: bool
If True, covariances of parameters will be returned as well.
Returns
-------
params: list
Central value and radial gradient of random field.
optional -- covariance (array)
Covariance matrix for returned parameters
'''
r = RA_DEC_to_radius(data_dict['RA'], data_dict['DEC'], meta)
covariates = np.array([np.ones(len(r)), r]).T
Z_grad_model = GLS(data_dict['Z'], covariates, sigma=data_dict['e_Z']).fit()
if return_covariances:
return Z_grad_model.params, Z_grad_model.normalized_cov_params
else:
return Z_grad_model.params
[docs]
def generate_residual_Z_grid(Z_grid, e_Z_grid, header, meta):
'''
Find and subtract a radial trend in Z_grid,
'''
RA_grid, DEC_grid = make_RA_DEC_grid(header)
r_list = RA_DEC_to_radius(RA_grid.flatten(), DEC_grid.flatten(), meta)
r_grid = r_list.reshape(RA_grid.shape)
# Strip out unwanted nans
wanted_spaxels = ~np.isnan(Z_grid.flatten())
wanted_r = r_list[wanted_spaxels]
wanted_Z = Z_grid.flatten()[wanted_spaxels]
wanted_e_Z = e_Z_grid.flatten()[wanted_spaxels]
covariates = np.array([np.ones(len(wanted_r)), wanted_r]).T
Z_grad_model = GLS(wanted_Z, covariates, sigma=wanted_e_Z).fit()
# Subtract radially varying mean
Z_c, gradZ = Z_grad_model.params
resid_Z_grid = Z_grid - Z_c - gradZ * r_grid
return resid_Z_grid
# Fit a model for the small-scale structure of a galaxy
[docs]
def fit_exp_cov_model(data_dict, meta, n_samples, n_walkers, backend_f, init_theta, init_unc_theta):
'''
Fit a geostatistical model to the supplied data using emcee,
accounting for (1) a radially linear mean trend, and (2) small fluctuations
that are exponentially correlated
Parameters
----------
data_dict: dict
Contains RA, DEC, Z, e_Z for each measured value of our random field.
meta: dict
Metadata for this galaxy. Must contain PA (position angle, degrees);
i (inclination, degrees) and D (distance, Mpc) for this galaxy, as well
as its central RA and DEC.
n_samples: int
Hyperparameter for emcee; controls how many samples are drawn for each
walker
n_walkers: int
Hyperparameter for emcee; controls how many walkers are used to sample
from the posterior.
backend_f: str
Filename for where to store emcee results (.hdf5)
init_theta: (4,) tuple
Initial values assumed for log_variance, correlation scale, central
value of random field, and radial gradient of random field.
init_unc_theta: (4,) tuple
Initial uncertainties assumed for the sample parameters
Returns
-------
f_acc: float
mean acceptance fraction over all chains.
'''
n_dim = 4 # number of parameters in our model
r = RA_DEC_to_radius(data_dict['RA'], data_dict['DEC'], meta)
dist_matrix = deprojected_distances(data_dict['RA'], data_dict['DEC'], meta=meta)
if 'PSF' in meta:
# Compute the observational error covariance matrix, accounting for
# correlation between pixels due to PSF smearing.
e_Z = build_correlated_error_covariance_matrix(dist_matrix, e_Z=data_dict['e_Z'], meta=meta)
else:
# assume all observational error is uncorrelated
e_Z = np.diag(data_dict['e_Z']**2)
Z = data_dict['Z']
# Convert variables that do not change between emcee loops to global variables
globalize_data(Z, e_Z, r, dist_matrix, init_theta, init_unc_theta)
# Create arrays of initial values
pos = np.outer(init_theta, np.ones(n_walkers)) + np.diag(init_unc_theta) @ np.random.randn(n_dim, n_walkers)
pos = pos.T
backend = emcee.backends.HDFBackend(backend_f)
# comment out the line below if you want to append results to an existing run.
backend.reset(n_walkers, n_dim)
sampler = emcee.EnsembleSampler(n_walkers, n_dim, log_likelihood_exp_model, backend=backend)
sampler.run_mcmc(pos, n_samples, progress=True, store=True)
return np.mean(sampler.acceptance_fraction)
[docs]
def globalize_data(loc_Z, loc_e_Z, loc_r, loc_dist_matrix, loc_init_theta, loc_init_unc_theta):
'''
Turn local variables into global ones
(not strictly necessary, but better for transparency)
'''
global Z
Z = loc_Z
global e_Z
e_Z = loc_e_Z
global r
r = loc_r
global dist_matrix
dist_matrix = loc_dist_matrix
global init_theta
init_theta = loc_init_theta
global init_unc_theta
init_unc_theta = loc_init_unc_theta
global n
n = len(Z)
global D
D = np.array([np.ones(n), r]).T
return 0
[docs]
def log_likelihood_exp_model(theta, priors = True):
'''
Function that is optimised by emcee to find the best parameters for our
model of the random field, with radially linear mean values for Z, and
exponentially-correlated random effects.
The following must be defined as global variables:
Z: (N,) np.array
Observations at N data points
r: (N,) np.array
Covariate that is each spaxel's distance from the galaxy center.
e_Z: (N,N) np.array
matrix of variance from observational error at all data points.
dist_matrix: (N,N) np.array
matrix of distance between all observed data points.
Parameters
----------
theta: 4-tuple
Contains:
log_A, phi: model parameters for spatial_cov
Z_c, gradZ: model parameters for the large scale gradient
priors: bool
If True, folds in the provided priors to the likelihood guess.
If False, just computes the likelihood.
Returns
-------
log_likelihood: float
The log likelihood of this model with the supplied parameters.
'''
log_A, phi, Z_c, gradZ = theta
A = 10**log_A
# infold priors
if priors:
ln_prior = log_prior(theta, init_theta, init_unc_theta)
if np.isinf(ln_prior):
return ln_prior
else:
ln_prior = 0
# Make variance matrix
r_on_phi = dist_matrix / phi
spatial_cov = A* np.exp( -1.0 * r_on_phi)
V = e_Z + spatial_cov
try:
L = np.linalg.cholesky(V)
except:
print('Linalg error. Parameter vals:' )
print(theta)
return -np.inf
log_det_L = np.sum(np.log(np.diag(L)))
log_det_V = 2*log_det_L
white_D = np.linalg.solve(L, D)
white_Z = np.linalg.solve(L, Z)
# subtract mean trend
beta = np.array([Z_c, gradZ])
resids = Z - D @ beta
white_resids = np.linalg.solve(L, resids)
chi_sq = n*np.log(2*np.pi) + log_det_V + white_resids.T @ white_resids
return -0.5*chi_sq + ln_prior
[docs]
def log_prior(theta, init_theta, init_unc_theta):
'''
A prior for the parameters of the model -- feel free to tweak or add your own!
'''
# Unpack theta
log_A, phi, Z_c, gradZ = theta
init_log_A, init_phi, init_Z_c, init_gradZ = init_theta
_, _, unc_Z_c, unc_gradZ = init_unc_theta
# Normal priors on Z_c, grad_Z
log_Z_c_prior = log_normal_prior(Z_c, mu=init_Z_c, sigma=unc_Z_c)
log_gradZ_prior = log_normal_prior(gradZ, mu=init_gradZ, sigma=unc_gradZ)
# Gamma priors on phi, A
log_A_prior = log_gamma_prior_tenth_to_ten(10**(log_A - init_log_A))
log_phi_prior = log_gamma_prior_tenth_to_ten(phi/init_phi)
return log_Z_c_prior + log_gradZ_prior + log_A_prior + log_phi_prior
[docs]
def log_normal_prior(x, mu, sigma):
return -0.5*( ((x-mu)/sigma)**2 ) - np.log(sigma) - 0.5*np.log(2*np.pi)
[docs]
def log_gamma_prior_tenth_to_ten(x):
'''Gamma distribution with 1% probability of being below 0.1 or above 10'''
if x<0:
return -np.inf
alpha = 1.494
labda = 0.5661
prob = np.power(labda, alpha) * np.power(x, alpha - 1) * np.exp(-1.0*labda*x) / scipy.special.gamma(alpha)
return np.log(prob)
########################################
# Subsampling and cross-validation #
########################################
[docs]
def get_subsample(data_dict, n_in_subsample):
'''
Selects n_in_subsample elements from a supplied data_dict
'''
n_dp = len(data_dict['Z'])
if n_dp < n_in_subsample:
raise ValueError
A = np.zeros(n_dp)
A[:n_in_subsample] = 1
np.random.shuffle(A)
sub_dict = dict()
for k in data_dict.keys():
sub_dict[k] = data_dict[k][A == 1]
return sub_dict
[docs]
def assign_IDs(n_dp, n_folds):
'''
Creates an array of length n_dp, with each element having a random integer
from 1 to n_folds, and an equal amount of each number.
If we can't get exactly even groups, the higher numbered groups will have
one less element than the lower numbers.
e.g. assign_IDs(5,3) may return [2,3,2,1,1].
Parameters
----------
n_dp: int
number of data points
n_folds: int
number of groups to split the data into
Returns
-------
group_IDs: np array
gives ID of each group element.
'''
IDs = [x for x in range(1, n_folds+1)]
long_IDs = IDs * int(n_dp/n_folds + 1)
group_IDs = np.array(long_IDs[:n_dp])
np.random.shuffle(group_IDs)
return group_IDs
#########################
# Kriging #
#########################
[docs]
def krig_exp_model(RA, DEC, Z_df, meta, theta, mode='grid'):
'''
Performs universal kriging on a model grid of RA and DEC
Uses my distance function, a choice of covariance function, the best fitting
value of f_d (no restrictions), and assumes Z ~ r + (random effects)
Uses equations presented in 'Spatio-Temporal Statistics with R', available
for free at https://spacetimewithr.org
Parameters
----------
RA: np array
Array of RA values. Must be in degrees.
DEC: (N,) np array
Array of DEC values. Must be in degrees.
Z_df: data dict, containing RA, DEC, Z, e_Z for each data point.
meta: dict
Metadata used to calculate the distances. Must contain:
PA: float
Principle Angle of the galaxy, degrees.
i: float
inclination of the galaxy along this principle axis, degrees.
D: float
Distance from this galaxy to Earth, Mpc.
theta: (4,) tuple
Contains model parameters (log_Var, phi, Zc, gradZ)
mode: str
Options include
'grid': make a grid of all possible combos of supplied RA and DEC
values; use kriging to estimate Z at each point on grid.
'list': just use pairs of RA and DEC values as they are given.
RA and DEC must have the same length.
'auto': Get RA and DEC values from the df itself.
Returns
-------
Z_pred_matrix: (M,N) np array
interpolated (kriged) values over the RA, DEC coords given.
var_matrix: (M,N) np array
variances for these predictions
'''
log_A, phi, Z_c, gradZ = theta
A = 10**log_A
if mode=='grid':
# Construct arrays for all pairs of RA and DEC values
if RA.shape == DEC.shape:
shape = RA.shape
else:
raise ValueError("RA_grid and DEC_grid must have the same shape! Input RA shape: {0} Input DEC shape: {1}".format((RA_grid.shape, DEC_grid.shape)))
RA_long = RA.flatten()
DEC_long = DEC.flatten()
elif mode=='list':
RA_long = RA
DEC_long = DEC
elif mode=='auto':
RA_long = Z_df['RA']
DEC_long = Z_df['DEC']
else:
print("Error: Bad argument given to `krig_model`.\nMode must be either 'grid', 'list', or 'auto'.")
return np.nan, np.nan
data_dists = deprojected_distances(Z_df['RA'], Z_df['DEC'], meta=meta)
r = RA_DEC_to_radius(Z_df['RA'], Z_df['DEC'], meta)
Z_resids = Z_df['Z'] - Z_c - gradZ*r
data_grid_dists = deprojected_distances(Z_df['RA'], Z_df['DEC'], RA_long, DEC_long, meta=meta)
# Find galactocentric radius of each grid point and each data point
grid_r = deprojected_distances(RA_long, DEC_long, meta['RA'], meta['DEC'], meta=meta).T[0]
covariates = np.array([np.ones(len(r)), r]).T
grid_covariates = np.array([np.ones(len(grid_r)), grid_r]).T
best_beta = np.array([Z_c, gradZ])
if 'PSF' in meta:
# Compute the observational error covariance matrix, accounting for
# correlation between pixels due to PSF smearing.
error_cov = build_correlated_error_covariance_matrix(dist_matrix, e_Z=Z_df['e_Z'], meta=meta)
else:
# assume all observational error is uncorrelated
error_cov = np.diag(Z_df['e_Z']**2)
spatial_cov_data = A*np.exp(-1.0*data_dists/phi)
spatial_cov_data_grid = A*np.exp(-1.0*data_grid_dists/phi)
tot_data_cov = spatial_cov_data + error_cov
# Use Universal Kriging to estimate residuals for each grid point
# SpaceTimeWithR, equation 4.6
c_factor = scipy.linalg.cho_factor(tot_data_cov)
white_resids = scipy.linalg.cho_solve(c_factor, Z_resids)
predicted_grid_resids = spatial_cov_data_grid.T @ white_resids
# Add this to model to predict metallicity at each point.
predicted_grid_Z = np.dot(best_beta, grid_covariates.T) + predicted_grid_resids
# Get uncertainty (eq. 4.10 of SpaceTimeWithR)
white_D = scipy.linalg.cho_solve(c_factor, covariates)
white_cov_data_grid = scipy.linalg.cho_solve(c_factor, spatial_cov_data_grid)
Cov_cvars = covariates.T @ white_D
inv_cov_cvars =np.linalg.inv(Cov_cvars) # don't be fancy, since it's a 2 by 2
kriged_cvar_resids = grid_covariates - (spatial_cov_data_grid.T @ white_D)
gls_uncertainty = np.diagonal( kriged_cvar_resids @ inv_cov_cvars @ kriged_cvar_resids.T)
grid_uncertainty = A - np.diagonal(spatial_cov_data_grid.T @ white_cov_data_grid) + gls_uncertainty
# If needed, reshape these to be an RA X DEC shaped, plottable matrix
if mode=='grid':
Z_pred_matrix = predicted_grid_Z.reshape(shape)
var_matrix = grid_uncertainty.reshape(shape)
return Z_pred_matrix, var_matrix
else:
return predicted_grid_Z, grid_uncertainty