"""High-level interface for Zgoubi using sequences.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Any, List, Tuple, Mapping, Union
from dataclasses import dataclass
import numpy as _np
import pandas as _pd
from .. import particles as _particles
from ..particles import Proton as _Proton
from ..kinematics import Kinematics as _Kinematics
from .elements import Element as _Element
from .elements import ElementClass as _ElementClass
from .betablock import BetaBlock as _BetaBlock
from ..codes_io import load_madx_twiss_headers, load_madx_twiss_table, load_transport_input_file, \
transport_element_factory
from ..codes_io import BDSimOutput
from .. import ureg as _ureg
if TYPE_CHECKING:
from ..particles import ParticuleType as _ParticuleType
__all__ = ['SequenceException',
'SequenceMetadata',
'Sequence',
'PlacementSequence',
'TwissSequence',
'SurveySequence',
'TransportSequence',
'BDSIMSequence',
]
_BDSIM_TO_MAD_CONVENTION: Mapping[str, str] = {
'Rcol': 'RectangularCollimator',
'Ecol': 'EllipticalCollimator',
}
[docs]class SequenceException(Exception):
"""Exception raised for errors when using zgoubidoo.Sequence"""
def __init__(self, m):
self.message = m
class SequenceMetadataType(type):
"""TODO"""
pass
class SequenceType(type):
"""TODO"""
pass
[docs]class Sequence(metaclass=SequenceType):
"""Sequence.
"""
def __init__(self,
name: str = '',
data=None,
metadata: Optional[SequenceMetadata] = None,
element_keys: Optional[Mapping[str, str]] = None,
):
"""
Args:
name: the name of the physics
data:
metadata:
element_keys:
"""
self._name: str = name
self._data: Any = data
self._metadata = metadata or SequenceMetadata()
self._element_keys = element_keys or {
k: k for k in [
'L',
]
}
def __repr__(self):
return repr(self._data)
@property
def name(self) -> str:
"""Provides the name of the sequence."""
return self._name
@property
def metadata(self) -> SequenceMetadata:
"""Provides the metadata associated with the sequence."""
return self._metadata
@property
def kinematics(self) -> _Kinematics:
"""Provides the kinematics data associated with the sequence metadata."""
return self.metadata.kinematics
@property
def particle(self) -> _ParticuleType:
"""Provides the particle type associated with the sequence metadata."""
return self.metadata.particle
@property
def betablock(self) -> _BetaBlock:
"""TODO"""
return _BetaBlock()
[docs] def to_df(self, df: Optional[_pd.DataFrame] = None, strip_units: bool = False) -> _pd.DataFrame:
"""TODO"""
if self._data is None and df is None:
return _pd.DataFrame()
else:
df = df if df is not None else _pd.DataFrame(self._data)
if strip_units:
def safe_convert(unit: str):
def do(_):
if _np.isnan(_):
return _
else:
return _.m_as(unit)
return do
df['AT_ENTRY'] = df['AT_ENTRY'].apply(safe_convert('meter'))
df['AT_CENTER'] = df['AT_CENTER'].apply(safe_convert('meter'))
df['AT_EXIT'] = df['AT_EXIT'].apply(safe_convert('meter'))
try:
df['L'] = df['L'].apply(safe_convert('meter'))
except KeyError:
pass
try:
df['ANGLE'] = df['ANGLE'].apply(safe_convert('radian'))
except KeyError:
pass
try:
df['K1'] = df['K1'].apply(safe_convert('1/m**2'))
except KeyError:
pass
try:
df['K2'] = df['K2'].apply(safe_convert('1/m**3'))
except KeyError:
pass
try:
df['E1'] = df['E1'].apply(safe_convert('radian'))
except KeyError:
pass
try:
df['E2'] = df['E2'].apply(safe_convert('radian'))
except KeyError:
pass
try:
df['TILT'] = df['TILT'].apply(safe_convert('radian'))
except KeyError:
pass
return df
df = property(to_df)
[docs] def apply(self, func, axis=0):
"""
Args:
func:
axis:
Returns:
"""
return self.df.apply(func, axis)
[docs] @staticmethod
def from_madx_twiss(filename: str = 'twiss.outx',
path: str = '.',
columns: List = None,
from_element: str = None,
to_element: str = None, ) -> Sequence:
"""
TODO
Args:
filename: name of the Twiss table file
path: path to the Twiss table file
columns: the list of columns in the Twiss file
from_element:
to_element:
Returns:
Examples:
TODO
"""
return TwissSequence(filename=filename,
path=path,
columns=columns,
from_element=from_element,
to_element=to_element,
)
[docs] @staticmethod
def from_transport(filename: str = 'transport.txt',
path: str = '.',
):
"""
TODO
Args:
filename:
path:
Returns:
"""
return TransportSequence(filename=filename,
path=path)
[docs] @staticmethod
def from_survey():
"""
TODO
Returns:
"""
return SurveySequence()
[docs] @staticmethod
def from_bdsim(filename: str = 'output.root',
path: str = '.',
):
"""
TODO
Returns:
"""
return BDSIMSequence(filename=filename,
path=path,
from_element=None,
to_element=None
)
[docs]class PlacementSequence(Sequence):
"""Placement Sequence.
"""
def __init__(self,
name: str = '',
data: Optional[List[Tuple[_Element,
_ureg.Quantity,
_ureg.Quantity,
_ureg.Quantity]]] = None,
metadata: Optional[SequenceMetadata] = None,
reference_placement: str = 'ENTRY',
element_keys: Optional[Mapping[str, str]] = None,
):
"""
Args:
name: the name of the physics
data: the list of commands composing the physics
metadata:
reference_placement:
element_keys:
"""
super().__init__(name=name, data=data or [], metadata=metadata, element_keys=element_keys)
self._reference_placement = reference_placement
self._betablock: Optional[_BetaBlock] = None
self._expanded = False
@property
def expanded(self):
return self._expanded
@property
def betablock(self) -> _BetaBlock:
return self._betablock
@betablock.setter
def betablock(self, betablock: _BetaBlock):
self._betablock = betablock
[docs] def to_df(self, df: Optional[_pd.DataFrame] = None, strip_units: bool = False) -> _pd.DataFrame:
"""
Args:
df:
strip_units:
Returns:
"""
if len(self._data) == 0:
return _pd.DataFrame()
if df is None:
df = _pd.DataFrame([{**e[0].data, **{
'AT_ENTRY': e[1],
'AT_CENTER': e[2],
'AT_EXIT': e[3]
}} for e in self._data])
df.name = self.name
df.set_index('NAME', inplace=True)
return super().to_df(df, strip_units=strip_units)
df = property(to_df)
[docs] def add(self,
element_or_sequence: Union[_Element, Sequence]):
"""
Args:
element_or_sequence:
Returns:
"""
self.place(element_or_sequence,
at_entry=0,
after=self._data[-1][0])
[docs] def place(self,
element_or_sequence: Union[_Element, Sequence],
at: Optional[_ureg.Quantity] = None,
at_entry: Optional[_ureg.Quantity] = None,
at_center: Optional[_ureg.Quantity] = None,
at_exit: Optional[_ureg.Quantity] = None,
after: Optional[str] = None,
before: Optional[str] = None,
) -> PlacementSequence:
"""
Args:
element_or_sequence:
at:
at_center:
at_entry:
at_exit:
after:
before:
Returns:
"""
if before is not None and after is not None:
raise SequenceException("'preceeding' and 'following' cannot be defined at the same time.")
ats = locals()
if after is not None:
for e in self._data:
if e[0]['NAME'] == after:
for k in ats:
if k.startswith('at') and ats[k] is not None:
ats[k] += e[3]
if before is not None:
for e in self._data:
if e[0]['NAME'] == before:
for k in ats:
if k.startswith('at') and ats[k] is not None:
ats[k] *= -1
ats[k] += e[1] - element_or_sequence.data['L']
if ats['at'] is not None:
ats[f"at_{self._reference_placement.lower()}"] = ats['at']
def compute(d):
"""Compute placement quantities."""
if d['at_entry'] is None:
if d['at_center'] is not None:
d['at_entry'] = d['at_center'] - element_or_sequence.data[self._element_keys['L']] / 2.0
elif d['at_exit'] is not None:
d['at_entry'] = d['at_exit'] - element_or_sequence.data[self._element_keys['L']]
if d['at_center'] is None:
if d['at_entry'] is not None:
d['at_center'] = d['at_entry'] + element_or_sequence.data[self._element_keys['L']] / 2.0
elif d['at_exit'] is not None:
d['at_center'] = d['at_exit'] - element_or_sequence.data[self._element_keys['L']] / 2.0
if d['at_exit'] is None:
if d['at_entry'] is not None:
d['at_exit'] = d['at_entry'] + element_or_sequence.data[self._element_keys['L']]
elif d['at_center'] is not None:
d['at_exit'] = d['at_center'] + element_or_sequence.data[self._element_keys['L']] / 2.0
return d
tmp = ats
tmp2 = tmp
while True:
_ = compute(tmp)
tmp, tmp2 = tmp2, _
if tmp == tmp2:
break # Fixed point
ats = tmp2
self._data.append((element_or_sequence, ats['at_entry'], ats['at_center'], ats['at_exit']))
return self
[docs] def place_after_last(self,
element_or_sequence: Union[_Element, Sequence],
at: Optional[_ureg.Quantity] = None,
at_entry: Optional[_ureg.Quantity] = None,
at_center: Optional[_ureg.Quantity] = None,
at_exit: Optional[_ureg.Quantity] = None,
) -> PlacementSequence:
"""
Args:
element_or_sequence:
at:
at_center:
at_entry:
at_exit:
Returns:
"""
self._data.sort(key=lambda _: _[1])
offset = self._data[-1][3]
if at is None and at_entry is None and at_center is None and at_exit is None:
at = 0.0 * _ureg.m
if at is not None:
at += offset
if at_entry is not None:
at_entry += offset
if at_center is not None:
at_center += offset
if at_exit is not None:
at_exit += offset
return self.place(element_or_sequence=element_or_sequence,
at=at,
at_entry=at_entry,
at_center=at_center,
at_exit=at_exit)
[docs] def place_before_first(self,
element_or_sequence: Union[_Element, Sequence],
at: Optional[_ureg.Quantity] = None,
at_entry: Optional[_ureg.Quantity] = None,
at_center: Optional[_ureg.Quantity] = None,
at_exit: Optional[_ureg.Quantity] = None,
) -> PlacementSequence:
"""
Args:
element_or_sequence:
at:
at_center:
at_entry:
at_exit:
Returns:
"""
self._data.sort(key=lambda _: _[1])
offset = self._data[0][1]
if at is not None:
at = offset - at - element_or_sequence.data['L']
if at_entry is not None:
at_entry = offset - at_entry - element_or_sequence['L']
if at_center is not None:
at_center = offset - at_center - element_or_sequence['L']
if at_exit is not None:
at_exit = offset - at_exit - element_or_sequence['L']
return self.place(element_or_sequence=element_or_sequence,
at=at,
at_entry=at_entry,
at_center=at_center,
at_exit=at_exit)
[docs] def expand(self, drift_element: _ElementClass = _Element.Drift) -> PlacementSequence:
"""
TODO Use namedtuples
Args:
drift_element:
Returns:
"""
self._data.sort(key=lambda _: _[1])
at = 0 * _ureg.m
expanded = []
for e in self._data:
length = (e[1] - at).m_as('m')
if length > 1e-6:
expanded.append((drift_element(f"D_{e[0].NAME}", L=length * _ureg.m),
at,
at + length * _ureg.m / 2,
at + length * _ureg.m,
))
expanded.append(e)
at = e[3]
self._data = expanded
self._expanded = True
return self
[docs] def reverse(self) -> PlacementSequence:
"""
Returns:
"""
length = self._data[-1][3]
self._data = self._data[::-1]
self._data = [
(e, length - at_entry, length - at_center, length - at_exit) for e, at_entry, at_center, at_exit in
self._data
]
return self
[docs] def sort(self, reverse: bool = False) -> PlacementSequence:
"""
Args:
reverse:
Returns:
"""
self._data.sort(key=lambda e: e[2], reverse=reverse)
return self
[docs] def join(self, other):
pass
[docs]class TwissSequence(Sequence):
"""
TODO
"""
def __init__(self,
filename: str = 'twiss.outx',
path: str = '.',
*,
columns: List = None,
from_element: str = None,
to_element: str = None,
element_keys: Optional[Mapping[str, str]] = None,
):
"""
Args:
filename: the name of the physics
path:
columns:
from_element:
to_element:
element_keys:
"""
twiss_headers = load_madx_twiss_headers(filename, path)
twiss_table = load_madx_twiss_table(filename, path, columns).loc[from_element:to_element]
particle_name = twiss_headers['PARTICLE'].capitalize()
p = getattr(_particles, particle_name if particle_name != 'Default' else 'Proton')
k = _Kinematics(float(twiss_headers['PC']) * _ureg.GeV_c, particle=p)
super().__init__(name=twiss_headers['NAME'],
data=twiss_table,
metadata=SequenceMetadata(data=twiss_headers, kinematics=k, particle=p),
element_keys=element_keys
)
@property
def betablock(self) -> _BetaBlock:
"""TODO"""
try:
return _BetaBlock(
BETA11=self.df.iloc[0]['BETA11'] * _ureg.m,
ALPHA11=self.df.iloc[0]['ALPHA11'],
BETA22=self.df.iloc[0]['BETA22'] * _ureg.m,
ALPHA22=self.df.iloc[0]['ALPHA22'],
DISP1=self.df.iloc[0]['DISP1'] * _ureg.m,
DISP2=self.df.iloc[0]['DISP2'],
DISP3=self.df.iloc[0]['DISP3'] * _ureg.m,
DISP4=self.df.iloc[0]['DISP4'],
EMIT1=self.metadata['EX'],
EMIT2=self.metadata['EY'],
EMIT3=self.metadata['ET'],
)
except KeyError:
try:
return _BetaBlock(
BETA11=self.df.iloc[0]['BETX'] * _ureg.m,
ALPHA11=self.df.iloc[0]['ALFX'],
BETA22=self.df.iloc[0]['BETY'] * _ureg.m,
ALPHA22=self.df.iloc[0]['ALFY'],
DISP1=self.df.iloc[0]['DX'] * _ureg.m,
DISP2=self.df.iloc[0]['DPX'],
DISP3=self.df.iloc[0]['DY'] * _ureg.m,
DISP4=self.df.iloc[0]['DPY'],
EMIT1=self.metadata['EX'],
EMIT2=self.metadata['EY'],
EMIT3=self.metadata['ET'],
)
except KeyError:
return _BetaBlock()
[docs] def to_df(self) -> _pd.DataFrame:
"""TODO"""
return self._data
df = property(to_df)
[docs]class TransportSequence(Sequence):
"""
TODO
"""
from ..codes_io.transport import TransportInputFlavor, TransportInputOriginalFlavor
def __init__(self,
filename: str,
path: str = '.',
flavor: TransportInputFlavor = TransportInputOriginalFlavor,
):
"""
Args:
filename: the name of the physics
path:
flavor:
"""
transport_input = load_transport_input_file(filename, path)
data = []
sequence_metadata = SequenceMetadata()
for line in transport_input:
if len(line.strip()) == 0:
continue
d = line.rsplit(';', 1)[0].split()
if d[0].startswith('-'):
continue
try:
float(d[0])
except ValueError:
continue
data.append(transport_element_factory(d, sequence_metadata, flavor)[0])
super().__init__(name='TRANSPORT',
data=[d for d in data if d is not None],
metadata=sequence_metadata,
)
[docs] def to_df(self):
dicts = list(map(dict, self._data))
counters = {}
for d in dicts:
if d['NAME'] is None:
counters[d['KEYWORD']] = counters.get(d['KEYWORD'], 0) + 1
d['NAME'] = f"{d['KEYWORD']}_{counters[d['KEYWORD']]}"
return _pd.DataFrame(dicts).set_index('NAME')
df = property(to_df)
class SurveySequence(PlacementSequence):
def __init__(self,
filename: str,
path: str = '.',
):
"""
Args:
filename: the name of the physics
path:
"""
transport_input = load_transport_input_file(filename, path)
data = []
sequence_metadata = SequenceMetadata()
for line in transport_input:
if len(line.strip()) == 0:
continue
d = line.split()
if d[0].startswith('-'):
continue
data.append(transport_element_factory(d, sequence_metadata)[0])
if sequence_metadata.kinematics is None:
raise SequenceException("Invalid kinematics - Beam not found in input")
super().__init__(name='TRANSPORT',
data=[d for d in data if d is not None],
metadata=sequence_metadata,
)
def to_df(self):
dicts = list(map(dict, self._data))
counters = {}
for d in dicts:
if d['NAME'] is None:
counters[d['KEYWORD']] = counters.get(d['KEYWORD'], 0) + 1
d['NAME'] = f"{d['KEYWORD']}_{counters[d['KEYWORD']]}"
return _pd.DataFrame(dicts).set_index('NAME')
df = property(to_df)
[docs]class BDSIMSequence(Sequence):
def __init__(self,
filename: str = 'output.root',
path: str = '.',
from_element: str = None,
to_element: str = None,
):
"""
Args:
filename: the name of the physics
path:
from_element:
to_element:
"""
# Load the model
bdsim_data = BDSimOutput(filename=filename, path=path)
bdsim_model = bdsim_data.model.df.loc[from_element:to_element]
self.set_units(bdsim_model)
# Load the beam properties
bdsim_beam = bdsim_data.beam.beam_base.pandas(branches=['beamEnergy', 'particle'])
particle_name = bdsim_beam["particle"].values[0].decode('utf-8').capitalize()
particle_energy = bdsim_beam["beamEnergy"].values[0] * _ureg.GeV
p = getattr(_particles, particle_name if particle_name != 'Default' else 'Proton')
kin = _Kinematics(particle_energy, kinetic=False, particle=p)
# Load the beam distribution
beam_distribution = bdsim_data.event.primary.df.copy()
beam_distribution['dpp'] = beam_distribution['p'].apply(lambda e: ((e/kin.momentum.m_as("GeV/c"))-1))
beam_distribution = beam_distribution[["x", 'y', 'xp', 'yp', 'dpp']]
beam_distribution.rename(columns={
"xp": "px",
"yp": "py"
}, inplace=True)
beam_distribution.columns = map(str.upper, beam_distribution.columns)
beam_distribution['T'] = 0
beam_distribution.reset_index(inplace=True) # Remove the multi index
super().__init__(name="BDSIM",
data=bdsim_model,
metadata=SequenceMetadata(
data=_pd.Series({
'BEAM_DISTRIBUTION': beam_distribution[['X', 'PX', 'Y', 'PY', 'T', 'DPP']],
}),
kinematics=kin,
particle=p)
)
[docs] @staticmethod
def set_units(model: _pd.DataFrame = None):
# Specify the units
for c in model.columns:
try:
model[c] = model[c].apply(float)
except ValueError:
pass
model['CLASS'] = model['TYPE'].apply(str.capitalize)
model['CLASS'] = model['CLASS'].apply(lambda e: _BDSIM_TO_MAD_CONVENTION.get(e, e))
model.loc[model["CLASS"] == "RectangularCollimator", "APERTYPE"] = "rectangular"
model.loc[model["CLASS"] == "Dump", "APERTYPE"] = "rectangular"
model.loc[model["CLASS"] == "EllipticalCollimator", "APERTYPE"] = "elliptical"
model['L'] = model['L'].apply(lambda e: e * _ureg.m)
model['APERTURE1'] = model['APERTURE1'].apply(lambda e: e * _ureg.m)
model['APERTURE2'] = model['APERTURE2'].apply(lambda e: e * _ureg.m)
model['APERTURE'] = model[['APERTURE1', 'APERTURE2']].apply(list, axis=1)
model['K1'] = model['K1'].apply(lambda e: e * _ureg.m ** -2)
model['K1S'] = model['K1S'].apply(lambda e: e * _ureg.m ** -2)
model['K2'] = model['K2'].apply(lambda e: e * _ureg.m ** -3)
model['E1'] = model['E1'].apply(lambda e: e * _ureg.radian)
model['E2'] = model['E2'].apply(lambda e: e * _ureg.radian)
model['HGAP'] = model['HGAP'].apply(lambda e: e * _ureg.meter)
model['TILT'] = model['TILT'].apply(lambda e: e * _ureg.radian)
model['B'] = model['B'].apply(lambda e: e * _ureg.T)
[docs] def to_df(self, df: Optional[_pd.DataFrame] = None, strip_units: bool = False) -> _pd.DataFrame:
return self._data
df = property(to_df)