Source code for madgui.model.madx

"""
MAD-X backend for madgui.
"""

__all__ = [
    'Model',
    'reverse_sequence',
    'reverse_sequence_inplace',
    'TwissTable',
    '_guess_main_sequence',
    '_get_seq_model',
    '_get_twiss',
]

import os
from collections import defaultdict
from collections.abc import Mapping
from functools import partial, reduce
import itertools
from bisect import bisect_right
from contextlib import suppress
import logging
from numbers import Number

import numpy as np

from cpymad.madx import (
    Madx, AttrDict, ArrayAttribute, Command, Table, ExpandedElementList)
from cpymad.util import normalize_range_name
from cpymad.types import VAR_TYPE_DIRECT, VAR_TYPE_DEFERRED

from madgui.util.undo import UndoCommand
from madgui.util import yaml
from madgui.util.export import read_str_file, import_params
from madgui.util.misc import memoize, invalidate
from madgui.util.signal import Signal


[docs]class Model: """ Contains the whole global state of a MAD-X instance and (possibly) loaded metadata. :ivar Madx madx: CPyMAD interpreter :ivar dict Model.data: loaded model data :ivar str path: base folder """ updated = Signal() def __init__(self, madx, data, *, filename=None, undo_stack=None, interpolate=0): super().__init__() self.madx = madx self.data = data self.filename = filename and os.path.abspath(filename) self.path, self.name = filename and os.path.split(filename) self.undo_stack = undo_stack if undo_stack: self.undo_stack.model = self self._init_segment( sequence=data['sequence'], range=data['range'], beam=data['beam'], twiss_args=data['twiss'], ) self.interpolate = interpolate self.invalidate()
[docs] def invalidate(self): """Invalidate twiss and sectormap computations. Initiate recomputation.""" invalidate(self, 'twiss') invalidate(self, 'sector') invalidate(self, 'survey') self.updated.emit()
[docs] @classmethod def load_file(cls, filename, madx=None, *, undo_stack=None, interpolate=0, **madx_kwargs): """Load model from .madx or .yml file and pass additional arguments to the Model constructor.""" madx = madx or Madx(**madx_kwargs) madx.option(echo=False) filename = os.path.abspath(filename) path, name = os.path.split(filename) ext = os.path.splitext(name)[1].lower() if ext in ('.yml', '.yaml'): data = yaml.load_file(filename) path = os.path.join(path, data.get('path', '.')) _load_params(data, 'beam', path) _load_params(data, 'twiss', path) for fname in data.get('init-files', []): _call(madx, path, fname) else: _call(madx, path, filename) seqname = _guess_main_sequence(madx) data = _get_seq_model(madx, seqname) data['init-files'] = [filename] return cls( madx, data, undo_stack=undo_stack, filename=filename, interpolate=interpolate)
def __del__(self): self.destroy()
[docs] def destroy(self): """Annihilate current model. Stop interpreter.""" if self.madx is not None: with suppress(AttributeError, RuntimeError): self.madx._libmadx.finish() with suppress(AttributeError, RuntimeError): self.madx._service.close() with suppress(AttributeError, RuntimeError): self.madx._process.wait() self.madx = None
@property def twiss_args(self) -> dict: """Return the dictionary of parameters for the MAD-X TWISS command.""" return self._twiss_args @property def beam(self) -> dict: """Get the beam parameter dictionary.""" return self._beam @property def globals(self) -> dict: """Return dict-like global variables.""" return self.madx.globals
[docs] def get_element_by_position(self, pos): """Find optics element by longitudinal position.""" if pos is None: return None i0 = bisect_right(self.positions, pos) return self.elements[i0-1 if i0 > 0 else 0]
[docs] def el_pos(self, el): """Position for matching / output.""" return el.position + el.length
continuous_matching = False
[docs] def adjust_match_pos(self, el, pos): if not self.continuous_matching: return self.el_pos(el) at, l = el.position, el.length if pos <= at: return at if pos >= at+l: return at+l return pos
[docs] def get_best_match_pos(self, pos): """Find optics element by longitudinal position.""" return min([ (el, self.adjust_match_pos(el, pos)) for el in self.elements if self.can_match_at(el) ], key=lambda x: abs(x[1]-pos))
[docs] def can_match_at(self, element): return True
[docs] def set_element_attribute(self, elem, attr, value): self.update_element({attr: value}, self.elements[elem].index)
# curves ELEM_KNOBS = { 'sbend': ['angle', 'k0'], 'quadrupole': ['k1', 'k1s'], 'hkicker': ['kick'], 'vkicker': ['kick'], 'kicker': ['hkick', 'vkick'], 'solenoid': ['ks'], 'multipole': ['knl', 'ksl'], 'srotation': ['angle'], }
[docs] def get_elem_knobs(self, elem): return [ knob for attr in self.ELEM_KNOBS.get(elem.base_name.lower(), ()) if _is_property_defined(elem, attr) for knob in self._get_knobs(elem, attr) ]
[docs] def get_knobs(self): """Get list of knobs.""" return [ knob for elem in self.elements for knob in self.get_elem_knobs(elem) ]
@property def libmadx(self): """Access to the low level cpymad API.""" return self.madx and self.madx._libmadx
[docs] def call(self, name): old = self.globals.defs new = _call(self.madx, self.path, name) if self.undo_stack: if new is None: # Have to clear the stack because general MAD-X commands are not # necessarily reversible (sequence definition, makethin, loading # tables, etc)! self.undo_stack.clear() else: text = "CALL {!r}".format(name) self._update(old, new, self._update_globals, text) self.invalidate()
[docs] def load_strengths(self, filename): try: data = import_params(filename, data_key='globals') except ValueError as e: logging.error("Parser error in {!r}:\n{}".format(filename, e)) else: self.update_globals(data)
# Serialization # TODO: save reproducible state of workspace?
[docs] def save(self, filename): """Save model to file.""" yaml.save_file(filename, self.model_data())
[docs] def model_data(self): """Return model data as dictionary.""" return dict(self.data, **{ 'sequence': self.seq_name, 'range': list(self.range), 'beam': self.beam, 'twiss': self.twiss_args, })
def _init_segment(self, sequence, range, beam, twiss_args): """ :param str sequence: :param tuple range: """ self.sequence = self.madx.sequence[sequence] self.seq_name = self.sequence.name self.continuous_matching = True self._beam = beam = dict(beam, sequence=self.seq_name) self._twiss_args = twiss_args self.madx.command.beam(**beam) self.sequence.use() # Use `expanded_elements` rather than `elements` to have a one-to-one # correspondence with the data points of TWISS/SURVEY: self.elements = elems = ElementList(self.madx, self.seq_name) self.positions = self.sequence.expanded_element_positions() self.start, self.stop = self.parse_range(range) self.range = (normalize_range_name(self.start.name, elems), normalize_range_name(self.stop.name, elems))
[docs] def parse_range(self, range): """Convert a range str/tuple to a tuple of :class:`Element`.""" if isinstance(range, str): range = range.split('/') start_name, stop_name = range return (self.elements[start_name], self.elements[stop_name])
[docs] def export_globals(self, var_type=VAR_TYPE_DIRECT): return { k: p.value for k, p in self.globals.cmdpar.items() if p.inform and (p.var_type == var_type or var_type == any) }
[docs] def export_beam(self): return dict(self.beam)
[docs] def export_twiss(self): return dict(self.twiss_args)
[docs] def fetch_beam(self): from madgui.widget.params import ParamInfo beam = self.beam pars = [ ParamInfo(k.title(), v, inform=k in beam, mutable=k != 'sequence') for k, v in self.sequence.beam.items() ] ekin = (beam['energy'] - beam['mass']) / beam['mass'] idx = next(i for i, p in enumerate(pars) if p.name.lower() == 'energy') pars.insert(idx, ParamInfo('E_kin', ekin)) return pars
[docs] def fetch_twiss(self): from madgui.widget.params import ParamInfo blacklist = ('sequence', 'line', 'range', 'notable') twiss_args = self.twiss_args return [ ParamInfo(k.title(), twiss_args.get(k, v), inform=k in twiss_args, mutable=k not in blacklist) for k, v in self.madx.command.twiss.items() ]
def _update(self, old, new, write, text): old = {k.lower(): v for k, v in items(old)} new = {k.lower(): v for k, v in items(new)} # NOTE: This trims not only expressions (as intended) but also regular # string arguments (which is incorrect). However, this should be a # sufficiently rare use case, so we don't care for now… _new = {k: v for k, v in items(new) if trim(old.get(k)) != trim(v)} _old = {k: v for k, v in items(old) if k in _new} _old.update({k: None for k in _new.keys() - _old.keys()}) if _new: self._exec(UndoCommand( new, old, write, text.format(", ".join(_new)))) def _exec(self, action): if self.undo_stack: self.undo_stack.push(action) else: action.redo() return action
[docs] def update_globals(self, globals, text="Change knobs: {}"): return self._update( self.globals.defs, globals, self._update_globals, text)
[docs] def update_beam(self, beam, text="Change beam: {}"): return self._update( self.beam, beam, self._update_beam, text)
[docs] def update_twiss_args(self, twiss, text="Change twiss args: {}"): return self._update( self.twiss_args, twiss, self._update_twiss_args, text)
[docs] def update_element(self, data, elem_index, text=None): elem = self.elements[elem_index] return self._update( elem.defs, data, partial(self._update_element, elem_index=elem_index), text or "Change element {}: {{}}".format(elem.name))
def _update_globals(self, globals): with self.madx.batch(): for k, v in globals.items(): if v is None: v = 0 elif v == '': v = self.madx.globals[k] self.madx.globals[k] = v # TODO: invalidate only elements that depend updated variables? self.invalidate() def _update_beam(self, beam): new_beam = self.beam.copy() new_beam.update((k.lower(), v) for k, v in beam.items()) if 'e_kin' in beam: eval = self.madx.eval ekin = eval(new_beam.get('e_kin')) mass = eval(new_beam.get('mass', 1)) new_beam['energy'] = (ekin + 1) * mass new_beam.pop('e_kin', None) new_beam['sequence'] = self.seq_name self._beam = new_beam self.madx.command.beam(**new_beam) self.invalidate() def _update_twiss_args(self, twiss): new_twiss = self.twiss_args.copy() new_twiss.update((k.lower(), v) for k, v in twiss.items()) self._twiss_args = new_twiss self.invalidate() def _update_element(self, data, elem_index): # TODO: this crashes for many parameters # - update only changed values elem = self.elements[elem_index] name = elem.node_name d = {k.lower(): v for k, v in data.items() if k in elem.cmdpar} if 'kick' in d and elem.base_name == 'sbend': # FIXME: This assumes the definition `k0:=(angle+k0)/l` and # will deliver incorrect results if this is not the case! var, = (set(self._get_knobs(elem, 'k0')) - set(self._get_knobs(elem, 'angle'))) self.madx.globals[var] = d.pop('kick') self.madx.elements[name](**d) self.invalidate()
[docs] def get_twiss(self, elem, name, pos): """Return beam envelope at element.""" ix = self.elements.index(elem) twiss = self.twiss() s = twiss.s y = twiss[name] x = self.indices[ix] # shortcut for thin elements: if float(self.elements[ix].length) == 0: return y[ix] lo = x.start-1 if x.start > 0 else x.start hi = x.stop+1 i0 = bisect_right(s, pos, lo, hi) i1 = i0+1 # never look outside the interpolation domain: if pos <= s[i0]: return y[i0] if pos >= s[i1]: return y[i1] dx = pos - s[i0] return y[i0] + dx * (y[i1]-y[i0]) / (s[i1]-s[i0])
twiss_columns = [ 'alfx', 'alfy', 'betx', 'bety', 'gamx', 'gamy', 'ex', 'ey', 'x', 'y', 'px', 'py', 'envx', 'envy', ]
[docs] def get_elem_twiss(self, elem): tw = self.twiss() ix = self.elements.index(elem) i0 = self.indices[ix].stop return AttrDict({col: tw[col][i0] for col in self.twiss_columns})
[docs] def get_elem_sigma(self, elem): ix = self.elements.index(elem) i0 = self.indices[ix].stop tw = self.twiss().row(i0, 'all') return [ [tw['sig{}{}'.format(i+1, j+1)] for j in range(6)] for i in range(6) ]
[docs] def contains(self, element): return (self.start.index <= element.index and self.stop.index >= element.index)
def _get_twiss_args(self, **kwargs): twiss_args = { 'sequence': self.sequence.name, 'range': self.range, } twiss_args.update(self.twiss_args) twiss_args.update(kwargs) return twiss_args
[docs] def sectormap(self, elem_from, elem_to=None, interval=None): """ Return SECTORMAP|KICKS in the closed range [from,to] as 7x7 matrix. If only one parameter is given, return its transfer map. Elements can be specified by name or index. For a description of the ``interval`` parameter, see :meth:`~Model.get_transfer_maps`. """ if elem_to is None: elem_to = elem_from if interval is None: interval = (0, 1) return self.get_transfer_maps([elem_from, elem_to], interval)[0]
[docs] def get_transfer_maps(self, elems, interval=(1, 1)): """ Get the transfer matrices R(i,j) between the given elements. The ``interval`` parameter can be used to select open/closedness of the individual intervals between the elements by adding offsets to the first and second element in every interval, counted from the entry end of the element. For example, by setting the ``interval`` parameter, the call ``get_transfer_maps([e0, e1, e2], interval)`` will retrieve the transfer maps in the following intervals: - ``interval=(0, 0)`` retrieves ``[e0, e1)`` and ``[e1, e2)`` - ``interval=(0, 1)`` retrieves ``[e0, e1]`` and ``[e1, e2]`` - ``interval=(1, 0)`` retrieves ``(e0, e1)`` and ``(e1, e2)`` - ``interval=(1, 1)`` retrieves ``(e0, e1]`` and ``(e1, e2]`` """ table = self.sector() maps = self.madx.sectortable(table._name) indices = [self.elements.index(el) for el in elems] x0, x1 = interval return [ reduce(lambda a, b: np.dot(b, a), maps[max(0, i+x0):max(0, j+x1)], np.eye(7)) for i, j in zip(indices, indices[1:]) ]
# TODO: default values for knobs/monitors # TODO: pass entire optic (knob + delta)
[docs] def get_orbit_response_matrix( self, monitors, knobs, errors=(), values=()) -> np.array: """ Compute the orbit response matrix Δx/Δφ numerically (by varying knobs) and return as `M×2×K` matrix (monitors × x|y × knobs). """ from .errors import apply_errors, Param madx = self.madx madx.command.select(flag='interpolate', clear=True) # applying errors here is a temporary measure until we figure out a # better way to take into account combined error effects (e.g. # relative error on top of absolute error): with apply_errors(self, errors, values): tw0 = madx.twiss(**self._get_twiss_args(table='orm_tmp')) x0, y0 = tw0.x, tw0.y idx = [self.elements.index(m) for m in monitors] def get_knob_response(var, step): """Return `2×M` matrix with responses for specified variable.""" with apply_errors(self, [Param(var)], [step]): with apply_errors(self, errors, values): tw1 = madx.twiss(**self._get_twiss_args(table='orm_tmp')) x1, y1 = tw1.x, tw1.y return np.vstack(( (x1 - x0)[idx], (y1 - y0)[idx], )).T / step return np.dstack([ get_knob_response(knob, 2e-4) for knob in knobs ])
[docs] @memoize def survey(self): """Recalculate survey coordinates.""" return self.madx.survey()
[docs] def ex(self): return self.summary.ex
[docs] def ey(self): return self.summary.ey
[docs] @memoize def twiss(self, **kwargs): """Recalculate TWISS parameters.""" self.madx.command.select(flag='interpolate', clear=True) if self.interpolate: step = self.sequence.elements[-1].position/self.interpolate self.madx.command.select(flag='interpolate', step=step) results = self.madx.twiss(**self._get_twiss_args(**kwargs)) results = TwissTable(results._name, results._libmadx, _check=False) self.summary = results.summary # FIXME: this will fail if subsequent element have the same name. # Safer alternatives: # - do another twiss call without interpolate # - change the behaviour of MAD-X' interpolate option itself to make # it clear in the table which rows are 'interpolated' # - change MAD-X interpolate option to produce 2 tables # - extract information via cpymad (table now has 'node' attribute) groups = itertools.groupby(enumerate(results.name), lambda x: x[1]) self.indices = [ slice(l[0][0], l[-1][0]) for k, v in groups for l in [list(v)] ] assert len(self.indices) == len(self.elements) return results
[docs] @memoize def sector(self): """Compute sectormaps of all elements.""" # TODO: Ideally, we should compute sectormaps and twiss during the # same MAD-X TWISS command. But, since we don't need interpolated # sectormaps, this will require patching MAD-X first… self.madx.command.select(flag='interpolate', clear=True) # NOTE: we have to pass a different twiss table because madgui # currently fetches twiss columns only demand. Therefore, using the # same twiss table for both TWISS/SECTORMAP routines would lead to # inconsistent table lengths (interpolate vs no-interpolate!). self.madx.sectormap((), **self._get_twiss_args( table='sectortwiss', sectortable='sectortable')) return self.madx.table['sectortable']
[docs] def track_one(self, x=0, px=0, y=0, py=0, range='#s/#e', **kwargs): """ Track a particle through the sequence. Handles backward tracking (sort of) by specifying a negative range. Currently uses TWISS for tracking internally (to allow tracking through thick sequences). This might change. """ elems = self.elements start, end = range.split('/') if isinstance(range, str) else range start, end = normalize_range_name(start), normalize_range_name(end) kwargs.setdefault('betx', 1) kwargs.setdefault('bety', 1) if elems.index(end) < elems.index(start): # TODO: we should use a separate madx process with inplace # reversed sequence for backtracking in the future, this will # simplify all the complications with element names etc. flip = {'#s': '#e', '#e': '#s'} start = flip.get(start, start + '_reversed') end = flip.get(end, end + '_reversed') tw = self.backtrack( x=-x, y=y, px=px, py=-py, range=start+'/'+end, **kwargs) return AttrDict({ 's': tw.s[-1] - tw.s, 'x': -tw.x, 'px': tw.px, 'y': tw.y, 'py': -tw.py, }) return self.madx.twiss( x=x, px=px, y=y, py=py, range=range, **kwargs)
backseq = None
[docs] def backtrack(self, **twiss_init): """Backtrack final orbit through the reversed sequence.""" if self.backseq is None: with self.madx.batch(): self.backseq = self.seq_name + '_backseq' reverse_sequence(self.madx, self.backseq, self.elements) self.madx.command.select(flag='interpolate', clear=True) twiss_init.setdefault('betx', 1) twiss_init.setdefault('bety', 1) twiss_init.setdefault('table', 'backtrack') tw = self.madx.twiss(sequence=self.backseq, **twiss_init) tw = self.madx.table.backtrack self.invalidate() return tw
[docs] def reverse(self): reverse_sequence_inplace(self.madx, self.seq_name) if self.undo_stack: self.undo_stack.clear() self._init_segment( sequence=self.seq_name, range='#s/#e', beam=self.beam, twiss_args={'betx': 1, 'bety': 1}, )
[docs] def match(self, vary, constraints, mirror_mode=True, **kwargs): # list intermediate positions # NOTE: need list instead of set, because quantity is unhashable: elem_positions = defaultdict(list) for elem, pos, axis, val in constraints: if pos not in elem_positions[elem.node_name]: elem_positions[elem.node_name].append(pos) elem_positions = {name: sorted(positions) for name, positions in elem_positions.items()} # activate matching at specified positions self.madx.command.select(flag='interpolate', clear=True) for name, positions in elem_positions.items(): at = self.elements[name].position l = self.elements[name].length positions = [at+l if p is None else p for p in positions] if any(not np.isclose(p, at+l) for p in positions): x = [float((p-at)/l) for p in positions] self.madx.command.select( flag='interpolate', range=name, at=x) # create constraints list to be passed to Madx.match cons = {} for elem, pos, axis, val in constraints: key = (elem.node_name, elem_positions[elem.node_name].index(pos)) cons.setdefault(key, {})[axis] = val madx_constraints = [ dict(range=name, iindex=pos, **c) for (name, pos), c in cons.items()] # FIXME TODO: use position-dependent emittances… # NOTE: Not sure we can measure or if there is a relevant # emittance dilution in the HEBT ex = self.ex() ey = self.ey() weights = { 'sig11': 1/ex, 'sig12': 1/ex, 'sig21': 1/ex, 'sig22': 1/ex, 'sig33': 1/ey, 'sig34': 1/ey, 'sig43': 1/ey, 'sig44': 1/ey, } weights.update(kwargs.pop('weight', {})) used_cols = {axis.lower() for elem, pos, axis, val in constraints} weights = {k: v for k, v in weights.items() if k in used_cols} twiss_args = self.twiss_args.copy() twiss_args.update(kwargs) old_values = {v: self.read_param(v) for v in vary} self.madx.match(sequence=self.sequence.name, vary=vary, constraints=madx_constraints, weight=weights, method=('lmdif', {'calls': 2000, 'tolerance': 1e-8}), **twiss_args) new_values = {v: self.read_param(v) for v in vary} if mirror_mode: fitResidual = self.madx.globals['TAR'] isGoodFit = (fitResidual < 1e-6) if isGoodFit: self._update(old_values, new_values, self._update_globals, "Match: {}") else: logging.warning('Fit Residual too high!!!') else: self._update(old_values, new_values, self._update_globals, "Match: {}") # return corrections return new_values
[docs] def read_monitor(self, name): """Mitigates read access to a monitor.""" # TODO: handle split h-/v-monitor index = self.elements.index(name) twiss = self.twiss() return { 'envx': twiss.envx[index], 'envy': twiss.envy[index], 'posx': twiss.x[index], 'posy': twiss.y[index], }
def _get_knobs(self, elem, attr): """Return list of all knob names belonging to the given attribute.""" try: return _get_leaf_knobs(self.madx, elem.cmdpar[attr].expr) except IndexError: return []
[docs] def read_param(self, expr): """Read element attribute. Return numeric value.""" return self.madx.eval(expr)
write_params = update_globals
class ElementList(ExpandedElementList): def __init__(self, madx, seq_name): super().__init__(madx, seq_name) self.names = madx._libmadx.get_expanded_element_names(seq_name) self._indices = {k: i for i, k in enumerate(self.names)} def __getitem__(self, key): index = self.index(key) elem = super().__getitem__(index) if elem.base_name == 'sbend': # MAD-X uses the condition k0=0 to check whether the attribute # should be used (even though that means you can never have a kick # that exactly counteracts the bending angle): elem._attr['kick'] = elem.k0 and elem.k0 * elem.length - elem.angle return elem def index(self, key): if isinstance(key, int): return key + len(self) if key < 0 else key elif isinstance(key, str): name = key.lower() if len(self) != 0: if name == '#s': return 0 elif name == '#e': return len(self) - 1 try: return self._indices[key.lower()] except KeyError as e: raise ValueError(key) from e elif hasattr(key, 'index'): return key.index raise TypeError( "Unknown key: {!r} ({})".format(key, type(key))) # stuff for online control def _get_leaf_knobs(madx, exprs): """ Return knobs names for a given element attribute from MAD-X. >>> get_element_attribute(elements['r1qs1'], 'k1') ['kL_R1QS1'] """ cmdpar = madx.globals.cmdpar exprs = exprs if isinstance(exprs, list) else [exprs] exprs = [e for e in exprs if e] seen = set() vars = [] while exprs: expr = exprs.pop(0) new = set(madx.expr_vars(expr)) - seen pars = [cmdpar[v] for v in new] vars.extend([p.name for p in pars if p.var_type == VAR_TYPE_DIRECT]) exprs.extend([p.expr for p in pars if p.var_type == VAR_TYPE_DEFERRED]) seen.update(new) return vars def _is_property_defined(elem, attr): """Check if attribute of an element was defined.""" while elem.parent is not elem: try: cmdpar = elem.cmdpar[attr] if cmdpar.inform: return bool(cmdpar.expr) except (KeyError, IndexError): pass elem = elem.parent return False def _eval_expr(value): """Helper method that replaces :class:`Expression` by their values.""" # NOTE: This method will become unnecessary in cpymad 1.0. if isinstance(value, list): return [_eval_expr(v) for v in value] if isinstance(value, (dict, Command)): return {k: _eval_expr(v) for k, v in value.items()} if isinstance(value, ArrayAttribute): return list(value) return value def items(d): return d.items() if isinstance(d, Mapping) else d def trim(s): return s.replace(' ', '') if isinstance(s, str) else s _INVERT_ATTRS = { 'sbend': ['angle', 'k0'], 'hkicker': ['kick'], 'kicker': ['hkick'], 'translation': ['dy'], }
[docs]def reverse_sequence(madx, name, elements): """ Create a direction reversed copy of the sequence. :param Madx madx: :param str name: name of the reversed sequence :param list elements: list of elements """ last = elements[-1] length = last.position + last.length madx.command.sequence.clone(name, l=length, refer='exit') for elem in reversed(elements): if elem.occ_cnt == 0 or '$' in elem.name: continue pos = elem.position overrides = {'at': length-pos} overrides.update({ attr: "-{}->{}".format(elem.name, attr) for attr in _INVERT_ATTRS.get(elem.base_name, ()) }) if elem.base_name == 'sbend': overrides['e1'] = '-{}->e2'.format(elem.name) overrides['e2'] = '-{}->e1'.format(elem.name) elem.clone(elem.name + '_reversed', **overrides) madx.command.endsequence() madx.command.beam(sequence=name) madx.use(name)
[docs]def reverse_sequence_inplace(madx, seq_name): cmd = madx.command cmd.seqedit(sequence=seq_name) cmd.flatten() cmd.reflect() cmd.endedit() for elem in reversed(madx.sequence[seq_name].elements): if elem.occ_cnt == 0 or '$' in elem.name: continue overrides = {} overrides.update({ attr: negate_expr(elem.defs[attr]) for attr in _INVERT_ATTRS.get(elem.base_name, ()) }) if elem.base_name == 'sbend': overrides['e1'] = negate_expr(elem.defs.e2) overrides['e2'] = negate_expr(elem.defs.e1) if overrides: elem(**overrides)
def negate_expr(value): return "-({})".format(value) if isinstance(value, str) else -value
[docs]class TwissTable(Table): """ Provide the following features on top of :class:`cpymad.madx.Table`: - add columns for one-sigma beam envelopes: envx, envy - add columns for gamma optical function: gamx, gamy - add aliases for beam position: posx, posy - override twiss alfa/beta parameters and emittances from the sigma matrix, i.e. in the local reference rather than normal coordinates. """ _transform = { 'alfx': lambda self: -self.sig12 / self.ex, 'alfy': lambda self: -self.sig34 / self.ey, 'betx': lambda self: +self.sig11 / self.ex, 'bety': lambda self: +self.sig33 / self.ey, 'gamx': lambda self: +self.sig22 / self.ex, 'gamy': lambda self: +self.sig44 / self.ey, 'envx': lambda self: self.sig11**0.5, 'envy': lambda self: self.sig33**0.5, 'posx': lambda self: self.x, 'posy': lambda self: self.y, 'ex': lambda self: (self.sig11 * self.sig22 - self.sig12 * self.sig21)**0.5, 'ey': lambda self: (self.sig33 * self.sig44 - self.sig34 * self.sig43)**0.5, } def _query(self, column): """Retrieve the column data.""" transform = self._transform.get(column) if transform is None: return super()._query(column) else: return transform(self)
def _load_params(data, name, path): """Load parameter dict from file if necessary.""" vals = data.get(name, {}) if isinstance(vals, str): data[name] = yaml.load_file(os.path.join(path, vals)) if len(data[name]) == 1 and name in data[name]: data[name] = data[name][name] def _call(madx, path, name): """Load a MAD-X file into the current workspace.""" name = os.path.join(path, name) vals = read_str_file(name) madx.call(name, True) return vals def _guess_main_sequence(madx): """Try to guess the 'main' sequence to be viewed.""" sequence = madx.sequence() if sequence: return sequence.name sequences = madx.sequence if not sequences: raise ValueError("No sequences defined!") if len(sequences) != 1: # TODO: ask user which one to use raise ValueError("Multiple sequences defined, none active. " "Cannot uniquely determine which to use.") return next(iter(sequences)) def _get_seq_model(madx, sequence_name): """ Return a model as good as possible from the last TWISS statement used for the given sequence, if available. Note that it seems currently not possible to reliably access prior TWISS statements and hence the information required to guess the model is extracted from the TWISS tables associated with the sequences. This means that - twiss tables may accidentally be associated with the wrong sequence - there is no reliable way to tell which parameters were set in the twiss command and hence deduce the correct (expected) model - you have to make sure the twiss range starts with a zero-width element (e.g. MARKER), otherwise TWISS parameters at the start of the range can not be reliably extrapolated The returned model should be seen as a first guess/approximation. Some fields may be empty if they cannot reliably be determined. :raises RuntimeError: if the sequence is undefined """ try: sequence = madx.sequence[sequence_name] except KeyError: raise RuntimeError("The sequence is not defined.") try: beam = sequence.beam except RuntimeError: beam = {} try: range, twiss = _get_twiss(madx, sequence) except RuntimeError: range = (sequence_name+'$start', sequence_name+'$end') twiss = {} return { 'sequence': sequence_name, 'range': range, 'beam': _eval_expr(beam), 'twiss': _eval_expr(twiss), } def _get_twiss(madx, sequence): """ Try to determine (range, twiss) from the MAD-X state. :raises RuntimeError: if unable to make a useful guess """ table = sequence.twiss_table # raises RuntimeError try: first, last = table.range except ValueError: raise RuntimeError("TWISS table inaccessible or nonsensical.") if first not in sequence.expanded_elements or \ last not in sequence.expanded_elements: raise RuntimeError( "The TWISS table appears to belong to a different sequence.") mandatory = {'betx', 'bety', 'alfx', 'alfy'} defaults = madx.command.twiss # TODO: periodic lines -> only mux/muy/deltap # TODO: logical parameters like CHROM twiss = { key: float(val) for key, val in table[0].items() if isinstance(val, Number) and ( (key in mandatory) or (key in defaults and val != defaults.cmdpar[key].value) ) } return (first, last), twiss