# Copyright (c) 2018-2024, Eduardo Rodrigues and Henry Schreiner.
# Distributed under the 3-clause BSD license, see accompanying file LICENSE
# or for details.

A class representing a set of decays. Can be subclassed to provide custom converters.

from __future__ import annotations

import os
import re
import sys
from copy import copy
from enum import Enum
from itertools import product
from typing import ClassVar

import attr
import numpy as np
import pandas as pd
from lark import Lark
from particle import Particle

from .. import data
from ..utils.particleutils import particle_from_string_name
from .ampgentransform import AmpGenTransformer, get_from_parser
from .decay import ModelDecay

[docs] class LS(Enum): "Line shapes supported (currently)" RBW = 0 GSpline = 1 kMatrix = 2 FOCUS = 3
[docs] @attr.s(slots=True) class AmplitudeChain(ModelDecay): 'This is a chain of decays (a "line")' lineshape = attr.ib(None) spinfactor = attr.ib(None) amp = attr.ib( 1 + 0j, eq=False, order=False, validator=attr.validators.instance_of(complex) ) err = attr.ib( 0 + 0j, eq=False, order=False, validator=attr.validators.instance_of(complex) ) fix = attr.ib( True, eq=False, order=False, validator=attr.validators.instance_of(bool) ) name = attr.ib(None) # Class members keep track of additions all_particles: ClassVar[Particle] = set() final_particles: ClassVar[Particle] = set() # This is set class-wide, and only used when a line is made cartesian = False
[docs] @classmethod def from_matched_line(cls, mat): """ This operates on an already-matched line. :param mat: The groupdict output of a match :return: A new amplitude chain instance """ getall = "all" if hasattr(Particle, "all") else "table" # Support 0.4.4 # Check to see if new particles loaded; if not, load them. if 998100 not in getattr(Particle, getall)(): data_dir = os.path.dirname(os.path.realpath(__file__)) special_filename = os.path.join( data_dir, "..", "data", "MintDalitzSpecialParticles.csv" ) Particle.load_table(special_filename, append=True) try: mat["particle"] = particle_from_string_name(mat["name"]) except Exception: print( "Failed to find particle", mat["name"], "with parsed dictionary:", mat, file=sys.stderr, ) raise if mat["particle"] not in cls.all_particles: cls.all_particles |= {mat["particle"]} if mat["daughters"]: mat["daughters"] = [cls.from_matched_line(d) for d in mat["daughters"]] # if master line only if "amp" in mat and not cls.cartesian: A = mat["amp"].real dA = mat["err"].real theta = mat["amp"].imag dtheta = mat["err"].imag mat["amp"] = A * np.exp(theta * 1j) mat["err"] = (dA * np.cos(theta) + A * np.sin(dtheta)) + ( dA * np.sin(theta) + A * np.cos(dtheta) ) * 1j return cls(**mat)
[docs] def expand_lines(self, linelist): """ Take a DecayTree -> list of DecayTrees with each dead-end daughter expanded to every possible combination. (recursive) """ # If the Tree has daughters, run on those if self.daughters: dlist = [d.expand_lines(linelist) for d in self.daughters] retlist = [] for ds in product(*dlist): newd = copy(self) newd.daughters = ds retlist.append(newd) return retlist # If the tree ends new_trees = [ ln for line in linelist if == for ln in line.expand_lines(linelist) ] if new_trees: return new_trees self.__class__.final_particles |= {self.particle} return [self]
@property def ls_enum(self): if not self.lineshape: return LS.RBW if self.lineshape == "GSpline.EFF": return LS.GSpline if self.lineshape.startswith("kMatrix"): return LS.kMatrix if self.lineshape.startswith("FOCUS"): return LS.FOCUS raise RuntimeError(f"Unimplemented lineshape {self.lineshape}") @property def full_amp(self): amp = self.amp for d in self.daughters: amp *= d.full_amp return amp def L_range(self, conserveParity=False): S = self.particle.J s1 = self[0].particle.J s2 = self[1].particle.J min_spin = abs(S - s1 - s2) min_spin = min(abs(S + s1 - s2), min_spin) min_spin = min(abs(S - s1 + s2), min_spin) max_spin = S + s1 + s2 if not conserveParity: return (min_spin, max_spin) raise RuntimeError("Strong decays not implemented yet") @property def L(self): if self.spinfactor: return "S P D F".split().index(self.spinfactor) min_L, _ = self.L_range() return min_L # Ground state unless specified def _get_html(self): name = self.particle.html_name name = re.sub( r'<SPAN STYLE="text-decoration:overline">(.*)</SPAN>', "\\1\u0305", name ) if self.spinfactor or self.lineshape: name += "<br/><br/>" if self.spinfactor: name += '<font color="blue">[' + self.spinfactor + "]</font>" if self.lineshape: name += '<font color="red">[' + self.lineshape + "]</font>" return name def __str__(self): name = str(self.particle) if self.lineshape and self.spinfactor: name += "[" + self.spinfactor + ";" + self.lineshape + "]" elif self.lineshape: name += "[" + self.lineshape + "]" elif self.spinfactor: name += "[" + self.spinfactor + "]" if self.daughters: name += "{" + ",".join(map(str, self.daughters)) + "}" return name
[docs] @classmethod def read_ampgen( cls, filename=None, text=None, grammar=None, parser="lalr", **kargs ): """ Read in an ampgen file :param filename: Filename to read :param text: Text to read (use instead of filename) :return: array of AmplitudeChains, parameters, constants, event type """ if grammar is None: grammar = data.basepath.joinpath("ampgen.lark").read_text() # Read the file in, ignore empty lines and comments if filename is not None: with open(filename, encoding="utf_8") as f: text = elif text is None: raise RuntimeError("Must have filename or text") lark = Lark(grammar, parser=parser, transformer=AmpGenTransformer(), **kargs) parsed = lark.parse(text) (event_type,) = get_from_parser(parsed, "event_type") # invert_lines = get_from_parser(parsed, "invert_line") cplx_decay_lines = get_from_parser(parsed, "cplx_decay_line") # cart_decay_lines = get_from_parser(parsed, "cart_decay_line") variables = get_from_parser(parsed, "variable") constants = get_from_parser(parsed, "constant") try: all_states = [particle_from_string_name(n) for n in event_type] except Exception: print("Did not find at least one of the state particles from", *event_type) raise fcs = get_from_parser(parsed, "fast_coherent_sum") if fcs: (fcs,) = fcs (fcs,) = fcs.children cls.cartesian = bool(fcs) # TODO: re-enable this # Combine dual line Cartesian lines into traditional cartesian lines # for a, b in combinations(cart_decay_lines, 2): # if a['name'] == b['name']: # if a['cart'] == 'Re' and b['cart'] == 'Im': # pass # elif a['cart'] == 'Im' and b['cart'] == 'Re': # a, b = b, a # else: # raise RuntimeError("Can't process a line with *both* components Re or Im") # new_string = "{a[name]} {a[fix]} {a[amp]} {a[err]} {b[fix]} {b[amp]} {b[err]}".format( # a=a, b=b) # real_lines.append(ampline.dual.match(new_string).groupdict()) # Make the partial lines and constants as dataframes parameters = pd.DataFrame( variables, columns="name fix value error".split() ).set_index("name") constants = pd.DataFrame(constants, columns="name value".split()).set_index( "name" ) # Convert the matches into AmplitudeChains line_arr = [cls.from_matched_line(c) for c in cplx_decay_lines] # Expand partial lines into complete lines new_line_arr = [ ln for line in line_arr if line.particle == all_states[0] for ln in line.expand_lines(line_arr) ] # Return return new_line_arr, parameters, constants, all_states