Module pylars.analysis.ledwindow

Expand source code
import datetime
from typing import Tuple

import numpy as np
import pandas as pd
from scipy.optimize import curve_fit
from scipy.signal import find_peaks

import pylars


class LED_window():
    '''Class to hold analysis of LED ON data with integrating window.
    Use independently of BV datasets.
    '''

    def __init__(self,
                 led_window: Tuple[int, int],
                 led_data_path: str) -> None:
        self.led_window = led_window
        self.led_data_path = led_data_path
        self.baseline_samples = 50
        self.files_df = self.find_files()

    def find_files(self) -> pd.DataFrame:  # type: ignore
        '''Find the raw data file and the LED data file.
        TODO
        '''
        pass

        # files = glob.glob(self.led_data_path + '**/*.root', recursive=True)
        # _dfs = []
        # for f in tqdm(files, desc='Finding LED data files: '):
        #     try:
        #         _module = int(f.split('/')[-1].split('_')[4])
        #         _width, _voltage = self.get_LED_width_voltage(f)
        #         _dfs.append({'LEDwidth': _width,
        #                      'LEDvoltage': _voltage,
        #                      'module': _module,
        #                      'path': f})
        #     except:
        #         print(f'Failed to get info for file {f}')
        #         continue
        # files_df = pd.DataFrame(_dfs)
        # files_df.sort_values(by=['LEDwidth', 'LEDvoltage', 'module'],
        #              ignore_index = True, inplace=True)
        # del _dfs, files

        # return files_df

    @staticmethod
    def get_LED_width_voltage(path):
        """Read LED width and voltage from the file path."""

        root_name_list = path.split('/')[-1].split('_')
        pulser_width = int(root_name_list[0][:-2])
        voltage = float(root_name_list[1] + '.' + root_name_list[2][0])
        return pulser_width, voltage

    def process_dataset(self, data_path: str, module: int = 0) -> pd.DataFrame:
        '''Process a dataset with fixwindowprocessor. Requires a raw data file
        and a LED window specified.
        '''

        processor = pylars.processing.fixwindowprocessor.window_processor(
            baseline_samples=self.baseline_samples,
            led_window=(self.led_window[0], self.led_window[1]))

        processor.load_raw_data(path_to_raw=data_path, module=module)
        df_processed = processor.process_all_channels()
        return df_processed

    def process_all_datasets(self) -> None:
        '''Process all datasets in the LED data path.
        '''

        if not hasattr(self, 'files_df'):
            raise AssertionError('No files found. Run find_files first.')

        _dfs = []
        # for i, row in tqdm(self.files_df.iterrows(),  # type: ignore
        #                    total=len(self.files_df),  # type: ignore
        #                    desc='Processing LED data: '):^
        for i, row in self.files_df.iterrows():
            _df = self.process_dataset(row['path'], module=row['module'])
            _df['Vbias'] = row['Vbias']
            _df['LEDwidth'] = row['LEDwidth']
            _df['LEDvoltage'] = row['LEDvoltage']
            _df['module'] = row['module']
            _dfs.append(_df)
        df_processed = pd.concat(_dfs)
        del _dfs
        df_processed.sort_values(by=['Vbias', 'LEDvoltage',
                                     'LEDwidth', 'module',
                                     'channel', 'wf_number'])

        self.df_processed = df_processed

    def check_settings_available(self):
        """Set led_voltages, led_widths and sipms_voltages as object
        attributes.
        """

        self.led_voltages = self.files_df['LEDvoltage'].unique()
        self.led_widths = self.files_df['LEDwidth'].unique()
        self.sipms_voltages = self.files_df['Vbias'].unique()

    def get_1_pe_fit_led(self,
                         df_processed: pd.DataFrame,
                         module: int,
                         channel: str) -> Tuple[float, float,
                                                float, np.ndarray]:
        """Fit the SPE peak of an LED area hsitogram for a given module and
        channel. Uses scipy.curve_fit with a Gaussian function.

        Args:
            df_processed (pd.DataFrame): dataframe with processed data
            module (int): module to consider
            channel (str): channel to consider

        Returns:
            Tuple[float, float, float, np.ndarray]: A, mu, sigma,
                and cov resulting of the fit
        """

        df_processed_mask = (
            (df_processed['module'] == module) &
            (df_processed['channel'] == channel))

        hist = np.histogram(df_processed[df_processed_mask]['led_area'],
                            bins=np.linspace(-2000, 20000, 300))
        middle_bins = (hist[1][:-1] + hist[1][1:]) / 2

        try:
            peaks, properties = find_peaks(hist[0],
                                           prominence=100,
                                           distance=5)
            spe_rough = middle_bins[peaks[1]]
        except BaseException:
            spe_rough = 2500
        #if (spe_rough -2000) > 1000: spe_rough = 2000

        spe_mask = np.abs(middle_bins - spe_rough) < spe_rough * 0.5
        (A, mu, sigma), cov = curve_fit(pylars.utils.common.Gaussian,
                                        middle_bins[spe_mask],
                                        hist[0][spe_mask],
                                        p0=[2000, spe_rough,
                                            spe_rough * 0.05])
        return A, mu, sigma, cov

    def get_occupancy(self, results_one_channel: pd.DataFrame,
                      mu: float, mu_err: float,
                      method: str = 'mean') -> Tuple[float, float]:
        """Calculate the occupancy of an LED dataset for a given channel.

        Args:
            results_one_channel (pd.DataFrame): dataframe with the results
                of a single channel
            mu (float): the SPE fit mean in ADCcounts
            mu_err (float): error in mu in ADCcounts
            method (str, optional): Method to calculate the occupancy. Either
                'mean' or 'median'. Defaults to 'mean'.

        Returns:
            Tuple[float, float]: Occupancy and its error. Uses mean by default!
        """

        if method == 'mean':
            med = np.mean(results_one_channel['led_area'])
        elif method == 'median':
            med = np.median(results_one_channel['led_area'])
        else:
            raise ValueError(
                'Method for occupancy calculation neither mean nor median')

        med_err = np.std(
            results_one_channel['led_area']) / np.sqrt(
                len(results_one_channel['led_area']))

        occ = med / mu
        occ_err = ((med_err / mu)**2 + ((med / mu**2) * mu_err)**2)**0.5
        return occ, occ_err

    def calculate_gain_occ(self, processed_df_single_led: pd.DataFrame,
                           module: int, channel: str,
                           occ_method: str = 'mean') -> Tuple[float, float,
                                                              float, float]:
        """Calculates the gain, occupancy and their errors for a given module
        and channel from a dataframe with the processed LED data.

        Args:
            processed_df_single_led (pd.DataFrame): dataframe with the
                processed LED data
            module (int): module to consider
            channel (str): channel to consider
            occ_method (str, optional): Method to calculate the occupancy.
                'mean' or 'median'. Defaults to 'mean'.

        Returns:
            Tuple[float, float, float, float]: Return the gain, gain_err,
                occ, occ_err of the requested channel.
        """

        A, mu, sigma, cov = self.get_1_pe_fit_led(processed_df_single_led,
                                                  module,
                                                  channel)
        A_err, mu_err, sigma_err = np.sqrt(np.diag(cov))

        gain = pylars.utils.common.get_gain(F_amp=20, spe_area=mu) / 1e6
        gain_err = pylars.utils.common.get_gain(
            F_amp=20, spe_area=mu_err) / 1e6

        results_mask = ((processed_df_single_led['module'] == module) &
                        (processed_df_single_led['channel'] == channel)
                        )
        occ, occ_err = self.get_occupancy(processed_df_single_led[results_mask],
                                          mu, mu_err, method=occ_method)

        return gain, gain_err, occ, occ_err

    def calculate_all_gains_occ(self):
        '''Calculate gains and occupancies for all channels available in the
        processed data. The results are stored in self.results_df and a
        registry of failed processing in self.failed_calculation_df.
        '''

        # Check if processed data exists
        if not hasattr(self, 'df_processed'):
            raise AssertionError(
                'No processed data found. Run process_all_datasets first.')

        results_df = pd.DataFrame(columns=['Vbias', 'LEDvoltage',
                                           'LEDwidth', 'module',
                                           'channel', 'gain',
                                           'gain_err', 'occ',
                                           'occ_err'])
        failed_calculation_df = pd.DataFrame(columns=['Vbias', 'LEDvoltage',
                                                      'LEDwidth', 'module',
                                                      'channel'])

        # for i, row in tqdm(self.files_df.iterrows(),
        #                    total=len(self.files_df),
        #                    desc='Calculating gains and occupancies: '):
        for i, row in self.files_df.iterrows():
            _vbias = row['Vbias']
            _led_voltage = row['LEDvoltage']
            _led_width = row['LEDwidth']
            _module = row['module']

            _df_select_processed = self.df_processed[
                (self.df_processed['Vbias'] == _vbias) &
                (self.df_processed['LEDvoltage'] == _led_voltage) &
                (self.df_processed['LEDwidth'] == _led_width)]

            _channels = _df_select_processed[
                _df_select_processed['module'] == _module]['channel'].unique()

            for _channel in _channels:
                try:
                    _gain, _gain_err, _occ, _occ_err = self.calculate_gain_occ(
                        _df_select_processed, _module, _channel)

                    results_df = pd.concat(
                        (results_df,
                            pd.DataFrame({'Vbias': [_vbias],
                                          'LEDvoltage': [_led_voltage],
                                          'LEDwidth': [_led_width],
                                          'module': [_module],
                                          'channel': [_channel],
                                          'gain': [_gain],
                                          'gain_err': [_gain_err],
                                          'occ': [_occ],
                                          'occ_err': [_occ_err]}),
                         ), ignore_index=True)
                except BaseException:
                    failed_calculation_df = pd.concat(
                        (failed_calculation_df,
                         pd.DataFrame({'Vbias': [_vbias],
                                       'LEDvoltage': [_led_voltage],
                                       'LEDwidth': [_led_width],
                                       'module': [_module],
                                       'channel': [_channel]})),
                        ignore_index=True)

        self.failed_calculation_df = failed_calculation_df
        self.results_df = results_df

    def save_gain_results(self, name: str = ''):
        """Save the results of the gain and occupancy calculations to a csv
        file.

        Args:
            name (str, optional): name to give to the file. Defaults to '',
            saving a file with the current timestamp.

        Raises:
            AssertionError: raised if no results dataframe is found in the
                object.
        """

        # Check if results exist
        if not hasattr(self, 'results_df'):
            raise AssertionError(
                'No results found. Run calculate_all_gains_occ first.')

        # Save results
        if name == '':
            now = datetime.datetime.now().isoformat()
            self.results_df.to_csv(f'gain_results_{str(now)}.csv', index=False)
        else:
            self.results_df.to_csv(f'{name}.csv', index=False)

    def print_gains_occ_for_wiki(self):
        if not hasattr(self, 'df_gains'):
            raise AssertionError('No computed gains found. Run '
                                 'calculate_all_gains_occ first.')
        for i, row in self.df_gains.iterrows():  # type: ignore
            print(f"| {row['tile']} | {row['gain']:.3f} $\\pm$ "  # type: ignore
                  # type: ignore
                  f"{row['gain_err']:.3f} | {row['occ']:.3f} $\\pm$ "
                  f"{row['occ_err']:.3f} |")

    @staticmethod
    def export_gains(gain_evolution: pd.DataFrame,
                     method: str = 'mean',
                     tag: str = 'vx'):
        """Export the gains as a single number to a csv file. Method can be
            'mean', 'median' or 'last'. Default is 'mean'.

        Args:
            gain_evolution (pd.DataFrame): dataframe with the gain evolution
            method (str, optional): method to calculate the number to use as
                gain. Can be 'mean', 'median' or 'last'. Defaults to 'mean'.
            tag (str, optional): Tag to give the gains in the form v#
                (v0,v1,v2,...). Defaults to 'vx'.

        Raises:
            ValueError: Raised if the method is not recognized.
        """
        if method == 'mean':
            gains = gain_evolution.groupby(
                ['module', 'tile', 'channel']).mean()
        elif method == 'median':
            gains = gain_evolution.groupby(
                ['module', 'tile', 'channel']).median()
        elif method == 'last':
            gains = gain_evolution.groupby(
                ['module', 'tile', 'channel']).last()
        else:
            raise ValueError('Method not recognized')

        gains.to_csv(f'gains_{tag}.csv')

Classes

class LED_window (led_window: Tuple[int, int], led_data_path: str)

Class to hold analysis of LED ON data with integrating window. Use independently of BV datasets.

Expand source code
class LED_window():
    '''Class to hold analysis of LED ON data with integrating window.
    Use independently of BV datasets.
    '''

    def __init__(self,
                 led_window: Tuple[int, int],
                 led_data_path: str) -> None:
        self.led_window = led_window
        self.led_data_path = led_data_path
        self.baseline_samples = 50
        self.files_df = self.find_files()

    def find_files(self) -> pd.DataFrame:  # type: ignore
        '''Find the raw data file and the LED data file.
        TODO
        '''
        pass

        # files = glob.glob(self.led_data_path + '**/*.root', recursive=True)
        # _dfs = []
        # for f in tqdm(files, desc='Finding LED data files: '):
        #     try:
        #         _module = int(f.split('/')[-1].split('_')[4])
        #         _width, _voltage = self.get_LED_width_voltage(f)
        #         _dfs.append({'LEDwidth': _width,
        #                      'LEDvoltage': _voltage,
        #                      'module': _module,
        #                      'path': f})
        #     except:
        #         print(f'Failed to get info for file {f}')
        #         continue
        # files_df = pd.DataFrame(_dfs)
        # files_df.sort_values(by=['LEDwidth', 'LEDvoltage', 'module'],
        #              ignore_index = True, inplace=True)
        # del _dfs, files

        # return files_df

    @staticmethod
    def get_LED_width_voltage(path):
        """Read LED width and voltage from the file path."""

        root_name_list = path.split('/')[-1].split('_')
        pulser_width = int(root_name_list[0][:-2])
        voltage = float(root_name_list[1] + '.' + root_name_list[2][0])
        return pulser_width, voltage

    def process_dataset(self, data_path: str, module: int = 0) -> pd.DataFrame:
        '''Process a dataset with fixwindowprocessor. Requires a raw data file
        and a LED window specified.
        '''

        processor = pylars.processing.fixwindowprocessor.window_processor(
            baseline_samples=self.baseline_samples,
            led_window=(self.led_window[0], self.led_window[1]))

        processor.load_raw_data(path_to_raw=data_path, module=module)
        df_processed = processor.process_all_channels()
        return df_processed

    def process_all_datasets(self) -> None:
        '''Process all datasets in the LED data path.
        '''

        if not hasattr(self, 'files_df'):
            raise AssertionError('No files found. Run find_files first.')

        _dfs = []
        # for i, row in tqdm(self.files_df.iterrows(),  # type: ignore
        #                    total=len(self.files_df),  # type: ignore
        #                    desc='Processing LED data: '):^
        for i, row in self.files_df.iterrows():
            _df = self.process_dataset(row['path'], module=row['module'])
            _df['Vbias'] = row['Vbias']
            _df['LEDwidth'] = row['LEDwidth']
            _df['LEDvoltage'] = row['LEDvoltage']
            _df['module'] = row['module']
            _dfs.append(_df)
        df_processed = pd.concat(_dfs)
        del _dfs
        df_processed.sort_values(by=['Vbias', 'LEDvoltage',
                                     'LEDwidth', 'module',
                                     'channel', 'wf_number'])

        self.df_processed = df_processed

    def check_settings_available(self):
        """Set led_voltages, led_widths and sipms_voltages as object
        attributes.
        """

        self.led_voltages = self.files_df['LEDvoltage'].unique()
        self.led_widths = self.files_df['LEDwidth'].unique()
        self.sipms_voltages = self.files_df['Vbias'].unique()

    def get_1_pe_fit_led(self,
                         df_processed: pd.DataFrame,
                         module: int,
                         channel: str) -> Tuple[float, float,
                                                float, np.ndarray]:
        """Fit the SPE peak of an LED area hsitogram for a given module and
        channel. Uses scipy.curve_fit with a Gaussian function.

        Args:
            df_processed (pd.DataFrame): dataframe with processed data
            module (int): module to consider
            channel (str): channel to consider

        Returns:
            Tuple[float, float, float, np.ndarray]: A, mu, sigma,
                and cov resulting of the fit
        """

        df_processed_mask = (
            (df_processed['module'] == module) &
            (df_processed['channel'] == channel))

        hist = np.histogram(df_processed[df_processed_mask]['led_area'],
                            bins=np.linspace(-2000, 20000, 300))
        middle_bins = (hist[1][:-1] + hist[1][1:]) / 2

        try:
            peaks, properties = find_peaks(hist[0],
                                           prominence=100,
                                           distance=5)
            spe_rough = middle_bins[peaks[1]]
        except BaseException:
            spe_rough = 2500
        #if (spe_rough -2000) > 1000: spe_rough = 2000

        spe_mask = np.abs(middle_bins - spe_rough) < spe_rough * 0.5
        (A, mu, sigma), cov = curve_fit(pylars.utils.common.Gaussian,
                                        middle_bins[spe_mask],
                                        hist[0][spe_mask],
                                        p0=[2000, spe_rough,
                                            spe_rough * 0.05])
        return A, mu, sigma, cov

    def get_occupancy(self, results_one_channel: pd.DataFrame,
                      mu: float, mu_err: float,
                      method: str = 'mean') -> Tuple[float, float]:
        """Calculate the occupancy of an LED dataset for a given channel.

        Args:
            results_one_channel (pd.DataFrame): dataframe with the results
                of a single channel
            mu (float): the SPE fit mean in ADCcounts
            mu_err (float): error in mu in ADCcounts
            method (str, optional): Method to calculate the occupancy. Either
                'mean' or 'median'. Defaults to 'mean'.

        Returns:
            Tuple[float, float]: Occupancy and its error. Uses mean by default!
        """

        if method == 'mean':
            med = np.mean(results_one_channel['led_area'])
        elif method == 'median':
            med = np.median(results_one_channel['led_area'])
        else:
            raise ValueError(
                'Method for occupancy calculation neither mean nor median')

        med_err = np.std(
            results_one_channel['led_area']) / np.sqrt(
                len(results_one_channel['led_area']))

        occ = med / mu
        occ_err = ((med_err / mu)**2 + ((med / mu**2) * mu_err)**2)**0.5
        return occ, occ_err

    def calculate_gain_occ(self, processed_df_single_led: pd.DataFrame,
                           module: int, channel: str,
                           occ_method: str = 'mean') -> Tuple[float, float,
                                                              float, float]:
        """Calculates the gain, occupancy and their errors for a given module
        and channel from a dataframe with the processed LED data.

        Args:
            processed_df_single_led (pd.DataFrame): dataframe with the
                processed LED data
            module (int): module to consider
            channel (str): channel to consider
            occ_method (str, optional): Method to calculate the occupancy.
                'mean' or 'median'. Defaults to 'mean'.

        Returns:
            Tuple[float, float, float, float]: Return the gain, gain_err,
                occ, occ_err of the requested channel.
        """

        A, mu, sigma, cov = self.get_1_pe_fit_led(processed_df_single_led,
                                                  module,
                                                  channel)
        A_err, mu_err, sigma_err = np.sqrt(np.diag(cov))

        gain = pylars.utils.common.get_gain(F_amp=20, spe_area=mu) / 1e6
        gain_err = pylars.utils.common.get_gain(
            F_amp=20, spe_area=mu_err) / 1e6

        results_mask = ((processed_df_single_led['module'] == module) &
                        (processed_df_single_led['channel'] == channel)
                        )
        occ, occ_err = self.get_occupancy(processed_df_single_led[results_mask],
                                          mu, mu_err, method=occ_method)

        return gain, gain_err, occ, occ_err

    def calculate_all_gains_occ(self):
        '''Calculate gains and occupancies for all channels available in the
        processed data. The results are stored in self.results_df and a
        registry of failed processing in self.failed_calculation_df.
        '''

        # Check if processed data exists
        if not hasattr(self, 'df_processed'):
            raise AssertionError(
                'No processed data found. Run process_all_datasets first.')

        results_df = pd.DataFrame(columns=['Vbias', 'LEDvoltage',
                                           'LEDwidth', 'module',
                                           'channel', 'gain',
                                           'gain_err', 'occ',
                                           'occ_err'])
        failed_calculation_df = pd.DataFrame(columns=['Vbias', 'LEDvoltage',
                                                      'LEDwidth', 'module',
                                                      'channel'])

        # for i, row in tqdm(self.files_df.iterrows(),
        #                    total=len(self.files_df),
        #                    desc='Calculating gains and occupancies: '):
        for i, row in self.files_df.iterrows():
            _vbias = row['Vbias']
            _led_voltage = row['LEDvoltage']
            _led_width = row['LEDwidth']
            _module = row['module']

            _df_select_processed = self.df_processed[
                (self.df_processed['Vbias'] == _vbias) &
                (self.df_processed['LEDvoltage'] == _led_voltage) &
                (self.df_processed['LEDwidth'] == _led_width)]

            _channels = _df_select_processed[
                _df_select_processed['module'] == _module]['channel'].unique()

            for _channel in _channels:
                try:
                    _gain, _gain_err, _occ, _occ_err = self.calculate_gain_occ(
                        _df_select_processed, _module, _channel)

                    results_df = pd.concat(
                        (results_df,
                            pd.DataFrame({'Vbias': [_vbias],
                                          'LEDvoltage': [_led_voltage],
                                          'LEDwidth': [_led_width],
                                          'module': [_module],
                                          'channel': [_channel],
                                          'gain': [_gain],
                                          'gain_err': [_gain_err],
                                          'occ': [_occ],
                                          'occ_err': [_occ_err]}),
                         ), ignore_index=True)
                except BaseException:
                    failed_calculation_df = pd.concat(
                        (failed_calculation_df,
                         pd.DataFrame({'Vbias': [_vbias],
                                       'LEDvoltage': [_led_voltage],
                                       'LEDwidth': [_led_width],
                                       'module': [_module],
                                       'channel': [_channel]})),
                        ignore_index=True)

        self.failed_calculation_df = failed_calculation_df
        self.results_df = results_df

    def save_gain_results(self, name: str = ''):
        """Save the results of the gain and occupancy calculations to a csv
        file.

        Args:
            name (str, optional): name to give to the file. Defaults to '',
            saving a file with the current timestamp.

        Raises:
            AssertionError: raised if no results dataframe is found in the
                object.
        """

        # Check if results exist
        if not hasattr(self, 'results_df'):
            raise AssertionError(
                'No results found. Run calculate_all_gains_occ first.')

        # Save results
        if name == '':
            now = datetime.datetime.now().isoformat()
            self.results_df.to_csv(f'gain_results_{str(now)}.csv', index=False)
        else:
            self.results_df.to_csv(f'{name}.csv', index=False)

    def print_gains_occ_for_wiki(self):
        if not hasattr(self, 'df_gains'):
            raise AssertionError('No computed gains found. Run '
                                 'calculate_all_gains_occ first.')
        for i, row in self.df_gains.iterrows():  # type: ignore
            print(f"| {row['tile']} | {row['gain']:.3f} $\\pm$ "  # type: ignore
                  # type: ignore
                  f"{row['gain_err']:.3f} | {row['occ']:.3f} $\\pm$ "
                  f"{row['occ_err']:.3f} |")

    @staticmethod
    def export_gains(gain_evolution: pd.DataFrame,
                     method: str = 'mean',
                     tag: str = 'vx'):
        """Export the gains as a single number to a csv file. Method can be
            'mean', 'median' or 'last'. Default is 'mean'.

        Args:
            gain_evolution (pd.DataFrame): dataframe with the gain evolution
            method (str, optional): method to calculate the number to use as
                gain. Can be 'mean', 'median' or 'last'. Defaults to 'mean'.
            tag (str, optional): Tag to give the gains in the form v#
                (v0,v1,v2,...). Defaults to 'vx'.

        Raises:
            ValueError: Raised if the method is not recognized.
        """
        if method == 'mean':
            gains = gain_evolution.groupby(
                ['module', 'tile', 'channel']).mean()
        elif method == 'median':
            gains = gain_evolution.groupby(
                ['module', 'tile', 'channel']).median()
        elif method == 'last':
            gains = gain_evolution.groupby(
                ['module', 'tile', 'channel']).last()
        else:
            raise ValueError('Method not recognized')

        gains.to_csv(f'gains_{tag}.csv')

Static methods

def export_gains(gain_evolution: pandas.core.frame.DataFrame, method: str = 'mean', tag: str = 'vx')

Export the gains as a single number to a csv file. Method can be 'mean', 'median' or 'last'. Default is 'mean'.

Args

gain_evolution : pd.DataFrame
dataframe with the gain evolution
method : str, optional
method to calculate the number to use as gain. Can be 'mean', 'median' or 'last'. Defaults to 'mean'.
tag : str, optional
Tag to give the gains in the form v# (v0,v1,v2,…). Defaults to 'vx'.

Raises

ValueError
Raised if the method is not recognized.
Expand source code
@staticmethod
def export_gains(gain_evolution: pd.DataFrame,
                 method: str = 'mean',
                 tag: str = 'vx'):
    """Export the gains as a single number to a csv file. Method can be
        'mean', 'median' or 'last'. Default is 'mean'.

    Args:
        gain_evolution (pd.DataFrame): dataframe with the gain evolution
        method (str, optional): method to calculate the number to use as
            gain. Can be 'mean', 'median' or 'last'. Defaults to 'mean'.
        tag (str, optional): Tag to give the gains in the form v#
            (v0,v1,v2,...). Defaults to 'vx'.

    Raises:
        ValueError: Raised if the method is not recognized.
    """
    if method == 'mean':
        gains = gain_evolution.groupby(
            ['module', 'tile', 'channel']).mean()
    elif method == 'median':
        gains = gain_evolution.groupby(
            ['module', 'tile', 'channel']).median()
    elif method == 'last':
        gains = gain_evolution.groupby(
            ['module', 'tile', 'channel']).last()
    else:
        raise ValueError('Method not recognized')

    gains.to_csv(f'gains_{tag}.csv')
def get_LED_width_voltage(path)

Read LED width and voltage from the file path.

Expand source code
@staticmethod
def get_LED_width_voltage(path):
    """Read LED width and voltage from the file path."""

    root_name_list = path.split('/')[-1].split('_')
    pulser_width = int(root_name_list[0][:-2])
    voltage = float(root_name_list[1] + '.' + root_name_list[2][0])
    return pulser_width, voltage

Methods

def calculate_all_gains_occ(self)

Calculate gains and occupancies for all channels available in the processed data. The results are stored in self.results_df and a registry of failed processing in self.failed_calculation_df.

Expand source code
def calculate_all_gains_occ(self):
    '''Calculate gains and occupancies for all channels available in the
    processed data. The results are stored in self.results_df and a
    registry of failed processing in self.failed_calculation_df.
    '''

    # Check if processed data exists
    if not hasattr(self, 'df_processed'):
        raise AssertionError(
            'No processed data found. Run process_all_datasets first.')

    results_df = pd.DataFrame(columns=['Vbias', 'LEDvoltage',
                                       'LEDwidth', 'module',
                                       'channel', 'gain',
                                       'gain_err', 'occ',
                                       'occ_err'])
    failed_calculation_df = pd.DataFrame(columns=['Vbias', 'LEDvoltage',
                                                  'LEDwidth', 'module',
                                                  'channel'])

    # for i, row in tqdm(self.files_df.iterrows(),
    #                    total=len(self.files_df),
    #                    desc='Calculating gains and occupancies: '):
    for i, row in self.files_df.iterrows():
        _vbias = row['Vbias']
        _led_voltage = row['LEDvoltage']
        _led_width = row['LEDwidth']
        _module = row['module']

        _df_select_processed = self.df_processed[
            (self.df_processed['Vbias'] == _vbias) &
            (self.df_processed['LEDvoltage'] == _led_voltage) &
            (self.df_processed['LEDwidth'] == _led_width)]

        _channels = _df_select_processed[
            _df_select_processed['module'] == _module]['channel'].unique()

        for _channel in _channels:
            try:
                _gain, _gain_err, _occ, _occ_err = self.calculate_gain_occ(
                    _df_select_processed, _module, _channel)

                results_df = pd.concat(
                    (results_df,
                        pd.DataFrame({'Vbias': [_vbias],
                                      'LEDvoltage': [_led_voltage],
                                      'LEDwidth': [_led_width],
                                      'module': [_module],
                                      'channel': [_channel],
                                      'gain': [_gain],
                                      'gain_err': [_gain_err],
                                      'occ': [_occ],
                                      'occ_err': [_occ_err]}),
                     ), ignore_index=True)
            except BaseException:
                failed_calculation_df = pd.concat(
                    (failed_calculation_df,
                     pd.DataFrame({'Vbias': [_vbias],
                                   'LEDvoltage': [_led_voltage],
                                   'LEDwidth': [_led_width],
                                   'module': [_module],
                                   'channel': [_channel]})),
                    ignore_index=True)

    self.failed_calculation_df = failed_calculation_df
    self.results_df = results_df
def calculate_gain_occ(self, processed_df_single_led: pandas.core.frame.DataFrame, module: int, channel: str, occ_method: str = 'mean') ‑> Tuple[float, float, float, float]

Calculates the gain, occupancy and their errors for a given module and channel from a dataframe with the processed LED data.

Args

processed_df_single_led : pd.DataFrame
dataframe with the processed LED data
module : int
module to consider
channel : str
channel to consider
occ_method : str, optional
Method to calculate the occupancy. 'mean' or 'median'. Defaults to 'mean'.

Returns

Tuple[float, float, float, float]
Return the gain, gain_err, occ, occ_err of the requested channel.
Expand source code
def calculate_gain_occ(self, processed_df_single_led: pd.DataFrame,
                       module: int, channel: str,
                       occ_method: str = 'mean') -> Tuple[float, float,
                                                          float, float]:
    """Calculates the gain, occupancy and their errors for a given module
    and channel from a dataframe with the processed LED data.

    Args:
        processed_df_single_led (pd.DataFrame): dataframe with the
            processed LED data
        module (int): module to consider
        channel (str): channel to consider
        occ_method (str, optional): Method to calculate the occupancy.
            'mean' or 'median'. Defaults to 'mean'.

    Returns:
        Tuple[float, float, float, float]: Return the gain, gain_err,
            occ, occ_err of the requested channel.
    """

    A, mu, sigma, cov = self.get_1_pe_fit_led(processed_df_single_led,
                                              module,
                                              channel)
    A_err, mu_err, sigma_err = np.sqrt(np.diag(cov))

    gain = pylars.utils.common.get_gain(F_amp=20, spe_area=mu) / 1e6
    gain_err = pylars.utils.common.get_gain(
        F_amp=20, spe_area=mu_err) / 1e6

    results_mask = ((processed_df_single_led['module'] == module) &
                    (processed_df_single_led['channel'] == channel)
                    )
    occ, occ_err = self.get_occupancy(processed_df_single_led[results_mask],
                                      mu, mu_err, method=occ_method)

    return gain, gain_err, occ, occ_err
def check_settings_available(self)

Set led_voltages, led_widths and sipms_voltages as object attributes.

Expand source code
def check_settings_available(self):
    """Set led_voltages, led_widths and sipms_voltages as object
    attributes.
    """

    self.led_voltages = self.files_df['LEDvoltage'].unique()
    self.led_widths = self.files_df['LEDwidth'].unique()
    self.sipms_voltages = self.files_df['Vbias'].unique()
def find_files(self) ‑> pandas.core.frame.DataFrame

Find the raw data file and the LED data file. TODO

Expand source code
def find_files(self) -> pd.DataFrame:  # type: ignore
    '''Find the raw data file and the LED data file.
    TODO
    '''
    pass

    # files = glob.glob(self.led_data_path + '**/*.root', recursive=True)
    # _dfs = []
    # for f in tqdm(files, desc='Finding LED data files: '):
    #     try:
    #         _module = int(f.split('/')[-1].split('_')[4])
    #         _width, _voltage = self.get_LED_width_voltage(f)
    #         _dfs.append({'LEDwidth': _width,
    #                      'LEDvoltage': _voltage,
    #                      'module': _module,
    #                      'path': f})
    #     except:
    #         print(f'Failed to get info for file {f}')
    #         continue
    # files_df = pd.DataFrame(_dfs)
    # files_df.sort_values(by=['LEDwidth', 'LEDvoltage', 'module'],
    #              ignore_index = True, inplace=True)
    # del _dfs, files

    # return files_df
def get_1_pe_fit_led(self, df_processed: pandas.core.frame.DataFrame, module: int, channel: str) ‑> Tuple[float, float, float, numpy.ndarray]

Fit the SPE peak of an LED area hsitogram for a given module and channel. Uses scipy.curve_fit with a Gaussian function.

Args

df_processed : pd.DataFrame
dataframe with processed data
module : int
module to consider
channel : str
channel to consider

Returns

Tuple[float, float, float, np.ndarray]
A, mu, sigma, and cov resulting of the fit
Expand source code
def get_1_pe_fit_led(self,
                     df_processed: pd.DataFrame,
                     module: int,
                     channel: str) -> Tuple[float, float,
                                            float, np.ndarray]:
    """Fit the SPE peak of an LED area hsitogram for a given module and
    channel. Uses scipy.curve_fit with a Gaussian function.

    Args:
        df_processed (pd.DataFrame): dataframe with processed data
        module (int): module to consider
        channel (str): channel to consider

    Returns:
        Tuple[float, float, float, np.ndarray]: A, mu, sigma,
            and cov resulting of the fit
    """

    df_processed_mask = (
        (df_processed['module'] == module) &
        (df_processed['channel'] == channel))

    hist = np.histogram(df_processed[df_processed_mask]['led_area'],
                        bins=np.linspace(-2000, 20000, 300))
    middle_bins = (hist[1][:-1] + hist[1][1:]) / 2

    try:
        peaks, properties = find_peaks(hist[0],
                                       prominence=100,
                                       distance=5)
        spe_rough = middle_bins[peaks[1]]
    except BaseException:
        spe_rough = 2500
    #if (spe_rough -2000) > 1000: spe_rough = 2000

    spe_mask = np.abs(middle_bins - spe_rough) < spe_rough * 0.5
    (A, mu, sigma), cov = curve_fit(pylars.utils.common.Gaussian,
                                    middle_bins[spe_mask],
                                    hist[0][spe_mask],
                                    p0=[2000, spe_rough,
                                        spe_rough * 0.05])
    return A, mu, sigma, cov
def get_occupancy(self, results_one_channel: pandas.core.frame.DataFrame, mu: float, mu_err: float, method: str = 'mean') ‑> Tuple[float, float]

Calculate the occupancy of an LED dataset for a given channel.

Args

results_one_channel : pd.DataFrame
dataframe with the results of a single channel
mu : float
the SPE fit mean in ADCcounts
mu_err : float
error in mu in ADCcounts
method : str, optional
Method to calculate the occupancy. Either 'mean' or 'median'. Defaults to 'mean'.

Returns

Tuple[float, float]
Occupancy and its error. Uses mean by default!
Expand source code
def get_occupancy(self, results_one_channel: pd.DataFrame,
                  mu: float, mu_err: float,
                  method: str = 'mean') -> Tuple[float, float]:
    """Calculate the occupancy of an LED dataset for a given channel.

    Args:
        results_one_channel (pd.DataFrame): dataframe with the results
            of a single channel
        mu (float): the SPE fit mean in ADCcounts
        mu_err (float): error in mu in ADCcounts
        method (str, optional): Method to calculate the occupancy. Either
            'mean' or 'median'. Defaults to 'mean'.

    Returns:
        Tuple[float, float]: Occupancy and its error. Uses mean by default!
    """

    if method == 'mean':
        med = np.mean(results_one_channel['led_area'])
    elif method == 'median':
        med = np.median(results_one_channel['led_area'])
    else:
        raise ValueError(
            'Method for occupancy calculation neither mean nor median')

    med_err = np.std(
        results_one_channel['led_area']) / np.sqrt(
            len(results_one_channel['led_area']))

    occ = med / mu
    occ_err = ((med_err / mu)**2 + ((med / mu**2) * mu_err)**2)**0.5
    return occ, occ_err
def print_gains_occ_for_wiki(self)
Expand source code
def print_gains_occ_for_wiki(self):
    if not hasattr(self, 'df_gains'):
        raise AssertionError('No computed gains found. Run '
                             'calculate_all_gains_occ first.')
    for i, row in self.df_gains.iterrows():  # type: ignore
        print(f"| {row['tile']} | {row['gain']:.3f} $\\pm$ "  # type: ignore
              # type: ignore
              f"{row['gain_err']:.3f} | {row['occ']:.3f} $\\pm$ "
              f"{row['occ_err']:.3f} |")
def process_all_datasets(self) ‑> NoneType

Process all datasets in the LED data path.

Expand source code
def process_all_datasets(self) -> None:
    '''Process all datasets in the LED data path.
    '''

    if not hasattr(self, 'files_df'):
        raise AssertionError('No files found. Run find_files first.')

    _dfs = []
    # for i, row in tqdm(self.files_df.iterrows(),  # type: ignore
    #                    total=len(self.files_df),  # type: ignore
    #                    desc='Processing LED data: '):^
    for i, row in self.files_df.iterrows():
        _df = self.process_dataset(row['path'], module=row['module'])
        _df['Vbias'] = row['Vbias']
        _df['LEDwidth'] = row['LEDwidth']
        _df['LEDvoltage'] = row['LEDvoltage']
        _df['module'] = row['module']
        _dfs.append(_df)
    df_processed = pd.concat(_dfs)
    del _dfs
    df_processed.sort_values(by=['Vbias', 'LEDvoltage',
                                 'LEDwidth', 'module',
                                 'channel', 'wf_number'])

    self.df_processed = df_processed
def process_dataset(self, data_path: str, module: int = 0) ‑> pandas.core.frame.DataFrame

Process a dataset with fixwindowprocessor. Requires a raw data file and a LED window specified.

Expand source code
def process_dataset(self, data_path: str, module: int = 0) -> pd.DataFrame:
    '''Process a dataset with fixwindowprocessor. Requires a raw data file
    and a LED window specified.
    '''

    processor = pylars.processing.fixwindowprocessor.window_processor(
        baseline_samples=self.baseline_samples,
        led_window=(self.led_window[0], self.led_window[1]))

    processor.load_raw_data(path_to_raw=data_path, module=module)
    df_processed = processor.process_all_channels()
    return df_processed
def save_gain_results(self, name: str = '')

Save the results of the gain and occupancy calculations to a csv file.

Args

name : str, optional
name to give to the file. Defaults to '',

saving a file with the current timestamp.

Raises

AssertionError
raised if no results dataframe is found in the object.
Expand source code
def save_gain_results(self, name: str = ''):
    """Save the results of the gain and occupancy calculations to a csv
    file.

    Args:
        name (str, optional): name to give to the file. Defaults to '',
        saving a file with the current timestamp.

    Raises:
        AssertionError: raised if no results dataframe is found in the
            object.
    """

    # Check if results exist
    if not hasattr(self, 'results_df'):
        raise AssertionError(
            'No results found. Run calculate_all_gains_occ first.')

    # Save results
    if name == '':
        now = datetime.datetime.now().isoformat()
        self.results_df.to_csv(f'gain_results_{str(now)}.csv', index=False)
    else:
        self.results_df.to_csv(f'{name}.csv', index=False)