Module pylars.processing.peaks

Expand source code
from typing import List, Tuple

import numpy as np
from .waveforms import waveform_processing


class peak_processing():
    """All the things peaks. Peaks are sums of pulses found in waveforms.

    This class, by definition, is a collection of class methods related
    to peak processing to be used in `peakprocessor`, where a processor
    object is then constructed.
    """

    __version__ = '0.0.1'

    available_posrec_algos = ['CoG']

    @classmethod
    def apply_ADCcounts_to_e(cls, waveforms_subtracted: np.ndarray,
                             ADC_config: dict) -> np.ndarray:
        """Convert ADC counts/sample to charge.

        Applies the charge converting factor to waveforms to turn ADC counts
        (which are integrated over 1 sample) to charge. `waveforms_subtracted`
        can be one or more channels.

        Args:
            waveforms_subtracted (np.ndarray): value of ADC counts per sample
                above calculate local baseline
            ADC_config (dict): dictionary with the ADC config

        Raises:
            ValueError: If the parsed ADC_config dictionary does not have the
                required keys.

        Returns:
            np.ndarray: waveforms in charge.
        """

        try:
            ADC_range = ADC_config['ADC_range']
            ADC_impedance = ADC_config['ADC_impedance']
            F_amp = ADC_config['F_amp']
            ADC_res = ADC_config['ADC_res']
            q_e = ADC_config['q_e']
            dt = ADC_config['dt']
        except BaseException:
            raise ValueError('The ADC_config dictionary is probably missing ' +
                             'something.')

        to_e_constant = (ADC_range * dt / ADC_impedance / F_amp /
                         ADC_res / q_e)

        waveforms_charge = waveforms_subtracted * to_e_constant

        return waveforms_charge

    @classmethod
    def apply_e_to_pe(cls, waveforms_charge: np.ndarray,
                      gains: np.ndarray) -> np.ndarray:
        """Transform waveforms from charge to pe with gain per channel.

        Takes waveforms already converted to charge from ADC counts and an
        array with the gains for each channel in units of [e/pe]. The ammount
        of rows in the waveform array needs to be the same as the length of
        the gains array.

        The gains array is assumed to be on the correct order in respect to
        the order of channels in waveforms_charge.

        Args:
            waveforms_charge (np.ndarray): waveform array in charge units

        Returns:
            np.ndarray: waveform array in pe/sample
        """

        assert len(gains) == np.shape(waveforms_charge)[0], ('''Size of
        gains and channels in waveforms array do not match.''')

        waveforms_pe = (waveforms_charge.T / gains).T

        return waveforms_pe

    @classmethod
    def apply_baseline_subtract(cls, waveforms: np.ndarray,
                                baselines: np.ndarray) -> np.ndarray:
        """Apply baseline subtracting and flipping from negative to positive
        pulses.

        Args:
             waveforms (np.ndarray): waveforms, all channels stacked by rows.
            baselines (np.ndarray): computed baselines, all channels stacked
                by rows.

        Returns:
            np.ndarray: waveforms flipped and where 0 is local baseline.
        """

        assert len(baselines) == np.shape(waveforms)[0], ('''Size of
        baseines and channels in waveforms array do not match.''')

        baselines_expanded = baselines[:, :, np.newaxis]
        waveforms_subtracted = baselines_expanded - waveforms

        return waveforms_subtracted

    @classmethod
    def apply_waveforms_transform(cls, waveforms: np.ndarray,
                                  baselines: np.ndarray,
                                  gains: np.ndarray,
                                  ADC_config: dict) -> np.ndarray:
        """Converts waveforms from ADC counts/sample to pe/s.

        Takes the initials waveforms stacked for all channels and returns
        the waveforms in converted pe/s space.

        Args:
            waveforms (np.ndarray): waveforms, all channels stacked by rows.
            baselines (np.ndarray): computed baselines, all channels stacked
                by rows.
            gains (np.ndarray): gains, all channels stacked by rows.
            ADC_config (dict): dictionary with the specific digitizer configs.

        Returns:
            np.ndarray: waveforms with applied gain and e2pe transformation.
        """

        waveforms_subtracted = cls.apply_baseline_subtract(
            waveforms, baselines)
        waveforms_charge = cls.apply_ADCcounts_to_e(
            waveforms_subtracted, ADC_config)
        waveforms_pe = cls.apply_e_to_pe(waveforms_charge, gains)

        return waveforms_pe

    @classmethod
    def flip_and_apply_gains(cls,
                             waveforms: np.ndarray,
                             gains: np.ndarray,
                             ADC_config: dict,
                             baseline_samples: int = 50,
                             ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Prepare a set of waveforms for processing by:
          - determining and subtracting baseline;
          - inverting polarity to positive peak
          - applying gains, converting to PE/s
          - computing the std of the waveform
          - computing the sum waveform


        Args:
            waveforms (np.ndarray): all the waveforms of a file, from all the channels
            gains (np.ndarray): gains for each channel **in alphabetical order**
            ADC_config (dict): ADCconfig dictionary
            baseline_samples (int, optional): How many samples used to
                calculate the baseline. Defaults to 50.

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray]: waveforms_pe, stds_pe,
                sum_waveform
        """

        baselines = np.apply_along_axis(
            func1d=waveform_processing.get_baseline_rough,
            axis=2,
            arr=waveforms,
            baseline_samples=baseline_samples)

        waveforms_pe = cls.apply_waveforms_transform(
            waveforms, baselines, gains, ADC_config)

        stds_pe = np.apply_along_axis(
            func1d=waveform_processing.get_std_rough,
            axis=2,
            arr=waveforms_pe,
            baseline_samples=baseline_samples)

        sum_waveform = peak_processing.get_sum_waveform(waveforms_pe)

        return waveforms_pe, stds_pe, sum_waveform

    @classmethod
    def get_area_of_single_waveform_each_channel(cls,
                                                 single_waveforms_pe: np.ndarray,
                                                 peak_start: int,
                                                 peak_end: int,
                                                 dt: int = 10) -> np.ndarray:
        """Get the area of identified peak in each channel.
        """

        area_individual_channels = np.sum(
            single_waveforms_pe[:, peak_start:peak_end], axis=1) * dt

        return area_individual_channels

    @classmethod
    def reorder_channel(cls, data_array: np.ndarray,
                        index_reorder: list) -> np.ndarray:
        """Reorder columns of an ndarray, corresponding to changing the order
        of channels to match the order in the sensor layout.

        Based on the following stack overflow thread: https://stackoverflow.
        com/questions/20265229/rearrange-columns-of-numpy-2d-array

        Args:
            data_array (np.ndarray): array where columns are different
                channels
            index_reorder (list): list of indexes where the change in
                `data_array` is i->index_reorder[i].

        Returns:
            np.ndarray: the original array with reordered collumns following
                `index_reorder`.
        """

        if len(np.shape(data_array)) == 1:
            data_array = np.array(data_array).reshape(1, len(data_array))

        idx = np.empty_like(index_reorder)
        idx[index_reorder] = np.arange(len(index_reorder))
        data_array[:] = data_array[:, idx]  # in-place modification of array

        return data_array

    @classmethod
    def get_sum_waveform(cls, waveforms_pe: np.ndarray) -> np.ndarray:
        """Sums the (transformed to pe/s) waveforms of all channels.

        Args:
            waveforms_pe (np.ndarray): array with waveforms from all the
                channels.

        Returns:
            np.ndarray: Summed waveform.
        """

        summed_waveform = np.sum(waveforms_pe, axis=0)

        return summed_waveform

    @classmethod
    def get_sum_peak_start_end_above_min_area(cls, areas: List[float],
                                              positions: List[int], lengths: List[int],
                                              area_min: float) -> Tuple[List[int], List[int]]:
        """Determine the indexes of start and end of a peak, considering
        only peaks with area above `area_min`.

        Returns:
            Tuple[List, List]: lists with the indexes of begin of peaks and
                end of peaks.
        """
        good_peaks_start = []
        good_peaks_end = []
        for _area, _position, _length in zip(areas, positions, lengths):
            if _area > area_min:
                good_peaks_start.append(_position)
                good_peaks_end.append(_position + _length)
        return (good_peaks_start, good_peaks_end)

    @classmethod
    def get_area_per_sensor(cls, waveforms_pe: np.ndarray,
                            peaks_start: List[int],
                            peaks_end: List[int]) -> np.ndarray:
        """Computes the area per sensor of a given waveform set to be used
        for hitpattern needs.

        Args:
            waveforms_pe (np.ndarray): waveforms in pe, one row per channel.
            peaks_start (List[int]): list with the start of peaks in the
                summed waveform.
            peaks_end (List[int]): list with the end of peaks in the
                summed waveform.

        Returns:
            np.ndarray: array with the area per channel for the peaks, each
                row a peak.
        """

        area_per_sensor = np.zeros((len(peaks_start),
                                    np.shape(waveforms_pe)[0]))
        for i, (_p_start, _p_end) in enumerate(zip(peaks_start, peaks_end)):
            area_per_sensor[i, :] = np.sum(waveforms_pe[:, _p_start:_p_end],
                                           axis=1)
        return area_per_sensor

    @classmethod
    def reconstruct_xy_position(cls, area_per_sensor: np.ndarray,
                                sensor_layout: np.ndarray,
                                algo: str = 'CoG') -> np.ndarray:
        """Computes xy position of event given a hitpattern from
        area_per_sensor.

        Args:
            algo (str, optional): The algorithm to use in position
                reconstruction. Defaults to 'CoG'.

        Returns:
            np.ndarray: array with x,y reconstructed position. Each row a
                different peak. pos[:,0] is the list of x, pos[:,1] the
                list of y.
        """

        if algo != 'CoG':
            raise NotImplementedError(f'''The requested reconstruction
            algorithm ({algo}) is not yet implemented,
            try: {cls.available_posrec_algos}''')

        area_tot = np.sum(area_per_sensor, axis=1)
        # might not be exactly the same as calculated in `process_waveform`

        x = np.sum((area_per_sensor * sensor_layout[0, :].T)) / area_tot
        y = np.sum((area_per_sensor * sensor_layout[1, :].T)) / area_tot

        return np.vstack([x, y])


class peak():
    """This is a peak (gipfel).
    #TODO, NOT IN USE
    """

    def __init__(self, timestamp: int,
                 wf_number: int,
                 area: float,
                 length: int,
                 position: int,
                 amplitude: float,
                 area_per_sensor: np.ndarray):

        self.timestamp = timestamp
        self.wf_number = wf_number
        self.area = area
        self.length = length
        self.position = position
        self.amplitude = amplitude
        self.area_per_sensor = area_per_sensor

Classes

class peak (timestamp: int, wf_number: int, area: float, length: int, position: int, amplitude: float, area_per_sensor: numpy.ndarray)

This is a peak (gipfel).

TODO, NOT IN USE

Expand source code
class peak():
    """This is a peak (gipfel).
    #TODO, NOT IN USE
    """

    def __init__(self, timestamp: int,
                 wf_number: int,
                 area: float,
                 length: int,
                 position: int,
                 amplitude: float,
                 area_per_sensor: np.ndarray):

        self.timestamp = timestamp
        self.wf_number = wf_number
        self.area = area
        self.length = length
        self.position = position
        self.amplitude = amplitude
        self.area_per_sensor = area_per_sensor
class peak_processing

All the things peaks. Peaks are sums of pulses found in waveforms.

This class, by definition, is a collection of class methods related to peak processing to be used in peakprocessor, where a processor object is then constructed.

Expand source code
class peak_processing():
    """All the things peaks. Peaks are sums of pulses found in waveforms.

    This class, by definition, is a collection of class methods related
    to peak processing to be used in `peakprocessor`, where a processor
    object is then constructed.
    """

    __version__ = '0.0.1'

    available_posrec_algos = ['CoG']

    @classmethod
    def apply_ADCcounts_to_e(cls, waveforms_subtracted: np.ndarray,
                             ADC_config: dict) -> np.ndarray:
        """Convert ADC counts/sample to charge.

        Applies the charge converting factor to waveforms to turn ADC counts
        (which are integrated over 1 sample) to charge. `waveforms_subtracted`
        can be one or more channels.

        Args:
            waveforms_subtracted (np.ndarray): value of ADC counts per sample
                above calculate local baseline
            ADC_config (dict): dictionary with the ADC config

        Raises:
            ValueError: If the parsed ADC_config dictionary does not have the
                required keys.

        Returns:
            np.ndarray: waveforms in charge.
        """

        try:
            ADC_range = ADC_config['ADC_range']
            ADC_impedance = ADC_config['ADC_impedance']
            F_amp = ADC_config['F_amp']
            ADC_res = ADC_config['ADC_res']
            q_e = ADC_config['q_e']
            dt = ADC_config['dt']
        except BaseException:
            raise ValueError('The ADC_config dictionary is probably missing ' +
                             'something.')

        to_e_constant = (ADC_range * dt / ADC_impedance / F_amp /
                         ADC_res / q_e)

        waveforms_charge = waveforms_subtracted * to_e_constant

        return waveforms_charge

    @classmethod
    def apply_e_to_pe(cls, waveforms_charge: np.ndarray,
                      gains: np.ndarray) -> np.ndarray:
        """Transform waveforms from charge to pe with gain per channel.

        Takes waveforms already converted to charge from ADC counts and an
        array with the gains for each channel in units of [e/pe]. The ammount
        of rows in the waveform array needs to be the same as the length of
        the gains array.

        The gains array is assumed to be on the correct order in respect to
        the order of channels in waveforms_charge.

        Args:
            waveforms_charge (np.ndarray): waveform array in charge units

        Returns:
            np.ndarray: waveform array in pe/sample
        """

        assert len(gains) == np.shape(waveforms_charge)[0], ('''Size of
        gains and channels in waveforms array do not match.''')

        waveforms_pe = (waveforms_charge.T / gains).T

        return waveforms_pe

    @classmethod
    def apply_baseline_subtract(cls, waveforms: np.ndarray,
                                baselines: np.ndarray) -> np.ndarray:
        """Apply baseline subtracting and flipping from negative to positive
        pulses.

        Args:
             waveforms (np.ndarray): waveforms, all channels stacked by rows.
            baselines (np.ndarray): computed baselines, all channels stacked
                by rows.

        Returns:
            np.ndarray: waveforms flipped and where 0 is local baseline.
        """

        assert len(baselines) == np.shape(waveforms)[0], ('''Size of
        baseines and channels in waveforms array do not match.''')

        baselines_expanded = baselines[:, :, np.newaxis]
        waveforms_subtracted = baselines_expanded - waveforms

        return waveforms_subtracted

    @classmethod
    def apply_waveforms_transform(cls, waveforms: np.ndarray,
                                  baselines: np.ndarray,
                                  gains: np.ndarray,
                                  ADC_config: dict) -> np.ndarray:
        """Converts waveforms from ADC counts/sample to pe/s.

        Takes the initials waveforms stacked for all channels and returns
        the waveforms in converted pe/s space.

        Args:
            waveforms (np.ndarray): waveforms, all channels stacked by rows.
            baselines (np.ndarray): computed baselines, all channels stacked
                by rows.
            gains (np.ndarray): gains, all channels stacked by rows.
            ADC_config (dict): dictionary with the specific digitizer configs.

        Returns:
            np.ndarray: waveforms with applied gain and e2pe transformation.
        """

        waveforms_subtracted = cls.apply_baseline_subtract(
            waveforms, baselines)
        waveforms_charge = cls.apply_ADCcounts_to_e(
            waveforms_subtracted, ADC_config)
        waveforms_pe = cls.apply_e_to_pe(waveforms_charge, gains)

        return waveforms_pe

    @classmethod
    def flip_and_apply_gains(cls,
                             waveforms: np.ndarray,
                             gains: np.ndarray,
                             ADC_config: dict,
                             baseline_samples: int = 50,
                             ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Prepare a set of waveforms for processing by:
          - determining and subtracting baseline;
          - inverting polarity to positive peak
          - applying gains, converting to PE/s
          - computing the std of the waveform
          - computing the sum waveform


        Args:
            waveforms (np.ndarray): all the waveforms of a file, from all the channels
            gains (np.ndarray): gains for each channel **in alphabetical order**
            ADC_config (dict): ADCconfig dictionary
            baseline_samples (int, optional): How many samples used to
                calculate the baseline. Defaults to 50.

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray]: waveforms_pe, stds_pe,
                sum_waveform
        """

        baselines = np.apply_along_axis(
            func1d=waveform_processing.get_baseline_rough,
            axis=2,
            arr=waveforms,
            baseline_samples=baseline_samples)

        waveforms_pe = cls.apply_waveforms_transform(
            waveforms, baselines, gains, ADC_config)

        stds_pe = np.apply_along_axis(
            func1d=waveform_processing.get_std_rough,
            axis=2,
            arr=waveforms_pe,
            baseline_samples=baseline_samples)

        sum_waveform = peak_processing.get_sum_waveform(waveforms_pe)

        return waveforms_pe, stds_pe, sum_waveform

    @classmethod
    def get_area_of_single_waveform_each_channel(cls,
                                                 single_waveforms_pe: np.ndarray,
                                                 peak_start: int,
                                                 peak_end: int,
                                                 dt: int = 10) -> np.ndarray:
        """Get the area of identified peak in each channel.
        """

        area_individual_channels = np.sum(
            single_waveforms_pe[:, peak_start:peak_end], axis=1) * dt

        return area_individual_channels

    @classmethod
    def reorder_channel(cls, data_array: np.ndarray,
                        index_reorder: list) -> np.ndarray:
        """Reorder columns of an ndarray, corresponding to changing the order
        of channels to match the order in the sensor layout.

        Based on the following stack overflow thread: https://stackoverflow.
        com/questions/20265229/rearrange-columns-of-numpy-2d-array

        Args:
            data_array (np.ndarray): array where columns are different
                channels
            index_reorder (list): list of indexes where the change in
                `data_array` is i->index_reorder[i].

        Returns:
            np.ndarray: the original array with reordered collumns following
                `index_reorder`.
        """

        if len(np.shape(data_array)) == 1:
            data_array = np.array(data_array).reshape(1, len(data_array))

        idx = np.empty_like(index_reorder)
        idx[index_reorder] = np.arange(len(index_reorder))
        data_array[:] = data_array[:, idx]  # in-place modification of array

        return data_array

    @classmethod
    def get_sum_waveform(cls, waveforms_pe: np.ndarray) -> np.ndarray:
        """Sums the (transformed to pe/s) waveforms of all channels.

        Args:
            waveforms_pe (np.ndarray): array with waveforms from all the
                channels.

        Returns:
            np.ndarray: Summed waveform.
        """

        summed_waveform = np.sum(waveforms_pe, axis=0)

        return summed_waveform

    @classmethod
    def get_sum_peak_start_end_above_min_area(cls, areas: List[float],
                                              positions: List[int], lengths: List[int],
                                              area_min: float) -> Tuple[List[int], List[int]]:
        """Determine the indexes of start and end of a peak, considering
        only peaks with area above `area_min`.

        Returns:
            Tuple[List, List]: lists with the indexes of begin of peaks and
                end of peaks.
        """
        good_peaks_start = []
        good_peaks_end = []
        for _area, _position, _length in zip(areas, positions, lengths):
            if _area > area_min:
                good_peaks_start.append(_position)
                good_peaks_end.append(_position + _length)
        return (good_peaks_start, good_peaks_end)

    @classmethod
    def get_area_per_sensor(cls, waveforms_pe: np.ndarray,
                            peaks_start: List[int],
                            peaks_end: List[int]) -> np.ndarray:
        """Computes the area per sensor of a given waveform set to be used
        for hitpattern needs.

        Args:
            waveforms_pe (np.ndarray): waveforms in pe, one row per channel.
            peaks_start (List[int]): list with the start of peaks in the
                summed waveform.
            peaks_end (List[int]): list with the end of peaks in the
                summed waveform.

        Returns:
            np.ndarray: array with the area per channel for the peaks, each
                row a peak.
        """

        area_per_sensor = np.zeros((len(peaks_start),
                                    np.shape(waveforms_pe)[0]))
        for i, (_p_start, _p_end) in enumerate(zip(peaks_start, peaks_end)):
            area_per_sensor[i, :] = np.sum(waveforms_pe[:, _p_start:_p_end],
                                           axis=1)
        return area_per_sensor

    @classmethod
    def reconstruct_xy_position(cls, area_per_sensor: np.ndarray,
                                sensor_layout: np.ndarray,
                                algo: str = 'CoG') -> np.ndarray:
        """Computes xy position of event given a hitpattern from
        area_per_sensor.

        Args:
            algo (str, optional): The algorithm to use in position
                reconstruction. Defaults to 'CoG'.

        Returns:
            np.ndarray: array with x,y reconstructed position. Each row a
                different peak. pos[:,0] is the list of x, pos[:,1] the
                list of y.
        """

        if algo != 'CoG':
            raise NotImplementedError(f'''The requested reconstruction
            algorithm ({algo}) is not yet implemented,
            try: {cls.available_posrec_algos}''')

        area_tot = np.sum(area_per_sensor, axis=1)
        # might not be exactly the same as calculated in `process_waveform`

        x = np.sum((area_per_sensor * sensor_layout[0, :].T)) / area_tot
        y = np.sum((area_per_sensor * sensor_layout[1, :].T)) / area_tot

        return np.vstack([x, y])

Class variables

var available_posrec_algos

Static methods

def apply_ADCcounts_to_e(waveforms_subtracted: numpy.ndarray, ADC_config: dict) ‑> numpy.ndarray

Convert ADC counts/sample to charge.

Applies the charge converting factor to waveforms to turn ADC counts (which are integrated over 1 sample) to charge. waveforms_subtracted can be one or more channels.

Args

waveforms_subtracted : np.ndarray
value of ADC counts per sample above calculate local baseline
ADC_config : dict
dictionary with the ADC config

Raises

ValueError
If the parsed ADC_config dictionary does not have the required keys.

Returns

np.ndarray
waveforms in charge.
Expand source code
@classmethod
def apply_ADCcounts_to_e(cls, waveforms_subtracted: np.ndarray,
                         ADC_config: dict) -> np.ndarray:
    """Convert ADC counts/sample to charge.

    Applies the charge converting factor to waveforms to turn ADC counts
    (which are integrated over 1 sample) to charge. `waveforms_subtracted`
    can be one or more channels.

    Args:
        waveforms_subtracted (np.ndarray): value of ADC counts per sample
            above calculate local baseline
        ADC_config (dict): dictionary with the ADC config

    Raises:
        ValueError: If the parsed ADC_config dictionary does not have the
            required keys.

    Returns:
        np.ndarray: waveforms in charge.
    """

    try:
        ADC_range = ADC_config['ADC_range']
        ADC_impedance = ADC_config['ADC_impedance']
        F_amp = ADC_config['F_amp']
        ADC_res = ADC_config['ADC_res']
        q_e = ADC_config['q_e']
        dt = ADC_config['dt']
    except BaseException:
        raise ValueError('The ADC_config dictionary is probably missing ' +
                         'something.')

    to_e_constant = (ADC_range * dt / ADC_impedance / F_amp /
                     ADC_res / q_e)

    waveforms_charge = waveforms_subtracted * to_e_constant

    return waveforms_charge
def apply_baseline_subtract(waveforms: numpy.ndarray, baselines: numpy.ndarray) ‑> numpy.ndarray

Apply baseline subtracting and flipping from negative to positive pulses.

Args

waveforms (np.ndarray): waveforms, all channels stacked by rows.
baselines : np.ndarray
computed baselines, all channels stacked by rows.

Returns

np.ndarray
waveforms flipped and where 0 is local baseline.
Expand source code
@classmethod
def apply_baseline_subtract(cls, waveforms: np.ndarray,
                            baselines: np.ndarray) -> np.ndarray:
    """Apply baseline subtracting and flipping from negative to positive
    pulses.

    Args:
         waveforms (np.ndarray): waveforms, all channels stacked by rows.
        baselines (np.ndarray): computed baselines, all channels stacked
            by rows.

    Returns:
        np.ndarray: waveforms flipped and where 0 is local baseline.
    """

    assert len(baselines) == np.shape(waveforms)[0], ('''Size of
    baseines and channels in waveforms array do not match.''')

    baselines_expanded = baselines[:, :, np.newaxis]
    waveforms_subtracted = baselines_expanded - waveforms

    return waveforms_subtracted
def apply_e_to_pe(waveforms_charge: numpy.ndarray, gains: numpy.ndarray) ‑> numpy.ndarray

Transform waveforms from charge to pe with gain per channel.

Takes waveforms already converted to charge from ADC counts and an array with the gains for each channel in units of [e/pe]. The ammount of rows in the waveform array needs to be the same as the length of the gains array.

The gains array is assumed to be on the correct order in respect to the order of channels in waveforms_charge.

Args

waveforms_charge : np.ndarray
waveform array in charge units

Returns

np.ndarray
waveform array in pe/sample
Expand source code
@classmethod
def apply_e_to_pe(cls, waveforms_charge: np.ndarray,
                  gains: np.ndarray) -> np.ndarray:
    """Transform waveforms from charge to pe with gain per channel.

    Takes waveforms already converted to charge from ADC counts and an
    array with the gains for each channel in units of [e/pe]. The ammount
    of rows in the waveform array needs to be the same as the length of
    the gains array.

    The gains array is assumed to be on the correct order in respect to
    the order of channels in waveforms_charge.

    Args:
        waveforms_charge (np.ndarray): waveform array in charge units

    Returns:
        np.ndarray: waveform array in pe/sample
    """

    assert len(gains) == np.shape(waveforms_charge)[0], ('''Size of
    gains and channels in waveforms array do not match.''')

    waveforms_pe = (waveforms_charge.T / gains).T

    return waveforms_pe
def apply_waveforms_transform(waveforms: numpy.ndarray, baselines: numpy.ndarray, gains: numpy.ndarray, ADC_config: dict) ‑> numpy.ndarray

Converts waveforms from ADC counts/sample to pe/s.

Takes the initials waveforms stacked for all channels and returns the waveforms in converted pe/s space.

Args

waveforms : np.ndarray
waveforms, all channels stacked by rows.
baselines : np.ndarray
computed baselines, all channels stacked by rows.
gains : np.ndarray
gains, all channels stacked by rows.
ADC_config : dict
dictionary with the specific digitizer configs.

Returns

np.ndarray
waveforms with applied gain and e2pe transformation.
Expand source code
@classmethod
def apply_waveforms_transform(cls, waveforms: np.ndarray,
                              baselines: np.ndarray,
                              gains: np.ndarray,
                              ADC_config: dict) -> np.ndarray:
    """Converts waveforms from ADC counts/sample to pe/s.

    Takes the initials waveforms stacked for all channels and returns
    the waveforms in converted pe/s space.

    Args:
        waveforms (np.ndarray): waveforms, all channels stacked by rows.
        baselines (np.ndarray): computed baselines, all channels stacked
            by rows.
        gains (np.ndarray): gains, all channels stacked by rows.
        ADC_config (dict): dictionary with the specific digitizer configs.

    Returns:
        np.ndarray: waveforms with applied gain and e2pe transformation.
    """

    waveforms_subtracted = cls.apply_baseline_subtract(
        waveforms, baselines)
    waveforms_charge = cls.apply_ADCcounts_to_e(
        waveforms_subtracted, ADC_config)
    waveforms_pe = cls.apply_e_to_pe(waveforms_charge, gains)

    return waveforms_pe
def flip_and_apply_gains(waveforms: numpy.ndarray, gains: numpy.ndarray, ADC_config: dict, baseline_samples: int = 50) ‑> Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]

Prepare a set of waveforms for processing by: - determining and subtracting baseline; - inverting polarity to positive peak - applying gains, converting to PE/s - computing the std of the waveform - computing the sum waveform

Args

waveforms : np.ndarray
all the waveforms of a file, from all the channels
gains : np.ndarray
gains for each channel in alphabetical order
ADC_config : dict
ADCconfig dictionary
baseline_samples : int, optional
How many samples used to calculate the baseline. Defaults to 50.

Returns

Tuple[np.ndarray, np.ndarray, np.ndarray]
waveforms_pe, stds_pe, sum_waveform
Expand source code
@classmethod
def flip_and_apply_gains(cls,
                         waveforms: np.ndarray,
                         gains: np.ndarray,
                         ADC_config: dict,
                         baseline_samples: int = 50,
                         ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Prepare a set of waveforms for processing by:
      - determining and subtracting baseline;
      - inverting polarity to positive peak
      - applying gains, converting to PE/s
      - computing the std of the waveform
      - computing the sum waveform


    Args:
        waveforms (np.ndarray): all the waveforms of a file, from all the channels
        gains (np.ndarray): gains for each channel **in alphabetical order**
        ADC_config (dict): ADCconfig dictionary
        baseline_samples (int, optional): How many samples used to
            calculate the baseline. Defaults to 50.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: waveforms_pe, stds_pe,
            sum_waveform
    """

    baselines = np.apply_along_axis(
        func1d=waveform_processing.get_baseline_rough,
        axis=2,
        arr=waveforms,
        baseline_samples=baseline_samples)

    waveforms_pe = cls.apply_waveforms_transform(
        waveforms, baselines, gains, ADC_config)

    stds_pe = np.apply_along_axis(
        func1d=waveform_processing.get_std_rough,
        axis=2,
        arr=waveforms_pe,
        baseline_samples=baseline_samples)

    sum_waveform = peak_processing.get_sum_waveform(waveforms_pe)

    return waveforms_pe, stds_pe, sum_waveform
def get_area_of_single_waveform_each_channel(single_waveforms_pe: numpy.ndarray, peak_start: int, peak_end: int, dt: int = 10) ‑> numpy.ndarray

Get the area of identified peak in each channel.

Expand source code
@classmethod
def get_area_of_single_waveform_each_channel(cls,
                                             single_waveforms_pe: np.ndarray,
                                             peak_start: int,
                                             peak_end: int,
                                             dt: int = 10) -> np.ndarray:
    """Get the area of identified peak in each channel.
    """

    area_individual_channels = np.sum(
        single_waveforms_pe[:, peak_start:peak_end], axis=1) * dt

    return area_individual_channels
def get_area_per_sensor(waveforms_pe: numpy.ndarray, peaks_start: List[int], peaks_end: List[int]) ‑> numpy.ndarray

Computes the area per sensor of a given waveform set to be used for hitpattern needs.

Args

waveforms_pe : np.ndarray
waveforms in pe, one row per channel.
peaks_start : List[int]
list with the start of peaks in the summed waveform.
peaks_end : List[int]
list with the end of peaks in the summed waveform.

Returns

np.ndarray
array with the area per channel for the peaks, each row a peak.
Expand source code
@classmethod
def get_area_per_sensor(cls, waveforms_pe: np.ndarray,
                        peaks_start: List[int],
                        peaks_end: List[int]) -> np.ndarray:
    """Computes the area per sensor of a given waveform set to be used
    for hitpattern needs.

    Args:
        waveforms_pe (np.ndarray): waveforms in pe, one row per channel.
        peaks_start (List[int]): list with the start of peaks in the
            summed waveform.
        peaks_end (List[int]): list with the end of peaks in the
            summed waveform.

    Returns:
        np.ndarray: array with the area per channel for the peaks, each
            row a peak.
    """

    area_per_sensor = np.zeros((len(peaks_start),
                                np.shape(waveforms_pe)[0]))
    for i, (_p_start, _p_end) in enumerate(zip(peaks_start, peaks_end)):
        area_per_sensor[i, :] = np.sum(waveforms_pe[:, _p_start:_p_end],
                                       axis=1)
    return area_per_sensor
def get_sum_peak_start_end_above_min_area(areas: List[float], positions: List[int], lengths: List[int], area_min: float) ‑> Tuple[List[int], List[int]]

Determine the indexes of start and end of a peak, considering only peaks with area above area_min.

Returns

Tuple[List, List]
lists with the indexes of begin of peaks and end of peaks.
Expand source code
@classmethod
def get_sum_peak_start_end_above_min_area(cls, areas: List[float],
                                          positions: List[int], lengths: List[int],
                                          area_min: float) -> Tuple[List[int], List[int]]:
    """Determine the indexes of start and end of a peak, considering
    only peaks with area above `area_min`.

    Returns:
        Tuple[List, List]: lists with the indexes of begin of peaks and
            end of peaks.
    """
    good_peaks_start = []
    good_peaks_end = []
    for _area, _position, _length in zip(areas, positions, lengths):
        if _area > area_min:
            good_peaks_start.append(_position)
            good_peaks_end.append(_position + _length)
    return (good_peaks_start, good_peaks_end)
def get_sum_waveform(waveforms_pe: numpy.ndarray) ‑> numpy.ndarray

Sums the (transformed to pe/s) waveforms of all channels.

Args

waveforms_pe : np.ndarray
array with waveforms from all the channels.

Returns

np.ndarray
Summed waveform.
Expand source code
@classmethod
def get_sum_waveform(cls, waveforms_pe: np.ndarray) -> np.ndarray:
    """Sums the (transformed to pe/s) waveforms of all channels.

    Args:
        waveforms_pe (np.ndarray): array with waveforms from all the
            channels.

    Returns:
        np.ndarray: Summed waveform.
    """

    summed_waveform = np.sum(waveforms_pe, axis=0)

    return summed_waveform
def reconstruct_xy_position(area_per_sensor: numpy.ndarray, sensor_layout: numpy.ndarray, algo: str = 'CoG') ‑> numpy.ndarray

Computes xy position of event given a hitpattern from area_per_sensor.

Args

algo : str, optional
The algorithm to use in position reconstruction. Defaults to 'CoG'.

Returns

np.ndarray
array with x,y reconstructed position. Each row a different peak. pos[:,0] is the list of x, pos[:,1] the list of y.
Expand source code
@classmethod
def reconstruct_xy_position(cls, area_per_sensor: np.ndarray,
                            sensor_layout: np.ndarray,
                            algo: str = 'CoG') -> np.ndarray:
    """Computes xy position of event given a hitpattern from
    area_per_sensor.

    Args:
        algo (str, optional): The algorithm to use in position
            reconstruction. Defaults to 'CoG'.

    Returns:
        np.ndarray: array with x,y reconstructed position. Each row a
            different peak. pos[:,0] is the list of x, pos[:,1] the
            list of y.
    """

    if algo != 'CoG':
        raise NotImplementedError(f'''The requested reconstruction
        algorithm ({algo}) is not yet implemented,
        try: {cls.available_posrec_algos}''')

    area_tot = np.sum(area_per_sensor, axis=1)
    # might not be exactly the same as calculated in `process_waveform`

    x = np.sum((area_per_sensor * sensor_layout[0, :].T)) / area_tot
    y = np.sum((area_per_sensor * sensor_layout[1, :].T)) / area_tot

    return np.vstack([x, y])
def reorder_channel(data_array: numpy.ndarray, index_reorder: list) ‑> numpy.ndarray

Reorder columns of an ndarray, corresponding to changing the order of channels to match the order in the sensor layout.

Based on the following stack overflow thread: https://stackoverflow. com/questions/20265229/rearrange-columns-of-numpy-2d-array

Args

data_array : np.ndarray
array where columns are different channels
index_reorder : list
list of indexes where the change in data_array is i->index_reorder[i].

Returns

np.ndarray
the original array with reordered collumns following index_reorder.
Expand source code
@classmethod
def reorder_channel(cls, data_array: np.ndarray,
                    index_reorder: list) -> np.ndarray:
    """Reorder columns of an ndarray, corresponding to changing the order
    of channels to match the order in the sensor layout.

    Based on the following stack overflow thread: https://stackoverflow.
    com/questions/20265229/rearrange-columns-of-numpy-2d-array

    Args:
        data_array (np.ndarray): array where columns are different
            channels
        index_reorder (list): list of indexes where the change in
            `data_array` is i->index_reorder[i].

    Returns:
        np.ndarray: the original array with reordered collumns following
            `index_reorder`.
    """

    if len(np.shape(data_array)) == 1:
        data_array = np.array(data_array).reshape(1, len(data_array))

    idx = np.empty_like(index_reorder)
    idx[index_reorder] = np.arange(len(index_reorder))
    data_array[:] = data_array[:, idx]  # in-place modification of array

    return data_array