Module pylars.processing.peakprocessor

Expand source code
import numpy as np
import pandas as pd
from tqdm import tqdm
from pylars.utils.common import get_deterministic_hash, load_ADC_config
from pylars.utils.input import raw_data

from .peaks import peak_processing
from .waveforms import waveform_processing


class peak_processor():
    """Define the functions for a simple peak processor.

    Defines a processor object to process waveforms from summing all the
    channels in the photosensor array.
    """

    version = '0.0.1'
    processing_method = 'peaks_simple'

    # for run6
    index_reorder_channels = [5, 6, 7, 8, 9, 10, 11, 0, 1, 2, 3, 4]
    list_of_tiles = [
        'A',
        'B',
        'C',
        'D',
        'E',
        'F',
        'G',
        'H',
        'J',
        'K',
        'L',
        'M']

    def __init__(self, sigma_level: float, baseline_samples: int,
                 gains_tag: str, gains_path: str) -> None:

        self.sigma_level = sigma_level
        self.baseline_samples = baseline_samples
        self.hash = get_deterministic_hash(f"{self.processing_method}" +
                                           f"{self.version}" +
                                           f"{self.sigma_level}" +
                                           f"{self.baseline_samples:.2f}")
        self.show_loadbar_channel = True
        self.show_tqdm_channel = True

        self.gains_tag = gains_tag
        self.gains_path = gains_path
        """Path to where the gains are saved.

        In the future this should be defined in a config file.
        """
        self.gains = self.load_gains()
        """Dictionary of gains [e/pe] for each channel.
        """

    def __hash__(self) -> str:
        return self.hash

    def load_gains(self) -> np.ndarray:
        """Load gains of photosensors based on the defined path and tag.

        Returns:
            np.ndarray: array with channel gains [e/pe]) in ascending order (mo
                dule->channel)
        """

        # csv is extremely fast but pandas is easier to load and sort
        # right away

        # with open(self.gains_path + self.gains_tag, mode='r') as file:
        #     reader = csv.reader(file)
        #     gains = {rows[0]:float(rows[1]) for rows in reader}

        # return gains

        gain_df = pd.read_csv(f'{self.gains_path}/gains_{self.gains_tag}.csv')
        gain_df = gain_df.sort_values(by=['tile'],
                                      ignore_index=True)
        if gain_df.iloc[0]['gain'] < 1e4:
            gain_df['gain'] = gain_df['gain'] * 1e6

        return np.array(gain_df['gain'])

    def set_tqdm_channel(self, bar: bool, show: bool):
        """Change the tqdm config

        Args:
            bar (bool): show or not the tqdm bar.
            show (bool): use tqdm if true, disable if false
        """
        self.show_loadbar_channel = bar
        self.show_tqdm_channel = show

    def define_ADC_config(self, F_amp: float, model: str = 'v1724') -> None:
        """Load the ADC related quantities for the dataset.

        Args:
        model (str): model of the digitizer
        F_amp (float): signal amplification from the sensor (pre-amp *
            external amplification on the rack).
        """

        self.ADC_config = load_ADC_config(model, F_amp)

    def load_raw_data(self, path_to_raw_both_modules: list,
                      modules: list, V: float, T: float,
                      ignore_channels: list = []):
        """Raw data loader to pass to the processing scripts.

        Args:
            path_to_raw_both_modules (list): list with path to both raw files,
                one for each module. If len(pat_list) = 1, then it assumes
                one only module.
            V (float): _description_
            T (float): _description_
            ignore_channels (list): list of channels to ignore. Each element
                should be a list corresponding to each module, in order.

        Returns:
            raw_data: _description_
        """

        assert len(path_to_raw_both_modules) == len(modules) == len(
            ignore_channels), "Number of paths and modules needs to be the same."

        self.raw_data_list = []
        for module, path_to_raw in zip(modules, path_to_raw_both_modules):

            raw = raw_data(path_to_raw, V, T, module)
            raw.load_root()
            raw.get_available_channels()
            raw.channels = [ch for ch in raw.channels if
                            ch not in ignore_channels[module]]

            self.raw_data_list.append(raw)

    def get_stacked_waveforms(self):
        """Get the waveforms from all channels stacked.
        FROM HERE THE WAVEFORMS ARE STACKED IN THE ORDER OF THE TILES,
        FROM A TO M!


        Returns:
            np.ndarray: waveforms of all channels stacked.
        """
        _stacked_channel_data_list = []
        for raw in self.raw_data_list:
            _stacked_channel_data_list.append(np.stack(
                [raw.get_channel_data(ch) for ch in raw.channels],
                axis=0))
        stacked_waveforms = np.concatenate(_stacked_channel_data_list, axis=0)
        stacked_waveforms = stacked_waveforms[self.index_reorder_channels]

        return stacked_waveforms

    def make_sum_waveforms_all_channels(self):

        waveforms = self.get_stacked_waveforms()

        baselines = np.apply_along_axis(
            func1d=waveform_processing.get_baseline_rough,
            axis=2,
            arr=waveforms,
            baseline_samples=50)

        stds = np.apply_along_axis(
            func1d=waveform_processing.get_std_rough,
            axis=2,
            arr=waveforms,
            baseline_samples=50)

        waveforms_pe = peak_processing.apply_waveforms_transform(
            waveforms, baselines, self.gains, self.ADC_config)

        sum_waveform = peak_processing.get_sum_waveform(waveforms_pe)

        return waveforms_pe, sum_waveform

    def process_waveform_set(self, waveforms_pe_single: np.ndarray,
                             sum_waveform_single: np.ndarray):
        """Process an array of waveforms from all channels.

        The waveforms are assumed to be synchronized and each row of the
        array is a channel. Once a summed waveform is formed, uses the same
        functions as the pulse processing to find peaks and compute its
        properties.

        Args:
            waveforms (np.ndarray): waveforms of all channels stacked.
        """

        areas, lengths, positions, amplitudes = waveform_processing.process_waveform(
            waveform=sum_waveform_single,
            baseline_samples=self.baseline_samples,
            sigma_level=self.sigma_level,
            negative_polarity=False,
            baseline_subtracted=True)

        areas_individual_channels = [peak_processing.get_area_of_single_waveform_each_channel(
            waveforms_pe_single, positions[i], positions[i] + lengths[i]) for i in range(len(positions))]

        return areas, lengths, positions, amplitudes, areas_individual_channels

    def process_all_waveforms(self):

        waveforms_pe, sum_waveform = self.make_sum_waveforms_all_channels()

        n_wfs = sum_waveform.shape[0]

        results = {'wf_number': [],
                   'peak_number': [],
                   'n_peaks': [],
                   'area': [],
                   'length': [],
                   'position': [],
                   'amplitude': [],
                   }

        areas_individual_channels_results = {
            'wf_number': [],
            'peak_number': [],
            'A': [],
            'B': [],
            'C': [],
            'D': [],
            'E': [],
            'F': [],
            'G': [],
            'H': [],
            'J': [],
            'K': [],
            'L': [],
            'M': [],
        }

        for wf_i in tqdm(range(n_wfs), total=n_wfs,
                         desc='Processing waveforms: '):

            _sum_waveform_single = sum_waveform[wf_i, :]
            _waverform_pe_single = waveforms_pe[:, wf_i, :]

            areas, lengths, positions, amplitudes, areas_individual_channels = self.process_waveform_set(
                waveforms_pe_single=_waverform_pe_single,
                sum_waveform_single=_sum_waveform_single)

            wf_number = np.ones(len(areas), dtype=int) * wf_i
            peak_number = np.arange(len(areas))
            n_pulses = np.ones(len(areas), dtype=int) * len(areas)

            results['wf_number'] += list(wf_number)
            results['peak_number'] += list(peak_number)
            results['n_peaks'] += list(n_pulses)
            results['area'] += list(areas)
            results['length'] += list(lengths)
            results['position'] += list(positions)
            results['amplitude'] += list(amplitudes)

            areas_individual_channels_results['wf_number'] += list(wf_number)
            areas_individual_channels_results['peak_number'] += list(
                peak_number)

            for peak_i in range(len(areas)):
                for tile_i, tile_name in enumerate(self.list_of_tiles):
                    areas_individual_channels_results[tile_name] += [
                        areas_individual_channels[peak_i][tile_i]]

        results = pd.DataFrame(results)
        areas_individual_channels_results = pd.DataFrame(
            areas_individual_channels_results)
        self.results_df = results
        self.areas_individual_channels_results_df = areas_individual_channels_results
        return results, areas_individual_channels_results

Classes

class peak_processor (sigma_level: float, baseline_samples: int, gains_tag: str, gains_path: str)

Define the functions for a simple peak processor.

Defines a processor object to process waveforms from summing all the channels in the photosensor array.

Expand source code
class peak_processor():
    """Define the functions for a simple peak processor.

    Defines a processor object to process waveforms from summing all the
    channels in the photosensor array.
    """

    version = '0.0.1'
    processing_method = 'peaks_simple'

    # for run6
    index_reorder_channels = [5, 6, 7, 8, 9, 10, 11, 0, 1, 2, 3, 4]
    list_of_tiles = [
        'A',
        'B',
        'C',
        'D',
        'E',
        'F',
        'G',
        'H',
        'J',
        'K',
        'L',
        'M']

    def __init__(self, sigma_level: float, baseline_samples: int,
                 gains_tag: str, gains_path: str) -> None:

        self.sigma_level = sigma_level
        self.baseline_samples = baseline_samples
        self.hash = get_deterministic_hash(f"{self.processing_method}" +
                                           f"{self.version}" +
                                           f"{self.sigma_level}" +
                                           f"{self.baseline_samples:.2f}")
        self.show_loadbar_channel = True
        self.show_tqdm_channel = True

        self.gains_tag = gains_tag
        self.gains_path = gains_path
        """Path to where the gains are saved.

        In the future this should be defined in a config file.
        """
        self.gains = self.load_gains()
        """Dictionary of gains [e/pe] for each channel.
        """

    def __hash__(self) -> str:
        return self.hash

    def load_gains(self) -> np.ndarray:
        """Load gains of photosensors based on the defined path and tag.

        Returns:
            np.ndarray: array with channel gains [e/pe]) in ascending order (mo
                dule->channel)
        """

        # csv is extremely fast but pandas is easier to load and sort
        # right away

        # with open(self.gains_path + self.gains_tag, mode='r') as file:
        #     reader = csv.reader(file)
        #     gains = {rows[0]:float(rows[1]) for rows in reader}

        # return gains

        gain_df = pd.read_csv(f'{self.gains_path}/gains_{self.gains_tag}.csv')
        gain_df = gain_df.sort_values(by=['tile'],
                                      ignore_index=True)
        if gain_df.iloc[0]['gain'] < 1e4:
            gain_df['gain'] = gain_df['gain'] * 1e6

        return np.array(gain_df['gain'])

    def set_tqdm_channel(self, bar: bool, show: bool):
        """Change the tqdm config

        Args:
            bar (bool): show or not the tqdm bar.
            show (bool): use tqdm if true, disable if false
        """
        self.show_loadbar_channel = bar
        self.show_tqdm_channel = show

    def define_ADC_config(self, F_amp: float, model: str = 'v1724') -> None:
        """Load the ADC related quantities for the dataset.

        Args:
        model (str): model of the digitizer
        F_amp (float): signal amplification from the sensor (pre-amp *
            external amplification on the rack).
        """

        self.ADC_config = load_ADC_config(model, F_amp)

    def load_raw_data(self, path_to_raw_both_modules: list,
                      modules: list, V: float, T: float,
                      ignore_channels: list = []):
        """Raw data loader to pass to the processing scripts.

        Args:
            path_to_raw_both_modules (list): list with path to both raw files,
                one for each module. If len(pat_list) = 1, then it assumes
                one only module.
            V (float): _description_
            T (float): _description_
            ignore_channels (list): list of channels to ignore. Each element
                should be a list corresponding to each module, in order.

        Returns:
            raw_data: _description_
        """

        assert len(path_to_raw_both_modules) == len(modules) == len(
            ignore_channels), "Number of paths and modules needs to be the same."

        self.raw_data_list = []
        for module, path_to_raw in zip(modules, path_to_raw_both_modules):

            raw = raw_data(path_to_raw, V, T, module)
            raw.load_root()
            raw.get_available_channels()
            raw.channels = [ch for ch in raw.channels if
                            ch not in ignore_channels[module]]

            self.raw_data_list.append(raw)

    def get_stacked_waveforms(self):
        """Get the waveforms from all channels stacked.
        FROM HERE THE WAVEFORMS ARE STACKED IN THE ORDER OF THE TILES,
        FROM A TO M!


        Returns:
            np.ndarray: waveforms of all channels stacked.
        """
        _stacked_channel_data_list = []
        for raw in self.raw_data_list:
            _stacked_channel_data_list.append(np.stack(
                [raw.get_channel_data(ch) for ch in raw.channels],
                axis=0))
        stacked_waveforms = np.concatenate(_stacked_channel_data_list, axis=0)
        stacked_waveforms = stacked_waveforms[self.index_reorder_channels]

        return stacked_waveforms

    def make_sum_waveforms_all_channels(self):

        waveforms = self.get_stacked_waveforms()

        baselines = np.apply_along_axis(
            func1d=waveform_processing.get_baseline_rough,
            axis=2,
            arr=waveforms,
            baseline_samples=50)

        stds = np.apply_along_axis(
            func1d=waveform_processing.get_std_rough,
            axis=2,
            arr=waveforms,
            baseline_samples=50)

        waveforms_pe = peak_processing.apply_waveforms_transform(
            waveforms, baselines, self.gains, self.ADC_config)

        sum_waveform = peak_processing.get_sum_waveform(waveforms_pe)

        return waveforms_pe, sum_waveform

    def process_waveform_set(self, waveforms_pe_single: np.ndarray,
                             sum_waveform_single: np.ndarray):
        """Process an array of waveforms from all channels.

        The waveforms are assumed to be synchronized and each row of the
        array is a channel. Once a summed waveform is formed, uses the same
        functions as the pulse processing to find peaks and compute its
        properties.

        Args:
            waveforms (np.ndarray): waveforms of all channels stacked.
        """

        areas, lengths, positions, amplitudes = waveform_processing.process_waveform(
            waveform=sum_waveform_single,
            baseline_samples=self.baseline_samples,
            sigma_level=self.sigma_level,
            negative_polarity=False,
            baseline_subtracted=True)

        areas_individual_channels = [peak_processing.get_area_of_single_waveform_each_channel(
            waveforms_pe_single, positions[i], positions[i] + lengths[i]) for i in range(len(positions))]

        return areas, lengths, positions, amplitudes, areas_individual_channels

    def process_all_waveforms(self):

        waveforms_pe, sum_waveform = self.make_sum_waveforms_all_channels()

        n_wfs = sum_waveform.shape[0]

        results = {'wf_number': [],
                   'peak_number': [],
                   'n_peaks': [],
                   'area': [],
                   'length': [],
                   'position': [],
                   'amplitude': [],
                   }

        areas_individual_channels_results = {
            'wf_number': [],
            'peak_number': [],
            'A': [],
            'B': [],
            'C': [],
            'D': [],
            'E': [],
            'F': [],
            'G': [],
            'H': [],
            'J': [],
            'K': [],
            'L': [],
            'M': [],
        }

        for wf_i in tqdm(range(n_wfs), total=n_wfs,
                         desc='Processing waveforms: '):

            _sum_waveform_single = sum_waveform[wf_i, :]
            _waverform_pe_single = waveforms_pe[:, wf_i, :]

            areas, lengths, positions, amplitudes, areas_individual_channels = self.process_waveform_set(
                waveforms_pe_single=_waverform_pe_single,
                sum_waveform_single=_sum_waveform_single)

            wf_number = np.ones(len(areas), dtype=int) * wf_i
            peak_number = np.arange(len(areas))
            n_pulses = np.ones(len(areas), dtype=int) * len(areas)

            results['wf_number'] += list(wf_number)
            results['peak_number'] += list(peak_number)
            results['n_peaks'] += list(n_pulses)
            results['area'] += list(areas)
            results['length'] += list(lengths)
            results['position'] += list(positions)
            results['amplitude'] += list(amplitudes)

            areas_individual_channels_results['wf_number'] += list(wf_number)
            areas_individual_channels_results['peak_number'] += list(
                peak_number)

            for peak_i in range(len(areas)):
                for tile_i, tile_name in enumerate(self.list_of_tiles):
                    areas_individual_channels_results[tile_name] += [
                        areas_individual_channels[peak_i][tile_i]]

        results = pd.DataFrame(results)
        areas_individual_channels_results = pd.DataFrame(
            areas_individual_channels_results)
        self.results_df = results
        self.areas_individual_channels_results_df = areas_individual_channels_results
        return results, areas_individual_channels_results

Class variables

var index_reorder_channels
var list_of_tiles
var processing_method
var version

Instance variables

var gains

Dictionary of gains [e/pe] for each channel.

var gains_path

Path to where the gains are saved.

In the future this should be defined in a config file.

Methods

def define_ADC_config(self, F_amp: float, model: str = 'v1724') ‑> NoneType

Load the ADC related quantities for the dataset.

Args: model (str): model of the digitizer F_amp (float): signal amplification from the sensor (pre-amp * external amplification on the rack).

Expand source code
def define_ADC_config(self, F_amp: float, model: str = 'v1724') -> None:
    """Load the ADC related quantities for the dataset.

    Args:
    model (str): model of the digitizer
    F_amp (float): signal amplification from the sensor (pre-amp *
        external amplification on the rack).
    """

    self.ADC_config = load_ADC_config(model, F_amp)
def get_stacked_waveforms(self)

Get the waveforms from all channels stacked. FROM HERE THE WAVEFORMS ARE STACKED IN THE ORDER OF THE TILES, FROM A TO M!

Returns

np.ndarray
waveforms of all channels stacked.
Expand source code
def get_stacked_waveforms(self):
    """Get the waveforms from all channels stacked.
    FROM HERE THE WAVEFORMS ARE STACKED IN THE ORDER OF THE TILES,
    FROM A TO M!


    Returns:
        np.ndarray: waveforms of all channels stacked.
    """
    _stacked_channel_data_list = []
    for raw in self.raw_data_list:
        _stacked_channel_data_list.append(np.stack(
            [raw.get_channel_data(ch) for ch in raw.channels],
            axis=0))
    stacked_waveforms = np.concatenate(_stacked_channel_data_list, axis=0)
    stacked_waveforms = stacked_waveforms[self.index_reorder_channels]

    return stacked_waveforms
def load_gains(self) ‑> numpy.ndarray

Load gains of photosensors based on the defined path and tag.

Returns

np.ndarray
array with channel gains [e/pe]) in ascending order (mo dule->channel)
Expand source code
def load_gains(self) -> np.ndarray:
    """Load gains of photosensors based on the defined path and tag.

    Returns:
        np.ndarray: array with channel gains [e/pe]) in ascending order (mo
            dule->channel)
    """

    # csv is extremely fast but pandas is easier to load and sort
    # right away

    # with open(self.gains_path + self.gains_tag, mode='r') as file:
    #     reader = csv.reader(file)
    #     gains = {rows[0]:float(rows[1]) for rows in reader}

    # return gains

    gain_df = pd.read_csv(f'{self.gains_path}/gains_{self.gains_tag}.csv')
    gain_df = gain_df.sort_values(by=['tile'],
                                  ignore_index=True)
    if gain_df.iloc[0]['gain'] < 1e4:
        gain_df['gain'] = gain_df['gain'] * 1e6

    return np.array(gain_df['gain'])
def load_raw_data(self, path_to_raw_both_modules: list, modules: list, V: float, T: float, ignore_channels: list = [])

Raw data loader to pass to the processing scripts.

Args

path_to_raw_both_modules : list
list with path to both raw files, one for each module. If len(pat_list) = 1, then it assumes one only module.
V : float
description
T : float
description
ignore_channels : list
list of channels to ignore. Each element should be a list corresponding to each module, in order.

Returns

raw_data
description
Expand source code
def load_raw_data(self, path_to_raw_both_modules: list,
                  modules: list, V: float, T: float,
                  ignore_channels: list = []):
    """Raw data loader to pass to the processing scripts.

    Args:
        path_to_raw_both_modules (list): list with path to both raw files,
            one for each module. If len(pat_list) = 1, then it assumes
            one only module.
        V (float): _description_
        T (float): _description_
        ignore_channels (list): list of channels to ignore. Each element
            should be a list corresponding to each module, in order.

    Returns:
        raw_data: _description_
    """

    assert len(path_to_raw_both_modules) == len(modules) == len(
        ignore_channels), "Number of paths and modules needs to be the same."

    self.raw_data_list = []
    for module, path_to_raw in zip(modules, path_to_raw_both_modules):

        raw = raw_data(path_to_raw, V, T, module)
        raw.load_root()
        raw.get_available_channels()
        raw.channels = [ch for ch in raw.channels if
                        ch not in ignore_channels[module]]

        self.raw_data_list.append(raw)
def make_sum_waveforms_all_channels(self)
Expand source code
def make_sum_waveforms_all_channels(self):

    waveforms = self.get_stacked_waveforms()

    baselines = np.apply_along_axis(
        func1d=waveform_processing.get_baseline_rough,
        axis=2,
        arr=waveforms,
        baseline_samples=50)

    stds = np.apply_along_axis(
        func1d=waveform_processing.get_std_rough,
        axis=2,
        arr=waveforms,
        baseline_samples=50)

    waveforms_pe = peak_processing.apply_waveforms_transform(
        waveforms, baselines, self.gains, self.ADC_config)

    sum_waveform = peak_processing.get_sum_waveform(waveforms_pe)

    return waveforms_pe, sum_waveform
def process_all_waveforms(self)
Expand source code
def process_all_waveforms(self):

    waveforms_pe, sum_waveform = self.make_sum_waveforms_all_channels()

    n_wfs = sum_waveform.shape[0]

    results = {'wf_number': [],
               'peak_number': [],
               'n_peaks': [],
               'area': [],
               'length': [],
               'position': [],
               'amplitude': [],
               }

    areas_individual_channels_results = {
        'wf_number': [],
        'peak_number': [],
        'A': [],
        'B': [],
        'C': [],
        'D': [],
        'E': [],
        'F': [],
        'G': [],
        'H': [],
        'J': [],
        'K': [],
        'L': [],
        'M': [],
    }

    for wf_i in tqdm(range(n_wfs), total=n_wfs,
                     desc='Processing waveforms: '):

        _sum_waveform_single = sum_waveform[wf_i, :]
        _waverform_pe_single = waveforms_pe[:, wf_i, :]

        areas, lengths, positions, amplitudes, areas_individual_channels = self.process_waveform_set(
            waveforms_pe_single=_waverform_pe_single,
            sum_waveform_single=_sum_waveform_single)

        wf_number = np.ones(len(areas), dtype=int) * wf_i
        peak_number = np.arange(len(areas))
        n_pulses = np.ones(len(areas), dtype=int) * len(areas)

        results['wf_number'] += list(wf_number)
        results['peak_number'] += list(peak_number)
        results['n_peaks'] += list(n_pulses)
        results['area'] += list(areas)
        results['length'] += list(lengths)
        results['position'] += list(positions)
        results['amplitude'] += list(amplitudes)

        areas_individual_channels_results['wf_number'] += list(wf_number)
        areas_individual_channels_results['peak_number'] += list(
            peak_number)

        for peak_i in range(len(areas)):
            for tile_i, tile_name in enumerate(self.list_of_tiles):
                areas_individual_channels_results[tile_name] += [
                    areas_individual_channels[peak_i][tile_i]]

    results = pd.DataFrame(results)
    areas_individual_channels_results = pd.DataFrame(
        areas_individual_channels_results)
    self.results_df = results
    self.areas_individual_channels_results_df = areas_individual_channels_results
    return results, areas_individual_channels_results
def process_waveform_set(self, waveforms_pe_single: numpy.ndarray, sum_waveform_single: numpy.ndarray)

Process an array of waveforms from all channels.

The waveforms are assumed to be synchronized and each row of the array is a channel. Once a summed waveform is formed, uses the same functions as the pulse processing to find peaks and compute its properties.

Args

waveforms : np.ndarray
waveforms of all channels stacked.
Expand source code
def process_waveform_set(self, waveforms_pe_single: np.ndarray,
                         sum_waveform_single: np.ndarray):
    """Process an array of waveforms from all channels.

    The waveforms are assumed to be synchronized and each row of the
    array is a channel. Once a summed waveform is formed, uses the same
    functions as the pulse processing to find peaks and compute its
    properties.

    Args:
        waveforms (np.ndarray): waveforms of all channels stacked.
    """

    areas, lengths, positions, amplitudes = waveform_processing.process_waveform(
        waveform=sum_waveform_single,
        baseline_samples=self.baseline_samples,
        sigma_level=self.sigma_level,
        negative_polarity=False,
        baseline_subtracted=True)

    areas_individual_channels = [peak_processing.get_area_of_single_waveform_each_channel(
        waveforms_pe_single, positions[i], positions[i] + lengths[i]) for i in range(len(positions))]

    return areas, lengths, positions, amplitudes, areas_individual_channels
def set_tqdm_channel(self, bar: bool, show: bool)

Change the tqdm config

Args

bar : bool
show or not the tqdm bar.
show : bool
use tqdm if true, disable if false
Expand source code
def set_tqdm_channel(self, bar: bool, show: bool):
    """Change the tqdm config

    Args:
        bar (bool): show or not the tqdm bar.
        show (bool): use tqdm if true, disable if false
    """
    self.show_loadbar_channel = bar
    self.show_tqdm_channel = show