Module pylars.utils.common

Expand source code
import base64
import hashlib
import itertools
import json
from typing import Dict, List, Union, Tuple

import numpy as np
import pandas as pd
import pylars
from scipy.signal import find_peaks
import scipy.ndimage


def Gaussian(x, A, mu, sigma):
    y = A * np.exp(-((x - mu) / sigma)**2 / 2) / np.sqrt(2 * np.pi * sigma**2)
    return y


def func_linear(x, a, b):
    return a * x + b


def get_deterministic_hash(id: str) -> str:
    """Return an hash with 7 characters from a string.

    In detail, returns a base32 lowercase string of length determined
    from hashing the configs. Based on
    https://github.com/AxFoundation/strax/blob/
    156254287c2037876a7040460b3551d590bf5589/strax/utils.py#L303

    Args:
        id (str): thing to hash

    Returns:
        str: hashed version of the thing
    """
    jsonned = json.dumps(id)
    digest = hashlib.sha1(jsonned.encode('ascii')).digest()
    readable_hash = base64.b32encode(digest)[:7].decode('ascii').lower()
    return readable_hash


def load_ADC_config(model: str, F_amp: float) -> Dict[str, Union[int, float]]:
    """Load the ADC related quantities depending on the model.

    Args:
        model (str): model of the digitizer
        F_amp (float): signal amplification from the sensor (pre-amp *
            external amplification on the rack).

    Raises:
        NotImplementedError: Raised if the requested model is not yet
            implemented

    Returns:
        dict: Python dictionary with the digitizer-related configs.
    """

    available_model_configs = ['v1724', 'v1730']

    if model == 'v1724':
        """ More info at https://www.caen.it/products/v1724/"""

        ADC_config = {'ADC_range': 2.25,  # V
                      'ADC_impedance': 50,  # ohm
                      'F_amp': F_amp,  # external amplification
                      'ADC_res': 2**14,  # bit-wise resolution
                      'q_e': 1.602176634e-19,  # electron charge
                      'dt': 10e-9}  # sampling time length

    elif model == 'v1730':
        """More info at https://www.caen.it/products/v1730/"""

        ADC_config = {'ADC_range': 2.00,  # V
                      'ADC_impedance': 50,  # ohm
                      'F_amp': F_amp,  # external amplification
                      'ADC_res': 2**14,  # bit-wise resolution
                      'q_e': 1.602176634e-19,  # electron charge
                      'dt': 2e-9}  # sampling time length
    else:
        raise NotImplementedError(f'''The requested model ({model}) is not
            implemented. Choose from {available_model_configs}.''')

    return ADC_config


def get_gain(F_amp: float,
             spe_area: float,
             ADC_range: float = 2.25,
             ADC_impedance: float = 50,
             ADC_res: float = 2**16,
             q_e: float = 1.602176634e-19,
             ) -> float:
    """Compute the gain given the value of the SPE area and the ADC
        paramenters.

    Args:
        F_amp (float): Total signal amplification factor.
        spe_area (float): mean area of spe (in ADC bins x ns).
        ADC_range (float, optional): Dynamic range of the ADC. Defaults
            to 2.25.
        ADC_impedance (float, optional): Impedance of the ADC. Defaults to 50.
        ADC_res (float, optional): bit.wise resolution of the ADC. Defaults
            to 2**16.
        q_e (float, optional): electron charge [C]. Defaults to 1.602176634e-19.

    Returns:
        float: the calculated gain.
    """

    gain = (ADC_range * spe_area * 1e-9 / ADC_impedance / F_amp /
            ADC_res / q_e)

    return gain


def find_minmax(array: np.ndarray) -> List[np.ndarray]:
    """Return local peaks and valeys of an 1d array.

    Args:
        array (np.ndarray): 1d array to compute peaks and valeys

    Returns:
        List[np.ndarray]: res[0] is an array with the indexes of where peaks
            were identified, valeys at res[1].
    """
    peaks = np.where(
        (array[1:-1] > array[0:-2]) * (array[1:-1] > array[2:]))[0] + 1
    valeys = np.where(
        (array[1:-1] < array[0:-2]) * (array[1:-1] < array[2:]))[0] + 1
    return [peaks, valeys]


def get_channel_list(process) -> List[Tuple[int, str]]:
    """Fetch the channels available for a dataset by reading the original
    ROOT file.

    Args:
        process (run_processor): initiated `run_processor` object.

    Returns:
        List[Tuple[int,str]]: list with the available channels of a given
            dataset in the format [(mod, ch)_i]
    """
    _datasets = process.datasets_df
    modules = np.unique(_datasets['module'])
    ch_list = []
    for mod in modules:
        raw = pylars.utils.input.raw_data(
            _datasets[_datasets['module'] == mod].sample(1)['path'].values[0],
            V=123,
            T=123,
            module=mod)

        raw.load_root()
        raw.get_available_channels()
        channels = raw.channels
        ch_list += list(itertools.product([mod], channels))
    return ch_list


def wstd(array: np.ndarray, waverage: float, weights: np.ndarray) -> float:
    """Compute weighted standard deviation.

    Args:
        array (np.ndarray): 1D array to compute wstd of
        waverage (float): weighted average value
        weights (np.ndarray): weights to consider

    Returns:
        float: value of the weighted standard deviation
    """

    N = np.count_nonzero(weights)

    wvar = N * np.sum(weights * (array - waverage)**2) / \
        (N - 1) / np.sum(weights)
    wstd = np.sqrt(wvar)

    return wstd


def get_peak_rough_positions(area_array: np.ndarray,
                             cut_mask,
                             bins: Union[int, np.ndarray, list] = 1000,
                             filt=scipy.ndimage.gaussian_filter1d,
                             filter_options=3,
                             plot: Union[bool, str] = False) -> tuple:
    '''Takes the area histogram (fingerplot) and looks for peaks
    and valeys. SPE should be the 2nd peak on most cases. Higher
    PE values might be properly unrecognized
    Returns two lists: list of the x value of the peaks, list of
    the x value of the valeys.
    Optional plot feature:
    - False: no plot
    - True: displays plot in notebook
    - string - sufix on the name of the file'''

    area_hist = np.histogram(
        area_array[cut_mask], bins=bins)

    area_x = area_hist[1]
    area_x = (area_x + (area_x[1] - area_x[0]) / 2)[:-1]
    area_y = area_hist[0]

    area_filt = filt(area_y, filter_options)
    area_peaks_x, peak_properties = find_peaks(area_filt,
                                               prominence=20,
                                               distance=50)

    if plot != False:
        from pylars.plotting.plotanalysis import plot_found_area_peaks
        plot_found_area_peaks(
            area_x, area_y, area_filt, area_peaks_x)

    return area_x[area_peaks_x], peak_properties


def apply_tile_labels(df: pd.DataFrame, label_map: dict):
    """Adds a column with the tile. Requires a label map of the form:
    {'mod#' : {'wf#' : [tile]}.

    Args:
        label_map (dict): tile label map

    Returns:
        pd.dataframe: the dataframe with a column for the tile
    """

    def map_label(row, label_map=label_map):
        return label_map[f"mod{row['module']}"][row['channel']]

    df['tile'] = df.apply(map_label, axis=1)
    df = df.sort_values('tile', ignore_index=True)
    return df


def get_summary_info(summary_path):
    with open(summary_path, 'r') as _summ_file:
        _summary = _summ_file.readlines()
    _t_stop = np.datetime64(int(_summary[0].strip().split(' ')[-1]), 's')
    _duration = np.timedelta64(int(_summary[1].strip().split(' ')[3]), 's')
    _n_events = int(_summary[2].strip().split(' ')[-1])

    return _t_stop, _duration, _n_events

Functions

def Gaussian(x, A, mu, sigma)
Expand source code
def Gaussian(x, A, mu, sigma):
    y = A * np.exp(-((x - mu) / sigma)**2 / 2) / np.sqrt(2 * np.pi * sigma**2)
    return y
def apply_tile_labels(df: pandas.core.frame.DataFrame, label_map: dict)

Adds a column with the tile. Requires a label map of the form: {'mod#' : {'wf#' : [tile]}.

Args

label_map : dict
tile label map

Returns

pd.dataframe
the dataframe with a column for the tile
Expand source code
def apply_tile_labels(df: pd.DataFrame, label_map: dict):
    """Adds a column with the tile. Requires a label map of the form:
    {'mod#' : {'wf#' : [tile]}.

    Args:
        label_map (dict): tile label map

    Returns:
        pd.dataframe: the dataframe with a column for the tile
    """

    def map_label(row, label_map=label_map):
        return label_map[f"mod{row['module']}"][row['channel']]

    df['tile'] = df.apply(map_label, axis=1)
    df = df.sort_values('tile', ignore_index=True)
    return df
def find_minmax(array: numpy.ndarray) ‑> List[numpy.ndarray]

Return local peaks and valeys of an 1d array.

Args

array : np.ndarray
1d array to compute peaks and valeys

Returns

List[np.ndarray]
res[0] is an array with the indexes of where peaks were identified, valeys at res[1].
Expand source code
def find_minmax(array: np.ndarray) -> List[np.ndarray]:
    """Return local peaks and valeys of an 1d array.

    Args:
        array (np.ndarray): 1d array to compute peaks and valeys

    Returns:
        List[np.ndarray]: res[0] is an array with the indexes of where peaks
            were identified, valeys at res[1].
    """
    peaks = np.where(
        (array[1:-1] > array[0:-2]) * (array[1:-1] > array[2:]))[0] + 1
    valeys = np.where(
        (array[1:-1] < array[0:-2]) * (array[1:-1] < array[2:]))[0] + 1
    return [peaks, valeys]
def func_linear(x, a, b)
Expand source code
def func_linear(x, a, b):
    return a * x + b
def get_channel_list(process) ‑> List[Tuple[int, str]]

Fetch the channels available for a dataset by reading the original ROOT file.

Args

process : run_processor
initiated run_processor object.

Returns

List[Tuple[int,str]]
list with the available channels of a given dataset in the format [(mod, ch)_i]
Expand source code
def get_channel_list(process) -> List[Tuple[int, str]]:
    """Fetch the channels available for a dataset by reading the original
    ROOT file.

    Args:
        process (run_processor): initiated `run_processor` object.

    Returns:
        List[Tuple[int,str]]: list with the available channels of a given
            dataset in the format [(mod, ch)_i]
    """
    _datasets = process.datasets_df
    modules = np.unique(_datasets['module'])
    ch_list = []
    for mod in modules:
        raw = pylars.utils.input.raw_data(
            _datasets[_datasets['module'] == mod].sample(1)['path'].values[0],
            V=123,
            T=123,
            module=mod)

        raw.load_root()
        raw.get_available_channels()
        channels = raw.channels
        ch_list += list(itertools.product([mod], channels))
    return ch_list
def get_deterministic_hash(id: str) ‑> str

Return an hash with 7 characters from a string.

In detail, returns a base32 lowercase string of length determined from hashing the configs. Based on https://github.com/AxFoundation/strax/blob/ 156254287c2037876a7040460b3551d590bf5589/strax/utils.py#L303

Args

id : str
thing to hash

Returns

str
hashed version of the thing
Expand source code
def get_deterministic_hash(id: str) -> str:
    """Return an hash with 7 characters from a string.

    In detail, returns a base32 lowercase string of length determined
    from hashing the configs. Based on
    https://github.com/AxFoundation/strax/blob/
    156254287c2037876a7040460b3551d590bf5589/strax/utils.py#L303

    Args:
        id (str): thing to hash

    Returns:
        str: hashed version of the thing
    """
    jsonned = json.dumps(id)
    digest = hashlib.sha1(jsonned.encode('ascii')).digest()
    readable_hash = base64.b32encode(digest)[:7].decode('ascii').lower()
    return readable_hash
def get_gain(F_amp: float, spe_area: float, ADC_range: float = 2.25, ADC_impedance: float = 50, ADC_res: float = 65536, q_e: float = 1.602176634e-19) ‑> float

Compute the gain given the value of the SPE area and the ADC paramenters.

Args

F_amp : float
Total signal amplification factor.
spe_area : float
mean area of spe (in ADC bins x ns).
ADC_range : float, optional
Dynamic range of the ADC. Defaults to 2.25.
ADC_impedance : float, optional
Impedance of the ADC. Defaults to 50.
ADC_res : float, optional
bit.wise resolution of the ADC. Defaults to 2**16.
q_e : float, optional
electron charge [C]. Defaults to 1.602176634e-19.

Returns

float
the calculated gain.
Expand source code
def get_gain(F_amp: float,
             spe_area: float,
             ADC_range: float = 2.25,
             ADC_impedance: float = 50,
             ADC_res: float = 2**16,
             q_e: float = 1.602176634e-19,
             ) -> float:
    """Compute the gain given the value of the SPE area and the ADC
        paramenters.

    Args:
        F_amp (float): Total signal amplification factor.
        spe_area (float): mean area of spe (in ADC bins x ns).
        ADC_range (float, optional): Dynamic range of the ADC. Defaults
            to 2.25.
        ADC_impedance (float, optional): Impedance of the ADC. Defaults to 50.
        ADC_res (float, optional): bit.wise resolution of the ADC. Defaults
            to 2**16.
        q_e (float, optional): electron charge [C]. Defaults to 1.602176634e-19.

    Returns:
        float: the calculated gain.
    """

    gain = (ADC_range * spe_area * 1e-9 / ADC_impedance / F_amp /
            ADC_res / q_e)

    return gain
def get_peak_rough_positions(area_array: numpy.ndarray, cut_mask, bins: Union[int, numpy.ndarray, list] = 1000, filt=<function gaussian_filter1d>, filter_options=3, plot: Union[bool, str] = False) ‑> tuple

Takes the area histogram (fingerplot) and looks for peaks and valeys. SPE should be the 2nd peak on most cases. Higher PE values might be properly unrecognized Returns two lists: list of the x value of the peaks, list of the x value of the valeys. Optional plot feature: - False: no plot - True: displays plot in notebook - string - sufix on the name of the file

Expand source code
def get_peak_rough_positions(area_array: np.ndarray,
                             cut_mask,
                             bins: Union[int, np.ndarray, list] = 1000,
                             filt=scipy.ndimage.gaussian_filter1d,
                             filter_options=3,
                             plot: Union[bool, str] = False) -> tuple:
    '''Takes the area histogram (fingerplot) and looks for peaks
    and valeys. SPE should be the 2nd peak on most cases. Higher
    PE values might be properly unrecognized
    Returns two lists: list of the x value of the peaks, list of
    the x value of the valeys.
    Optional plot feature:
    - False: no plot
    - True: displays plot in notebook
    - string - sufix on the name of the file'''

    area_hist = np.histogram(
        area_array[cut_mask], bins=bins)

    area_x = area_hist[1]
    area_x = (area_x + (area_x[1] - area_x[0]) / 2)[:-1]
    area_y = area_hist[0]

    area_filt = filt(area_y, filter_options)
    area_peaks_x, peak_properties = find_peaks(area_filt,
                                               prominence=20,
                                               distance=50)

    if plot != False:
        from pylars.plotting.plotanalysis import plot_found_area_peaks
        plot_found_area_peaks(
            area_x, area_y, area_filt, area_peaks_x)

    return area_x[area_peaks_x], peak_properties
def get_summary_info(summary_path)
Expand source code
def get_summary_info(summary_path):
    with open(summary_path, 'r') as _summ_file:
        _summary = _summ_file.readlines()
    _t_stop = np.datetime64(int(_summary[0].strip().split(' ')[-1]), 's')
    _duration = np.timedelta64(int(_summary[1].strip().split(' ')[3]), 's')
    _n_events = int(_summary[2].strip().split(' ')[-1])

    return _t_stop, _duration, _n_events
def load_ADC_config(model: str, F_amp: float) ‑> Dict[str, Union[int, float]]

Load the ADC related quantities depending on the model.

Args

model : str
model of the digitizer
F_amp : float
signal amplification from the sensor (pre-amp * external amplification on the rack).

Raises

NotImplementedError
Raised if the requested model is not yet implemented

Returns

dict
Python dictionary with the digitizer-related configs.
Expand source code
def load_ADC_config(model: str, F_amp: float) -> Dict[str, Union[int, float]]:
    """Load the ADC related quantities depending on the model.

    Args:
        model (str): model of the digitizer
        F_amp (float): signal amplification from the sensor (pre-amp *
            external amplification on the rack).

    Raises:
        NotImplementedError: Raised if the requested model is not yet
            implemented

    Returns:
        dict: Python dictionary with the digitizer-related configs.
    """

    available_model_configs = ['v1724', 'v1730']

    if model == 'v1724':
        """ More info at https://www.caen.it/products/v1724/"""

        ADC_config = {'ADC_range': 2.25,  # V
                      'ADC_impedance': 50,  # ohm
                      'F_amp': F_amp,  # external amplification
                      'ADC_res': 2**14,  # bit-wise resolution
                      'q_e': 1.602176634e-19,  # electron charge
                      'dt': 10e-9}  # sampling time length

    elif model == 'v1730':
        """More info at https://www.caen.it/products/v1730/"""

        ADC_config = {'ADC_range': 2.00,  # V
                      'ADC_impedance': 50,  # ohm
                      'F_amp': F_amp,  # external amplification
                      'ADC_res': 2**14,  # bit-wise resolution
                      'q_e': 1.602176634e-19,  # electron charge
                      'dt': 2e-9}  # sampling time length
    else:
        raise NotImplementedError(f'''The requested model ({model}) is not
            implemented. Choose from {available_model_configs}.''')

    return ADC_config
def wstd(array: numpy.ndarray, waverage: float, weights: numpy.ndarray) ‑> float

Compute weighted standard deviation.

Args

array : np.ndarray
1D array to compute wstd of
waverage : float
weighted average value
weights : np.ndarray
weights to consider

Returns

float
value of the weighted standard deviation
Expand source code
def wstd(array: np.ndarray, waverage: float, weights: np.ndarray) -> float:
    """Compute weighted standard deviation.

    Args:
        array (np.ndarray): 1D array to compute wstd of
        waverage (float): weighted average value
        weights (np.ndarray): weights to consider

    Returns:
        float: value of the weighted standard deviation
    """

    N = np.count_nonzero(weights)

    wvar = N * np.sum(weights * (array - waverage)**2) / \
        (N - 1) / np.sum(weights)
    wstd = np.sqrt(wvar)

    return wstd