Module pylars.analysis.darkcount
This file contains classes and methods to conduct an analysis of an LED OFF
dataset/run. It is divided into:
- DCR_analysis
: the collection of methods required for the analysis (fits,
DCR, CTP, gain calculation, etc.)
- DCR_dataset
: the object of a LED OFF dataset, i.e., given temp, given
module and given channel, several bias voltages.
- DCR_run
: the object to collect and handle all the DCR_dataset
objects of a run.
Expand source code
"""This file contains classes and methods to conduct an analysis of an LED OFF
dataset/run. It is divided into:
- `DCR_analysis`: the collection of methods required for the analysis (fits,
DCR, CTP, gain calculation, etc.)
- `DCR_dataset`: the object of a LED OFF dataset, i.e., given temp, given
module and given channel, several bias voltages.
- `DCR_run`: the object to collect and handle all the `DCR_dataset`
objects of a run.
"""
import copy
import time
from typing import Tuple, Union
import numba
import numpy as np
import pandas as pd
import pylars
import pylars.plotting.plotanalysis
import pylars.utils.common
import pylars.utils.input
import pylars.utils.output
import scipy.interpolate as itp
from pylars.analysis.breakdown import compute_BV_df
from pylars.utils.common import Gaussian, get_gain
from scipy.optimize import curve_fit
from scipy.signal import find_peaks
from tqdm import tqdm
class DCR_analysis():
"""A class with all the methods needed for DCR analysis.
Why a separate class? DCR_dataset was too messy with @classmethods.
These we be put in this parent class and DCR_dataset and DCR_run will
have only object-specific methods.
"""
__version__ = 'v0.0.1'
def __init__(self):
pass
@classmethod
def get_1pe_value_fit(cls,
df: pd.DataFrame,
length_cut_min: int = 5,
length_cut_max: int = 80,
plot: Union[bool, str] = False,
use_scipy_find_peaks: bool = False) -> tuple:
"""Try to fit the SPE peak in the area histogram and return the
Gaussian paramenters.
Args:
df (pd.DataFrame): _description_
length_cut (int, optional): cut to impose on the length of
the peaks for noise suppression. Defaults to 5.
plot (boolorstr, optional): _description_. Defaults to False.
Returns:
tuple: (A, mu, sigma), cov
"""
(area_hist_x, DCR_values, DCR_der_x_points,
DCR_der_y_points, min_area_x) = cls.get_1pe_rough(
df, length_cut_min, length_cut_max,
use_scipy_find_peaks=use_scipy_find_peaks)
if plot != False:
pylars.plotting.plotanalysis.plot_DCR_curve(
plot, area_hist_x, DCR_values, DCR_der_x_points,
DCR_der_y_points, min_area_x)
area_hist = np.histogram(df[(df['length'] > length_cut_min) &
(df['length'] < length_cut_max)]['area'],
bins=np.linspace(0.5 * min_area_x, 1.5 * min_area_x, 300))
area_hist_x = area_hist[1]
area_hist_x = (
area_hist_x + (area_hist_x[1] - area_hist_x[0]) / 2)[:-1]
area_hist_y = area_hist[0]
(A, mu, sigma), cov = curve_fit(Gaussian, area_hist_x, area_hist_y,
p0=(2000, min_area_x, 0.05 * min_area_x))
if plot != False:
pylars.plotting.plotanalysis.plot_SPE_fit(
df, length_cut_min, length_cut_max, plot, area_hist_x, min_area_x, A, mu, sigma)
return (A, mu, sigma), cov
@classmethod
def get_1pe_rough(cls, df: pd.DataFrame,
length_cut_min: int,
length_cut_max: int,
bins: Union[int, list] = 200,
use_scipy_find_peaks: bool = False) -> tuple:
"""From an event df (1 channel), find the rough position of the
SPE from the DCR vs threshold curve and its derivative
Args:
df (pd.DataFrame): dataframe with the processed data.
length_cut_min (int): minimum value of lenght to consider
length_cut_max (int): maximum value of lenght to consider
bins (intorlist, optional): number of bins to make the are
histogram or list of bin edges to consider on the histogram.
Defaults to 200.
Returns:
tuple: computed arrays: area_hist_x, DCR_values, DCR_der_x_points,
DCR_der_y_points, min_area_x
"""
(area_hist_x, DCR_values) = cls.get_DCR_above_threshold_values(
df, length_cut_min, length_cut_max, bins, output='values') # type: ignore
grad = np.gradient(DCR_values)
grad_spline = itp.UnivariateSpline(area_hist_x, grad)
# , s = len(area_hist_x)*3)
DCR_der_x_points = np.linspace(
area_hist_x[0], area_hist_x[-1], bins) # type: ignore
DCR_der_y_points = grad_spline(DCR_der_x_points)
if use_scipy_find_peaks:
pks, props = find_peaks(-1 * DCR_der_y_points,
prominence=20)
peaks_area_values = DCR_der_x_points[pks]
if len(peaks_area_values) == 0:
print('Could not find any peaks')
min_area_x = np.nan
elif len(peaks_area_values) > 0:
if (peaks_area_values[0] > 2000 *
200) or (len(peaks_area_values) == 1):
min_area_x = peaks_area_values[0]
else:
min_area_x = peaks_area_values[1]
else:
min_idx = np.where(DCR_der_y_points == min(DCR_der_y_points))
min_area_x = DCR_der_x_points[min_idx][0]
return (area_hist_x, DCR_values, DCR_der_x_points,
DCR_der_y_points, min_area_x) # type:ignore
@classmethod
def get_DCR_above_threshold_values(cls, df: pd.DataFrame,
length_cut_min: int = 5,
length_cut_max: int = 80,
bins: Union[int, list] = 200,
output: str = 'values',
**kwargs) -> Union[
tuple,
itp.UnivariateSpline,
itp.interp1d,
None]:
"""Computes the event rate in a sweep of area thresholds and
returns the pair [area thersholds, DCR values]
Args:
df (pd.DataFrame): a pd.DataFrame with the series "area" and
"length".
length_cut_min (int, optional): cut to impose on the minimum
length of the peaks for noise suppression. Defaults to 5.
length_cut_max (int, optional): cut to impose on the maximum
length of the peaks for noise suppression. Defaults to 5.
bins (intorlist, optional): number of bins to make the are
histogram or list of bin edges to consider on the histogram.
Defaults to 200.
output (str): type of output. Can be 'values', 'spline' or
'interp1d'
Returns:
tuple: pair of np.ndarrays (area thersholds, DCR values)
"""
if output not in ['values', 'spline', 'interp1d']:
raise NotImplementedError("Specifiy a valid output. Options are "
"'values', 'spline' or 'interp1d'.")
area_hist = np.histogram(df[(df['length'] > length_cut_min) &
(df['length'] < length_cut_max)]['area'],
bins=bins)
area_hist_x = area_hist[1]
area_hist_x = (area_hist_x +
(area_hist_x[1] - area_hist_x[0]) / 2)[:-1]
area_hist_y = area_hist[0]
DCR_values = np.flip(np.cumsum(np.flip(area_hist_y)))
if output == 'values':
return (area_hist_x, DCR_values)
elif output == 'spline':
DCR_func = itp.UnivariateSpline(area_hist_x, DCR_values, **kwargs)
return DCR_func
elif output == 'interp1d':
DCR_func = itp.interp1d(area_hist_x, DCR_values)
return DCR_func
@staticmethod
def get_DCR(df: pd.DataFrame,
length_cut_min: int,
length_cut_max: int,
spe_area: float,
sensor_area: float,
t: float) -> tuple:
"""Compute the dark count rate (DCR) and crosstalk probability
(CTP) of a dataset.
Args:
df (pd.DataFrame): DataFrame of the dataset (single ch)
length_cut_min (int): low length cut to apply
length_cut_max (int): high length cut to apply
spe_area (float): SPE area
sensor_area (float): effective are of the sensor
t (float): livetime of the dataset
Returns:
tuple: DCR value, error of DCR value, CTP value, error of
CTP value
"""
pe_0_5 = spe_area * 0.5
pe_1_5 = spe_area * 1.5
DC_0_5 = len(df[(df['length'] > length_cut_min) &
(df['length'] < length_cut_max) &
(df['area'] > pe_0_5)])
DC_1_5 = len(df[(df['length'] > length_cut_min) &
(df['length'] < length_cut_max) &
(df['area'] > pe_1_5)])
DC_0_5_error = np.sqrt(DC_0_5)
DC_1_5_error = np.sqrt(DC_1_5)
DCR = DC_0_5 / sensor_area / t
DCR_error = DC_0_5_error / sensor_area / t
CTP = DC_1_5 / DC_0_5
CTP_error = np.sqrt((DC_1_5_error / DC_0_5)**2 +
(DC_1_5 * DC_0_5_error / (DC_0_5)**2)**2)
return DCR, DCR_error, CTP, CTP_error
@staticmethod
def get_DCR_amplitude(df: pd.DataFrame,
length_cut_min: int,
length_cut_max: int,
pe_amplitude: float,
pe_amplitude_std: float,
sensor_area: float,
t: float) -> tuple:
"""Compute the dark count rate (DCR) and crosstalk probability
(CTP) of a dataset, amplitude based.
Args:
df (pd.DataFrame): DataFrame of the dataset (single ch)
length_cut_min (int): low length cut to apply
length_cut_max (int): high length cut to apply
pe_amplitude (float): SPE amplitude
sensor_area (float): effective are of the sensor
t (float): livetime of the dataset
Returns:
tuple: DCR value, error of DCR value, CTP value, error of
CTP value
"""
pe_0_5 = pe_amplitude * 0.5
pe_1_5 = pe_amplitude * 1.5
pe_m5sigma = pe_amplitude - 5 * pe_amplitude_std
pe_p5sigma = pe_amplitude + 5 * pe_amplitude_std
DC_0_5 = len(df[(df['length'] > length_cut_min) &
(df['length'] < length_cut_max) &
(df['amplitude'] < pe_p5sigma)])
DC_1_5 = len(df[(df['length'] > length_cut_min) &
(df['length'] < length_cut_max) &
(df['area'] < pe_m5sigma)])
DC_0_5_error = np.sqrt(DC_0_5)
DC_1_5_error = np.sqrt(DC_1_5)
DCR = DC_0_5 / sensor_area / t
DCR_error = DC_0_5_error / sensor_area / t
CTP = DC_1_5 / DC_0_5
CTP_error = np.sqrt((DC_0_5_error / DC_1_5)**2 +
(DC_0_5 * DC_1_5_error / (DC_1_5)**2)**2)
return DCR, DCR_error, CTP, CTP_error
@staticmethod
def print_DCR_CTP(DCR: float, DCR_error: float,
CTP: float, CTP_error: float) -> None:
"""Print the dark count rate (DCR) and crosstalk probability
(CTP) values in a nice formatted and welcoming message.
Args:
DCR (float): DCR value
DCR_error (float): Error on the DCR value
CTP (float): CTP value
CTP_error (float): Error on the CTP value
"""
print(f'Your lovely DCR is: ({DCR:.2f} +- {DCR_error:.2f}) Hz/mm^2')
print(f'Your lovely CTP is: ({CTP*100:.2f} +- {CTP_error*100:.2f})%')
class DCR_dataset(DCR_analysis):
"""Object class to hold dark count related instances and
methods. Collects all the data and properties of a single MMPC,
meaning the pair (module, channel) for all the available voltages at
a certain temperature.
"""
__version__ = '0.0.2'
def __init__(self, run: pylars.utils.input.run, temperature: float,
module: int, channel: str,
processor: pylars.processing.rawprocessor.run_processor):
self.run = run
self.temp = temperature
self.module = module
self.channel = channel
self.process = processor
self.voltages = self.get_voltages_available()
self.plots_flag = False
self.livetimes = self.get_livetimes()
self.set_standard_cuts()
# There are better ways to set these options but this is stil
# better then a lot of flags
self.use_scipy_find_peaks = False
self.amplitude_based = False
def set_plots_flag(self, flag: bool) -> None:
"""Set if computing properties makes plots (True) or not (False).
Assumes a ./figures/ directory exists.
Args:
flag (bool): True for plotting stuff, False for not making nice
pictures (plots) to hang on the wall. Yes, I hang plots on
my bedroom wall and it looks nice.
"""
self.plots_flag = flag
def define_SiPM_config(self,
sensor_area: float = 12 * 12,
) -> None:
r"""Define the SiPM data related quantities for the dataset.
Args:
sensor_area (float, optional): Area of the photosensor (mm\*\*2).
Defaults to 12\*12.
"""
SiPM_config = {'sensor_area': sensor_area,
}
self.SiPM_config = SiPM_config
def get_voltages_available(self) -> np.ndarray:
"""Checks the loaded run for which voltages are available for the
defined temperature.
Returns:
np.ndarray: array of the available voltages
"""
voltages = []
for _dataset in self.run.datasets:
if (_dataset.temp == self.temp) and (_dataset.kind == 'DCR'):
voltages.append(_dataset.vbias)
voltages = np.unique(voltages)
return voltages
def get_livetimes(self) -> dict:
"""Fetch the livetimes of a dataset.
Based on the propertied of the DCR_dataset, creates its dataset object
and computes the livetime for each vbias by multiplying the number
of samples per waveform with the number of waveforms in the dataset
and the duration of a sample (given by `self.run.ADC_config['dt']`).
Provides the result in dict format where the keys are the vbias of
the dataset.
Returns:
dict: _description_
"""
livetimes = {}
for v in self.voltages:
# Need all this to find the correct path...
self.datasets_df = self.run.get_run_df()
selection = ((self.datasets_df['kind'] == 'DCR') &
(self.datasets_df['vbias'] == v) &
(self.datasets_df['temp'] == self.temp) &
# to end with just one of the modules.
# It's assumed both have the same number of
# entries and size of entries
(self.datasets_df['module'] == 0))
ds_selected = self.datasets_df[selection]
assert len(
self.datasets_df[selection]) == 1, "Found more than 1 ds with the same config. Help."
ds_temp = pylars.utils.input.dataset(path=str(ds_selected['path'].values[0]),
kind='DCR',
module=0,
temp=self.temp,
vbias=v)
try:
n_waveforms, n_samples = ds_temp.read_sizes()
livetimes[v] = n_waveforms * \
n_samples * self.run.ADC_config['dt']
except BaseException:
livetimes[v] = np.nan
return livetimes
def set_standard_cuts(self,
cut_area_min: float = 5,
cut_area_max: float = 1e6,
cut_length_min: int = 4,
cut_length_max: int = 70,
cut_n_pulses_min: float = 0,
cut_n_pulses_max: float = 2) -> None:
"""Sets the cuts to use in analysis as object variables.
Args:
cut_area_min (float, optional): Area minimum value. Defaults to 5.
cut_area_max (float, optional): Area maximum value. Defaults
to 1e6.
cut_length_min (float, optional): Lenght minimum value.
Defaults to 4.
cut_length_max (float, optional): Lenght minimum value.
Defaults to 70.
cut_n_pulses_min (float, optional): Minimum number of pulses in
the waveform. Defaults to 0.
cut_n_pulses_max (float, optional): Maximum number of pulses in
the waveform. Defaults to 2.
"""
self.cut_area_min = cut_area_min
self.cut_area_max = cut_area_max
self.cut_length_min = cut_length_min
self.cut_length_max = cut_length_max
self.cut_n_pulses_min = cut_n_pulses_min
self.cut_n_pulses_max = cut_n_pulses_max
def load_processed_data(self, force_processing: bool = False) -> None:
"""For all the voltages of a DCR_dataset (smae temperature) looks
for already processed files to load. If force_processing=True and
no saved file is found, processes the dataset with standard
options (sigma = 5, N_baseline = 50).
Args:
force_processing (bool, optional): Flag to force processing
of raw data in case the processed dataset is not found.
Defaults to False.
Returns:
dict: dictionary with the data df for each voltage as a key.
"""
self.data = {}
for _voltage in tqdm(self.voltages,
desc=f'Loading processed data for DCR ' +
f'data at {self.temp}K: ',
total=len(self.voltages),
leave=False):
processed_data = pylars.utils.output.processed_dataset(
run=self.run,
kind='DCR',
vbias=_voltage,
temp=self.temp,
path_processed=('/disk/gfs_atp/xenoscope/SiPMs/char_campaign/'
'processed_data/'),
process_hash=self.process.hash)
processed_data.load_data(force=force_processing)
_df = processed_data.data
mask = ((_df['module'] == self.module) &
(_df['channel'] == self.channel))
self.data[_voltage] = copy.deepcopy(_df[mask])
def compute_properties_of_dataset(
self,
use_n_pulse_wf: bool = False,
compute_BV_flag: bool = True,
amplitude_based: bool = False) -> pd.DataFrame:
"""Calculate the gain, DCR, CTP and BV for the dataset in a single
line!
Args:
use_1_pulse_wf (bool):
compute_BV_flag (bool):
Returns:
pd.DataFrame: dataframe with all the computed properties with
the columns ['T', 'V', 'SPE_area','SPE_area_error', 'Gain',
'Gain_error', 'SPE_res', 'SPE_res_error', 'DCR',
'DCR_error', 'CTP', 'CTP_error']
"""
assert isinstance(self.data, dict), ('Woops, no data found! Load'
' data into the dataset first')
if amplitude_based:
raise NotImplementedError
voltage_list = self.voltages
_results_dataset = pd.DataFrame(
columns=['T', 'V', 'SPE_area', 'SPE_area_error', 'Gain',
'Gain_error', 'SPE_res', 'SPE_res_error', 'DCR',
'DCR_error', 'CTP', 'CTP_error'])
for _volt in voltage_list:
# select voltage
df = self.data[_volt]
if use_n_pulse_wf == True:
df = df[(df['n_pulses'] > self.cut_n_pulses_min) &
(df['n_pulses'] <= self.cut_n_pulses_max)]
if self.plots_flag == True:
plot_name_1pe_fit = (f'{self.temp}K_{_volt}V_mod{self.module}_'
f'ch{self.channel}')
else:
plot_name_1pe_fit = False
try:
# Get SPE value from Gaussian fit
(A, mu, sigma), cov = self.get_1pe_value_fit(
df, plot=plot_name_1pe_fit,
length_cut_min=self.cut_length_min,
length_cut_max=self.cut_length_max,
use_scipy_find_peaks=self.use_scipy_find_peaks)
A_err, mu_err, sigma_err = np.sqrt(np.diag(cov))
# Calculate DCR and CTP
DCR, DCR_error, CTP, CTP_error = self.get_DCR(
df=df,
length_cut_min=self.cut_length_min,
length_cut_max=self.cut_length_max,
spe_area=mu,
sensor_area=self.SiPM_config['sensor_area'],
t=self.livetimes[_volt])
# Calculate gain
gain = get_gain(F_amp=self.run.ADC_config['F_amp'],
spe_area=mu,
ADC_range=self.run.ADC_config['ADC_range'],
ADC_impedance=self.run.ADC_config['ADC_impedance'],
ADC_res=self.run.ADC_config['ADC_res'],
q_e=self.run.ADC_config['q_e'])
gain_error = get_gain(F_amp=self.run.ADC_config['F_amp'],
spe_area=mu_err,
ADC_range=self.run.ADC_config['ADC_range'],
ADC_impedance=self.run.ADC_config['ADC_impedance'],
ADC_res=self.run.ADC_config['ADC_res'],
q_e=self.run.ADC_config['q_e'])
SPE_res = np.abs(sigma / mu) * 100 # in %
SPE_res_err = np.sqrt(
(sigma_err / mu)**2 + (sigma * mu_err / mu**2)**2) * 100 # in %
# Merge into rolling dataframe (I know it's slow... make a PR,
# pls)
_results_dataset = pd.concat(
(_results_dataset,
pd.DataFrame({'T': [self.temp],
'V': [_volt],
'SPE_area': [mu],
'SPE_area_error': [mu_err],
'Gain': [gain],
'Gain_error': [gain_error],
'SPE_res': [SPE_res],
'SPE_res_error': [SPE_res_err],
'DCR': [DCR],
'DCR_error': [DCR_error],
'CTP': [CTP * 100],
'CTP_error': [CTP_error * 100]})
), ignore_index=True
)
except BaseException:
print(f'Could not compute properties of module {self.module}, '
f'channel {self.channel}, {self.temp} K, {_volt} V. '
f'Skipping dataset.')
_results_dataset = pd.concat(
(_results_dataset,
pd.DataFrame({'T': [self.temp],
'V': [_volt],
'SPE_area': [np.nan],
'SPE_area_error': [np.nan],
'Gain': [np.nan],
'Gain_error': [np.nan],
'SPE_res': [np.nan],
'SPE_res_error': [np.nan],
'DCR': [np.nan],
'DCR_error': [np.nan],
'CTP': [np.nan],
'CTP_error': [np.nan]})
), ignore_index=True
)
# Compute BV from gain(V) curve and update df
if compute_BV_flag == True:
if self.plots_flag == True:
plot_BV = f'BV_mod{self.module}_ch{self.channel}'
else:
plot_BV = False
_results_dataset, _a, _b, _b_err = compute_BV_df(
_results_dataset, plot_BV)
return _results_dataset
@staticmethod
def get_how_many_peaks_per_waveform(df: pd.DataFrame,
verbose: bool = False) -> pd.DataFrame:
"""Finds how many peaks each waveform has.
Only looks in wavforms with at least 1 peak
Since commit 6f8e0c72b8e3e0bf7c5307941b49bdf57879d554 this method
is not needed as n_pulses is stored during processing.
Args:
df (pd.DataFrame): dataframe with the processed pulse data.
verbose (bool): print info during processing.
Returns:
pd.DataFrame: dataframe with the pairs: wf_number - pulse_count.
"""
t0 = time.time()
if verbose:
print('Starting counting peaks at:', t0)
wf_number_arr = np.array(df['wf_number'].values)
waveforms_w_pulses, N_pulses = _get_how_many_peaks_per_waveform(
wf_number_arr)
pulse_count_df = pd.concat(
[pd.Series(waveforms_w_pulses, name='wf_number'),
pd.Series(N_pulses, name='pulse_count')], axis=1
)
t1 = time.time()
if verbose:
print('Finished conting peaks at: ', t1)
print('Took time in calc: ', t1 - t0)
return pulse_count_df
def get_med_amplitude(self, df: pd.DataFrame,
cut_mask: np.ndarray) -> tuple:
"""Calculate the median and standard deviation of the distribution
of amplitudes, applying a given mask.
Args:
df (pd.DataFrame): dataframe with the processed pulse data
extra_cut_mask (np.ndarray): cut mask
Returns:
tuple: _description_
"""
med_amplitude = np.median(df[cut_mask]['amplitude'])
std_amplitude = np.std(df[cut_mask]['amplitude'])
return med_amplitude, std_amplitude
def compute_properties_of_dataset_amplitude_based(self,
use_n_pulse_wf: bool = True) -> pd.DataFrame:
"""NOT IMPLEMENTED - Needs review and merge with other compute method
Calculate the gain, DCR, CTP and BV for the dataset in a single
line! This is an alternative method to the main one, using amplitude
cuts instead of area cuts.
Returns:
pd.DataFrame: dataframe with all the computed properties with
the columns ['V','T','path','module','channel','Gain','DCR',
'CTP','DCR_error','CTP_error','BV','OV']
"""
raise NotImplementedError # needs review and merging with normal
class DCR_run():
"""Collection of all the DCR_datasets results for a run, ie, for all
the channels and modules, for every available temperatures and voltages.
The results of every dataset (channel, V, T) is saved on the instance
DCR_run.results_df .
"""
__version__ = '0.0.1'
def __init__(self, run: pylars.utils.input.run,
processor: pylars.processing.rawprocessor.run_processor,
use_n_pulse: bool = True):
self.run = run
self.process = processor
self.use_n_pulse = use_n_pulse
self.datasets = self.process.datasets_df
self.temperatures = self.get_run_temperatures()
self.plots_flag = False
self.analysis_path = (f'{self.run.main_data_path[:-9]}analysis_data'
f'/run{self.run.run_number}/')
def set_plots_flag(self, flag: bool) -> None:
"""Set if computing properties makes plots (True) or not (False).
Assumes a ./figures/ directory exists.
Args:
flag (bool): True for plotting stuff, False for not making nice
pictures (plots) to hang on the wall. Yes, I hang plots on my
bedroom wall and it looks nice.
"""
self.plots_flag = flag
def get_run_temperatures(self) -> np.ndarray:
"""Get all the temperatures available in the DCR run.
Returns:
np.ndarray: array with all the available temperatures.
"""
temp_list = np.unique(self.datasets['temp'])
return temp_list
def initialize_results_df(self) -> None:
"""Initialize a clean results_df instance in the object.
"""
results_df = pd.DataFrame(
columns=['T', 'V', 'SPE_area', 'SPE_area_error', 'Gain',
'Gain_error', 'SPE_res', 'SPE_res_error', 'DCR',
'DCR_error', 'CTP', 'CTP_error'])
self.results_df = results_df
def define_run_SiPM_config(self, sensor_area: float = 12 * 12,
) -> None:
r"""Define the SiPM data related quantities for the dataset.
Args:
livetime (float): livetime of the measurement for DCR porpuses.
sensor_area (float, optional): Area of the photosensor (mm\*\*2).
Defaults to 12\*12.
"""
SiPM_config = {'sensor_area': sensor_area,
}
self.SiPM_config = SiPM_config
def set_standard_cuts_run(self,
cut_area_min: float = 5,
cut_area_max: float = 1e6,
cut_length_min: int = 4,
cut_length_max: int = 70,
cut_n_pulses_min: float = 0,
cut_n_pulses_max: float = 2) -> None:
"""Sets the cuts to use in analysis as (run) object variables.
Args:
cut_area_min (float, optional): Area minimum value. Defaults to 5.
cut_area_max (float, optional): Area maximum value. Defaults
to 1e6.
cut_length_min (float, optional): Lenght minimum value.
Defaults to 4.
cut_length_max (float, optional): Lenght minimum value.
Defaults to 70.
cut_n_pulses_min (float, optional): Minimum number of pulses in
the waveform. Defaults to 0.
cut_n_pulses_max (float, optional): Maximum number of pulses in
the waveform. Defaults to 2.
"""
self.cut_area_min = cut_area_min
self.cut_area_max = cut_area_max
self.cut_length_min = cut_length_min
self.cut_length_max = cut_length_max
self.cut_n_pulses_min = cut_n_pulses_min
self.cut_n_pulses_max = cut_n_pulses_max
def load_dataset(self, temp: float,
module: int,
channel: str) -> DCR_dataset:
"""Create a DCR_dataset object for a (T, mod, ch) configuration and
load the corresponding data into it.
! This assumes processed data is available for all the raw files of
the DCR run datasets !
Args:
temp (float): temperature to consider
module (int): module to load
channel (str): channel in the module to select
Returns:
DCR_dataset: dataset obeject
"""
particular_DCR_dataset = DCR_dataset(run=self.run,
temperature=temp,
module=module,
channel=channel,
processor=self.process,
)
particular_DCR_dataset.load_processed_data()
return particular_DCR_dataset
def compute_properties_of_ds(self, temp: float,
module: int,
channel: str,
amplitude_based: bool = False
) -> pd.DataFrame:
"""Loads and computes the properties of a single dataset (temp,
module, channel) by creating a DCR_dataset object and calling its
methods.
Args:
temp (float): temperature
module (int): module
channel (str): channel
amplitude_based (bool): if the computation method is based on
amplitude instead of area
Returns:
pd.DataFrame: dataframe
"""
assert isinstance(self.run.ADC_config, dict), 'No ADC_config found!'
ds = self.load_dataset(temp, module, channel)
ds.set_plots_flag(self.plots_flag)
ds.SiPM_config = self.SiPM_config
ds.cut_area_min = self.cut_area_min
ds.cut_area_max = self.cut_area_max
ds.cut_length_min = self.cut_length_min
ds.cut_length_max = self.cut_length_max
ds.cut_n_pulses_min = self.cut_n_pulses_min
ds.cut_n_pulses_max = self.cut_n_pulses_max
if amplitude_based:
ds_results = ds.compute_properties_of_dataset_amplitude_based(
use_n_pulse_wf=self.use_n_pulse,
)
else:
ds_results = ds.compute_properties_of_dataset(
compute_BV_flag=False,
use_n_pulse_wf=self.use_n_pulse)
# Add module and channel columns
module_Series = pd.Series([module] * len(ds_results), name='module')
channel_Series = pd.Series([channel] * len(ds_results), name='channel')
ds_results = pd.concat([ds_results, module_Series, channel_Series],
axis=1)
return ds_results
def compute_properties_of_run(self, amplitude_based: bool = False) -> None:
"""Loads and computes the properties of ALL the datasets.
Args:
amplitude_based (bool): Turn True to compute SPE based on
amplitude.
Returns:
pd.DataFrame: The results.
"""
self.initialize_results_df()
all_channels = pylars.utils.common.get_channel_list(self.process)
for temperature in self.temperatures:
for (module, channel) in tqdm(all_channels,
(f'Computing properties for '
f'T={temperature}: ')):
_ds_results = self.compute_properties_of_ds(
temp=temperature,
module=module,
channel=channel,
amplitude_based=amplitude_based)
self.results_df = pd.concat(
[self.results_df, _ds_results], # type: ignore
ignore_index=True)
def save_results(self, custom_name: str) -> None:
"""Save dataframe of results to a hdf5 file. Saved files go to
self.analysis_path.
Args:
name (str): name to give the file (without extension).
"""
assert isinstance(
self.results_df, pd.DataFrame), ("Trying to save results that do "
"not exist in the object, c'mon"
", you know better.")
assert len(self.results_df) > 0, ("Results df is empty, please compute"
"something before trying to save, "
"otherwire it's just a waste of "
"disk space")
name = f'DCR_results_{custom_name}'
self.results_df.to_hdf(self.analysis_path + name + '.h5', key='df')
print('Saved results to ')
def load_results(self, name: str) -> None:
"""Load dataframe of results from a hdf5 file. Looks for files in
the standard analysis cache directory.
Args:
name (str): name of the file to load (without extension)
"""
_df = pd.read_hdf(self.analysis_path + name + '.h5')
self.results_df = _df
@numba.njit
def _get_how_many_peaks_per_waveform(wf_number_list: np.ndarray) -> Tuple:
"""Numbafied process of counting how peaks there are in each waveform.
Not used any longer since "n_pulses" is stored.
Args:
wf_number_list (np.ndarray): the wf number of each identified peak.
Returns:
tuple: (wf number with pulses, the number of pulses in the wf)
"""
waveforms_w_pulses = np.unique(wf_number_list)
n_waveforms = len(waveforms_w_pulses)
N_pulses = np.zeros(n_waveforms)
for i, _wf in enumerate(waveforms_w_pulses):
_n_pulses = np.count_nonzero(wf_number_list == _wf)
N_pulses[i] = _n_pulses
return (waveforms_w_pulses, N_pulses)
Classes
class DCR_analysis
-
A class with all the methods needed for DCR analysis.
Why a separate class? DCR_dataset was too messy with @classmethods. These we be put in this parent class and DCR_dataset and DCR_run will have only object-specific methods.
Expand source code
class DCR_analysis(): """A class with all the methods needed for DCR analysis. Why a separate class? DCR_dataset was too messy with @classmethods. These we be put in this parent class and DCR_dataset and DCR_run will have only object-specific methods. """ __version__ = 'v0.0.1' def __init__(self): pass @classmethod def get_1pe_value_fit(cls, df: pd.DataFrame, length_cut_min: int = 5, length_cut_max: int = 80, plot: Union[bool, str] = False, use_scipy_find_peaks: bool = False) -> tuple: """Try to fit the SPE peak in the area histogram and return the Gaussian paramenters. Args: df (pd.DataFrame): _description_ length_cut (int, optional): cut to impose on the length of the peaks for noise suppression. Defaults to 5. plot (boolorstr, optional): _description_. Defaults to False. Returns: tuple: (A, mu, sigma), cov """ (area_hist_x, DCR_values, DCR_der_x_points, DCR_der_y_points, min_area_x) = cls.get_1pe_rough( df, length_cut_min, length_cut_max, use_scipy_find_peaks=use_scipy_find_peaks) if plot != False: pylars.plotting.plotanalysis.plot_DCR_curve( plot, area_hist_x, DCR_values, DCR_der_x_points, DCR_der_y_points, min_area_x) area_hist = np.histogram(df[(df['length'] > length_cut_min) & (df['length'] < length_cut_max)]['area'], bins=np.linspace(0.5 * min_area_x, 1.5 * min_area_x, 300)) area_hist_x = area_hist[1] area_hist_x = ( area_hist_x + (area_hist_x[1] - area_hist_x[0]) / 2)[:-1] area_hist_y = area_hist[0] (A, mu, sigma), cov = curve_fit(Gaussian, area_hist_x, area_hist_y, p0=(2000, min_area_x, 0.05 * min_area_x)) if plot != False: pylars.plotting.plotanalysis.plot_SPE_fit( df, length_cut_min, length_cut_max, plot, area_hist_x, min_area_x, A, mu, sigma) return (A, mu, sigma), cov @classmethod def get_1pe_rough(cls, df: pd.DataFrame, length_cut_min: int, length_cut_max: int, bins: Union[int, list] = 200, use_scipy_find_peaks: bool = False) -> tuple: """From an event df (1 channel), find the rough position of the SPE from the DCR vs threshold curve and its derivative Args: df (pd.DataFrame): dataframe with the processed data. length_cut_min (int): minimum value of lenght to consider length_cut_max (int): maximum value of lenght to consider bins (intorlist, optional): number of bins to make the are histogram or list of bin edges to consider on the histogram. Defaults to 200. Returns: tuple: computed arrays: area_hist_x, DCR_values, DCR_der_x_points, DCR_der_y_points, min_area_x """ (area_hist_x, DCR_values) = cls.get_DCR_above_threshold_values( df, length_cut_min, length_cut_max, bins, output='values') # type: ignore grad = np.gradient(DCR_values) grad_spline = itp.UnivariateSpline(area_hist_x, grad) # , s = len(area_hist_x)*3) DCR_der_x_points = np.linspace( area_hist_x[0], area_hist_x[-1], bins) # type: ignore DCR_der_y_points = grad_spline(DCR_der_x_points) if use_scipy_find_peaks: pks, props = find_peaks(-1 * DCR_der_y_points, prominence=20) peaks_area_values = DCR_der_x_points[pks] if len(peaks_area_values) == 0: print('Could not find any peaks') min_area_x = np.nan elif len(peaks_area_values) > 0: if (peaks_area_values[0] > 2000 * 200) or (len(peaks_area_values) == 1): min_area_x = peaks_area_values[0] else: min_area_x = peaks_area_values[1] else: min_idx = np.where(DCR_der_y_points == min(DCR_der_y_points)) min_area_x = DCR_der_x_points[min_idx][0] return (area_hist_x, DCR_values, DCR_der_x_points, DCR_der_y_points, min_area_x) # type:ignore @classmethod def get_DCR_above_threshold_values(cls, df: pd.DataFrame, length_cut_min: int = 5, length_cut_max: int = 80, bins: Union[int, list] = 200, output: str = 'values', **kwargs) -> Union[ tuple, itp.UnivariateSpline, itp.interp1d, None]: """Computes the event rate in a sweep of area thresholds and returns the pair [area thersholds, DCR values] Args: df (pd.DataFrame): a pd.DataFrame with the series "area" and "length". length_cut_min (int, optional): cut to impose on the minimum length of the peaks for noise suppression. Defaults to 5. length_cut_max (int, optional): cut to impose on the maximum length of the peaks for noise suppression. Defaults to 5. bins (intorlist, optional): number of bins to make the are histogram or list of bin edges to consider on the histogram. Defaults to 200. output (str): type of output. Can be 'values', 'spline' or 'interp1d' Returns: tuple: pair of np.ndarrays (area thersholds, DCR values) """ if output not in ['values', 'spline', 'interp1d']: raise NotImplementedError("Specifiy a valid output. Options are " "'values', 'spline' or 'interp1d'.") area_hist = np.histogram(df[(df['length'] > length_cut_min) & (df['length'] < length_cut_max)]['area'], bins=bins) area_hist_x = area_hist[1] area_hist_x = (area_hist_x + (area_hist_x[1] - area_hist_x[0]) / 2)[:-1] area_hist_y = area_hist[0] DCR_values = np.flip(np.cumsum(np.flip(area_hist_y))) if output == 'values': return (area_hist_x, DCR_values) elif output == 'spline': DCR_func = itp.UnivariateSpline(area_hist_x, DCR_values, **kwargs) return DCR_func elif output == 'interp1d': DCR_func = itp.interp1d(area_hist_x, DCR_values) return DCR_func @staticmethod def get_DCR(df: pd.DataFrame, length_cut_min: int, length_cut_max: int, spe_area: float, sensor_area: float, t: float) -> tuple: """Compute the dark count rate (DCR) and crosstalk probability (CTP) of a dataset. Args: df (pd.DataFrame): DataFrame of the dataset (single ch) length_cut_min (int): low length cut to apply length_cut_max (int): high length cut to apply spe_area (float): SPE area sensor_area (float): effective are of the sensor t (float): livetime of the dataset Returns: tuple: DCR value, error of DCR value, CTP value, error of CTP value """ pe_0_5 = spe_area * 0.5 pe_1_5 = spe_area * 1.5 DC_0_5 = len(df[(df['length'] > length_cut_min) & (df['length'] < length_cut_max) & (df['area'] > pe_0_5)]) DC_1_5 = len(df[(df['length'] > length_cut_min) & (df['length'] < length_cut_max) & (df['area'] > pe_1_5)]) DC_0_5_error = np.sqrt(DC_0_5) DC_1_5_error = np.sqrt(DC_1_5) DCR = DC_0_5 / sensor_area / t DCR_error = DC_0_5_error / sensor_area / t CTP = DC_1_5 / DC_0_5 CTP_error = np.sqrt((DC_1_5_error / DC_0_5)**2 + (DC_1_5 * DC_0_5_error / (DC_0_5)**2)**2) return DCR, DCR_error, CTP, CTP_error @staticmethod def get_DCR_amplitude(df: pd.DataFrame, length_cut_min: int, length_cut_max: int, pe_amplitude: float, pe_amplitude_std: float, sensor_area: float, t: float) -> tuple: """Compute the dark count rate (DCR) and crosstalk probability (CTP) of a dataset, amplitude based. Args: df (pd.DataFrame): DataFrame of the dataset (single ch) length_cut_min (int): low length cut to apply length_cut_max (int): high length cut to apply pe_amplitude (float): SPE amplitude sensor_area (float): effective are of the sensor t (float): livetime of the dataset Returns: tuple: DCR value, error of DCR value, CTP value, error of CTP value """ pe_0_5 = pe_amplitude * 0.5 pe_1_5 = pe_amplitude * 1.5 pe_m5sigma = pe_amplitude - 5 * pe_amplitude_std pe_p5sigma = pe_amplitude + 5 * pe_amplitude_std DC_0_5 = len(df[(df['length'] > length_cut_min) & (df['length'] < length_cut_max) & (df['amplitude'] < pe_p5sigma)]) DC_1_5 = len(df[(df['length'] > length_cut_min) & (df['length'] < length_cut_max) & (df['area'] < pe_m5sigma)]) DC_0_5_error = np.sqrt(DC_0_5) DC_1_5_error = np.sqrt(DC_1_5) DCR = DC_0_5 / sensor_area / t DCR_error = DC_0_5_error / sensor_area / t CTP = DC_1_5 / DC_0_5 CTP_error = np.sqrt((DC_0_5_error / DC_1_5)**2 + (DC_0_5 * DC_1_5_error / (DC_1_5)**2)**2) return DCR, DCR_error, CTP, CTP_error @staticmethod def print_DCR_CTP(DCR: float, DCR_error: float, CTP: float, CTP_error: float) -> None: """Print the dark count rate (DCR) and crosstalk probability (CTP) values in a nice formatted and welcoming message. Args: DCR (float): DCR value DCR_error (float): Error on the DCR value CTP (float): CTP value CTP_error (float): Error on the CTP value """ print(f'Your lovely DCR is: ({DCR:.2f} +- {DCR_error:.2f}) Hz/mm^2') print(f'Your lovely CTP is: ({CTP*100:.2f} +- {CTP_error*100:.2f})%')
Subclasses
Static methods
def get_1pe_rough(df: pandas.core.frame.DataFrame, length_cut_min: int, length_cut_max: int, bins: Union[int, list] = 200, use_scipy_find_peaks: bool = False) ‑> tuple
-
From an event df (1 channel), find the rough position of the SPE from the DCR vs threshold curve and its derivative
Args
df
:pd.DataFrame
- dataframe with the processed data.
length_cut_min
:int
- minimum value of lenght to consider
length_cut_max
:int
- maximum value of lenght to consider
bins
:intorlist
, optional- number of bins to make the are histogram or list of bin edges to consider on the histogram. Defaults to 200.
Returns
tuple
- computed arrays: area_hist_x, DCR_values, DCR_der_x_points, DCR_der_y_points, min_area_x
Expand source code
@classmethod def get_1pe_rough(cls, df: pd.DataFrame, length_cut_min: int, length_cut_max: int, bins: Union[int, list] = 200, use_scipy_find_peaks: bool = False) -> tuple: """From an event df (1 channel), find the rough position of the SPE from the DCR vs threshold curve and its derivative Args: df (pd.DataFrame): dataframe with the processed data. length_cut_min (int): minimum value of lenght to consider length_cut_max (int): maximum value of lenght to consider bins (intorlist, optional): number of bins to make the are histogram or list of bin edges to consider on the histogram. Defaults to 200. Returns: tuple: computed arrays: area_hist_x, DCR_values, DCR_der_x_points, DCR_der_y_points, min_area_x """ (area_hist_x, DCR_values) = cls.get_DCR_above_threshold_values( df, length_cut_min, length_cut_max, bins, output='values') # type: ignore grad = np.gradient(DCR_values) grad_spline = itp.UnivariateSpline(area_hist_x, grad) # , s = len(area_hist_x)*3) DCR_der_x_points = np.linspace( area_hist_x[0], area_hist_x[-1], bins) # type: ignore DCR_der_y_points = grad_spline(DCR_der_x_points) if use_scipy_find_peaks: pks, props = find_peaks(-1 * DCR_der_y_points, prominence=20) peaks_area_values = DCR_der_x_points[pks] if len(peaks_area_values) == 0: print('Could not find any peaks') min_area_x = np.nan elif len(peaks_area_values) > 0: if (peaks_area_values[0] > 2000 * 200) or (len(peaks_area_values) == 1): min_area_x = peaks_area_values[0] else: min_area_x = peaks_area_values[1] else: min_idx = np.where(DCR_der_y_points == min(DCR_der_y_points)) min_area_x = DCR_der_x_points[min_idx][0] return (area_hist_x, DCR_values, DCR_der_x_points, DCR_der_y_points, min_area_x) # type:ignore
def get_1pe_value_fit(df: pandas.core.frame.DataFrame, length_cut_min: int = 5, length_cut_max: int = 80, plot: Union[bool, str] = False, use_scipy_find_peaks: bool = False) ‑> tuple
-
Try to fit the SPE peak in the area histogram and return the Gaussian paramenters.
Args
df
:pd.DataFrame
- description
length_cut
:int
, optional- cut to impose on the length of the peaks for noise suppression. Defaults to 5.
plot
:boolorstr
, optional- description. Defaults to False.
Returns
tuple
- (A, mu, sigma), cov
Expand source code
@classmethod def get_1pe_value_fit(cls, df: pd.DataFrame, length_cut_min: int = 5, length_cut_max: int = 80, plot: Union[bool, str] = False, use_scipy_find_peaks: bool = False) -> tuple: """Try to fit the SPE peak in the area histogram and return the Gaussian paramenters. Args: df (pd.DataFrame): _description_ length_cut (int, optional): cut to impose on the length of the peaks for noise suppression. Defaults to 5. plot (boolorstr, optional): _description_. Defaults to False. Returns: tuple: (A, mu, sigma), cov """ (area_hist_x, DCR_values, DCR_der_x_points, DCR_der_y_points, min_area_x) = cls.get_1pe_rough( df, length_cut_min, length_cut_max, use_scipy_find_peaks=use_scipy_find_peaks) if plot != False: pylars.plotting.plotanalysis.plot_DCR_curve( plot, area_hist_x, DCR_values, DCR_der_x_points, DCR_der_y_points, min_area_x) area_hist = np.histogram(df[(df['length'] > length_cut_min) & (df['length'] < length_cut_max)]['area'], bins=np.linspace(0.5 * min_area_x, 1.5 * min_area_x, 300)) area_hist_x = area_hist[1] area_hist_x = ( area_hist_x + (area_hist_x[1] - area_hist_x[0]) / 2)[:-1] area_hist_y = area_hist[0] (A, mu, sigma), cov = curve_fit(Gaussian, area_hist_x, area_hist_y, p0=(2000, min_area_x, 0.05 * min_area_x)) if plot != False: pylars.plotting.plotanalysis.plot_SPE_fit( df, length_cut_min, length_cut_max, plot, area_hist_x, min_area_x, A, mu, sigma) return (A, mu, sigma), cov
def get_DCR(df: pandas.core.frame.DataFrame, length_cut_min: int, length_cut_max: int, spe_area: float, sensor_area: float, t: float) ‑> tuple
-
Compute the dark count rate (DCR) and crosstalk probability (CTP) of a dataset.
Args
df
:pd.DataFrame
- DataFrame of the dataset (single ch)
length_cut_min
:int
- low length cut to apply
length_cut_max
:int
- high length cut to apply
spe_area
:float
- SPE area
sensor_area
:float
- effective are of the sensor
t
:float
- livetime of the dataset
Returns
tuple
- DCR value, error of DCR value, CTP value, error of CTP value
Expand source code
@staticmethod def get_DCR(df: pd.DataFrame, length_cut_min: int, length_cut_max: int, spe_area: float, sensor_area: float, t: float) -> tuple: """Compute the dark count rate (DCR) and crosstalk probability (CTP) of a dataset. Args: df (pd.DataFrame): DataFrame of the dataset (single ch) length_cut_min (int): low length cut to apply length_cut_max (int): high length cut to apply spe_area (float): SPE area sensor_area (float): effective are of the sensor t (float): livetime of the dataset Returns: tuple: DCR value, error of DCR value, CTP value, error of CTP value """ pe_0_5 = spe_area * 0.5 pe_1_5 = spe_area * 1.5 DC_0_5 = len(df[(df['length'] > length_cut_min) & (df['length'] < length_cut_max) & (df['area'] > pe_0_5)]) DC_1_5 = len(df[(df['length'] > length_cut_min) & (df['length'] < length_cut_max) & (df['area'] > pe_1_5)]) DC_0_5_error = np.sqrt(DC_0_5) DC_1_5_error = np.sqrt(DC_1_5) DCR = DC_0_5 / sensor_area / t DCR_error = DC_0_5_error / sensor_area / t CTP = DC_1_5 / DC_0_5 CTP_error = np.sqrt((DC_1_5_error / DC_0_5)**2 + (DC_1_5 * DC_0_5_error / (DC_0_5)**2)**2) return DCR, DCR_error, CTP, CTP_error
def get_DCR_above_threshold_values(df: pandas.core.frame.DataFrame, length_cut_min: int = 5, length_cut_max: int = 80, bins: Union[int, list] = 200, output: str = 'values', **kwargs) ‑> Union[tuple, scipy.interpolate.fitpack2.UnivariateSpline, scipy.interpolate.interpolate.interp1d, NoneType]
-
Computes the event rate in a sweep of area thresholds and returns the pair [area thersholds, DCR values]
Args
df
:pd.DataFrame
- a pd.DataFrame with the series "area" and "length".
length_cut_min
:int
, optional- cut to impose on the minimum length of the peaks for noise suppression. Defaults to 5.
length_cut_max
:int
, optional- cut to impose on the maximum length of the peaks for noise suppression. Defaults to 5.
bins
:intorlist
, optional- number of bins to make the are histogram or list of bin edges to consider on the histogram. Defaults to 200.
output
:str
- type of output. Can be 'values', 'spline' or 'interp1d'
Returns
tuple
- pair of np.ndarrays (area thersholds, DCR values)
Expand source code
@classmethod def get_DCR_above_threshold_values(cls, df: pd.DataFrame, length_cut_min: int = 5, length_cut_max: int = 80, bins: Union[int, list] = 200, output: str = 'values', **kwargs) -> Union[ tuple, itp.UnivariateSpline, itp.interp1d, None]: """Computes the event rate in a sweep of area thresholds and returns the pair [area thersholds, DCR values] Args: df (pd.DataFrame): a pd.DataFrame with the series "area" and "length". length_cut_min (int, optional): cut to impose on the minimum length of the peaks for noise suppression. Defaults to 5. length_cut_max (int, optional): cut to impose on the maximum length of the peaks for noise suppression. Defaults to 5. bins (intorlist, optional): number of bins to make the are histogram or list of bin edges to consider on the histogram. Defaults to 200. output (str): type of output. Can be 'values', 'spline' or 'interp1d' Returns: tuple: pair of np.ndarrays (area thersholds, DCR values) """ if output not in ['values', 'spline', 'interp1d']: raise NotImplementedError("Specifiy a valid output. Options are " "'values', 'spline' or 'interp1d'.") area_hist = np.histogram(df[(df['length'] > length_cut_min) & (df['length'] < length_cut_max)]['area'], bins=bins) area_hist_x = area_hist[1] area_hist_x = (area_hist_x + (area_hist_x[1] - area_hist_x[0]) / 2)[:-1] area_hist_y = area_hist[0] DCR_values = np.flip(np.cumsum(np.flip(area_hist_y))) if output == 'values': return (area_hist_x, DCR_values) elif output == 'spline': DCR_func = itp.UnivariateSpline(area_hist_x, DCR_values, **kwargs) return DCR_func elif output == 'interp1d': DCR_func = itp.interp1d(area_hist_x, DCR_values) return DCR_func
def get_DCR_amplitude(df: pandas.core.frame.DataFrame, length_cut_min: int, length_cut_max: int, pe_amplitude: float, pe_amplitude_std: float, sensor_area: float, t: float) ‑> tuple
-
Compute the dark count rate (DCR) and crosstalk probability (CTP) of a dataset, amplitude based.
Args
df
:pd.DataFrame
- DataFrame of the dataset (single ch)
length_cut_min
:int
- low length cut to apply
length_cut_max
:int
- high length cut to apply
pe_amplitude
:float
- SPE amplitude
sensor_area
:float
- effective are of the sensor
t
:float
- livetime of the dataset
Returns
tuple
- DCR value, error of DCR value, CTP value, error of CTP value
Expand source code
@staticmethod def get_DCR_amplitude(df: pd.DataFrame, length_cut_min: int, length_cut_max: int, pe_amplitude: float, pe_amplitude_std: float, sensor_area: float, t: float) -> tuple: """Compute the dark count rate (DCR) and crosstalk probability (CTP) of a dataset, amplitude based. Args: df (pd.DataFrame): DataFrame of the dataset (single ch) length_cut_min (int): low length cut to apply length_cut_max (int): high length cut to apply pe_amplitude (float): SPE amplitude sensor_area (float): effective are of the sensor t (float): livetime of the dataset Returns: tuple: DCR value, error of DCR value, CTP value, error of CTP value """ pe_0_5 = pe_amplitude * 0.5 pe_1_5 = pe_amplitude * 1.5 pe_m5sigma = pe_amplitude - 5 * pe_amplitude_std pe_p5sigma = pe_amplitude + 5 * pe_amplitude_std DC_0_5 = len(df[(df['length'] > length_cut_min) & (df['length'] < length_cut_max) & (df['amplitude'] < pe_p5sigma)]) DC_1_5 = len(df[(df['length'] > length_cut_min) & (df['length'] < length_cut_max) & (df['area'] < pe_m5sigma)]) DC_0_5_error = np.sqrt(DC_0_5) DC_1_5_error = np.sqrt(DC_1_5) DCR = DC_0_5 / sensor_area / t DCR_error = DC_0_5_error / sensor_area / t CTP = DC_1_5 / DC_0_5 CTP_error = np.sqrt((DC_0_5_error / DC_1_5)**2 + (DC_0_5 * DC_1_5_error / (DC_1_5)**2)**2) return DCR, DCR_error, CTP, CTP_error
def print_DCR_CTP(DCR: float, DCR_error: float, CTP: float, CTP_error: float) ‑> NoneType
-
Print the dark count rate (DCR) and crosstalk probability (CTP) values in a nice formatted and welcoming message.
Args
DCR
:float
- DCR value
DCR_error
:float
- Error on the DCR value
CTP
:float
- CTP value
CTP_error
:float
- Error on the CTP value
Expand source code
@staticmethod def print_DCR_CTP(DCR: float, DCR_error: float, CTP: float, CTP_error: float) -> None: """Print the dark count rate (DCR) and crosstalk probability (CTP) values in a nice formatted and welcoming message. Args: DCR (float): DCR value DCR_error (float): Error on the DCR value CTP (float): CTP value CTP_error (float): Error on the CTP value """ print(f'Your lovely DCR is: ({DCR:.2f} +- {DCR_error:.2f}) Hz/mm^2') print(f'Your lovely CTP is: ({CTP*100:.2f} +- {CTP_error*100:.2f})%')
class DCR_dataset (run: run, temperature: float, module: int, channel: str, processor: run_processor)
-
Object class to hold dark count related instances and methods. Collects all the data and properties of a single MMPC, meaning the pair (module, channel) for all the available voltages at a certain temperature.
Expand source code
class DCR_dataset(DCR_analysis): """Object class to hold dark count related instances and methods. Collects all the data and properties of a single MMPC, meaning the pair (module, channel) for all the available voltages at a certain temperature. """ __version__ = '0.0.2' def __init__(self, run: pylars.utils.input.run, temperature: float, module: int, channel: str, processor: pylars.processing.rawprocessor.run_processor): self.run = run self.temp = temperature self.module = module self.channel = channel self.process = processor self.voltages = self.get_voltages_available() self.plots_flag = False self.livetimes = self.get_livetimes() self.set_standard_cuts() # There are better ways to set these options but this is stil # better then a lot of flags self.use_scipy_find_peaks = False self.amplitude_based = False def set_plots_flag(self, flag: bool) -> None: """Set if computing properties makes plots (True) or not (False). Assumes a ./figures/ directory exists. Args: flag (bool): True for plotting stuff, False for not making nice pictures (plots) to hang on the wall. Yes, I hang plots on my bedroom wall and it looks nice. """ self.plots_flag = flag def define_SiPM_config(self, sensor_area: float = 12 * 12, ) -> None: r"""Define the SiPM data related quantities for the dataset. Args: sensor_area (float, optional): Area of the photosensor (mm\*\*2). Defaults to 12\*12. """ SiPM_config = {'sensor_area': sensor_area, } self.SiPM_config = SiPM_config def get_voltages_available(self) -> np.ndarray: """Checks the loaded run for which voltages are available for the defined temperature. Returns: np.ndarray: array of the available voltages """ voltages = [] for _dataset in self.run.datasets: if (_dataset.temp == self.temp) and (_dataset.kind == 'DCR'): voltages.append(_dataset.vbias) voltages = np.unique(voltages) return voltages def get_livetimes(self) -> dict: """Fetch the livetimes of a dataset. Based on the propertied of the DCR_dataset, creates its dataset object and computes the livetime for each vbias by multiplying the number of samples per waveform with the number of waveforms in the dataset and the duration of a sample (given by `self.run.ADC_config['dt']`). Provides the result in dict format where the keys are the vbias of the dataset. Returns: dict: _description_ """ livetimes = {} for v in self.voltages: # Need all this to find the correct path... self.datasets_df = self.run.get_run_df() selection = ((self.datasets_df['kind'] == 'DCR') & (self.datasets_df['vbias'] == v) & (self.datasets_df['temp'] == self.temp) & # to end with just one of the modules. # It's assumed both have the same number of # entries and size of entries (self.datasets_df['module'] == 0)) ds_selected = self.datasets_df[selection] assert len( self.datasets_df[selection]) == 1, "Found more than 1 ds with the same config. Help." ds_temp = pylars.utils.input.dataset(path=str(ds_selected['path'].values[0]), kind='DCR', module=0, temp=self.temp, vbias=v) try: n_waveforms, n_samples = ds_temp.read_sizes() livetimes[v] = n_waveforms * \ n_samples * self.run.ADC_config['dt'] except BaseException: livetimes[v] = np.nan return livetimes def set_standard_cuts(self, cut_area_min: float = 5, cut_area_max: float = 1e6, cut_length_min: int = 4, cut_length_max: int = 70, cut_n_pulses_min: float = 0, cut_n_pulses_max: float = 2) -> None: """Sets the cuts to use in analysis as object variables. Args: cut_area_min (float, optional): Area minimum value. Defaults to 5. cut_area_max (float, optional): Area maximum value. Defaults to 1e6. cut_length_min (float, optional): Lenght minimum value. Defaults to 4. cut_length_max (float, optional): Lenght minimum value. Defaults to 70. cut_n_pulses_min (float, optional): Minimum number of pulses in the waveform. Defaults to 0. cut_n_pulses_max (float, optional): Maximum number of pulses in the waveform. Defaults to 2. """ self.cut_area_min = cut_area_min self.cut_area_max = cut_area_max self.cut_length_min = cut_length_min self.cut_length_max = cut_length_max self.cut_n_pulses_min = cut_n_pulses_min self.cut_n_pulses_max = cut_n_pulses_max def load_processed_data(self, force_processing: bool = False) -> None: """For all the voltages of a DCR_dataset (smae temperature) looks for already processed files to load. If force_processing=True and no saved file is found, processes the dataset with standard options (sigma = 5, N_baseline = 50). Args: force_processing (bool, optional): Flag to force processing of raw data in case the processed dataset is not found. Defaults to False. Returns: dict: dictionary with the data df for each voltage as a key. """ self.data = {} for _voltage in tqdm(self.voltages, desc=f'Loading processed data for DCR ' + f'data at {self.temp}K: ', total=len(self.voltages), leave=False): processed_data = pylars.utils.output.processed_dataset( run=self.run, kind='DCR', vbias=_voltage, temp=self.temp, path_processed=('/disk/gfs_atp/xenoscope/SiPMs/char_campaign/' 'processed_data/'), process_hash=self.process.hash) processed_data.load_data(force=force_processing) _df = processed_data.data mask = ((_df['module'] == self.module) & (_df['channel'] == self.channel)) self.data[_voltage] = copy.deepcopy(_df[mask]) def compute_properties_of_dataset( self, use_n_pulse_wf: bool = False, compute_BV_flag: bool = True, amplitude_based: bool = False) -> pd.DataFrame: """Calculate the gain, DCR, CTP and BV for the dataset in a single line! Args: use_1_pulse_wf (bool): compute_BV_flag (bool): Returns: pd.DataFrame: dataframe with all the computed properties with the columns ['T', 'V', 'SPE_area','SPE_area_error', 'Gain', 'Gain_error', 'SPE_res', 'SPE_res_error', 'DCR', 'DCR_error', 'CTP', 'CTP_error'] """ assert isinstance(self.data, dict), ('Woops, no data found! Load' ' data into the dataset first') if amplitude_based: raise NotImplementedError voltage_list = self.voltages _results_dataset = pd.DataFrame( columns=['T', 'V', 'SPE_area', 'SPE_area_error', 'Gain', 'Gain_error', 'SPE_res', 'SPE_res_error', 'DCR', 'DCR_error', 'CTP', 'CTP_error']) for _volt in voltage_list: # select voltage df = self.data[_volt] if use_n_pulse_wf == True: df = df[(df['n_pulses'] > self.cut_n_pulses_min) & (df['n_pulses'] <= self.cut_n_pulses_max)] if self.plots_flag == True: plot_name_1pe_fit = (f'{self.temp}K_{_volt}V_mod{self.module}_' f'ch{self.channel}') else: plot_name_1pe_fit = False try: # Get SPE value from Gaussian fit (A, mu, sigma), cov = self.get_1pe_value_fit( df, plot=plot_name_1pe_fit, length_cut_min=self.cut_length_min, length_cut_max=self.cut_length_max, use_scipy_find_peaks=self.use_scipy_find_peaks) A_err, mu_err, sigma_err = np.sqrt(np.diag(cov)) # Calculate DCR and CTP DCR, DCR_error, CTP, CTP_error = self.get_DCR( df=df, length_cut_min=self.cut_length_min, length_cut_max=self.cut_length_max, spe_area=mu, sensor_area=self.SiPM_config['sensor_area'], t=self.livetimes[_volt]) # Calculate gain gain = get_gain(F_amp=self.run.ADC_config['F_amp'], spe_area=mu, ADC_range=self.run.ADC_config['ADC_range'], ADC_impedance=self.run.ADC_config['ADC_impedance'], ADC_res=self.run.ADC_config['ADC_res'], q_e=self.run.ADC_config['q_e']) gain_error = get_gain(F_amp=self.run.ADC_config['F_amp'], spe_area=mu_err, ADC_range=self.run.ADC_config['ADC_range'], ADC_impedance=self.run.ADC_config['ADC_impedance'], ADC_res=self.run.ADC_config['ADC_res'], q_e=self.run.ADC_config['q_e']) SPE_res = np.abs(sigma / mu) * 100 # in % SPE_res_err = np.sqrt( (sigma_err / mu)**2 + (sigma * mu_err / mu**2)**2) * 100 # in % # Merge into rolling dataframe (I know it's slow... make a PR, # pls) _results_dataset = pd.concat( (_results_dataset, pd.DataFrame({'T': [self.temp], 'V': [_volt], 'SPE_area': [mu], 'SPE_area_error': [mu_err], 'Gain': [gain], 'Gain_error': [gain_error], 'SPE_res': [SPE_res], 'SPE_res_error': [SPE_res_err], 'DCR': [DCR], 'DCR_error': [DCR_error], 'CTP': [CTP * 100], 'CTP_error': [CTP_error * 100]}) ), ignore_index=True ) except BaseException: print(f'Could not compute properties of module {self.module}, ' f'channel {self.channel}, {self.temp} K, {_volt} V. ' f'Skipping dataset.') _results_dataset = pd.concat( (_results_dataset, pd.DataFrame({'T': [self.temp], 'V': [_volt], 'SPE_area': [np.nan], 'SPE_area_error': [np.nan], 'Gain': [np.nan], 'Gain_error': [np.nan], 'SPE_res': [np.nan], 'SPE_res_error': [np.nan], 'DCR': [np.nan], 'DCR_error': [np.nan], 'CTP': [np.nan], 'CTP_error': [np.nan]}) ), ignore_index=True ) # Compute BV from gain(V) curve and update df if compute_BV_flag == True: if self.plots_flag == True: plot_BV = f'BV_mod{self.module}_ch{self.channel}' else: plot_BV = False _results_dataset, _a, _b, _b_err = compute_BV_df( _results_dataset, plot_BV) return _results_dataset @staticmethod def get_how_many_peaks_per_waveform(df: pd.DataFrame, verbose: bool = False) -> pd.DataFrame: """Finds how many peaks each waveform has. Only looks in wavforms with at least 1 peak Since commit 6f8e0c72b8e3e0bf7c5307941b49bdf57879d554 this method is not needed as n_pulses is stored during processing. Args: df (pd.DataFrame): dataframe with the processed pulse data. verbose (bool): print info during processing. Returns: pd.DataFrame: dataframe with the pairs: wf_number - pulse_count. """ t0 = time.time() if verbose: print('Starting counting peaks at:', t0) wf_number_arr = np.array(df['wf_number'].values) waveforms_w_pulses, N_pulses = _get_how_many_peaks_per_waveform( wf_number_arr) pulse_count_df = pd.concat( [pd.Series(waveforms_w_pulses, name='wf_number'), pd.Series(N_pulses, name='pulse_count')], axis=1 ) t1 = time.time() if verbose: print('Finished conting peaks at: ', t1) print('Took time in calc: ', t1 - t0) return pulse_count_df def get_med_amplitude(self, df: pd.DataFrame, cut_mask: np.ndarray) -> tuple: """Calculate the median and standard deviation of the distribution of amplitudes, applying a given mask. Args: df (pd.DataFrame): dataframe with the processed pulse data extra_cut_mask (np.ndarray): cut mask Returns: tuple: _description_ """ med_amplitude = np.median(df[cut_mask]['amplitude']) std_amplitude = np.std(df[cut_mask]['amplitude']) return med_amplitude, std_amplitude def compute_properties_of_dataset_amplitude_based(self, use_n_pulse_wf: bool = True) -> pd.DataFrame: """NOT IMPLEMENTED - Needs review and merge with other compute method Calculate the gain, DCR, CTP and BV for the dataset in a single line! This is an alternative method to the main one, using amplitude cuts instead of area cuts. Returns: pd.DataFrame: dataframe with all the computed properties with the columns ['V','T','path','module','channel','Gain','DCR', 'CTP','DCR_error','CTP_error','BV','OV'] """ raise NotImplementedError # needs review and merging with normal
Ancestors
Subclasses
Static methods
def get_how_many_peaks_per_waveform(df: pandas.core.frame.DataFrame, verbose: bool = False) ‑> pandas.core.frame.DataFrame
-
Finds how many peaks each waveform has. Only looks in wavforms with at least 1 peak
Since commit 6f8e0c72b8e3e0bf7c5307941b49bdf57879d554 this method is not needed as n_pulses is stored during processing.
Args
df
:pd.DataFrame
- dataframe with the processed pulse data.
verbose
:bool
- print info during processing.
Returns
pd.DataFrame
- dataframe with the pairs: wf_number - pulse_count.
Expand source code
@staticmethod def get_how_many_peaks_per_waveform(df: pd.DataFrame, verbose: bool = False) -> pd.DataFrame: """Finds how many peaks each waveform has. Only looks in wavforms with at least 1 peak Since commit 6f8e0c72b8e3e0bf7c5307941b49bdf57879d554 this method is not needed as n_pulses is stored during processing. Args: df (pd.DataFrame): dataframe with the processed pulse data. verbose (bool): print info during processing. Returns: pd.DataFrame: dataframe with the pairs: wf_number - pulse_count. """ t0 = time.time() if verbose: print('Starting counting peaks at:', t0) wf_number_arr = np.array(df['wf_number'].values) waveforms_w_pulses, N_pulses = _get_how_many_peaks_per_waveform( wf_number_arr) pulse_count_df = pd.concat( [pd.Series(waveforms_w_pulses, name='wf_number'), pd.Series(N_pulses, name='pulse_count')], axis=1 ) t1 = time.time() if verbose: print('Finished conting peaks at: ', t1) print('Took time in calc: ', t1 - t0) return pulse_count_df
Methods
def compute_properties_of_dataset(self, use_n_pulse_wf: bool = False, compute_BV_flag: bool = True, amplitude_based: bool = False) ‑> pandas.core.frame.DataFrame
-
Calculate the gain, DCR, CTP and BV for the dataset in a single line!
Args
use_1_pulse_wf (bool): compute_BV_flag (bool):
Returns
pd.DataFrame
- dataframe with all the computed properties with the columns ['T', 'V', 'SPE_area','SPE_area_error', 'Gain', 'Gain_error', 'SPE_res', 'SPE_res_error', 'DCR', 'DCR_error', 'CTP', 'CTP_error']
Expand source code
def compute_properties_of_dataset( self, use_n_pulse_wf: bool = False, compute_BV_flag: bool = True, amplitude_based: bool = False) -> pd.DataFrame: """Calculate the gain, DCR, CTP and BV for the dataset in a single line! Args: use_1_pulse_wf (bool): compute_BV_flag (bool): Returns: pd.DataFrame: dataframe with all the computed properties with the columns ['T', 'V', 'SPE_area','SPE_area_error', 'Gain', 'Gain_error', 'SPE_res', 'SPE_res_error', 'DCR', 'DCR_error', 'CTP', 'CTP_error'] """ assert isinstance(self.data, dict), ('Woops, no data found! Load' ' data into the dataset first') if amplitude_based: raise NotImplementedError voltage_list = self.voltages _results_dataset = pd.DataFrame( columns=['T', 'V', 'SPE_area', 'SPE_area_error', 'Gain', 'Gain_error', 'SPE_res', 'SPE_res_error', 'DCR', 'DCR_error', 'CTP', 'CTP_error']) for _volt in voltage_list: # select voltage df = self.data[_volt] if use_n_pulse_wf == True: df = df[(df['n_pulses'] > self.cut_n_pulses_min) & (df['n_pulses'] <= self.cut_n_pulses_max)] if self.plots_flag == True: plot_name_1pe_fit = (f'{self.temp}K_{_volt}V_mod{self.module}_' f'ch{self.channel}') else: plot_name_1pe_fit = False try: # Get SPE value from Gaussian fit (A, mu, sigma), cov = self.get_1pe_value_fit( df, plot=plot_name_1pe_fit, length_cut_min=self.cut_length_min, length_cut_max=self.cut_length_max, use_scipy_find_peaks=self.use_scipy_find_peaks) A_err, mu_err, sigma_err = np.sqrt(np.diag(cov)) # Calculate DCR and CTP DCR, DCR_error, CTP, CTP_error = self.get_DCR( df=df, length_cut_min=self.cut_length_min, length_cut_max=self.cut_length_max, spe_area=mu, sensor_area=self.SiPM_config['sensor_area'], t=self.livetimes[_volt]) # Calculate gain gain = get_gain(F_amp=self.run.ADC_config['F_amp'], spe_area=mu, ADC_range=self.run.ADC_config['ADC_range'], ADC_impedance=self.run.ADC_config['ADC_impedance'], ADC_res=self.run.ADC_config['ADC_res'], q_e=self.run.ADC_config['q_e']) gain_error = get_gain(F_amp=self.run.ADC_config['F_amp'], spe_area=mu_err, ADC_range=self.run.ADC_config['ADC_range'], ADC_impedance=self.run.ADC_config['ADC_impedance'], ADC_res=self.run.ADC_config['ADC_res'], q_e=self.run.ADC_config['q_e']) SPE_res = np.abs(sigma / mu) * 100 # in % SPE_res_err = np.sqrt( (sigma_err / mu)**2 + (sigma * mu_err / mu**2)**2) * 100 # in % # Merge into rolling dataframe (I know it's slow... make a PR, # pls) _results_dataset = pd.concat( (_results_dataset, pd.DataFrame({'T': [self.temp], 'V': [_volt], 'SPE_area': [mu], 'SPE_area_error': [mu_err], 'Gain': [gain], 'Gain_error': [gain_error], 'SPE_res': [SPE_res], 'SPE_res_error': [SPE_res_err], 'DCR': [DCR], 'DCR_error': [DCR_error], 'CTP': [CTP * 100], 'CTP_error': [CTP_error * 100]}) ), ignore_index=True ) except BaseException: print(f'Could not compute properties of module {self.module}, ' f'channel {self.channel}, {self.temp} K, {_volt} V. ' f'Skipping dataset.') _results_dataset = pd.concat( (_results_dataset, pd.DataFrame({'T': [self.temp], 'V': [_volt], 'SPE_area': [np.nan], 'SPE_area_error': [np.nan], 'Gain': [np.nan], 'Gain_error': [np.nan], 'SPE_res': [np.nan], 'SPE_res_error': [np.nan], 'DCR': [np.nan], 'DCR_error': [np.nan], 'CTP': [np.nan], 'CTP_error': [np.nan]}) ), ignore_index=True ) # Compute BV from gain(V) curve and update df if compute_BV_flag == True: if self.plots_flag == True: plot_BV = f'BV_mod{self.module}_ch{self.channel}' else: plot_BV = False _results_dataset, _a, _b, _b_err = compute_BV_df( _results_dataset, plot_BV) return _results_dataset
def compute_properties_of_dataset_amplitude_based(self, use_n_pulse_wf: bool = True) ‑> pandas.core.frame.DataFrame
-
NOT IMPLEMENTED - Needs review and merge with other compute method
Calculate the gain, DCR, CTP and BV for the dataset in a single line! This is an alternative method to the main one, using amplitude cuts instead of area cuts.
Returns
pd.DataFrame
- dataframe with all the computed properties with the columns ['V','T','path','module','channel','Gain','DCR', 'CTP','DCR_error','CTP_error','BV','OV']
Expand source code
def compute_properties_of_dataset_amplitude_based(self, use_n_pulse_wf: bool = True) -> pd.DataFrame: """NOT IMPLEMENTED - Needs review and merge with other compute method Calculate the gain, DCR, CTP and BV for the dataset in a single line! This is an alternative method to the main one, using amplitude cuts instead of area cuts. Returns: pd.DataFrame: dataframe with all the computed properties with the columns ['V','T','path','module','channel','Gain','DCR', 'CTP','DCR_error','CTP_error','BV','OV'] """ raise NotImplementedError # needs review and merging with normal
def define_SiPM_config(self, sensor_area: float = 144) ‑> NoneType
-
Define the SiPM data related quantities for the dataset.
Args
sensor_area
:float
, optional- Area of the photosensor (mm**2). Defaults to 12*12.
Expand source code
def define_SiPM_config(self, sensor_area: float = 12 * 12, ) -> None: r"""Define the SiPM data related quantities for the dataset. Args: sensor_area (float, optional): Area of the photosensor (mm\*\*2). Defaults to 12\*12. """ SiPM_config = {'sensor_area': sensor_area, } self.SiPM_config = SiPM_config
def get_livetimes(self) ‑> dict
-
Fetch the livetimes of a dataset.
Based on the propertied of the DCR_dataset, creates its dataset object and computes the livetime for each vbias by multiplying the number of samples per waveform with the number of waveforms in the dataset and the duration of a sample (given by
self.run.ADC_config['dt']
). Provides the result in dict format where the keys are the vbias of the dataset.Returns
dict
- description
Expand source code
def get_livetimes(self) -> dict: """Fetch the livetimes of a dataset. Based on the propertied of the DCR_dataset, creates its dataset object and computes the livetime for each vbias by multiplying the number of samples per waveform with the number of waveforms in the dataset and the duration of a sample (given by `self.run.ADC_config['dt']`). Provides the result in dict format where the keys are the vbias of the dataset. Returns: dict: _description_ """ livetimes = {} for v in self.voltages: # Need all this to find the correct path... self.datasets_df = self.run.get_run_df() selection = ((self.datasets_df['kind'] == 'DCR') & (self.datasets_df['vbias'] == v) & (self.datasets_df['temp'] == self.temp) & # to end with just one of the modules. # It's assumed both have the same number of # entries and size of entries (self.datasets_df['module'] == 0)) ds_selected = self.datasets_df[selection] assert len( self.datasets_df[selection]) == 1, "Found more than 1 ds with the same config. Help." ds_temp = pylars.utils.input.dataset(path=str(ds_selected['path'].values[0]), kind='DCR', module=0, temp=self.temp, vbias=v) try: n_waveforms, n_samples = ds_temp.read_sizes() livetimes[v] = n_waveforms * \ n_samples * self.run.ADC_config['dt'] except BaseException: livetimes[v] = np.nan return livetimes
def get_med_amplitude(self, df: pandas.core.frame.DataFrame, cut_mask: numpy.ndarray) ‑> tuple
-
Calculate the median and standard deviation of the distribution of amplitudes, applying a given mask.
Args
df
:pd.DataFrame
- dataframe with the processed pulse data
extra_cut_mask
:np.ndarray
- cut mask
Returns
tuple
- description
Expand source code
def get_med_amplitude(self, df: pd.DataFrame, cut_mask: np.ndarray) -> tuple: """Calculate the median and standard deviation of the distribution of amplitudes, applying a given mask. Args: df (pd.DataFrame): dataframe with the processed pulse data extra_cut_mask (np.ndarray): cut mask Returns: tuple: _description_ """ med_amplitude = np.median(df[cut_mask]['amplitude']) std_amplitude = np.std(df[cut_mask]['amplitude']) return med_amplitude, std_amplitude
def get_voltages_available(self) ‑> numpy.ndarray
-
Checks the loaded run for which voltages are available for the defined temperature.
Returns
np.ndarray
- array of the available voltages
Expand source code
def get_voltages_available(self) -> np.ndarray: """Checks the loaded run for which voltages are available for the defined temperature. Returns: np.ndarray: array of the available voltages """ voltages = [] for _dataset in self.run.datasets: if (_dataset.temp == self.temp) and (_dataset.kind == 'DCR'): voltages.append(_dataset.vbias) voltages = np.unique(voltages) return voltages
def load_processed_data(self, force_processing: bool = False) ‑> NoneType
-
For all the voltages of a DCR_dataset (smae temperature) looks for already processed files to load. If force_processing=True and no saved file is found, processes the dataset with standard options (sigma = 5, N_baseline = 50).
Args
force_processing
:bool
, optional- Flag to force processing of raw data in case the processed dataset is not found. Defaults to False.
Returns
dict
- dictionary with the data df for each voltage as a key.
Expand source code
def load_processed_data(self, force_processing: bool = False) -> None: """For all the voltages of a DCR_dataset (smae temperature) looks for already processed files to load. If force_processing=True and no saved file is found, processes the dataset with standard options (sigma = 5, N_baseline = 50). Args: force_processing (bool, optional): Flag to force processing of raw data in case the processed dataset is not found. Defaults to False. Returns: dict: dictionary with the data df for each voltage as a key. """ self.data = {} for _voltage in tqdm(self.voltages, desc=f'Loading processed data for DCR ' + f'data at {self.temp}K: ', total=len(self.voltages), leave=False): processed_data = pylars.utils.output.processed_dataset( run=self.run, kind='DCR', vbias=_voltage, temp=self.temp, path_processed=('/disk/gfs_atp/xenoscope/SiPMs/char_campaign/' 'processed_data/'), process_hash=self.process.hash) processed_data.load_data(force=force_processing) _df = processed_data.data mask = ((_df['module'] == self.module) & (_df['channel'] == self.channel)) self.data[_voltage] = copy.deepcopy(_df[mask])
def set_plots_flag(self, flag: bool) ‑> NoneType
-
Set if computing properties makes plots (True) or not (False). Assumes a ./figures/ directory exists.
Args
flag
:bool
- True for plotting stuff, False for not making nice pictures (plots) to hang on the wall. Yes, I hang plots on my bedroom wall and it looks nice.
Expand source code
def set_plots_flag(self, flag: bool) -> None: """Set if computing properties makes plots (True) or not (False). Assumes a ./figures/ directory exists. Args: flag (bool): True for plotting stuff, False for not making nice pictures (plots) to hang on the wall. Yes, I hang plots on my bedroom wall and it looks nice. """ self.plots_flag = flag
def set_standard_cuts(self, cut_area_min: float = 5, cut_area_max: float = 1000000.0, cut_length_min: int = 4, cut_length_max: int = 70, cut_n_pulses_min: float = 0, cut_n_pulses_max: float = 2) ‑> NoneType
-
Sets the cuts to use in analysis as object variables.
Args
cut_area_min
:float
, optional- Area minimum value. Defaults to 5.
cut_area_max
:float
, optional- Area maximum value. Defaults to 1e6.
cut_length_min
:float
, optional- Lenght minimum value. Defaults to 4.
cut_length_max
:float
, optional- Lenght minimum value. Defaults to 70.
cut_n_pulses_min
:float
, optional- Minimum number of pulses in the waveform. Defaults to 0.
cut_n_pulses_max
:float
, optional- Maximum number of pulses in the waveform. Defaults to 2.
Expand source code
def set_standard_cuts(self, cut_area_min: float = 5, cut_area_max: float = 1e6, cut_length_min: int = 4, cut_length_max: int = 70, cut_n_pulses_min: float = 0, cut_n_pulses_max: float = 2) -> None: """Sets the cuts to use in analysis as object variables. Args: cut_area_min (float, optional): Area minimum value. Defaults to 5. cut_area_max (float, optional): Area maximum value. Defaults to 1e6. cut_length_min (float, optional): Lenght minimum value. Defaults to 4. cut_length_max (float, optional): Lenght minimum value. Defaults to 70. cut_n_pulses_min (float, optional): Minimum number of pulses in the waveform. Defaults to 0. cut_n_pulses_max (float, optional): Maximum number of pulses in the waveform. Defaults to 2. """ self.cut_area_min = cut_area_min self.cut_area_max = cut_area_max self.cut_length_min = cut_length_min self.cut_length_max = cut_length_max self.cut_n_pulses_min = cut_n_pulses_min self.cut_n_pulses_max = cut_n_pulses_max
Inherited members
class DCR_run (run: run, processor: run_processor, use_n_pulse: bool = True)
-
Collection of all the DCR_datasets results for a run, ie, for all the channels and modules, for every available temperatures and voltages.
The results of every dataset (channel, V, T) is saved on the instance DCR_run.results_df .
Expand source code
class DCR_run(): """Collection of all the DCR_datasets results for a run, ie, for all the channels and modules, for every available temperatures and voltages. The results of every dataset (channel, V, T) is saved on the instance DCR_run.results_df . """ __version__ = '0.0.1' def __init__(self, run: pylars.utils.input.run, processor: pylars.processing.rawprocessor.run_processor, use_n_pulse: bool = True): self.run = run self.process = processor self.use_n_pulse = use_n_pulse self.datasets = self.process.datasets_df self.temperatures = self.get_run_temperatures() self.plots_flag = False self.analysis_path = (f'{self.run.main_data_path[:-9]}analysis_data' f'/run{self.run.run_number}/') def set_plots_flag(self, flag: bool) -> None: """Set if computing properties makes plots (True) or not (False). Assumes a ./figures/ directory exists. Args: flag (bool): True for plotting stuff, False for not making nice pictures (plots) to hang on the wall. Yes, I hang plots on my bedroom wall and it looks nice. """ self.plots_flag = flag def get_run_temperatures(self) -> np.ndarray: """Get all the temperatures available in the DCR run. Returns: np.ndarray: array with all the available temperatures. """ temp_list = np.unique(self.datasets['temp']) return temp_list def initialize_results_df(self) -> None: """Initialize a clean results_df instance in the object. """ results_df = pd.DataFrame( columns=['T', 'V', 'SPE_area', 'SPE_area_error', 'Gain', 'Gain_error', 'SPE_res', 'SPE_res_error', 'DCR', 'DCR_error', 'CTP', 'CTP_error']) self.results_df = results_df def define_run_SiPM_config(self, sensor_area: float = 12 * 12, ) -> None: r"""Define the SiPM data related quantities for the dataset. Args: livetime (float): livetime of the measurement for DCR porpuses. sensor_area (float, optional): Area of the photosensor (mm\*\*2). Defaults to 12\*12. """ SiPM_config = {'sensor_area': sensor_area, } self.SiPM_config = SiPM_config def set_standard_cuts_run(self, cut_area_min: float = 5, cut_area_max: float = 1e6, cut_length_min: int = 4, cut_length_max: int = 70, cut_n_pulses_min: float = 0, cut_n_pulses_max: float = 2) -> None: """Sets the cuts to use in analysis as (run) object variables. Args: cut_area_min (float, optional): Area minimum value. Defaults to 5. cut_area_max (float, optional): Area maximum value. Defaults to 1e6. cut_length_min (float, optional): Lenght minimum value. Defaults to 4. cut_length_max (float, optional): Lenght minimum value. Defaults to 70. cut_n_pulses_min (float, optional): Minimum number of pulses in the waveform. Defaults to 0. cut_n_pulses_max (float, optional): Maximum number of pulses in the waveform. Defaults to 2. """ self.cut_area_min = cut_area_min self.cut_area_max = cut_area_max self.cut_length_min = cut_length_min self.cut_length_max = cut_length_max self.cut_n_pulses_min = cut_n_pulses_min self.cut_n_pulses_max = cut_n_pulses_max def load_dataset(self, temp: float, module: int, channel: str) -> DCR_dataset: """Create a DCR_dataset object for a (T, mod, ch) configuration and load the corresponding data into it. ! This assumes processed data is available for all the raw files of the DCR run datasets ! Args: temp (float): temperature to consider module (int): module to load channel (str): channel in the module to select Returns: DCR_dataset: dataset obeject """ particular_DCR_dataset = DCR_dataset(run=self.run, temperature=temp, module=module, channel=channel, processor=self.process, ) particular_DCR_dataset.load_processed_data() return particular_DCR_dataset def compute_properties_of_ds(self, temp: float, module: int, channel: str, amplitude_based: bool = False ) -> pd.DataFrame: """Loads and computes the properties of a single dataset (temp, module, channel) by creating a DCR_dataset object and calling its methods. Args: temp (float): temperature module (int): module channel (str): channel amplitude_based (bool): if the computation method is based on amplitude instead of area Returns: pd.DataFrame: dataframe """ assert isinstance(self.run.ADC_config, dict), 'No ADC_config found!' ds = self.load_dataset(temp, module, channel) ds.set_plots_flag(self.plots_flag) ds.SiPM_config = self.SiPM_config ds.cut_area_min = self.cut_area_min ds.cut_area_max = self.cut_area_max ds.cut_length_min = self.cut_length_min ds.cut_length_max = self.cut_length_max ds.cut_n_pulses_min = self.cut_n_pulses_min ds.cut_n_pulses_max = self.cut_n_pulses_max if amplitude_based: ds_results = ds.compute_properties_of_dataset_amplitude_based( use_n_pulse_wf=self.use_n_pulse, ) else: ds_results = ds.compute_properties_of_dataset( compute_BV_flag=False, use_n_pulse_wf=self.use_n_pulse) # Add module and channel columns module_Series = pd.Series([module] * len(ds_results), name='module') channel_Series = pd.Series([channel] * len(ds_results), name='channel') ds_results = pd.concat([ds_results, module_Series, channel_Series], axis=1) return ds_results def compute_properties_of_run(self, amplitude_based: bool = False) -> None: """Loads and computes the properties of ALL the datasets. Args: amplitude_based (bool): Turn True to compute SPE based on amplitude. Returns: pd.DataFrame: The results. """ self.initialize_results_df() all_channels = pylars.utils.common.get_channel_list(self.process) for temperature in self.temperatures: for (module, channel) in tqdm(all_channels, (f'Computing properties for ' f'T={temperature}: ')): _ds_results = self.compute_properties_of_ds( temp=temperature, module=module, channel=channel, amplitude_based=amplitude_based) self.results_df = pd.concat( [self.results_df, _ds_results], # type: ignore ignore_index=True) def save_results(self, custom_name: str) -> None: """Save dataframe of results to a hdf5 file. Saved files go to self.analysis_path. Args: name (str): name to give the file (without extension). """ assert isinstance( self.results_df, pd.DataFrame), ("Trying to save results that do " "not exist in the object, c'mon" ", you know better.") assert len(self.results_df) > 0, ("Results df is empty, please compute" "something before trying to save, " "otherwire it's just a waste of " "disk space") name = f'DCR_results_{custom_name}' self.results_df.to_hdf(self.analysis_path + name + '.h5', key='df') print('Saved results to ') def load_results(self, name: str) -> None: """Load dataframe of results from a hdf5 file. Looks for files in the standard analysis cache directory. Args: name (str): name of the file to load (without extension) """ _df = pd.read_hdf(self.analysis_path + name + '.h5') self.results_df = _df
Methods
def compute_properties_of_ds(self, temp: float, module: int, channel: str, amplitude_based: bool = False) ‑> pandas.core.frame.DataFrame
-
Loads and computes the properties of a single dataset (temp, module, channel) by creating a DCR_dataset object and calling its methods.
Args
temp
:float
- temperature
module
:int
- module
channel
:str
- channel
amplitude_based
:bool
- if the computation method is based on amplitude instead of area
Returns
pd.DataFrame
- dataframe
Expand source code
def compute_properties_of_ds(self, temp: float, module: int, channel: str, amplitude_based: bool = False ) -> pd.DataFrame: """Loads and computes the properties of a single dataset (temp, module, channel) by creating a DCR_dataset object and calling its methods. Args: temp (float): temperature module (int): module channel (str): channel amplitude_based (bool): if the computation method is based on amplitude instead of area Returns: pd.DataFrame: dataframe """ assert isinstance(self.run.ADC_config, dict), 'No ADC_config found!' ds = self.load_dataset(temp, module, channel) ds.set_plots_flag(self.plots_flag) ds.SiPM_config = self.SiPM_config ds.cut_area_min = self.cut_area_min ds.cut_area_max = self.cut_area_max ds.cut_length_min = self.cut_length_min ds.cut_length_max = self.cut_length_max ds.cut_n_pulses_min = self.cut_n_pulses_min ds.cut_n_pulses_max = self.cut_n_pulses_max if amplitude_based: ds_results = ds.compute_properties_of_dataset_amplitude_based( use_n_pulse_wf=self.use_n_pulse, ) else: ds_results = ds.compute_properties_of_dataset( compute_BV_flag=False, use_n_pulse_wf=self.use_n_pulse) # Add module and channel columns module_Series = pd.Series([module] * len(ds_results), name='module') channel_Series = pd.Series([channel] * len(ds_results), name='channel') ds_results = pd.concat([ds_results, module_Series, channel_Series], axis=1) return ds_results
def compute_properties_of_run(self, amplitude_based: bool = False) ‑> NoneType
-
Loads and computes the properties of ALL the datasets.
Args
amplitude_based
:bool
- Turn True to compute SPE based on amplitude.
Returns
pd.DataFrame
- The results.
Expand source code
def compute_properties_of_run(self, amplitude_based: bool = False) -> None: """Loads and computes the properties of ALL the datasets. Args: amplitude_based (bool): Turn True to compute SPE based on amplitude. Returns: pd.DataFrame: The results. """ self.initialize_results_df() all_channels = pylars.utils.common.get_channel_list(self.process) for temperature in self.temperatures: for (module, channel) in tqdm(all_channels, (f'Computing properties for ' f'T={temperature}: ')): _ds_results = self.compute_properties_of_ds( temp=temperature, module=module, channel=channel, amplitude_based=amplitude_based) self.results_df = pd.concat( [self.results_df, _ds_results], # type: ignore ignore_index=True)
def define_run_SiPM_config(self, sensor_area: float = 144) ‑> NoneType
-
Define the SiPM data related quantities for the dataset.
Args
livetime
:float
- livetime of the measurement for DCR porpuses.
sensor_area
:float
, optional- Area of the photosensor (mm**2). Defaults to 12*12.
Expand source code
def define_run_SiPM_config(self, sensor_area: float = 12 * 12, ) -> None: r"""Define the SiPM data related quantities for the dataset. Args: livetime (float): livetime of the measurement for DCR porpuses. sensor_area (float, optional): Area of the photosensor (mm\*\*2). Defaults to 12\*12. """ SiPM_config = {'sensor_area': sensor_area, } self.SiPM_config = SiPM_config
def get_run_temperatures(self) ‑> numpy.ndarray
-
Get all the temperatures available in the DCR run.
Returns
np.ndarray
- array with all the available temperatures.
Expand source code
def get_run_temperatures(self) -> np.ndarray: """Get all the temperatures available in the DCR run. Returns: np.ndarray: array with all the available temperatures. """ temp_list = np.unique(self.datasets['temp']) return temp_list
def initialize_results_df(self) ‑> NoneType
-
Initialize a clean results_df instance in the object.
Expand source code
def initialize_results_df(self) -> None: """Initialize a clean results_df instance in the object. """ results_df = pd.DataFrame( columns=['T', 'V', 'SPE_area', 'SPE_area_error', 'Gain', 'Gain_error', 'SPE_res', 'SPE_res_error', 'DCR', 'DCR_error', 'CTP', 'CTP_error']) self.results_df = results_df
def load_dataset(self, temp: float, module: int, channel: str) ‑> DCR_dataset
-
Create a DCR_dataset object for a (T, mod, ch) configuration and load the corresponding data into it.
! This assumes processed data is available for all the raw files of the DCR run datasets !
Args
temp
:float
- temperature to consider
module
:int
- module to load
channel
:str
- channel in the module to select
Returns
DCR_dataset
- dataset obeject
Expand source code
def load_dataset(self, temp: float, module: int, channel: str) -> DCR_dataset: """Create a DCR_dataset object for a (T, mod, ch) configuration and load the corresponding data into it. ! This assumes processed data is available for all the raw files of the DCR run datasets ! Args: temp (float): temperature to consider module (int): module to load channel (str): channel in the module to select Returns: DCR_dataset: dataset obeject """ particular_DCR_dataset = DCR_dataset(run=self.run, temperature=temp, module=module, channel=channel, processor=self.process, ) particular_DCR_dataset.load_processed_data() return particular_DCR_dataset
def load_results(self, name: str) ‑> NoneType
-
Load dataframe of results from a hdf5 file. Looks for files in the standard analysis cache directory.
Args
name
:str
- name of the file to load (without extension)
Expand source code
def load_results(self, name: str) -> None: """Load dataframe of results from a hdf5 file. Looks for files in the standard analysis cache directory. Args: name (str): name of the file to load (without extension) """ _df = pd.read_hdf(self.analysis_path + name + '.h5') self.results_df = _df
def save_results(self, custom_name: str) ‑> NoneType
-
Save dataframe of results to a hdf5 file. Saved files go to self.analysis_path.
Args
name
:str
- name to give the file (without extension).
Expand source code
def save_results(self, custom_name: str) -> None: """Save dataframe of results to a hdf5 file. Saved files go to self.analysis_path. Args: name (str): name to give the file (without extension). """ assert isinstance( self.results_df, pd.DataFrame), ("Trying to save results that do " "not exist in the object, c'mon" ", you know better.") assert len(self.results_df) > 0, ("Results df is empty, please compute" "something before trying to save, " "otherwire it's just a waste of " "disk space") name = f'DCR_results_{custom_name}' self.results_df.to_hdf(self.analysis_path + name + '.h5', key='df') print('Saved results to ')
def set_plots_flag(self, flag: bool) ‑> NoneType
-
Set if computing properties makes plots (True) or not (False). Assumes a ./figures/ directory exists.
Args
flag
:bool
- True for plotting stuff, False for not making nice pictures (plots) to hang on the wall. Yes, I hang plots on my bedroom wall and it looks nice.
Expand source code
def set_plots_flag(self, flag: bool) -> None: """Set if computing properties makes plots (True) or not (False). Assumes a ./figures/ directory exists. Args: flag (bool): True for plotting stuff, False for not making nice pictures (plots) to hang on the wall. Yes, I hang plots on my bedroom wall and it looks nice. """ self.plots_flag = flag
def set_standard_cuts_run(self, cut_area_min: float = 5, cut_area_max: float = 1000000.0, cut_length_min: int = 4, cut_length_max: int = 70, cut_n_pulses_min: float = 0, cut_n_pulses_max: float = 2) ‑> NoneType
-
Sets the cuts to use in analysis as (run) object variables.
Args
cut_area_min
:float
, optional- Area minimum value. Defaults to 5.
cut_area_max
:float
, optional- Area maximum value. Defaults to 1e6.
cut_length_min
:float
, optional- Lenght minimum value. Defaults to 4.
cut_length_max
:float
, optional- Lenght minimum value. Defaults to 70.
cut_n_pulses_min
:float
, optional- Minimum number of pulses in the waveform. Defaults to 0.
cut_n_pulses_max
:float
, optional- Maximum number of pulses in the waveform. Defaults to 2.
Expand source code
def set_standard_cuts_run(self, cut_area_min: float = 5, cut_area_max: float = 1e6, cut_length_min: int = 4, cut_length_max: int = 70, cut_n_pulses_min: float = 0, cut_n_pulses_max: float = 2) -> None: """Sets the cuts to use in analysis as (run) object variables. Args: cut_area_min (float, optional): Area minimum value. Defaults to 5. cut_area_max (float, optional): Area maximum value. Defaults to 1e6. cut_length_min (float, optional): Lenght minimum value. Defaults to 4. cut_length_max (float, optional): Lenght minimum value. Defaults to 70. cut_n_pulses_min (float, optional): Minimum number of pulses in the waveform. Defaults to 0. cut_n_pulses_max (float, optional): Maximum number of pulses in the waveform. Defaults to 2. """ self.cut_area_min = cut_area_min self.cut_area_max = cut_area_max self.cut_length_min = cut_length_min self.cut_length_max = cut_length_max self.cut_n_pulses_min = cut_n_pulses_min self.cut_n_pulses_max = cut_n_pulses_max