Source code for georges_core.distribution

"""
Georges-core beam distribution module.

This module provides a collection of functions and classes to deal with a beam distribution. After loading or
generating a distribution, methods are available to compute beam properties, such as mean, standard deviation,
Twiss parameters, emittance or the beam halo.
"""

from __future__ import annotations

import logging
import os
from typing import Any, Dict, List, Optional, Union

import numpy as _np
import numpy.typing as _npt
import pandas as _pd
from numba import njit

from .units import Q_ as _Q
from .units import ureg as _ureg

PARTICLE_TYPES = {"proton", "antiproton", "electron", "positron"}
PHASE_SPACE_DIMENSIONS = ["X", "PX", "Y", "PY", "DPP"]
DEFAULT_N_PARTICLES = int(1e5)


# Define all methods to generate the beam
[docs] def load_from_file(path: str = "", filename: str = "", file_format: str = "csv") -> _pd.DataFrame: """Load a distribution from a file Args: path (str, optional): Path to the file. Defaults to "". filename (str, optional): Name of the file. Defaults to "". file_format (str, optional): Format of the file. Defaults to "csv". Returns: _pd.DataFrame: Pandas dataframe with the distributions. """ if file_format == "csv": return _pd.read_csv(os.path.join(path, filename)) if file_format == "parquet": return _pd.read_parquet(os.path.join(path, filename)) raise DistributionException("Format of the file is incorrect. Only csv or parquet are available.")
[docs] def generate_from_5d_sigma_matrix( n: int, x: float = 0, px: float = 0, y: float = 0, py: float = 0, dpp: float = 0, s11: float = 0, s12: float = 0, s13: float = 0, s14: float = 0, s15: float = 0, s22: float = 0, s23: float = 0, s24: float = 0, s25: float = 0, s33: float = 0, s34: float = 0, s35: float = 0, s44: float = 0, s45: float = 0, s55: float = 0, matrix: Optional[_npt.NDArray[_np.float_]] = None, ) -> _npt.NDArray[_np.float_]: """ Args: n: x: px: y: py: dpp: s11: s12: s13: s14: s15: s22: s23: s24: s25: s33: s34: s35: s44: s45: s55: matrix: Returns: """ # For performance considerations, see # https://software.intel.com/en-us/blogs/2016/06/15/faster-random-number-generation-in-intel-distribution-for-python try: import numpy.random_intel generator = numpy.random_intel.multivariate_normal except ModuleNotFoundError: import numpy.random generator = numpy.random.multivariate_normal s21 = s12 s31 = s13 s32 = s23 s41 = s14 s42 = s24 s43 = s34 s51 = s15 s52 = s25 s53 = s35 s54 = s45 if matrix is not None: assert matrix.shape == (5, 5) return generator([x, px, y, py, dpp], matrix, int(n)) # type: ignore[no-any-return] return generator( # type: ignore[no-any-return] [x, px, y, py, dpp], _np.array( [ [s11, s12, s13, s14, s15], [s21, s22, s23, s24, s25], [s31, s32, s33, s34, s35], [s41, s42, s43, s44, s45], [s51, s52, s53, s54, s55], ], ), int(n), )
[docs] class DistributionException(Exception): """Exception raised for errors in the Beam module.""" def __init__(self, m: str = ""): self.message = m
[docs] class Distribution: """Particle beam to be tracked in a beamline or accelerator model. The internal representation is essentially a `pandas` `DataFrame`. """ def __init__(self, distribution: Optional[_pd.DataFrame] = None): """ Initialize a beam object from various sources of particle beam distribution. Args: distribution: distribution of particles to initialize the beam with. Should be pandas.DataFrame() friendly. """ try: self.__initialize_distribution(distribution) self.__dims = self.__distribution.shape[1] # type: ignore[has-type] except DistributionException: self.__dims = len(PHASE_SPACE_DIMENSIONS) self.__distribution = _pd.DataFrame(_np.zeros((1, self.__dims))) self.__distribution.columns = PHASE_SPACE_DIMENSIONS[: self.__dims] self.__n_particles = self.__distribution.shape[0] if self.__n_particles <= 0: raise DistributionException("Error, no particles in the beam.") self._halo = None # To force the first computation of the halo @property def distribution(self) -> _pd.DataFrame: """Return a dataframe containing the beam's particles distribution.""" return self.__distribution @property def dims(self) -> int: """Return the dimensions of the beam's phase-space.""" return self.__dims # type: ignore[no-any-return] @property def n_particles(self) -> int: """Return the number of particles in the beam's distribution.""" return self.__n_particles # type: ignore[no-any-return] @property def mean(self) -> _pd.Series: """Return a dataframe containing the first order moments of each dimension.""" return self.__distribution.mean() @property def std(self) -> _pd.Series: """Return a dataframe containing the second order moments of each dimension.""" return self.__distribution.std() @property def emit(self) -> Dict[str, float]: """Return the emittance of the beam in both planes""" tw = njit(self.compute_twiss)(self.__distribution.values) return {"X": tw[0], "Y": tw[5]} @property def sigma(self) -> _pd.Series: """Return the sigma matrix of the beam""" return self.__distribution.cov() covariance = sigma @property def twiss(self) -> Dict[str, float]: """Return the Twiss parameters of the beam""" tw = njit(self.compute_twiss)(self.__distribution.values) return { "emit_x": tw[0], "beta_x": tw[1], "alpha_x": tw[2], "disp_x": tw[3], "disp_xp": tw[4], "emit_y": tw[5], "beta_y": tw[6], "alpha_y": tw[7], "disp_y": tw[8], "disp_yp": tw[9], } @property def halo(self, dimensions: Optional[Union[str, List[str]]] = None) -> _pd.DataFrame: """Return a dataframe containing the 1st, 5th, 95th and 99th percentiles of each dimensions.""" if dimensions is None: dimensions = ["X", "Y", "PX", "PY"] if self._halo is None: self._halo = _pd.concat( [ self.__distribution[dimensions].quantile(0.01), self.__distribution[dimensions].quantile(0.05), self.__distribution[dimensions].quantile(0.2), self.__distribution[dimensions].quantile(0.8), self.__distribution[dimensions].quantile(0.95), self.__distribution[dimensions].quantile(0.99), ], axis=1, ).rename(columns={0.01: "1%", 0.05: "5%", 0.2: "20%", 0.8: "80%", 0.95: "95%", 0.99: "99%"}) return self._halo def __getitem__(self, item: str) -> _pd.Series: if item not in PHASE_SPACE_DIMENSIONS[: self.__dims]: raise DistributionException("Trying to access an invalid data from a beam.") return self.__distribution[item] def __initialize_distribution(self, distribution: _pd.DataFrame = None) -> None: """Try setting the internal pandas.DataFrame with a distribution.""" if distribution is not None: self.__distribution = distribution else: logging.warning("Distribution is None: generate a default beam") raise DistributionException("") self.__dims = self.__distribution.shape[1] if self.__dims < 4 or self.__dims > len(PHASE_SPACE_DIMENSIONS): missing_key = list({"X", "Y", "PX", "PY"} - set(distribution.columns.values)) logging.warning( "Trying to initialize a beam distribution with invalid dimensions. " f"{missing_key} are missing. Generate a default beam", ) raise DistributionException("") self.__distribution[list(set(PHASE_SPACE_DIMENSIONS) - set(self.__distribution.columns.values))] = 0
[docs] @staticmethod def compute_twiss(beam: _npt.NDArray[_np.float_]) -> _npt.NDArray[_np.float_]: """Compute Twiss parameters of a beam From http://nicadd.niu.edu/~syphers/tutorials/analyzeTrack.html Args: beam (_np.ndarray): beam input distribution Returns: _np.array: An array with the Twiss parameters """ s11 = _np.var(beam[:, 0]) s22 = _np.var(beam[:, 1]) s33 = _np.var(beam[:, 2]) s44 = _np.var(beam[:, 3]) s55 = _np.var(beam[:, 4]) if s55 == 0: s12 = _np.cov(beam[:, 0], beam[:, 1])[0, 1] s34 = _np.cov(beam[:, 2], beam[:, 3])[0, 1] emit_x = _np.sqrt(_np.linalg.det(_np.cov(beam[:, 0], beam[:, 1]))) emit_y = _np.sqrt(_np.linalg.det(_np.cov(beam[:, 2], beam[:, 3]))) beta_x = s11 / emit_x alpha_x = -s12 / emit_x disp_x = 0 disp_xp = 0 beta_y = s33 / emit_y alpha_y = -s34 / emit_y disp_y = 0 disp_yp = 0 else: a_xxp = _np.mean((beam[:, 0] - _np.mean(beam[:, 0])) * (beam[:, 1] - _np.mean(beam[:, 1]))) a_xd = _np.mean((beam[:, 0] - _np.mean(beam[:, 0])) * (beam[:, 4] - _np.mean(beam[:, 4]))) a_xpd = _np.mean((beam[:, 1] - _np.mean(beam[:, 1])) * (beam[:, 4] - _np.mean(beam[:, 4]))) a_yyp = _np.mean((beam[:, 2] - _np.mean(beam[:, 2])) * (beam[:, 3] - _np.mean(beam[:, 3]))) a_yd = _np.mean((beam[:, 2] - _np.mean(beam[:, 2])) * (beam[:, 4] - _np.mean(beam[:, 4]))) a_ypd = _np.mean((beam[:, 3] - _np.mean(beam[:, 3])) * (beam[:, 4] - _np.mean(beam[:, 4]))) disp_x = a_xd / s55 disp_xp = a_xpd / s55 ebeta_x = s11 - a_xd**2 / s55 egamma_x = s22 - a_xpd**2 / s55 ealpha_x = -a_xxp + a_xpd * a_xd / s55 emit_x = _np.sqrt(ebeta_x * egamma_x - ealpha_x**2) beta_x = ebeta_x / emit_x alpha_x = ealpha_x / emit_x disp_y = a_yd / s55 disp_yp = a_ypd / s55 ebeta_y = s33 - a_yd**2 / s55 egamma_y = s44 - a_ypd**2 / s55 ealpha_y = -a_yyp + a_ypd * a_yd / s55 emit_y = _np.sqrt(ebeta_y * egamma_y - ealpha_y**2) beta_y = ebeta_y / emit_y alpha_y = ealpha_y / emit_y return _np.array([emit_x, beta_x, alpha_x, disp_x, disp_xp, emit_y, beta_y, alpha_y, disp_y, disp_yp])
[docs] @classmethod def from_csv(cls, path: str = "", filename: str = "") -> Distribution: """ Args: path (str): Path to the csv file filename (str): filename Returns: An instance of the class with the distribution """ return cls(distribution=load_from_file(path, filename, file_format="csv"))
[docs] @classmethod def from_parquet(cls, path: str = "", filename: str = "") -> Distribution: """ Args: path (str): path to the parquet file filename (str): filename Returns: An instance of the class with the distribution """ return cls(distribution=load_from_file(path, filename, file_format="parquet"))
[docs] @classmethod def from_5d_sigma_matrix( cls, n: int, x: _Q = 0 * _ureg.m, px: float = 0, y: _Q = 0 * _ureg.m, py: float = 0, dpp: float = 0, s11: _Q = 0 * _ureg.m**2, s12: float = 0, s13: float = 0, s14: float = 0, s15: float = 0, s22: float = 0, s23: float = 0, s24: float = 0, s25: float = 0, s33: _Q = 0 * _ureg.m**2, s34: float = 0, s35: float = 0, s44: float = 0, s45: float = 0, s55: float = 0, matrix: Optional[Any] = None, ) -> Distribution: """ Initialize a beam with a 5D particle distribution from a Sigma matrix. Args: n (int): number of particles x (_Q): Horizontal position [m] px (float): Horizontal component momentum of unit vector y (_Q): Vertical position [m] py (float): Vertical component momentum of unit vector dpp (float): Momentum spread s11 (): s12 (): s13 (): s14 (): s15 (): s22 (): s23 (): s24 (): s25 (): s33 (): s34 (): s35 (): s44 (): s45 (): s55 (): matrix (): Returns: An instance of the class with the distribution """ return cls( distribution=_pd.DataFrame( generate_from_5d_sigma_matrix( n=int(n), x=x.m_as("m"), px=px, y=y.m_as("m"), py=py, dpp=dpp, s11=s11.m_as("m**2"), s12=s12, s13=s13, s14=s14, s15=s15, s22=s22, s23=s23, s24=s24, s25=s25, s33=s33.m_as("m**2"), s34=s34, s35=s35, s44=s44, s45=s45, s55=s55, matrix=matrix, ), columns=["X", "PX", "Y", "PY", "DPP"], ), )
[docs] @classmethod def from_5d_multigaussian_distribution( cls, n: int = DEFAULT_N_PARTICLES, x: _Q = 0 * _ureg.m, px: float = 0, y: _Q = 0 * _ureg.m, py: float = 0, dpp: float = 0, xrms: _Q = 0 * _ureg.m, pxrms: float = 0, yrms: _Q = 0 * _ureg.m, pyrms: float = 0, dpprms: float = 0, ) -> Distribution: """ Initialize a beam with a 5D particle distribution from rms quantities. Args: n (int): number of particles x (_Q): Horizontal position [m] px (): Horizontal component momentum of unit vector y (_Q): Vertical position [m] py (float): Vertical component momentum of unit vector dpp (float): Momentum spread xrms (_Q): Horizontal Gaussian sigma [m] pxrms (float): Sigma of the horizontal component of unit momentum yrms (_Q): Vertical Gaussian sigma [m] pyrms (float): Sigma of the vertical component of unit momentum dpprms (float): Relative momentum spread Returns: An instance of the class with the distribution """ return cls( distribution=_pd.DataFrame( generate_from_5d_sigma_matrix( n=int(n), x=x.m_as("m"), px=px, y=y.m_as("m"), py=py, dpp=dpp, s11=xrms.m_as("m") ** 2, s12=0, s22=pxrms**2, s33=yrms.m_as("m") ** 2, s34=0, s44=pyrms**2, s55=dpprms**2, ), columns=["X", "PX", "Y", "PY", "DPP"], ), )
[docs] @classmethod def from_twiss_parameters( cls, n: int = DEFAULT_N_PARTICLES, x: _Q = 0 * _ureg.m, px: float = 0, y: _Q = 0 * _ureg.m, py: float = 0, dpp: float = 0, betax: _Q = 1 * _ureg.m, alphax: float = 0, betay: _Q = 1 * _ureg.m, alphay: float = 0, emitx: _Q = 1e-6 * _ureg.m * _ureg.radians, emity: _Q = 1e-6 * _ureg.m * _ureg.radians, dispx: _Q = 0 * _ureg.m, dispy: _Q = 0 * _ureg.m, dispxp: float = 0, dispyp: float = 0, dpprms: float = 0, ) -> Distribution: """ Initialize a beam with a 5D particle distribution from Twiss parameters. Args: n (int): number of particles x (_Q): Horizontal position [m] px (float): Horizontal component momentum of unit vector y (_Q): Vertical position [m] py (float): Vertical component momentum of unit vector dpp (float): Momentum spread betax (_Q): Horizontal beta function [m] alphax (): Horizontal alpha function betay (_Q): Vertical beta function [m] alphay (float): Vertical alpha function emitx (_Q): Horizontal emittance [m rad] emity (_Q): Vertical emittance [m rad] dispx (_Q): Horizontal dispersion function [m] dispy (_Q): Vertical dispersion function [m] dispxp (float): Horizontal angular dispersion function dispyp (float): Vertical angular dispersion function dpprms (float): Relative momentum spread Returns: An instance of the class with the distribution """ gammax = (1 + alphax**2) / betax.m_as("m") gammay = (1 + alphay**2) / betay.m_as("m") s11 = betax.m_as("m") * emitx.m_as("m rad") + (dispx.m_as("m") * dpprms) ** 2 s12 = -alphax * emitx.m_as("m rad") + dispx.m_as("m") * dispxp * dpprms**2 s22 = gammax * emitx.m_as("m rad") + (dispxp * dpprms) ** 2 s33 = betay.m_as("m") * emity.m_as("m rad") + (dispy.m_as("m") * dpprms) ** 2 s34 = -alphay * emity.m_as("m rad") + dispy.m_as("m") * dispyp * dpprms**2 s44 = gammay * emity.m_as("m rad") + (dispyp * dpprms) ** 2 s13 = dispx.m_as("m") * dispy.m_as("m") * dpprms**2 s23 = dispxp * dispy.m_as("m") * dpprms**2 s14 = dispx.m_as("m") * dispyp * dpprms**2 s24 = dispxp * dispyp * dpprms**2 s15 = dispx.m_as("m") * dpprms**2 s25 = dispxp * dpprms**2 s35 = dispy.m_as("m") * dpprms**2 s45 = dispyp * dpprms**2 s55 = dpprms**2 return cls( distribution=_pd.DataFrame( generate_from_5d_sigma_matrix( n=int(n), x=x.m_as("m"), px=px, y=y.m_as("m"), py=py, dpp=dpp, s11=s11, s12=s12, s15=s15, s25=s25, s22=s22, s33=s33, s34=s34, s35=s35, s44=s44, s45=s45, s13=s13, s23=s23, s14=s14, s24=s24, s55=s55, ), columns=["X", "PX", "Y", "PY", "DPP"], ), )