diff --git a/pySC/core/simulated_commissioning.py b/pySC/core/simulated_commissioning.py index 2fdf4ef..c956754 100644 --- a/pySC/core/simulated_commissioning.py +++ b/pySC/core/simulated_commissioning.py @@ -330,6 +330,7 @@ def register_magnets(self, ords: ndarray, **kwargs): # TODO docstring is too lo if ind not in self.SIG.Magnet.keys(): self.SIG.Magnet[ind] = DotDict() self.SIG.Magnet[ind].update(nvpairs) + # TODO Correctors for ab in AB: order = len(getattr(self.RING[ind], f"Polynom{ab}")) for field in ("NomPolynom", "SetPoint", "CalError"): @@ -948,6 +949,15 @@ def _dipole_compensation(self, ord, setpoint): (setpoint - self.RING[ord].SetPointB[1]) / self.RING[ord].NomPolynomB[1] * self.RING[ord].BendingAngle, skewness=False, method=SETTING_ADD) + def very_deep_copy(self): + copied_structure = copy.deepcopy(self) + copied_structure.RING = self.RING.deepcopy() + copied_structure.IDEALRING = self.IDEALRING.deepcopy() + for ind, element in enumerate(self.RING): + copied_structure.RING[ind] = element.deepcopy() + copied_structure.IDEALRING[ind] = element.deepcopy() + return copied_structure + def verify_structure(self): """ Verifies the integrity of SimilatedCommissionining class instance and warns if things look fishy. diff --git a/pySC/correction/bba.py b/pySC/correction/bba.py index a262963..75dc1e4 100644 --- a/pySC/correction/bba.py +++ b/pySC/correction/bba.py @@ -1,374 +1,365 @@ import matplotlib.pyplot as plt import numpy as np -from pySC.utils.at_wrapper import findspos, atgetfieldvalues -from pySC.correction.orbit_trajectory import SCfeedbackRun from pySC.core.beam import bpm_reading, all_elements_reading from pySC.utils.sc_tools import SCrandnc -from pySC.core.lattice_setting import set_cm_setpoints, set_magnet_setpoints, get_cm_setpoints from pySC.utils import logging_tools from pySC.core.classes import DotDict +from pySC.core.constants import TRACK_TBT, NUM_TO_AB, SETTING_REL, SETTING_ABS +from pySC.utils.stats import weighted_mean, weighted_error, effective_sample_size, weights_from_errors +from pySC.utils.at_wrapper import atgetfieldvalues, findspos +from pySC.correction.orbit_trajectory import SCfeedbackRun + LOGGER = logging_tools.get_logger(__name__) +CONFIDENCE_LEVEL_SIGMAS = 2.5 +OUTLIER_LIMIT = 1.5e-3 -""" -Pieces of found function calls from ALS-U SR, i.e. nothing in PETRA IV -qOrd = SCgetOrds(SC.RING,'QF') -SC,errorFlags = SCBBA(SC, repmat(SC.ORD.BPM,2,1),QuadOrds, mode='TBT', fakeMeasForFailures=True, - quadOrdPhaseAdvance=qOrd[0], quadStrengthPhaseAdvance=[0.95 0.8 1.05], magSPvec=magSPvec, plotResults=SC.plot) +def trajectory_bba(SC, bpm_ords, mag_ords, **kwargs): + par = DotDict(dict(n_steps=10, fit_order=1, magnet_order=1, skewness=False, setpoint_method=SETTING_REL, + q_ord_phase=np.array([], dtype=int), q_ord_setpoints=np.ones(1), magnet_strengths=np.array([0.95, 1.05]), + num_downstream_bpms=len(SC.ORD.BPM), max_injection_pos_angle=np.array([0.9E-3, 0.9E-3]), + dipole_compensation=True, plot_results=False)) + par.update(**kwargs) + par = _check_input(bpm_ords, mag_ords, par) + if SC.INJ.trackMode != TRACK_TBT or SC.INJ.nTurns != 2: + raise ValueError('Beam-trajectory-based alignment works in TBT mode with 2 turns. ' + 'Please set: SC.INJ.nTurns = 2 and SC.INJ.trackMode = "TBT"') + + q0 = atgetfieldvalues(SC.RING, par.q_ord_phase, "SetPointB", index=1) + bba_offsets = np.full(bpm_ords.shape, np.nan) + bba_offset_errors = np.full(bpm_ords.shape, np.nan) + for n_dim in range(bpm_ords.shape[0]): # TODO currently assumes either horizontal or both planes + last_bpm_ind = np.where(bpm_ords[n_dim, -1] == SC.ORD.BPM)[0][0] + quads_strengths, scalings, bpm_ranges = _phase_advance_injection_scan(SC, n_dim, last_bpm_ind, par) + LOGGER.info(f"Scanned plane {n_dim}") + for j_bpm in range(bpm_ords.shape[1]): # j_bpm: Index of BPM adjacent to magnet for BBA + LOGGER.info(f"BPM number {j_bpm}") + bpm_ind = np.where(bpm_ords[n_dim, j_bpm] == SC.ORD.BPM)[0][0] + m_ord = mag_ords[n_dim, j_bpm] + set_ind = np.argmax(bpm_ranges[:, bpm_ind]) + SC.set_magnet_setpoints(par.q_ord_phase, quads_strengths[set_ind], False, 1, method=SETTING_REL, dipole_compensation=True) + bpm_pos, downstream_trajectories = _data_measurement_tbt(SC, m_ord, bpm_ind, j_bpm, n_dim, scalings[set_ind], par) + if len(q0): + SC.set_magnet_setpoints(par.q_ord_phase, q0, False, 1, method=SETTING_ABS, dipole_compensation=True) + bba_offsets[n_dim, j_bpm], bba_offset_errors[n_dim, j_bpm] = _data_evaluation(SC, bpm_pos, downstream_trajectories, par.magnet_strengths[n_dim, j_bpm], n_dim, m_ord, par) + SC = apply_bpm_offsets(SC, bpm_ords, bba_offsets, bba_offset_errors) + if par.plot_results: + plot_bba_results(SC, bpm_ords, bba_offsets, bba_offset_errors) + return SC, bba_offsets, bba_offset_errors + + +def orbit_bba(SC, bpm_ords, mag_ords, **kwargs): + par = DotDict(dict(n_steps=10, fit_order=1, magnet_order=1, skewness=False, setpoint_method=SETTING_REL, + magnet_strengths=np.array([0.95, 1.05]), RMstruct=[], orbBumpWindow=5, BBABPMtarget=1E-3, + maxNumOfDownstreamBPMs=len(SC.ORD.BPM), dipole_compensation=True, + use_bpm_reading_for_orbit_bump_ref=False, plotResults=False)) + par.update(**kwargs) + par = _check_input(bpm_ords, mag_ords, par) + if SC.INJ.trackMode == TRACK_TBT: + raise ValueError('Beam-orbit-based alignment does not work in TBT mode. ' + 'Please set: SC.INJ.trackMode to "ORB" or "PORB".') + bba_offsets = np.full(bpm_ords.shape, np.nan) + bba_offset_errors = np.full(bpm_ords.shape, np.nan) + + for j_bpm in range(bpm_ords.shape[1]): # j_bpm: Index of BPM adjacent to magnet for BBA + LOGGER.info(f"BPM number {j_bpm}") + for n_dim in range(bpm_ords.shape[0]): + LOGGER.debug(f'BBA-BPM {j_bpm}/{bpm_ords.shape[1]}, n_dim = {n_dim}') + bpm_ind = np.where(bpm_ords[n_dim, j_bpm] == SC.ORD.BPM)[0][0] + m_ord = mag_ords[n_dim, j_bpm] + SC0 = SC.very_deep_copy + bpm_pos, orbits = _data_measurement_orb(SC, m_ord, bpm_ind, j_bpm, n_dim, par, + *_get_orbit_bump(SC, m_ord, bpm_ords[n_dim, j_bpm], n_dim, par)) + bba_offsets[n_dim, j_bpm], bba_offset_errors[n_dim, j_bpm] = _data_evaluation(SC, bpm_pos, orbits, par.magnet_strengths[n_dim, j_bpm], n_dim, m_ord, par) + SC = apply_bpm_offsets(SC, bpm_ords, bba_offsets, bba_offset_errors) + if par.plot_results: + plot_bba_results(SC, bpm_ords, bba_offsets, bba_offset_errors) + return SC, bba_offsets, bba_offset_errors + + +def fake_bba(SC, bpm_ords, mag_ords, errors=None, fake_offset=None): + """Example use: + SC = fake_bba(SC, bpm_ords, mag_ords, is_bba_errored(bba_offsets, bba_offset_errors))""" + if errors is None and fake_offset is None: + raise ValueError("At least one of error_flags or fake_offset has to be given") + if errors is None: + errors = np.ones(bpm_ords.shape, dtype=bool) + if fake_offset is None: + final_offset_errors = _get_bpm_offset_from_mag(SC.RING, bpm_ords, mag_ords) + final_offset_errors[errors != 0] = np.nan + fake_offset = np.sqrt(np.nanmean(final_offset_errors ** 2, axis=1)) + + LOGGER.info(f"Final offset error is {1E6 * fake_offset} um (hor|ver)" + f" with {np.sum(errors, axis=1)} measurement failures -> being re-calculated now.\n") + for inds in np.argwhere(errors): # TODO get to the form such that apply_bpm_offsets can be used + fake_bpm_offset = (SC.RING[mag_ords[inds[0], inds[1]]].MagnetOffset[inds[0]] + + SC.RING[mag_ords[inds[0], inds[1]]].SupportOffset[inds[0]] + - SC.RING[bpm_ords[inds[0], inds[1]]].SupportOffset[inds[0]] + + fake_offset[inds[0]] * SCrandnc()) + if not np.isnan(fake_bpm_offset): + SC.RING[bpm_ords[inds[0], inds[1]]].Offset[inds[0]] = fake_bpm_offset + else: + LOGGER.info('BPM offset not reassigned, NaN.\n') + return SC -SC = SCBBA(SC, BPMordsQuadBBA, QuadOrds, mode='ORB',fakeMeasForFailures=True, outlierRejectionAt=200E-6, RMstruct=RMstruct, - plotResults=SC.plot, magSPvec=magSPvec, minSlopeForFit=0.005, minBPMrangeAtBBABBPM=1*20E-6, - BBABPMtarget=5*50E-6, minBPMrangeOtherBPM=0) -# Orbit correction (without weights) -SC = performOrbitCorr_ALSU_SR(SC,RMstruct,'weight=[]) +def _check_input(bpm_ords, mag_ords, par): + if bpm_ords.shape != mag_ords.shape: # both in shape 2 x N + raise ValueError('Input arrays for BPMs and magnets must be same size.') + if par.magnet_strengths.ndim < 2: + par.magnet_strengths = np.tile(par.magnet_strengths, mag_ords.shape + (1,)) + if par.fit_order > 2: + raise ValueError("At most second order fit is supported.") + return par + + +def apply_bpm_offsets(SC, bpm_ords, bba_offsets, bba_offset_errors): + errors = is_bba_errored(bba_offsets, bba_offset_errors) + # bpm_ords, bba_offsets, bba_offset_errors should have same shape + for inds in np.argwhere(~errors): + SC.RING[bpm_ords[inds[0], inds[1]]].Offset[inds[0]] += bba_offsets[inds[0], inds[1]] + for inds in np.argwhere(errors): + LOGGER.info(f"Poor resolution for BPM {inds[1]} in plane {inds[0]}: " + f"{bba_offsets[inds[0], inds[1]]}+-{bba_offset_errors[inds[0], inds[1]]}") + return SC + -# BBA on sextupoles (quad trim coils) -SC = SCBBA(SC, BPMordsSextBBA, sextOrds, mode='ORB',fakeMeasForFailures=True, outlierRejectionAt=200E-6, - RMstruct=RMstruct, plotResults=SC.plot, switchOffSext=True, magSPflag='abs', magSPvec=magSPvecSext, - minSlopeForFit=0.005, minBPMrangeAtBBABBPM=1*10E-6, BBABPMtarget=5*50E-6, minBPMrangeOtherBPM=0) -""" +def is_bba_errored(bba_offsets, bba_offset_errors): + return np.logical_or(np.logical_or(np.isnan(bba_offsets), np.abs(bba_offsets) > OUTLIER_LIMIT), + np.abs(bba_offsets) < CONFIDENCE_LEVEL_SIGMAS * bba_offset_errors) -def SCBBA(SC, bpm_ords, mag_ords, **kwargs): - par = DotDict(dict(mode=SC.INJ.trackMode, outlierRejectionAt=np.inf, nSteps=10, fitOrder=1, magOrder=2, - magSPvec=[0.95, 1.05], magSPflag='rel', RMstruct=[], orbBumpWindow=5, BBABPMtarget=1E-3, - minBPMrangeAtBBABBPM=500E-6, minBPMrangeOtherBPM=100E-6, maxStdForFittedCenters=600E-6, - nXPointsNeededAtMeasBPM=3, maxNumOfDownstreamBPMs=len(SC.ORD.BPM), minSlopeForFit=0.03, - maxTrajChangeAtInjection=[.9E-3, .9E-3], quadOrdPhaseAdvance=[], - quadStrengthPhaseAdvance=[0.95, 1.05], fakeMeasForFailures=False, dipCompensation=True, - skewQuadrupole=False, switchOffSext=False, useBPMreadingsForOrbBumpRef=False, - plotLines=False, plotResults=False)) - par.update(**kwargs) - if bpm_ords.shape != mag_ords.shape: # both in shape 2 x N - raise ValueError('Input arrays for BPMs and magnets must be same size.') - if not isinstance(par.magSPvec, list): - par.magSPvec = [par.magSPvec] * len(mag_ords) - if par.mode not in ("TBT", "ORB"): - raise ValueError(f"Unknown mode {par.mode}.") - if par.mode == 'TBT' and SC.INJ.nTurns != 2: - raise ValueError('Beam-based alignment in TBT mode works with 2 turns. Please set: SC.INJ.nTurns = 2') - init_offset_errors = _get_bpm_offset_from_mag(SC, bpm_ords, mag_ords) - error_flags = np.full(bpm_ords.shape, np.nan) - kickVec0 = par.maxTrajChangeAtInjection.reshape(2, 1) * np.linspace(-1, 1, par.nSteps) - - for jBPM in range(bpm_ords.shape[1]): # jBPM: Index of BPM adjacent to magnet for BBA - for nDim in range(bpm_ords.shape[0]): - LOGGER.debug(f'BBA-BPM {jBPM}/{bpm_ords.shape[1]}, nDim = {nDim}') - SC0 = SC - BPMind = np.where(bpm_ords[nDim, jBPM] == SC.ORD.BPM)[0][0] - mOrd = mag_ords[nDim, jBPM] - if par.switchOffSext: - SC = set_magnet_setpoints(SC, mOrd, setpoints=np.zeros(1), skewness=False, order=2, method='abs') - SC = SCfeedbackRun(SC, par.RMstruct.MinvCO, BPMords=par.RMstruct.BPMords, CMords=par.RMstruct.CMords, - target=0, maxsteps=50, scaleDisp=par.RMstruct.scaleDisp, eps=1E-6) - if par.mode == 'ORB': - BPMpos, tmpTra = _data_measurement(SC, mOrd, BPMind, jBPM, nDim, par, - _get_orbit_bump(SC, mOrd, bpm_ords[nDim, jBPM], nDim, par)) - else: - kickVec, BPMrange = _scale_injection_to_reach_bpm(SC, BPMind, nDim, kickVec0) - if par.quadOrdPhaseAdvance and BPMrange < par.BBABPMtarget: - SC, kickVec = _scan_phase_advance(SC, BPMind, nDim, kickVec0, par) - BPMpos, tmpTra = _data_measurement(SC, mOrd, BPMind, jBPM, nDim, par, kickVec) - OffsetChange, error_flags[nDim, jBPM] = _data_evaluation(SC, bpm_ords, jBPM, BPMpos, tmpTra, nDim, mOrd, - par) - SC = SC0 - if not np.isnan(OffsetChange): - SC.RING[bpm_ords[nDim, jBPM]].Offset[nDim] = SC.RING[bpm_ords[nDim, jBPM]].Offset[nDim] + OffsetChange - if par.plotResults: - plot_bba_results(SC, init_offset_errors, error_flags, jBPM, bpm_ords, mag_ords) - if par.fakeMeasForFailures: - SC = _fake_measurement(SC, bpm_ords, mag_ords, error_flags) - return SC, error_flags - - -def _get_bpm_offset_from_mag(SC, BPMords, magOrds): - offset = np.full(BPMords.shape, np.nan) - for n_dim in range(2): - offset[n_dim, :] = (atgetfieldvalues(SC.RING, BPMords[n_dim, :], 'Offset', n_dim) - + atgetfieldvalues(SC.RING, BPMords[n_dim, :], 'SupportOffset', n_dim) - - atgetfieldvalues(SC.RING, magOrds[n_dim, :], 'MagnetOffset', n_dim) - - atgetfieldvalues(SC.RING, magOrds[n_dim, :], 'SupportOffset', n_dim)) +def _get_bpm_offset_from_mag(ring, bpm_ords, mag_ords): + offset = np.full(bpm_ords.shape, np.nan) + for n_dim in range(bpm_ords.shape[0]): + offset[n_dim, :] = (atgetfieldvalues(ring, bpm_ords[n_dim, :], 'Offset', n_dim) + + atgetfieldvalues(ring, bpm_ords[n_dim, :], 'SupportOffset', n_dim) + - atgetfieldvalues(ring, mag_ords[n_dim, :], 'MagnetOffset', n_dim) + - atgetfieldvalues(ring, mag_ords[n_dim, :], 'SupportOffset', n_dim)) return offset -def _fake_measurement(SC, BPMords, magOrds, errorFlags): - finOffsetErrors = _get_bpm_offset_from_mag(SC, BPMords, magOrds) - finOffsetErrors[errorFlags != 0] = np.nan - LOGGER.info(f"Final offset error is {1E6 * np.sqrt(np.nanmean(finOffsetErrors ** 2, axis=1))}" - f" um (hor|ver) with {np.sum(errorFlags != 0, axis=1)} measurement failures -> being re-calculated now.\n") - - for nBPM in range(BPMords.shape[1]): - for nDim in range(2): - if errorFlags[nDim, nBPM] != 0: - fakeBPMoffset = (SC.RING[magOrds[nDim, nBPM]].MagnetOffset[nDim] - + SC.RING[magOrds[nDim, nBPM]].SupportOffset[nDim] - - SC.RING[BPMords[nDim, nBPM]].SupportOffset[nDim] - + np.sqrt(np.nanmean(np.square(finOffsetErrors[nDim, :]))) * SCrandnc(2)) - if not np.isnan(fakeBPMoffset): - SC.RING[BPMords[nDim, nBPM]].Offset[nDim] = fakeBPMoffset - else: - LOGGER.info('BPM offset not reasigned, NaN.\n') - return SC +def _data_evaluation(SC, bpm_pos, trajectories, mag_vec, n_dim, m_ord, par): + x = np.mean(bpm_pos, axis=1) + x_mask = ~np.isnan(x) + err = np.mean(np.std(bpm_pos[x_mask, :], axis=1)) + x = x[x_mask] + new_tmp_tra = trajectories[x_mask, :, :] + + tmp_slope = np.full((new_tmp_tra.shape[0], new_tmp_tra.shape[2]), np.nan) + tmp_slope_err = np.full((new_tmp_tra.shape[0], new_tmp_tra.shape[2]), np.nan) + center = np.full((new_tmp_tra.shape[2]), np.nan) + center_err = np.full((new_tmp_tra.shape[2]), np.nan) + for i in range(new_tmp_tra.shape[0]): + for j in range(new_tmp_tra.shape[2]): + y = new_tmp_tra[i, :, j] + y_mask = ~np.isnan(y) + if np.sum(y_mask) < min(len(mag_vec), 3): + continue + # TODO once the position errors are calculated and propagated, should be used + p, pcov = np.polyfit(mag_vec[y_mask], y[y_mask], 1, w=np.ones(int(np.sum(y_mask))) / err, cov='unscaled') + tmp_slope[i, j], tmp_slope_err[i, j] = p[0], pcov[0, 0] + + for j in range(min(new_tmp_tra.shape[2], par.num_downstream_bpms)): + y = tmp_slope[:, j] + y_err = tmp_slope_err[:, j] + y_mask = ~np.isnan(y) + if np.sum(y_mask) <= par.fit_order + 1: + continue + # TODO here do odr as the x values have also measurement errors + p, pcov = np.polyfit(x[y_mask], y[y_mask], par.fit_order, w=1 / y_err[y_mask], cov='unscaled') + if np.abs(p[0]) < 2 * np.sqrt(pcov[0, 0]): + continue + center[j] = -p[1] / (par.fit_order * p[0]) # zero-crossing if linear, minimum is quadratic + center_err[j] = np.sqrt(center[j] ** 2 * (pcov[0,0]/p[0]**2 + pcov[1,1]/p[1]**2 - 2 * pcov[0, 1] / p[0] / p[1])) + mask = ~np.isnan(center) + offset_change = weighted_mean(center[mask], center_err[mask]) + offset_change_error = weighted_error(center[mask]-offset_change, center_err[mask]) / np.sqrt(effective_sample_size(center[mask], weights_from_errors(center_err[mask]))) + if not par.dipole_compensation and n_dim == 0 and SC.RING[m_ord].NomPolynomB[1] != 0: + offset_change += getattr(SC.RING[m_ord], 'BendingAngle', 0) / SC.RING[m_ord].NomPolynomB[1] / SC.RING[m_ord].Length + return offset_change, offset_change_error + + +def plot_bba_results(SC, bpm_ords, bba_offsets, bba_offset_errors): + to_um = 1E6 + errors = is_bba_errored(bba_offsets, bba_offset_errors) + plt.rcParams.update({'font.size': 10}) + f, ax = plt.subplots(nrows=2, num=90, figsize=(8, 8), facecolor="w") + plabels = ("Horizontal", "Vertical") + for n_dim in range(bpm_ords.shape[0]): + x = np.where(np.in1d(SC.ORD.BPM, bpm_ords[n_dim, :]))[0] + mask = errors[n_dim, :] + ax[n_dim].errorbar(x[~mask], to_um * bba_offsets[n_dim, ~mask], yerr=to_um * bba_offset_errors[n_dim, ~mask], fmt="bo", + label=plabels[n_dim]) + ax[n_dim].errorbar(x[mask], to_um * bba_offsets[n_dim, mask], yerr=to_um * bba_offset_errors[n_dim, mask], fmt="rX", + label=f"{plabels[n_dim]} failed") + ax[n_dim].set_ylabel(r' Offset $\Delta$ [$\mu$m]') + ax[n_dim].set_xlabel('Index of BPM') + ax[n_dim].set_xlim([0, len(SC.ORD.BPM)]) + ax[n_dim].legend() + f.show() + + +def plot_bpm_offsets_from_magnets(SC, bpm_ords, mag_ords, error_flags): + plt.rcParams.update({'font.size': 10}) + fom = _get_bpm_offset_from_mag(SC.RING, bpm_ords, mag_ords) + n_steps = 1 if bpm_ords.shape[1] == 1 else 1.1 * np.max(np.abs(fom)) * np.linspace(-1, 1, int(np.floor(bpm_ords.shape[1] / 3))) + f, ax = plt.subplots(nrows=3, num=91, figsize=(8, 11), facecolor="w") + colors = ['#1f77b4', '#ff7f0e'] + for n_dim in range(bpm_ords.shape[0]): + a, b = np.histogram(fom[n_dim, :], n_steps) + ax[0].plot(1E6 * b[1:], a, linewidth=2) + if bpm_ords.shape[0] > 1: + rmss = 1E6 * np.sqrt(np.nanmean(np.square(fom), axis=1)) + legends = [f"Horizontal rms: {rmss[0]:.0f}$\\mu m$", + f"Vertical rms: {rmss[1]:.0f}$\\mu m$"] + ax[0].legend(legends) + ax[0].set_xlabel(r'BPM offset w.r.t. magnet [$\mu$m]') + ax[0].set_ylabel('Occurrences') -def _data_measurement(SC, mOrd, BPMind, jBPM, nDim, par, varargin): - sPos = findspos(SC.RING) - measDim = 1 - nDim if par.skewQuadrupole else nDim - initialZ0 = SC.INJ.Z0.copy() - if par.mode == 'ORB': - CMords, CMvec = varargin - nMsteps = CMvec[nDim].shape[0] - tmpTra = np.nan((nMsteps, len(par.magSPvec[nDim, jBPM]), len(SC.ORD.BPM))) - else: - kickVec = varargin.copy() - nMsteps = kickVec.shape[1] - tmpTra = np.nan((nMsteps, len(par.magSPvec[nDim, jBPM]), par.maxNumOfDownstreamBPMs)) - - BPMpos = np.nan((nMsteps, len(par.magSPvec[nDim, jBPM]))) - if par.plotLines: - f, ax = plt.subplots(nrows=len(par.magSPvec[nDim, jBPM]), num=99) - for nQ, setpointQ in enumerate(par.magSPvec[nDim, jBPM]): - SC = set_magnet_setpoints(SC, mOrd, setpointQ, par.skewQuadrupole, par.magOrder, method=par.magSPflag, - dipole_compensation=par.dipCompensation) - for nKick in range(nMsteps): - if par.mode == 'ORB': - for nD in range(2): - SC = set_cm_setpoints(SC, CMords[nD], CMvec[nD][nKick, :], bool(nD), method='abs') - else: - SC.INJ.Z0[2 * nDim:2 * nDim + 2] = initialZ0[2 * nDim:2 * nDim + 2] + kickVec[:, nKick] - B = bpm_reading(SC) - if par.plotLines: - ax[nQ] = _plot_bba_step(SC, ax[nQ], BPMind, nDim) - BPMpos[nKick, nQ] = B[nDim, BPMind] - tmpTra[nKick, nQ, :] = B[measDim, :] if par.mode == 'ORB' else B[measDim, (BPMind + 1):( - BPMind + par.maxNumOfDownstreamBPMs)] - - if par.plotLines: - ax[nQ].rectangle([sPos[mOrd], -1, sPos[mOrd + 1] - sPos[mOrd], 1], facecolor=[0, 0.4470, 0.7410]) - ax[nQ].set_xlim(sPos[mOrd] + np.array([-10, 10])) - ax[nQ].set_ylim(1.3 * np.array([-1, 1])) - plt.show() - SC.INJ.Z0 = initialZ0 - return BPMpos, tmpTra - - -def _data_evaluation(SC, BPMords, jBPM, BPMpos, tmpTra, nDim, mOrd, par): - if par.plotLines: - plt.rcParams.update({'font.size': 18}) - fig, ax = plt.subplots(num=56, facecolor="w", projection="3d") - p1 = ax.plot(0, 1E6 * SC.RING[mOrd].T2[2 * nDim - 1], 0, 'rD', MarkerSize=40, MarkerFaceColor='b') - OffsetChange = np.nan - Error = 5 - tmpCenter = np.nan((1, (tmpTra.shape[1] - 1) * par.maxNumOfDownstreamBPMs)) - tmpNorm = np.nan((1, (tmpTra.shape[1] - 1) * par.maxNumOfDownstreamBPMs)) - tmpRangeX = np.zeros((1, (tmpTra.shape[1] - 1) * par.maxNumOfDownstreamBPMs)) - tmpRangeY = np.zeros((1, (tmpTra.shape[1] - 1) * par.maxNumOfDownstreamBPMs)) - i = 0 - for nBPM in range(par.maxNumOfDownstreamBPMs): - y0 = np.diff(tmpTra[:, :, nBPM], 1, 1) - x0 = np.tile(np.mean(BPMpos, 1), (y0.shape[1], 1)).T - for nKick in range(y0.shape[1]): - i = i + 1 - y = y0[:, nKick] - x = x0[:, nKick] - x = x[~np.isnan(y)] - y = y[~np.isnan(y)] - if len(x) == 0 or len(y) == 0: - continue - tmpRangeX[i] = abs(np.min(x) - np.max(x)) - tmpRangeY[i] = abs(np.min(y) - np.max(y)) - sol = np.nan((1, 2)) - if len(x) >= par.nXPointsNeededAtMeasBPM and tmpRangeX[i] > par.minBPMrangeAtBBABBPM and tmpRangeY[ - i] > par.minBPMrangeOtherBPM: - if par.fitOrder == 1: - sol = np.linalg.lstsq(np.vstack((np.ones(x.shape), x)).T, y)[0] - sol = sol[[1, 0]] - if abs(sol[0]) < par.minSlopeForFit: - sol[0] = np.nan - tmpCenter[i] = -sol[1] / sol[0] - tmpNorm[i] = 1 / np.sqrt(np.sum((sol[0] * x + sol[1] - y) ** 2)) - else: - sol = np.polyfit(x, y, par.fitOrder) - if par.fitOrder == 2: - tmpCenter[i] = - (sol[1] / (2 * sol[0])) - else: - tmpCenter[i] = min(abs(np.roots(sol))) - tmpNorm[i] = 1 / np.linalg.norm(np.polyval(sol, x) - y) - if par.plotLines: - p2 = ax.plot(np.tile(jBPM + nBPM, x.shape), 1E6 * x, 1E3 * y, 'ko') - tmp = ax.plot(np.tile(jBPM + nBPM, x.shape), 1E6 * x, 1E3 * np.polyval(sol, x), 'k-') - p3 = tmp[0] - p4 = plt.plot(jBPM + nBPM, 1E6 * tmpCenter[nBPM], 0, 'Or', MarkerSize=10) - if np.max(tmpRangeX) < par.minBPMrangeAtBBABBPM: - Error = 1 - elif np.max(tmpRangeY) < par.minBPMrangeOtherBPM: - Error = 2 - elif np.std(tmpCenter, ddof=1) > par.maxStdForFittedCenters: - Error = 3 - elif len(np.where(~np.isnan(tmpCenter))[0]) == 0: - Error = 4 - else: - OffsetChange = np.sum(tmpCenter * tmpNorm) / np.sum(tmpNorm) - Error = 0 - if not par.dipCompensation and nDim == 1 and SC.RING[mOrd].NomPolynomB[1] != 0: - if 'BendingAngle' in SC.RING[mOrd].keys(): - B = SC.RING[mOrd].BendingAngle - else: - B = 0 - K = SC.RING[mOrd].NomPolynomB[1] - L = SC.RING[mOrd].Length - OffsetChange = OffsetChange + B / L / K - if OffsetChange > par.outlierRejectionAt: - OffsetChange = np.nan - Error = 6 - if par.plotLines: - p5 = plt.plot(0, 1E6 * OffsetChange, 0, 'kD', MarkerSize=30, MarkerFaceColor='r') - p6 = plt.plot(0, 1E6 * (SC.RING[BPMords[nDim, jBPM]].Offset[nDim] + SC.RING[BPMords[nDim, jBPM]].SupportOffset[ - nDim] + OffsetChange), 0, 'kD', MarkerSize=30, MarkerFaceColor='g') - ax.title( - f'BBA-BPM: {jBPM:d} \n mOrd: {mOrd:d} \n mFam: {SC.RING[mOrd].FamName} \n nDim: {nDim:d} \n FinOffset = {1E6 * np.abs(SC.RING[BPMords[nDim, jBPM]].Offset[nDim] + SC.RING[BPMords[nDim, jBPM]].SupportOffset[nDim] + OffsetChange - SC.RING[mOrd].MagnetOffset[nDim] - SC.RING[mOrd].SupportOffset[nDim]):3.0f} $\\mu m$') - ax.legend((p1, p2, p3, p4, p5, p6), ( - 'Magnet center', 'Measured offset change', 'Line fit', 'Fitted BPM offset (individual)', - 'Fitted BPM offset (mean)', 'Predicted magnet center')) - ax.set_xlabel('Index of BPM') - ax.set_ylabel('BBA-BPM offset [$\mu$m]') - ax.set_zlabel('Offset change [mm]') - plt.show() - return OffsetChange, Error - - -def _scale_injection_to_reach_bpm(SC, BPMind, nDim, kickVec0): - initialZ0 = SC.INJ.Z0.copy() - for scaling_factor in (1, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1): - tmp_bpm_pos = np.full(kickVec0.shape[1], np.nan) - for nK in range(kickVec0.shape[1]): - SC.INJ.Z0[2 * nDim:2 * nDim + 2] = initialZ0[2 * nDim:2 * nDim + 2] + scaling_factor * kickVec0[:, nK] - tmp_bpm_pos[nK] = bpm_reading(SC, SC.ORD.BPM[BPMind])[nDim, 0] - SC.INJ.Z0 = initialZ0.copy() - - if np.sum(np.isnan(tmp_bpm_pos)) == 0: - BPMrange = np.max(tmp_bpm_pos) - np.min(tmp_bpm_pos) - kickVec = scaling_factor * kickVec0 - LOGGER.debug(f'Initial trajectory variation scaled to [{100 * (kickVec[0] / kickVec0[0]):.0f}| ' - f'{100 * (kickVec[-1] / kickVec0[-1]):.0f}]% of its initial value, ' - f'BBA-BPM range {1E6 * BPMrange:.0f} um.') - return kickVec, BPMrange + plabels = ("Horizontal", "Vertical") + for n_dim in range(bpm_ords.shape[0]): + x = np.where(np.in1d(SC.ORD.BPM, bpm_ords[n_dim, :]))[0] + mask = ~error_flags[n_dim, :] + if not np.all(np.isnan(fom[n_dim, mask])): + ax[n_dim + 1].plot(x[mask], 1E6 * fom[n_dim, mask], 'bo', label=plabels[n_dim]) + if not np.all(np.isnan(fom[n_dim, ~mask])): + ax[n_dim + 1].plot(x[~mask], 1E6 * fom[n_dim, ~mask], 'rX', label=f"{plabels[n_dim]} failed") + + ax[n_dim + 1].set_ylabel(r'Offset [$\mu$m]') + ax[n_dim + 1].set_xlabel('Index of BPM') + ax[n_dim + 1].set_xlim((0, len(SC.ORD.BPM))) + ax[n_dim + 1].legend() + f.tight_layout() + f.show() + + + +# trajectory BBA helper functions + + +def _data_measurement_tbt(SC, m_ord, bpm_ind, j_bpm, n_dim, scaling, par): + kick_vec = scaling * par.max_injection_pos_angle.reshape(2, 1) * np.linspace(-1, 1, par.n_steps) + meas_dim = 1 - n_dim if par.skewness else n_dim + initial_z0 = SC.INJ.Z0.copy() + trajectories = np.full((par.n_steps, len(par.magnet_strengths[n_dim, j_bpm]), par.num_downstream_bpms), np.nan) + bpm_pos = np.full((par.n_steps, len(par.magnet_strengths[n_dim, j_bpm])), np.nan) + init_setpoint = getattr(SC.RING[m_ord], f"SetPoint{NUM_TO_AB[int(par.skewness)]}")[par.magnet_order] + for n_q, setpoint_q in enumerate(par.magnet_strengths[n_dim, j_bpm]): + SC.set_magnet_setpoints(m_ord, setpoint_q, par.skewness, par.magnet_order, + method=par.setpoint_method, dipole_compensation=par.dipole_compensation) + for step in range(par.n_steps): + SC.INJ.Z0[2 * n_dim:2 * n_dim + 2] = initial_z0[2 * n_dim:2 * n_dim + 2] + kick_vec[:, step] + bpm_readings = bpm_reading(SC)[0] + bpm_pos[step, n_q] = bpm_readings[n_dim, bpm_ind] + trajectories[step, n_q, :] = bpm_readings[meas_dim, bpm_ind:(bpm_ind + par.num_downstream_bpms)] + + SC.INJ.Z0 = initial_z0 + SC.set_magnet_setpoints(m_ord, init_setpoint, par.skewness, par.magnet_order, + method=SETTING_ABS, dipole_compensation=par.dipole_compensation) + return bpm_pos, trajectories + + +def _phase_advance_injection_scan(SC, n_dim, last_bpm_ind, par): + q0 = atgetfieldvalues(SC.RING, par.q_ord_phase, "SetPointB", index=1) + n_setpoints = len(par.q_ord_setpoints) + bpm_ranges = np.zeros((n_setpoints, len(SC.ORD.BPM))) + scalings = np.zeros(n_setpoints) + for i, q_scale in enumerate(par.q_ord_setpoints): + SC.set_magnet_setpoints(par.q_ord_phase, q_scale, False, 1, method=SETTING_REL, dipole_compensation=True) + scalings[i], bpm_ranges[i] = _scale_injection_to_reach_bpms(SC, n_dim, last_bpm_ind, par.max_injection_pos_angle) + if len(q0): + SC.set_magnet_setpoints(par.q_ord_phase, q0, False, 1, method=SETTING_ABS, dipole_compensation=True) + return par.q_ord_setpoints, scalings, bpm_ranges + + +def _scale_injection_to_reach_bpms(SC, n_dim, last_bpm_ind, max_injection_pos_angle): + initial_z0, initial_nturns = SC.INJ.Z0.copy(), SC.INJ.nTurns + SC.INJ.nTurns = 1 + scaling_factor = 1.0 + mask = np.ones(len(SC.ORD.BPM), dtype=bool) + if last_bpm_ind + 1 < len(SC.ORD.BPM): + mask[last_bpm_ind + 1:] = False + + # trying the largest kicks with both signs, if fails, scale down and try again + for _ in range(20): + LOGGER.info(f"{scaling_factor=}") + tmp_bpm_pos = np.full((2, len(SC.ORD.BPM)), np.nan) + for sign_ind in range(2): + SC.INJ.Z0[2 * n_dim:2 * n_dim + 2] = initial_z0[2 * n_dim:2 * n_dim + 2] + (-1) ** sign_ind * scaling_factor * max_injection_pos_angle + tmp_bpm_pos[sign_ind, :] = bpm_reading(SC)[0][n_dim, :] + SC.INJ.Z0 = initial_z0.copy() + + if np.sum(np.isnan(tmp_bpm_pos[:, mask])) == 0: + bpm_ranges = np.max(tmp_bpm_pos, axis=0) - np.min(tmp_bpm_pos, axis=0) + LOGGER.debug(f'Initial trajectory variation scaled to [{100 * scaling_factor}| ' + f'{100 * scaling_factor}]% of its initial value, ' + f'BBA-BPM range from {1E6 * np.min(bpm_ranges):.0f} um to {1E6 * np.max(bpm_ranges):.0f}.') + SC.INJ.nTurns = initial_nturns + return scaling_factor, bpm_ranges + scaling_factor *= 0.8 else: LOGGER.warning('Something went wrong. No beam transmission at all(?)') - return kickVec0, 0 - - -def _scan_phase_advance(SC, BPMind, nDim, kickVec0, par): - mOrd = par.quadOrdPhaseAdvance - qVec = par.quadStrengthPhaseAdvance - q0 = SC.RING[mOrd].SetPointB[1] - allBPMRange = np.zeros(len(qVec)) - for nQ in range(len(qVec)): - LOGGER.debug(f'BBA-BPM range to small, try to change phase advance with quad ord {par.quadOrdPhaseAdvance} ' - f'to {qVec[nQ]:.2f} of nom. SP.') - SC = set_magnet_setpoints(SC, mOrd, qVec[nQ], False, 1, method='rel', dipole_compensation=True) - kickVec, BPMrange = _scale_injection_to_reach_bpm(SC, BPMind, nDim, kickVec0) - - if BPMrange >= par.BBABPMtarget: - LOGGER.debug( - f'Change phase advance with quad ord {mOrd} successful. BBA-BPM range = {1E6 * BPMrange:.0f}um.') - return SC, kickVec - allBPMRange[nQ] = BPMrange - - if BPMrange < np.max(allBPMRange): - LOGGER.debug(f'Changing phase advance of quad with ord {mOrd} NOT succesfull, ' - f'returning to best value with BBA-BPM range = {1E6 * max(allBPMRange):.0f}um.') - return set_magnet_setpoints(SC, mOrd, np.max(qVec), False, 1, method='rel', dipole_compensation=True), kickVec - LOGGER.debug(f'Changing phase advance of quad with ord {mOrd} NOT succesfull, returning to initial setpoint.') - return set_magnet_setpoints(SC, mOrd, q0, False, 1, method='abs', dipole_compensation=True), kickVec - - -def _get_orbit_bump(SC, mOrd, BPMord, nDim, par): - tmpCMind = np.where(par.RMstruct.CMords[0] == mOrd)[0] + SC.INJ.nTurns = initial_nturns + return scaling_factor, np.zeros(len(SC.ORD.BPM)) + +# orbit BBA helper functions + + +def _data_measurement_orb(SC, m_ord, bpm_ind, j_bpm, n_dim, par, cm_ords, cm_vec): + meas_dim = 1 - n_dim if par.skewness else n_dim + initial_z0 = SC.INJ.Z0.copy() + n_msteps = cm_vec[n_dim].shape[0] + orbits = np.full((n_msteps, len(par.magnet_strengths[n_dim, j_bpm]), len(SC.ORD.BPM)), np.nan) + bpm_pos = np.full((n_msteps, len(par.magnet_strengths[n_dim, j_bpm])), np.nan) + for n_q, setpoint_q in enumerate(par.magnet_strengths[n_dim, j_bpm]): + SC.set_magnet_setpoints(m_ord, setpoint_q, par.skewness, par.magnet_order, method=par.setpoint_method, + dipole_compensation=par.dipole_compensation) + for step in range(n_msteps): + for n_d in range(2): + SC.set_cm_setpoints(cm_ords[n_d], cm_vec[n_d][step, :], bool(n_d), method=SETTING_ABS) + bpm_readings = bpm_reading(SC)[0] + bpm_pos[step, n_q] = bpm_readings[n_dim, bpm_ind] + orbits[step, n_q, :] = bpm_readings[meas_dim, :] + + SC.INJ.Z0 = initial_z0 + return bpm_pos, orbits + + +def _get_orbit_bump(SC, cm_ord, bpm_ord, n_dim, par): # TODO + tmpCMind = np.where(par.RMstruct.CMords[0] == cm_ord)[0] if len(tmpCMind): par.RMstruct.RM = np.delete(par.RMstruct.RM, tmpCMind, 1) # TODO not nice par.RMstruct.CMords[0] = np.delete(par.RMstruct.CMords[0], tmpCMind) - tmpBPMind = np.where(BPMord == par.RMstruct.BPMords)[0] + tmpBPMind = np.where(bpm_ord == par.RMstruct.BPMords)[0] - R0 = bpm_reading(SC) if par.useBPMreadingsForOrbBumpRef else np.zeros((2, len(par.RMstruct.BPMords))) - R0[nDim, tmpBPMind] += par.BBABPMtarget - CMords = par.RMstruct.CMords + R0 = bpm_reading(SC) if par.use_bpm_reading_for_orbit_bump_ref else np.zeros((2, len(par.RMstruct.BPMords))) + R0[n_dim, tmpBPMind] += par.BBABPMtarget + cm_ords = par.RMstruct.CMords W0 = np.ones((2, len(par.RMstruct.BPMords))) # TODO weight for SCFedbackRun - W0[nDim, max(1, tmpBPMind - par.orbBumpWindow):(tmpBPMind - 1)] = 0 - W0[nDim, (tmpBPMind + 1):min(len(par.RMstruct.BPMords), tmpBPMind + par.orbBumpWindow)] = 0 + W0[n_dim, max(1, tmpBPMind - par.orbBumpWindow):(tmpBPMind - 1)] = 0 + W0[n_dim, (tmpBPMind + 1):min(len(par.RMstruct.BPMords), tmpBPMind + par.orbBumpWindow)] = 0 - CUR = SCfeedbackRun(SC, par.RMstruct.MinvCO, reference=R0, CMords=CMords, BPMords=par.RMstruct.BPMords, eps=1E-6, + CUR = SCfeedbackRun(SC, par.RMstruct.MinvCO, reference=R0, CMords=cm_ords, BPMords=par.RMstruct.BPMords, eps=1E-6, target=0, maxsteps=50, scaleDisp=par.RMstruct.scaleDisp, ) - CMvec = [] - factor = np.linspace(-1, 1, par.nSteps) - for nDim in range(2): - vec0 = get_cm_setpoints(SC, CMords[nDim], skewness=bool(nDim)) - vec1 = get_cm_setpoints(CUR, CMords[nDim], skewness=bool(nDim)) - CMvec.append(vec0 + np.outer(factor, vec0 - vec1)) + cm_vec = [] + factor = np.linspace(-1, 1, par.n_steps) + for n_dim in range(2): + vec0 = SC.get_cm_setpoints(cm_ords[n_dim], skewness=bool(n_dim)) + vec1 = CUR.get_cm_setpoints(cm_ords[n_dim], skewness=bool(n_dim)) + cm_vec.append(vec0 + np.outer(factor, vec0 - vec1)) - return CMords, CMvec + return cm_ords, cm_vec -def _plot_bba_step(SC, ax, BPMind, nDim): +def _plot_bba_step(SC, ax, bpm_ind, n_dim): s_pos = findspos(SC.RING) - B, T = all_elements_reading(SC) - ax.plot(s_pos[SC.ORD.BPM], 1E3 * B[nDim, :], marker='o') - ax.plot(s_pos[SC.ORD.BPM[BPMind]], 1E3 * B[nDim, BPMind], marker='o', markersize=10, markerfacecolor='k') - ax.plot(s_pos, 1E3 * T[nDim, 0, :, 0], linestyle='-') # TODO 5D + bpm_readings, all_elements_positions = all_elements_reading(SC) + ax.plot(s_pos[SC.ORD.BPM], 1E3 * bpm_readings[n_dim, :len(SC.ORD.BPM)], marker='o') + ax.plot(s_pos[SC.ORD.BPM[bpm_ind]], 1E3 * bpm_readings[n_dim, bpm_ind], marker='o', markersize=10, markerfacecolor='k') + ax.plot(s_pos, 1E3 * all_elements_positions[n_dim, 0, :, 0, 0], linestyle='-') return ax - - -def plot_bba_results(SC, initOffsetErrors, errorFlags, jBPM, BPMords, magOrds): - plt.rcParams.update({'font.size': 18}) - fom0 = initOffsetErrors - fom = _get_bpm_offset_from_mag(SC, BPMords, magOrds) - fom[:, jBPM + 1:] = np.nan - if BPMords.shape[1] == 1: - - nSteps = 1 - else: - nSteps = 1.1 * np.max(np.abs(fom0)) * np.linspace(-1, 1, np.floor(BPMords.shape[1] / 3)) - f, ax = plt.subplots(nrows=3, num=90, facecolor="w") - colors = ['#1f77b4', '#ff7f0e'] - for nDim in range(BPMords.shape[0]): - a, b = np.histogram(fom[nDim, :], nSteps) - ax[0].plot(1E6 * b, a, linewidth=2) - a, b = np.histogram(fom0, nSteps) - ax[0].plot(1E6 * b, a, 'k-', linewidth=2) - if BPMords.shape[0] > 1: - rmss = 1E6 * np.sqrt(np.nanmean(np.square(fom), axis=1)) - init_rms = 1E6 * np.sqrt(np.nanmean(np.square(fom))) - legends = [f"Horizontal rms: {rmss[0]:.0f}$\\mu m$", - f"Vertical rms: {rmss[1]:.0f}$\\mu m$", - f"Initial rms: {init_rms:.0f}$\\mu m$"] - ax[0].legend(legends) - ax[0].set_xlabel(r'Final BPM offset w.r.t. magnet [$\mu$m]') - ax[0].set_ylabel('Number of counts') - ax[0].set(box="on") - - mask_errors = errorFlags == 0 - plabels = ("Horizontal", "Vertical") - for nDim in range(BPMords.shape[0]): - x = np.where(np.in1d(SC.ORD.BPM, BPMords[nDim, :])) - mask = mask_errors[nDim, :] - if not np.all(np.isnan(fom[nDim, mask])): - ax[1].plot(x[mask], 1E6 * np.abs(fom[nDim, mask]), label=plabels[nDim], marker='O', linewidth=2, color=colors[nDim]) - if not np.all(np.isnan(fom[nDim, ~mask])): - ax[1].plot(x[~mask], 1E6 * np.abs(fom[nDim, ~mask]), label=f"{plabels[nDim]} failed", marker='X', linewidth=2, color=colors[nDim]) - ax[2].plot(x, 1E6 * (fom0[nDim, :] - fom[nDim, :]), label=plabels[nDim], marker='d', linewidth=2) - - ax[1].set_ylabel(r'Final offset [$\mu$m]') - ax[1].set_xlabel('Index of BPM') - ax[1].set(xlim=(1, len(SC.ORD.BPM)), box='on') - ax[1].legend() - - ax[2].set_ylabel(r'Offsets change [$\mu$m]') - ax[2].set_xlabel('Index of BPM') - ax[2].set(xlim=(1, len(SC.ORD.BPM)), box='on') - ax[2].legend() - - plt.show() diff --git a/pySC/example.py b/pySC/example.py index 4fa4f46..3dc820d 100644 --- a/pySC/example.py +++ b/pySC/example.py @@ -1,10 +1,11 @@ import at import numpy as np +from pySC.correction.bba import trajectory_bba, fake_bba + from at import Lattice from pySC.utils.at_wrapper import atloco from pySC.core.simulated_commissioning import SimulatedCommissioning -from pySC.correction.orbit_trajectory import SCfeedbackFirstTurn, SCfeedbackStitch, SCfeedbackRun, SCfeedbackBalance, \ - SCpseudoBBA +from pySC.correction.orbit_trajectory import SCfeedbackFirstTurn, SCfeedbackStitch, SCfeedbackRun, SCfeedbackBalance from pySC.core.beam import bpm_reading, beam_transmission from pySC.correction.tune import tune_scan from pySC.lattice_properties.response_model import SCgetModelRM, SCgetModelDispersion @@ -126,6 +127,16 @@ def _marker(name): # SC = SCfeedbackRun(SC, Minv2, target=300E-6, maxsteps=30, eps=eps) SC = SCfeedbackBalance(SC, Minv2, maxsteps=32, eps=eps) + # plot_cm_strengths(SC) + # Performing trajectory BBA + SC.INJ.nParticles = 1 + quadOrds = np.tile(SCgetOrds(SC.RING, 'QF|QD'), (2, 1)) + BPMords = np.tile(SC.ORD.BPM, (2, 1)) + SC, bba_offsets, bba_offset_errors = trajectory_bba(SC, BPMords, quadOrds, q_ord_phase=SCgetOrds(SC.RING, 'QF|QD')[0], + q_ord_setpoints=np.array([0.8, 0.9, 1.0, 1.1, 1.2]), + magnet_strengths=np.array([0.8, 0.9, 1.0, 1.1, 1.2]), + dipole_compensation=True, plot_results=True) + # Turning on the sextupoles for rel_setting in np.linspace(0.1, 1, 5): SC.set_magnet_setpoints(sextOrds, rel_setting, False, 2, method='rel') @@ -140,22 +151,21 @@ def _marker(name): plot_phase_space(SC, nParticles=10, nTurns=100) # RF cavity correction - SC.INJ.nTurns = 5 for nIter in range(2): + SC.INJ.nTurns = 5 SC = correct_rf_phase(SC, n_steps=25, plot_results=False, plot_progress=False) - - SC = correct_rf_frequency(SC, f_range=40E3 * np.array([-1, 1]), # Frequency range [kHz] - n_steps=15, # Number of frequency steps - plot_results=False, plot_progress=False) + SC.INJ.nTurns = 15 + SC = correct_rf_frequency(SC, n_steps=15, f_range=4E3 * np.array([-1, 1]), plot_results=False, + plot_progress=False) # Plot phasespace after RF correction plot_phase_space(SC, nParticles=10, nTurns=100) [maxTurns, lostCount] = beam_transmission(SC, nParticles=100, nTurns=10) - # Performing pseudo-BBA - quadOrds = np.tile(SCgetOrds(SC.RING, 'QF|QD'), (2,1)) - BPMords = np.tile(SC.ORD.BPM, (2,1)) - SC = SCpseudoBBA(SC, BPMords, quadOrds, np.array([50E-6])) + # Faking-BBA + quadOrds = np.tile(SCgetOrds(SC.RING, 'QF|QD'), (2, 1)) + BPMords = np.tile(SC.ORD.BPM, (2, 1)) + SC = fake_bba(SC, BPMords, quadOrds, fake_offset=np.array([50E-6, 50E-6])) # Orbit correction SC.INJ.trackMode = 'ORB' diff --git a/pySC/matlab_index.py b/pySC/matlab_index.py index b7a04f4..10562aa 100644 --- a/pySC/matlab_index.py +++ b/pySC/matlab_index.py @@ -19,10 +19,10 @@ from pySC.core.classes import DotDict from pySC.core.lattice_setting import switch_cavity_and_radiation -from pySC.correction.bba import SCBBA as bba +from pySC.correction.bba import trajectory_bba, orbit_bba, fake_bba from pySC.correction.injection_fit import fit_injection_trajectory, fit_injection_drift from pySC.correction.orbit_trajectory import SCfeedbackFirstTurn as first_turn, SCfeedbackStitch as stitch, \ - SCfeedbackRun as frun, SCfeedbackBalance as fbalance, SCpseudoBBA as pseudo_bba + SCfeedbackRun as frun, SCfeedbackBalance as fbalance from pySC.correction.ramp_errors import SCrampUpErrors as ramp_up_errors from pySC.correction.rf import correct_rf_phase, correct_rf_frequency from pySC.correction.tune import tune_scan @@ -51,7 +51,12 @@ def SCapplyErrors(SC: SimulatedCommissioning, nsigmas: float = 2) -> SimulatedCo def SCBBA(SC, BPMords, magOrds, **kwargs): - return bba(SC, BPMords, magOrds, **kwargs) + if kwargs["mode"] == "TBT": + LOGGER.warning("Performing trajectory BBA, please check the actual function to resolve parameter changes.") + return trajectory_bba(SC, BPMords, magOrds, **kwargs) + if kwargs["mode"] == "ORB": + LOGGER.warning("Performing orbit BBA, please check the actual function to resolve parameter changes.") + return orbit_bba(SC, BPMords, magOrds, **kwargs) def SCcronoff(ring: Lattice, *args: str) -> Lattice: @@ -224,7 +229,7 @@ def SCplotSupport(SC: SimulatedCommissioning, /, *, fontSize: int = 8, xLim: Tup def SCpseudoBBA(SC, BPMords, MagOrds, postBBAoffset, /, *, sigma=2): - return pseudo_bba(SC, BPMords, MagOrds, postBBAoffset, sigma=sigma) + return fake_bba(SC, BPMords, MagOrds, fake_offset=postBBAoffset) def SCrampUpErrors(SC, /, *, nStepsRamp=10, eps=1e-5, target=0, alpha=10, maxsteps=30, verbose=0): diff --git a/tests/test_example.py b/tests/test_example.py index 1f18fdd..ef42833 100644 --- a/tests/test_example.py +++ b/tests/test_example.py @@ -2,10 +2,10 @@ from tests.test_at_wrapper import at_lattice import numpy as np from pySC.core.simulated_commissioning import SimulatedCommissioning -from pySC.correction.orbit_trajectory import SCfeedbackFirstTurn, SCfeedbackStitch, SCfeedbackRun, SCfeedbackBalance, \ - SCpseudoBBA +from pySC.correction.orbit_trajectory import SCfeedbackFirstTurn, SCfeedbackStitch, SCfeedbackRun, SCfeedbackBalance from pySC.core.beam import bpm_reading, beam_transmission from pySC.correction.tune import tune_scan +from pySC.correction.bba import trajectory_bba, fake_bba, _get_bpm_offset_from_mag from pySC.lattice_properties.response_model import SCgetModelRM, SCgetModelDispersion from pySC.utils.sc_tools import SCgetOrds, SCgetPinv from pySC.core.lattice_setting import switch_cavity_and_radiation @@ -13,7 +13,7 @@ def test_example(at_lattice): - np.random.seed(1234567) + np.random.seed(12345678) sc = SimulatedCommissioning(at_lattice) sc.register_bpms(SCgetOrds(sc.RING, 'BPM'), CalError=5E-2 * np.ones(2), @@ -82,6 +82,20 @@ def test_example(at_lattice): sc = SCfeedbackStitch(sc, minv2, nBPMs=3, maxsteps=20) sc = SCfeedbackBalance(sc, minv2, maxsteps=32, eps=eps) + # Performing BBA + sc.INJ.nParticles = 1 + quadOrds = np.tile(SCgetOrds(sc.RING, 'QF|QD'), (2, 1)) + BPMords = np.tile(sc.ORD.BPM, (2, 1)) + init_offsets = _get_bpm_offset_from_mag(sc.RING, BPMords, quadOrds) + sc, bba_offsets, bba_offset_errors = trajectory_bba(sc, np.tile(sc.ORD.BPM, (2, 1)), np.tile(SCgetOrds(sc.RING, 'QF|QD'), (2, 1)), + q_ord_phase=SCgetOrds(sc.RING, 'QF|QD')[0], + q_ord_setpoints=np.array([0.8, 0.9, 1.0, 1.1, 1.2]), + plot_results=True, + magnet_strengths=np.array([0.8, 0.9, 1.0, 1.1, 1.2]), + dipole_compensation=False) + final_offsets = _get_bpm_offset_from_mag(sc.RING, BPMords, quadOrds) + assert np.std(init_offsets) > 2 * np.std(final_offsets) + # Turning on the sextupoles for rel_setting in np.linspace(0.1, 1, 5): sc.set_magnet_setpoints(sext_ords, rel_setting, False, 2, method='rel') @@ -92,9 +106,11 @@ def test_example(at_lattice): # RF cavity correction sc.INJ.nTurns = 5 sc = correct_rf_phase(sc, n_steps=15) + sc.INJ.nTurns = 15 sc = correct_rf_frequency(sc, n_steps=15, f_range=4E3 * np.array([-1, 1])) - sc = SCpseudoBBA(sc, np.tile(sc.ORD.BPM, (2, 1)), np.tile(SCgetOrds(sc.RING, 'QF|QD'), (2, 1)), np.array([50E-6])) + sc = fake_bba(sc, np.tile(sc.ORD.BPM, (2, 1)), np.tile(SCgetOrds(sc.RING, 'QF|QD'), (2, 1)), + fake_offset=np.array([50E-6, 50E-6])) # Orbit correction sc.INJ.trackMode = 'ORB' @@ -116,7 +132,7 @@ def test_example(at_lattice): max_turns, fraction_survived = beam_transmission(sc, nParticles=100, nTurns=200, plot=True) sc, _, _, _ = tune_scan(sc, np.vstack((SCgetOrds(sc.RING, 'QF'), SCgetOrds(sc.RING, 'QD'))), np.outer(np.ones(2), 1 + np.linspace(-0.01, 0.01, 51)), do_plot=False, nParticles=50, - nTurns=100, target=0.85) + nTurns=100, target=0.95) max_turns, fraction_survived = beam_transmission(sc, nParticles=100, nTurns=200, plot=True) assert max_turns == 200 - assert fraction_survived[-1] == 1 + assert fraction_survived[-1] >= 0.95 diff --git a/tests/test_lattice_setting.py b/tests/test_lattice_setting.py index a9cd02f..cfafe5a 100644 --- a/tests/test_lattice_setting.py +++ b/tests/test_lattice_setting.py @@ -59,6 +59,14 @@ def test_check_input_and_setpoints(): ords1d, setps1d = check_input_and_setpoints(SETTING_ABS, os, ss) assert_equal(ords1d, wanted_ords) assert_equal(setps1d, wanted_setpoints) + # check also empty ord + wanted_ord, wanted_setpoint = np.array([], dtype=int), np.array([]) + ord1d, setp1d = check_input_and_setpoints(SETTING_REL, [], 1.0) + assert_equal(ord1d, wanted_ord) + assert_equal(setp1d, wanted_setpoint) + +def test_set_magnet_setpoints_empty(unit_sc): + unit_sc.set_magnet_setpoints([], 1.0, False, 1, method=SETTING_REL) def test_set_magnet_setpoints(unit_sc):