"""Code to calculate the 1D and 2D power spectrum of a lightcone."""
import warnings
from collections.abc import Callable
import astropy.units as un
import numpy as np
from powerbox.tools import (
_magnitude_grid,
above_mu_min_angular_generator,
angular_average,
get_power,
ignore_zero_ki,
power2delta,
regular_angular_generator,
)
from scipy.interpolate import RegularGridInterpolator
from ..units import validate
from .psclasses import CylindricalPS, SphericalPS
def get_chunk_indices(
lc_redshifts: np.ndarray,
chunk_size: int | np.ndarray,
ps_redshifts: np.ndarray | None = None,
chunk_skip: np.ndarray | None = None,
):
"""Get the start and end indices for each lightcone chunk."""
n_slices = lc_redshifts.shape[0]
if ps_redshifts is None:
if chunk_skip is None:
chunk_skip = chunk_size
if isinstance(chunk_size, int):
chunk_starts = list(range(0, n_slices - chunk_size, chunk_skip))
chunk_ends = np.array(chunk_starts) + chunk_size
if isinstance(chunk_size, np.ndarray):
raise ValueError(
"chunk_size should be an int or ps_redshifts should be provided."
)
else:
if not np.iterable(ps_redshifts):
ps_redshifts = np.array([ps_redshifts])
if np.min(np.round(ps_redshifts, 5)) < np.min(
np.round(lc_redshifts, 5)
) or np.max(np.round(ps_redshifts, 5)) > np.max(np.round(lc_redshifts, 5)):
raise ValueError("ps_redshifts should be within the range of lc_redshifts")
if isinstance(chunk_size, int):
chunk_size = np.array([chunk_size] * len(ps_redshifts))
chunk_starts = np.array(
[
np.max([np.argmin(abs(lc_redshifts - z)) - s // 2, 0])
for z, s in zip(ps_redshifts, chunk_size, strict=False)
],
dtype=np.int32,
)
chunk_ends = np.min(
[
np.array(chunk_starts) + chunk_size,
np.zeros_like(ps_redshifts) + n_slices,
],
axis=0,
)
chunk_starts = np.array(chunk_starts, dtype=np.int32)
chunk_ends = np.array(chunk_ends, dtype=np.int32)
return list(zip(chunk_starts, chunk_ends, strict=False))
[docs]
def calculate_ps(
chunk: un.Quantity,
box_length: un.Quantity,
*,
chunk_redshift: float | None = None,
calc_2d: bool | None = True,
kperp_bins: int | np.ndarray | None = None,
k_weights_2d: Callable | None = ignore_zero_ki,
log_bins: bool | None = True,
calc_1d: bool | None = False,
k_bins: int | None = None,
k_weights_1d: Callable | None = ignore_zero_ki,
bin_ave: bool | None = True,
interp: bool | None = None,
prefactor_fnc: Callable | None = power2delta,
interp_points_generator: Callable | None = None,
get_variance: bool | None = False,
) -> tuple[SphericalPS | None, CylindricalPS | None]:
r"""Calculate power spectra from a lightcone or coeval box.
Parameters
----------
chunk : un.Quantity
The 3D chunk whose power spectrum we want to calculate.
This can be either a coeval box or a lightcone chunk.
box_length : un.Quantity
The side length of the box.
Accepted units are: Mpc and Mpc/h.
chunk_redshift : float, optional
The central redshift of the lightcone chunk or coeval box.
calc_2d : bool, optional
If True, calculate the 2D power spectrum.
kperp_bins : int, optional
The number of bins to use for the kperp axis of the 2D PS.
k_weights : callable, optional
A function that takes a frequency tuple and returns
a boolean mask for the k values to ignore.
See powerbox.tools.ignore_zero_ki for an example
and powerbox.tools.get_power documentation for more details.
Default is powerbox.tools.ignore_zero_ki, which excludes
the power any k_i = 0 mode.
Typically, only the central zero mode |k| = 0 is excluded,
in which case use powerbox.tools.ignore_zero_absk.
calc_1d : bool, optional
If True, calculate the 1D power spectrum.
k_bins : int, optional
The number of bins on which to calculate 1D PS.
bin_ave : bool, optional
If True, return the center value of each kperp and kpar bin
i.e. len(kperp) = ps_2d.shape[0].
If False, return the left edge of each bin
i.e. len(kperp) = ps_2d.shape[0] + 1.
interp : str, optional
If True, use linear interpolation to calculate the PS
at the points specified by interp_points_generator.
Note that this significantly slows down the calculation.
prefactor_fnc : callable, optional
A function that takes a frequency tuple and returns the prefactor
to multiply the PS with.
Default is powerbox.tools.power2delta, which converts the power
P [mK^2 Mpc^{-3}] to the dimensionless power :math:`\\delta^2` [mK^2].
interp_points_generator : callable, optional
A function that generates the points at which to interpolate the PS.
See powerbox.tools.get_power documentation for more details.
get_variance : bool, optional
If True, compute the variance of the PS over the modes within each bin.
Default is False.
Returns
-------
ps1d : SphericalPS or None
The 1D power spectrum.
None if calc_1d is False.
ps2d : CylindricalPS or None
The 2D power spectrum.
None if calc_2d is False.
"""
if not calc_1d and not calc_2d:
raise ValueError("At least one of calc_1d or calc_2d must be True.")
if not interp:
interp = None
if not isinstance(chunk, un.Quantity):
raise TypeError("chunk should be a Quantity.")
if not isinstance(box_length, un.Quantity):
raise TypeError("box_length should be a Quantity.")
# Split the lightcone into chunks for each redshift bin
# Infer HII_DIM from lc side shape
box_side_shape = chunk.shape[0]
if get_variance and interp is not None:
raise NotImplementedError("Cannot get variance while interpolating.")
out = {}
if calc_1d:
out["ps_1d"] = {}
if calc_2d:
out["ps_2d"] = {}
if interp:
interp = "linear"
if prefactor_fnc is None:
ps_unit = chunk.unit**2 * box_length.unit**3
elif prefactor_fnc == power2delta:
ps_unit = chunk.unit**2
else:
warnings.warn(
"The prefactor function is not the default. PS unit may not be correct.",
stacklevel=2,
)
ps_unit = chunk.unit**2
ps2d = None
ps1d = None
if calc_2d:
results = get_power(
chunk.value,
(
box_length.value,
box_length.value,
box_length.value * chunk.shape[-1] / box_side_shape,
),
res_ndim=2,
bin_ave=bin_ave,
bins=kperp_bins,
log_bins=log_bins,
nthreads=1,
k_weights=k_weights_2d,
prefactor_fnc=prefactor_fnc,
interpolation_method=interp,
return_sumweights=True,
get_variance=get_variance,
bins_upto_boxlen=True,
)
if get_variance:
ps_2d, kperp, variance, nmodes, kpar = results
lc_var_2d = variance
else:
ps_2d, kperp, nmodes, kpar = results
kpar = np.array(kpar).squeeze()
lc_ps_2d = ps_2d[..., kpar > 0]
if get_variance:
lc_var_2d = lc_var_2d[..., kpar > 0]
kpar = kpar[kpar > 0]
ps2d = CylindricalPS(
ps=lc_ps_2d * ps_unit,
kperp=kperp.squeeze() / box_length.unit,
kpar=kpar / box_length.unit,
redshift=chunk_redshift,
n_modes=nmodes,
variance=lc_var_2d * ps_unit**2 if get_variance else None,
is_deltasq=prefactor_fnc is not None,
)
if calc_1d:
results = get_power(
chunk,
(
box_length.value,
box_length.value,
box_length.value * chunk.shape[-1] / box_side_shape,
),
bin_ave=bin_ave,
bins=k_bins,
log_bins=log_bins,
k_weights=k_weights_1d,
prefactor_fnc=prefactor_fnc,
interpolation_method=interp,
interp_points_generator=interp_points_generator,
return_sumweights=True,
get_variance=get_variance,
bins_upto_boxlen=True,
)
if get_variance:
ps_1d, k, var_1d, nmodes_1d = results
lc_var_1d = var_1d
else:
ps_1d, k, nmodes_1d = results
lc_ps_1d = ps_1d
ps1d = SphericalPS(
ps=lc_ps_1d * ps_unit,
k=k.squeeze() / box_length.unit,
redshift=chunk_redshift,
n_modes=nmodes_1d.squeeze(),
variance=lc_var_1d * ps_unit**2 if get_variance else None,
is_deltasq=prefactor_fnc is not None,
)
return ps1d, ps2d
[docs]
def calculate_ps_lc(
lc: un.Quantity,
box_length: un.Quantity,
lc_redshifts: np.ndarray,
*,
ps_redshifts: float | np.ndarray | None = None,
chunk_indices: list | None = None,
chunk_size: int | None = None,
chunk_skip: int | None = None,
calc_2d: bool = True,
kperp_bins: int | None = None,
k_weights_2d: Callable | None = ignore_zero_ki,
k_weights_1d: Callable | None = ignore_zero_ki,
log_bins: bool = True,
calc_1d: bool = True,
k_bins: int | None = None,
mu_min: float | None = None,
bin_ave: bool = True,
interp: bool | None = None,
deltasq: bool = True,
interp_points_generator: Callable | None = None,
get_variance: bool = False,
transform_ps1d: Callable | None = None,
transform_ps2d: Callable | None = None,
) -> tuple[list[SphericalPS] | None, list[CylindricalPS] | None]:
r"""
Calculate the PS by chunking a lightcone.
Parameters
----------
lc : un.Quantity
The lightcone with units of temperature or dimensionless.
box_length : un.Quantity
The side length of the box, accepted units are length.
lc_redshifts : np.ndarray
The redshift of each lightcone slice.
Array has same length as lc.shape[-1].
chunk_redshift : float, optional
The central redshift of the lightcone chunk or coeval box.
calc_2d : bool, optional
If True, calculate the 2D power spectrum.
kperp_bins : int, optional
The number of bins to use for the kperp axis of the 2D PS.
k_weights : callable, optional
A function that takes a frequency tuple and returns
a boolean mask for the k values to ignore.
See powerbox.tools.ignore_zero_ki for an example
and powerbox.tools.get_power documentation for more details.
Default is powerbox.tools.ignore_zero_ki, which excludes
the power any k_i = 0 mode.
Typically, only the central zero mode |k| = 0 is excluded,
in which case use powerbox.tools.ignore_zero_absk.
calc_1d : bool, optional
If True, calculate the 1D power spectrum.
k_bins : int, optional
The number of bins on which to calculate 1D PS.
mu_min : float, optional
The minimum value of
:math:`\\cos(\theta), \theta = \arctan (k_\\perp/k_\\parallel)`
for all calculated PS.
If None, all modes are included.
bin_ave : bool, optional
If True, return the center value of each kperp and kpar bin
i.e. len(kperp) = ps_2d.shape[0].
If False, return the left edge of each bin
i.e. len(kperp) = ps_2d.shape[0] + 1.
interp : str, optional
If True, use linear interpolation to calculate the PS
at the points specified by interp_points_generator.
Note that this significantly slows down the calculation.
delta : bool, optional
Whether to convert the power P [mK^2 Mpc^{-3}] to the dimensionless
power :math:`\\delta^2` [mK^2].
Default is True.
interp_points_generator : callable, optional
A function that generates the points at which to interpolate the PS.
See powerbox.tools.get_power documentation for more details.
transform_ps2d : Callable, optional
A function that takes in a CylindricalPS object and returns
a new CylindricalPS object.
transform_ps1d : Callable, optional
A function that takes in a SphericalPS object and returns
a new SphericalPS object.
get_variance : bool, optional
Whether to calculate the variance of the PS.
Default is False.
chunk_indices : list, optional
A list of tuples specifying the start and end indices of the lightcone
chunks for which power spectra are calculated.
Returns
-------
ps1d : list of SphericalPS or None
The 1D power spectrum for each chunk.
None if calc_1d is False.
ps2d : list of CylindricalPS or None
The 2D power spectrum for each chunk.
None if calc_2d is False.
"""
validate(lc, "temperature")
validate(box_length, "length")
if chunk_indices is None:
chunk_indices = get_chunk_indices(
lc_redshifts,
lc.shape[0] if chunk_size is None else chunk_size,
ps_redshifts=ps_redshifts,
chunk_skip=chunk_skip,
)
if mu_min is not None:
if interp is None:
k_weights_1d_input = k_weights_1d
def mask_fnc(freq, absk):
kz_mesh = np.zeros((len(freq[0]), len(freq[1]), len(freq[2])))
kz = freq[2]
for i in range(len(kz)):
kz_mesh[:, :, i] = kz[i]
phi = np.arccos(kz_mesh / absk)
mu_mesh = abs(np.cos(phi))
kmag = _magnitude_grid([c for i, c in enumerate(freq) if i < 2])
return np.logical_and(mu_mesh > mu_min, k_weights_1d_input(freq, kmag))
k_weights_1d = mask_fnc
if interp is not None:
k_weights_1d = ignore_zero_ki
interp_points_generator = above_mu_min_angular_generator(mu=mu_min)
else:
k_weights_1d = ignore_zero_ki
if interp is not None:
interp_points_generator = regular_angular_generator()
prefactor_fnc = power2delta if deltasq else None
ps2ds = []
ps1ds = []
for chunk in chunk_indices:
start = chunk[0]
end = chunk[1]
chunk = lc[..., start:end]
if lc_redshifts is not None:
chunk_z = lc_redshifts[(start + end) // 2]
ps1d, ps2d = calculate_ps(
chunk=chunk,
box_length=box_length,
chunk_redshift=chunk_z,
calc_2d=calc_2d,
kperp_bins=kperp_bins,
k_weights_2d=k_weights_2d,
k_weights_1d=k_weights_1d,
log_bins=log_bins,
calc_1d=calc_1d,
k_bins=k_bins,
bin_ave=bin_ave,
interp=interp,
prefactor_fnc=prefactor_fnc,
interp_points_generator=interp_points_generator,
get_variance=get_variance,
)
if ps1d is not None and transform_ps1d is not None:
ps1d = transform_ps1d(ps1d)
if ps2d is not None and transform_ps2d is not None:
ps2d = transform_ps2d(ps2d)
ps1ds.append(ps1d)
ps2ds.append(ps2d)
return ps1ds if calc_1d else None, ps2ds if calc_2d else None
[docs]
def calculate_ps_coeval(
box: un.Quantity,
box_length: un.Quantity,
*,
box_redshift: float | None = None,
calc_2d: bool | None = True,
kperp_bins: int | None = None,
k_weights_2d: Callable | None = ignore_zero_ki,
k_weights_1d: Callable | None = ignore_zero_ki,
log_bins: bool | None = True,
calc_1d: bool | None = True,
k_bins: int | None = None,
mu_min: float | None = None,
bin_ave: bool | None = True,
interp: bool | None = None,
deltasq: bool | None = True,
interp_points_generator: Callable | None = None,
get_variance: bool | None = False,
transform_ps1d: Callable | None = None,
transform_ps2d: Callable | None = None,
) -> tuple[SphericalPS | None, CylindricalPS | None]:
r"""
Calculate the PS by chunking a lightcone.
Parameters
----------
box : un.Quantity
The coeval box with units of temperature or dimensionless.
box_length : un.Quantity
The side length of the box, accepted units are length.
box_redshift : float, optional
The redshift value of the coeval box.
chunk_redshift : float, optional
The central redshift of the lightcone chunk or coeval box.
calc_2d : bool, optional
If True, calculate the 2D power spectrum.
kperp_bins : int, optional
The number of bins to use for the kperp axis of the 2D PS.
k_weights : callable, optional
A function that takes a frequency tuple and returns
a boolean mask for the k values to ignore.
See powerbox.tools.ignore_zero_ki for an example
and powerbox.tools.get_power documentation for more details.
Default is powerbox.tools.ignore_zero_ki, which excludes
the power any k_i = 0 mode.
Typically, only the central zero mode |k| = 0 is excluded,
in which case use powerbox.tools.ignore_zero_absk.
calc_1d : bool, optional
If True, calculate the 1D power spectrum.
k_bins : int, optional
The number of bins on which to calculate 1D PS.
mu_min : float, optional
The minimum value of
:math:`\\cos(\theta), \theta = \arctan (k_\\perp/k_\\parallel)`
for all calculated PS.
If None, all modes are included.
bin_ave : bool, optional
If True, return the center value of each kperp and kpar bin
i.e. len(kperp) = ps_2d.shape[0].
If False, return the left edge of each bin
i.e. len(kperp) = ps_2d.shape[0] + 1.
interp : str, optional
If True, use linear interpolation to calculate the PS
at the points specified by interp_points_generator.
Note that this significantly slows down the calculation.
delta : bool, optional
Whether to convert the power P [mK^2 Mpc^{-3}] to the dimensionless
power :math:`\\delta^2` [mK^2].
Default is True.
interp_points_generator : callable, optional
A function that generates the points at which to interpolate the PS.
See powerbox.tools.get_power documentation for more details.
get_variance : bool, optional
Whether to calculate the variance of the PS.
Default is False.
transform_ps2d : Callable, optional
A function that takes in a CylindricalPS object and returns
a new CylindricalPS object.
transform_ps1d : Callable, optional
A function that takes in a SphericalPS object and returns
a new SphericalPS object.
get_variance : bool, optional
Whether to calculate the variance of the PS.
Default is False.
interp : bool, optional
If True, use linear interpolation to calculate the PS
at the points specified by interp_points_generator.
Note that this significantly slows down the calculation.
deltasq : bool, optional
Whether to convert the power P [mK^2 Mpc^{-3}] to the dimensionless
power :math:`\\delta^2` [mK^2].
Default is True.
transform_ps1d : Callable, optional
A function that takes in a SphericalPS object and returns
a new SphericalPS object.
transform_ps2d : Callable, optional
A function that takes in a CylindricalPS object and returns
a new CylindricalPS object.
Returns
-------
ps1d : SphericalPS or None
The 1D power spectrum.
None if calc_1d is False.
ps2d : CylindricalPS or None
The 2D power spectrum.
None if calc_2d is False.
"""
validate(box, "temperature")
validate(box_length, "length")
if mu_min is not None:
if interp is None:
k_weights_1d_input = k_weights_1d
def mask_fnc(freq, absk):
kz_mesh = np.zeros((len(freq[0]), len(freq[1]), len(freq[2])))
kz = freq[2]
for i in range(len(kz)):
kz_mesh[:, :, i] = kz[i]
phi = np.arccos(kz_mesh / absk)
mu_mesh = abs(np.cos(phi))
kmag = _magnitude_grid([c for i, c in enumerate(freq) if i < 2])
return np.logical_and(mu_mesh > mu_min, k_weights_1d_input(freq, kmag))
k_weights_1d = mask_fnc
if interp is not None:
k_weights_1d = ignore_zero_ki
interp_points_generator = above_mu_min_angular_generator(mu=mu_min)
else:
k_weights_1d = ignore_zero_ki
if interp is not None:
interp_points_generator = regular_angular_generator()
prefactor_fnc = power2delta if deltasq else None
ps1d, ps2d = calculate_ps(
chunk=box,
box_length=box_length,
chunk_redshift=box_redshift,
calc_2d=calc_2d,
kperp_bins=kperp_bins,
k_weights_2d=k_weights_2d,
k_weights_1d=k_weights_1d,
log_bins=log_bins,
calc_1d=calc_1d,
k_bins=k_bins,
bin_ave=bin_ave,
interp=interp,
prefactor_fnc=prefactor_fnc,
interp_points_generator=interp_points_generator,
get_variance=get_variance,
)
if calc_1d and transform_ps1d is not None:
ps1d = transform_ps1d(ps1d)
if calc_2d and transform_ps2d is not None:
ps2d = transform_ps2d(ps2d)
return ps1d, ps2d
[docs]
def bin_kpar(
bins_kpar: int | un.Quantity | None = None,
log_kpar: bool | None = False,
interp_kpar: bool | None = False,
crop_kperp: tuple[int, int] | None = None,
crop_kpar: tuple[int, int] | None = None,
):
"""
Bins the kpar axis of a CylindricalPS object.
Parameters
----------
bins_kpar : int, astropy.units.Quantity, or None, optional
Number of bins (if int), array of bin edges (if array-like),
or None to use default binning (half the number of original kpar bins).
Default is None.
log_kpar : bool or None, optional
If True, use logarithmic binning for kpar.
If False or None, use linear binning. Default is False.
interp_kpar : bool or None, optional
If True, interpolate the power spectrum onto the new kpar bins.
If False or None, aggregate using bin means. Default is False.
crop_kperp : tuple of int or None, optional
Tuple specifying the (start, end) indices to crop the kperp axis after binning.
If None, no cropping is applied. Default is None.
crop_kpar : tuple of int or None, optional
Tuple specifying the (start, end) indices to crop the kpar axis after binning.
If None, no cropping is applied. Default is None.
Returns
-------
transform_ps : callable
A function that takes a CylindricalPS object and returns a new CylindricalPS
object with the binned kpar axis.
Raises
------
ValueError
If `bins_kpar` is not an int or a valid array of bin edges/centres.
Notes
-----
- If `interp_kpar` is True, the power spectrum and its variance (if present) are
interpolated onto the new kpar bins.
- If `interp_kpar` is False, the power spectrum and its variance are aggregated
using the mean within each bin.
- Cropping is applied after binning/interpolation.
"""
def transform_ps(ps: CylindricalPS):
if bins_kpar is None:
if log_kpar:
final_bins_kpar = (
np.logspace(
np.log10(ps.kpar.value[0]),
np.log10(ps.kpar.value[-1]),
len(ps.kpar) // 2 + 1,
)
* ps.kpar.unit
)
else:
final_bins_kpar = np.linspace(
ps.kpar[0], ps.kpar[-1], len(ps.kpar) // 2 + 1
)
elif isinstance(bins_kpar, int):
if log_kpar:
final_bins_kpar = (
np.logspace(
np.log10(ps.kpar.value[0]),
np.log10(ps.kpar.value[-1]),
bins_kpar,
)
* ps.kpar.unit
)
else:
final_bins_kpar = np.linspace(ps.kpar[0], ps.kpar[-1], bins_kpar)
else:
if not isinstance(bins_kpar, np.ndarray):
raise ValueError("bins_kpar must be an array of bin edges or centres.")
final_bins_kpar = bins_kpar
if interp_kpar:
mask = np.isnan(np.nanmean(ps.ps, axis=-1))
interp_fnc = RegularGridInterpolator(
(ps.kperp.value[~mask], ps.kpar.value),
ps.ps[~mask].squeeze(),
bounds_error=False,
fill_value=np.nan,
)
kperp_grid, kpar_grid = np.meshgrid(
ps.kperp, final_bins_kpar, indexing="ij", sparse=True
)
final_ps = interp_fnc((kperp_grid, kpar_grid)) * ps.ps.unit
if ps.variance is not None:
interp_fnc = RegularGridInterpolator(
(ps.kperp[~mask].value, ps.kpar.value),
ps.variance[~mask].squeeze(),
bounds_error=False,
fill_value=np.nan,
)
kperp_grid, kpar_grid = np.meshgrid(
ps.kperp, final_bins_kpar, indexing="ij", sparse=True
)
final_var = interp_fnc((kperp_grid, kpar_grid)) * ps.variance.unit
idxs = np.digitize(ps.kpar.value, final_bins_kpar.value) - 1
final_nmodes = np.zeros(len(final_bins_kpar))
for i in range(len(final_bins_kpar)):
final_nmodes[i] = np.sum(idxs == i)
else:
final_ps = np.zeros((len(ps.kperp), len(final_bins_kpar) - 1))
final_nmodes = np.zeros(len(final_bins_kpar) - 1)
idxs = np.digitize(ps.kpar.value, final_bins_kpar.value) - 1
if ps.variance is not None:
final_var = np.zeros((len(ps.kperp), len(final_bins_kpar)))
for i in range(len(final_bins_kpar) - 1):
m = idxs == i
final_ps[..., i] = np.nanmean(ps.ps.value[..., m], axis=-1)
final_nmodes[i] = np.sum(m)
if ps.variance is not None:
final_var[..., i] = np.nanmean(ps.variance.value[..., m], axis=-1)
if log_kpar:
final_bins_kpar = (
np.exp(
(
np.log(final_bins_kpar.value[1:])
+ np.log(final_bins_kpar.value[:-1])
)
/ 2
)
* ps.kpar.unit
)
else:
final_bins_kpar = (final_bins_kpar[1:] + final_bins_kpar[:-1]) / 2
final_ps = final_ps * ps.ps.unit
if ps.variance is not None:
final_var = final_var * ps.variance.unit
if crop_kperp is not None:
final_ps = final_ps[crop_kperp[0] : crop_kperp[1]]
if ps.variance is not None:
final_var = final_var[crop_kperp[0] : crop_kperp[1]]
if crop_kpar is not None:
final_ps = final_ps[:, crop_kpar[0] : crop_kpar[1]]
if ps.variance is not None:
final_var = final_var[:, crop_kpar[0] : crop_kpar[1]]
if ps.n_modes.ndim != 1 and np.all(ps.n_modes[:, :1] == ps.n_modes):
raise ValueError("Must provide only kperp n_modes in a 1D array.")
final_kperp_modes = (
ps.n_modes[crop_kperp[0] : crop_kperp[1]]
if crop_kperp is not None
else ps.n_modes
)
final_kpar_modes = (
final_nmodes[crop_kpar[0] : crop_kpar[1]]
if crop_kpar is not None
else final_nmodes
)
kpar_grid, kperp_grid = np.meshgrid(
final_kperp_modes, final_kpar_modes, indexing="ij"
)
final_nmodes = np.sqrt(kperp_grid**2 + kpar_grid**2)
return CylindricalPS(
ps=final_ps,
kperp=ps.kperp
if crop_kperp is None
else ps.kperp[crop_kperp[0] : crop_kperp[1]],
kpar=final_bins_kpar
if crop_kpar is None
else final_bins_kpar[crop_kpar[0] : crop_kpar[1]],
redshift=ps.redshift,
n_modes=final_nmodes,
variance=final_var if ps.variance is not None else None,
is_deltasq=ps.is_deltasq,
)
return transform_ps
[docs]
def cylindrical_to_spherical(
ps,
kperp,
kpar,
nbins=16,
weights=1,
interp=False,
mu_min=None,
generator=None,
bin_ave=True,
):
r"""
Angularly average 2D PS to 1D PS.
Parameters
----------
ps : np.ndarray
The 2D power spectrum of shape [len(kperp), len(kpar)].
kperp : np.ndarray
Values of kperp.
kpar : np.ndarray
Values of kpar.
nbins : int, optional
The number of bins on which to calculate 1D PS. Default is 16
weights : np.ndarray, optional
Weights to apply to the PS before averaging.
Note that to obtain a 1D PS from the 2D PS that is consistent with
the 1D PS obtained directly from the 3D PS, the weights should be
the number of modes in each bin of the 2D PS (`Nmodes`).
interp : bool, optional
If True, use linear interpolation to calculate the 1D PS.
mu_min : float, optional
The minimum value of
:math:`\\cos(\theta), \theta = \arctan (k_\\perp/k_\\parallel)`
for all calculated PS.
If None, all modes are included.
generator : callable, optional
A function that generates the points at which to interpolate the PS.
See powerbox.tools.get_power documentation for more details.
bin_ave : bool, optional
If True, return the center value of each k bin
i.e. len(k) = ps_1d.shape[0].
If False, return the left edge of each bin
i.e. len(k) = ps_1d.shape[0] + 1.
"""
if mu_min is not None and interp and generator is None:
generator = above_mu_min_angular_generator(mu=mu_min)
if mu_min is not None and not interp:
kpar_mesh, kperp_mesh = np.meshgrid(kpar, kperp)
theta = np.arctan(kperp_mesh / kpar_mesh)
mu_mesh = np.cos(theta)
weights = mu_mesh >= mu_min
ps_1d, k, sws = angular_average(
ps,
coords=[kperp, kpar],
bins=nbins,
weights=weights,
bin_ave=bin_ave,
log_bins=True,
return_sumweights=True,
interpolation_method="linear" if interp else None,
interp_points_generator=generator,
)
return ps_1d, k, sws