Module pylars.utils.input
Expand source code
from glob import glob
from typing import Tuple, Union
import numpy as np
import pandas as pd
import uproot
from pylars.utils.common import load_ADC_config, get_summary_info
from pylars.utils.gsheets_db import xenoscope_db
class raw_data():
'''
General raw data class to define paths to raw and processed data,
acquisition conditions, ...
'''
def __init__(self, raw_path: str, V: float, T: float, module: int):
self.raw_path = raw_path
self.tree = 't1'
self.load_root()
self.get_available_channels()
self.get_n_samples()
self.get_n_waveforms()
self.set_general_conditions()
self.set_specific_conditions(V, T, module)
def set_general_conditions(self):
'''
Define the conditions of the data taking and of the setup
to be propagated forward. TO DO: fetch and save in a DB
'''
self.ADC_range = 2.25
self.ADC_impedance = 50
self.F_amp = 20 * 10
self.ADC_res = 2**14
self.q_e = 1.602176634e-19
self.dt = 10e-9
self.charge_factor = self.ADC_range / self.ADC_impedance / \
self.F_amp / self.ADC_res * self.dt / self.q_e
def set_specific_conditions(self, V: float, T: float, module: int):
"""Sets the run specific conditions the data was taken
Args:
V (float): Bias voltage applied
T (float): Temperature
"""
self.bias_voltage = V
self.temperature = T
self.module = module
def load_root(self):
"""Open the ROOT file and put in memory.
"""
try:
raw_file = uproot.open(self.raw_path)
self.raw_file = raw_file
except BaseException:
print(f'No root file found for {self.raw_path}')
def get_available_channels(self):
'''
Scans the loaded raw file for branches in tree the tree '''
keys = self.raw_file[self.tree].keys()
if keys[-1] == 'Time':
keys.pop(-1)
self.channels = keys
def get_channel_data(self, ch: str) -> np.ndarray:
'''
Return the raw data array of a given channel.
'''
data = self.raw_file[self.tree][ch].array() # type: ignore
return np.array(data)
def get_n_waveforms(self) -> int:
"""Get the number of waveforms in the root file without reading
the whole array.
Returns:
int: The number of wfs in the file
"""
first_channel = self.channels[0]
n_waveforms = self.raw_file[self.tree][first_channel].num_entries # type: ignore
self.n_waveforms = n_waveforms
return n_waveforms
def get_n_samples(self) -> int:
"""Get the number of samples in wach waveform in the root file
without reading the whole array.
Returns:
int: The number of samples of the wfs in the file
"""
first_channel = self.channels[0]
n_samples = self.raw_file[self.tree][first_channel].interpretation.inner_shape[0] # type: ignore
self.n_samples = n_samples
return n_samples
class run():
"""A run is made of a collection of datasets taken at a given
setup. Usually, opening and closing the setup defines a run.
The datasets can be at different tmeperature and bias voltage
conditions but the layout of the array stays the same."""
def __init__(self, run_number: int, main_data_path: str, F_amp: float,
ADC_model: str = 'v1724',
signal_negative_polarity: bool = True):
self.run_number = run_number
self.main_data_path = main_data_path
self.main_run_path = self.get_run_path()
self.root_files = self.get_all_files_of_run()
self.datasets = self.fetch_datasets()
self.define_ADC_config(F_amp=F_amp, model=ADC_model)
self.signal_negative_polarity = signal_negative_polarity
def __repr__(self) -> str:
repr = f'Run {self.run_number}'
return repr
def get_run_path(self) -> str:
"""Creates string with the run raw data directory.
Returns:
str: path to run raw data.
"""
if self.run_number < 6:
main_run_path = self.main_data_path + \
f'run{self.run_number}/'
else:
main_run_path = self.main_data_path + \
f'run{self.run_number}/data/'
return main_run_path
def read_layout(self):
"""Fetch the SiPM layout from a file.
layout: dict(mod0 = dict(ch# = dict('tile': str
'mppc:[###,###,...]
)
ch# = dict('tile': str
'mppc:[###,###,...]
)
),
mod1 = dict(ch# = dict('tile': str
'mppc:[###,###,...]
)
ch# = dict('tile': str
'mppc:[###,###,...]
)
),
)
fetch info in the form:
which tile: layout[<module>][<channel>]['tile'] -> str
which mppc(s): layout[<module>][<channel>]['mppc'] -> list of ints
"""
raise NotImplementedError
def define_ADC_config(self, F_amp: float, model: str = 'v1724') -> None:
"""Load the ADC related quantities for the run.
Args:
model (str): model of the digitizer
F_amp (float): signal amplification from the sensor (pre-amp *
external amplification on rack).
"""
self.ADC_config = load_ADC_config(model, F_amp)
def get_all_files_of_run(self) -> list:
"""Look for all the raw files stored for a given run.
Returns:
list: list of all ROOT files in the run.
"""
all_root_files = glob(
self.main_run_path + '**/*.root', recursive=True)
return all_root_files
def fetch_datasets(self) -> list:
"""Get all the datasets of a given run.
This method needs to be adapted to the specific data storage system.
Returns:
list: list of all the datasets of a given run. Elements
are type dataset.
"""
all_root_files = self.root_files
datasets = []
if self.run_number == 9:
self.root_files = []
temps = [190, 195, 200, 205, 210]
for t in temps:
root_files = glob(
f'{self.main_data_path}{str(t)}/breakdown-v/**/*.root',
recursive=True)
for path in root_files:
f = path.split('/')[-1].split('_')
if f[0] == 'test':
continue
v = float(f'{f[0]}.{f[1][:-1]}')
datasets.append(dataset(path, 'BV', 0, t, v))
# LED OFF
for t in temps:
root_files = glob(
f'{self.main_data_path}{str(t)}/dcr/**/*.root',
recursive=True)
for path in root_files:
f = path.split('/')[-1].split('_')
if f[0] == 'test':
continue
v = float(f'{f[0]}.{f[1][:-1]}')
datasets.append(dataset(path, 'DCR', 0, t, v))
if self.run_number >= 6:
for file in all_root_files:
try:
split_file_path = file.split('/')
_module = int(split_file_path[-1][-8])
_temp = float(split_file_path[-1][-27:-24])
_vbias = float(
split_file_path[-1][-22:-17].replace('_', '.'))
if split_file_path[-1][0] == 'B':
_kind = 'BV'
elif split_file_path[-1][0] == 'D':
_kind = 'DCR'
elif split_file_path[-1][0] == 'f':
_kind = 'fplt'
else:
print('Ignoring file: ', file)
continue
datasets.append(
dataset(file, _kind, _module, _temp, _vbias))
except BaseException:
print('Ignoring file: ', file)
elif self.run_number == 1:
for file in all_root_files:
file_split = file.split('/')
f_split = file_split[-1].split('_')
if f_split[0] == 'test':
print('Ignoring test dataset: ', file)
continue
if file_split[8] == 'breakdown-v':
_kind = 'BV'
_vbias = float(f_split[1] + '.' + f_split[2][:-1])
elif file_split[8] == 'dcr':
_kind = 'DCR'
if len(f_split) == 5:
_vbias = float(f_split[1][:-1])
else:
_vbias = float(
f_split[1] + '.' + f_split[2][:-1])
else:
print('Ignoring file due to unknown kind: ', file)
continue
_temp = float(f_split[0][:-1])
_module = int(f_split[-2])
datasets.append(dataset(file, _kind, _module, _temp, _vbias))
elif self.run_number in (2, 3):
for file in all_root_files:
file_split = file.split('/')
f_split = file_split[-1].split('_')
if f_split[0] == 'test':
print('Ignoring test dataset: ', file)
continue
if file_split[8] == 'breakdown-v':
_kind = 'BV'
_vbias = float(f_split[1] + '.' + f_split[2][:-1])
elif file_split[8] == 'dcr':
_kind = 'DCR'
_vbias = float(f_split[1][:-1])
else:
print('Ignoring file due to unknown kind: ', file)
continue
_temp = float(f_split[0][:-1])
_module = int(f_split[-2])
datasets.append(dataset(file, _kind, _module, _temp, _vbias))
elif self.run_number == 4:
for file in all_root_files:
file_split = file.split('/')
f_split = file_split[-1].split('_')
if f_split[0] == 'test':
print('Ignoring test dataset: ', file)
continue
if file_split[8] == 'breakdown-v':
_kind = 'BV'
elif file_split[8] == 'dcr':
_kind = 'DCR'
else:
print('Ignoring file due to unknown kind: ', file)
continue
_vbias = float(f_split[1][:-1])
_temp = float(f_split[0][:-1])
_module = int(f_split[-2])
datasets.append(dataset(file, _kind, _module, _temp, _vbias))
elif self.run_number == 5:
for file in all_root_files:
file_split = file.split('/')
f_split = file_split[-1].split('_')
if f_split[0] == 'test':
print('Ignoring test dataset: ', file)
continue
if file_split[8] == 'breakdown-v':
_kind = 'BV'
elif file_split[8] == 'dcr':
_kind = 'DCR'
else:
print('Ignoring file due to unknown kind: ', file)
continue
_temp = float(f_split[0][:-1])
_vbias = float(f_split[1] + '.' + f_split[2][:-1])
_module = int(f_split[-2])
datasets.append(dataset(file, _kind, _module, _temp, _vbias))
else:
raise NotImplementedError("Run not implemented yet.")
return datasets
def get_run_df(self) -> pd.DataFrame:
"""Get a frienly pandas dataframe with all the datasets available,
their kind, V, T, module and path.
Returns:
pd.DataFrame: all the available datasets in the run.
"""
dataset_list = self.datasets
dicts_list = [ds.dict for ds in dataset_list]
dataset_df = pd.DataFrame(dicts_list)
dataset_df = dataset_df.sort_values(
['kind', 'temp', 'vbias', 'module'],
ignore_index=True)
return dataset_df
class dataset():
"""A dataset is an object with the individual setup of each
data taking process, ie, each time the DAQ starts acquiring
at a certain (T,V).
"""
def __init__(self, path: str, kind: str,
module: int, temp: float, vbias: float):
self.path = path
self.kind = kind
self.module = module
self.temp = temp
self.vbias = vbias
# self.read_sizes()
self.dict = dict(kind=self.kind,
module=self.module,
temp=self.temp,
vbias=self.vbias,
path=self.path,
)
def __repr__(self) -> str:
repr = f'{self.kind}_{self.temp}_{self.vbias}'
return repr
def read_sizes(self) -> Tuple[int, int]:
"""Gets the number of waveforms and number of samples per waveform
of a given dataset.
Brieafly creates a raw_data object of the dataset to access the ROOT
file and read the number of entries as number of waveforms and size of
entries as number of samples in each waveform.
Returns:
Tuple[int, int]: (number of waveforms, number of samples)
"""
raw = raw_data(raw_path=self.path,
V=self.vbias,
T=self.temp,
module=self.module)
raw.load_root()
n_samples = raw.get_n_samples()
n_waveforms = raw.get_n_waveforms()
return n_waveforms, n_samples
def print_config(self):
config_print = f'''
---Dataset info---
path: {self.path}
kind: {self.kind}
module: {self.module}
temperature: {self.temp}
bias voltage: {self.vbias}
--- ---
'''
return config_print
class db_dataset():
"""A dataset is an object with the individual setup of each
data taking process, ie, each time the DAQ starts acquiring
at with a certain config. The db_dataset can point to different
raw_files, one for each module used.
"""
def __init__(self, run_number: str, db: Union[None, xenoscope_db]):
if db is None:
self.db = xenoscope_db()
self.run_number = run_number
self.run_dict = self.db.get_run_dict(self.run_number)
self.path = self.run_dict['Path to remote raw data']
self.run_type = self.run_dict['Run type']
self.vbias = self.run_dict['SiPM bias voltage']
# self.read_sizes()
self.get_files()
def get_files(self):
files = glob(self.path + '**/*.root', recursive=True)
self.files = files
self.n_modules = len(files)
# Assuming each file is a different module, i.e., the full dataset is
# written to only 1 .root file (per module)
def read_sizes(self) -> Tuple[int, int]:
"""Gets the number of waveforms and number of samples per waveform
of a given dataset.
Brieafly creates a raw_data object of the dataset to access the ROOT
file and read the number of entries as number of waveforms and size of
entries as number of samples in each waveform.
Returns:
Tuple[int, int]: (number of waveforms, number of samples)
"""
raw = raw_data(raw_path=self.path,
V=0, # mockup
T=0, # mockup
module=0) # mockup
raw.load_root()
n_samples = raw.get_n_samples()
n_waveforms = raw.get_n_waveforms()
return n_waveforms, n_samples
def print_config(self):
config_print = f'''
---Dataset info---
path: {self.path}
run_type: {self.run_type}
number of modules found: {self.n_modules}
bias voltage: {self.vbias}
number of events: {self.run_dict['Total number of events']}
--- ---
'''
return config_print
class xenoscope_run():
"""Main class of a Xenoscope run. It contains the necessary functions to
find, load and process the data of muon and chntrg data taking.
Some DB would be much better, please make a PR.
"""
def __init__(self, run_number: int, main_data_path: str, F_amp: float,
ADC_model: str = 'v1724',
signal_negative_polarity: bool = True):
self.run_number = run_number
self.main_data_path = main_data_path
self.root_files = self.get_all_files_of_run()
self.files_df, self.files_fail = self.get_all_datasets()
self.define_ADC_config(F_amp=F_amp, model=ADC_model)
self.signal_negative_polarity = signal_negative_polarity
self.load_labels()
def __repr__(self) -> str:
repr = f'Run {self.run_number}'
return repr
def get_all_files_of_run(self) -> list:
"""Look for all the raw files stored for a given run.
Returns:
list: list of all ROOT files in the run.
"""
all_root_files = glob(
self.main_data_path + '/**/*.root', recursive=True)
return all_root_files
def get_run_info(self, file):
file_name = file.split('/')[-1]
specific_name = file_name[:-16]
run_type = specific_name.split('_')[0]
summary_file = f'{self.main_data_path}/{specific_name}/Summary_{specific_name}.txt'
end, duration, n_events = get_summary_info(summary_file)
start = end - duration
module = file_name[-8]
info_dict = {'start': start, 'end': end,
'duration': duration,
'run_type': run_type,
'n_events': int(n_events),
'module': int(module), 'path': file}
return info_dict
def get_all_datasets(self) -> Union[pd.DataFrame, Tuple]:
files_df = pd.DataFrame(columns=['start', 'end', 'duration',
'run_type', 'n_events',
'module', 'path'])
files_fail = []
for file in self.root_files:
try:
files_df = files_df.append(self.get_run_info(file),
ignore_index=True) # type: ignore
except FileNotFoundError:
files_fail.append(file)
files_df.sort_values(by=['start', 'module'], inplace=True,
ignore_index=True)
return files_df, files_fail
def define_ADC_config(self, F_amp: float, model: str = 'v1724') -> None:
"""Load the ADC related quantities for the run.
Args:
model (str): model of the digitizer
F_amp (float): signal amplification from the sensor (pre-amp *
external amplification on rack).
"""
self.ADC_config = load_ADC_config(model, F_amp)
def load_labels(self) -> None:
"""Define the labeling of all the channels and corresponding tiles.
"""
labels_complete = {'mod0': {'wf1': 'wf1 | Tile H',
'wf2': 'wf2 | Tile J',
'wf3': 'wf3 | Tile K',
'wf4': 'wf4 | Tile L',
'wf5': 'wf5 | Tile M',
'wf6': 'wf6 | Muon detector 1',
'wf7': 'wf7 | Muon detector 2'},
'mod1': {'wf1': 'wf1 | Tile A',
'wf2': ' wf2 | Tile B',
'wf3': 'wf3 | Tile C',
'wf4': 'wf4 | Tile D',
'wf5': 'wf5 | Tile E',
'wf6': 'wf6 | Tile F',
'wf7': 'wf7 | Tile G'}
}
labels_tiles = {'mod0': {'wf1': 'H', 'wf2': 'J', 'wf3': 'K',
'wf4': 'L', 'wf5': 'M',
'wf6': 'Muon detector 1',
'wf7': 'Muon detector 2'},
'mod1': {'wf1': 'A', 'wf2': 'B', 'wf3': 'C',
'wf4': 'D', 'wf5': 'E', 'wf6': 'F',
'wf7': 'G'}
}
self.labels_complete = labels_complete
self.labels_tiles = labels_tiles
Classes
class dataset (path: str, kind: str, module: int, temp: float, vbias: float)
-
A dataset is an object with the individual setup of each data taking process, ie, each time the DAQ starts acquiring at a certain (T,V).
Expand source code
class dataset(): """A dataset is an object with the individual setup of each data taking process, ie, each time the DAQ starts acquiring at a certain (T,V). """ def __init__(self, path: str, kind: str, module: int, temp: float, vbias: float): self.path = path self.kind = kind self.module = module self.temp = temp self.vbias = vbias # self.read_sizes() self.dict = dict(kind=self.kind, module=self.module, temp=self.temp, vbias=self.vbias, path=self.path, ) def __repr__(self) -> str: repr = f'{self.kind}_{self.temp}_{self.vbias}' return repr def read_sizes(self) -> Tuple[int, int]: """Gets the number of waveforms and number of samples per waveform of a given dataset. Brieafly creates a raw_data object of the dataset to access the ROOT file and read the number of entries as number of waveforms and size of entries as number of samples in each waveform. Returns: Tuple[int, int]: (number of waveforms, number of samples) """ raw = raw_data(raw_path=self.path, V=self.vbias, T=self.temp, module=self.module) raw.load_root() n_samples = raw.get_n_samples() n_waveforms = raw.get_n_waveforms() return n_waveforms, n_samples def print_config(self): config_print = f''' ---Dataset info--- path: {self.path} kind: {self.kind} module: {self.module} temperature: {self.temp} bias voltage: {self.vbias} --- --- ''' return config_print
Methods
def print_config(self)
-
Expand source code
def print_config(self): config_print = f''' ---Dataset info--- path: {self.path} kind: {self.kind} module: {self.module} temperature: {self.temp} bias voltage: {self.vbias} --- --- ''' return config_print
def read_sizes(self) ‑> Tuple[int, int]
-
Gets the number of waveforms and number of samples per waveform of a given dataset.
Brieafly creates a raw_data object of the dataset to access the ROOT file and read the number of entries as number of waveforms and size of entries as number of samples in each waveform.
Returns
Tuple[int, int]
- (number of waveforms, number of samples)
Expand source code
def read_sizes(self) -> Tuple[int, int]: """Gets the number of waveforms and number of samples per waveform of a given dataset. Brieafly creates a raw_data object of the dataset to access the ROOT file and read the number of entries as number of waveforms and size of entries as number of samples in each waveform. Returns: Tuple[int, int]: (number of waveforms, number of samples) """ raw = raw_data(raw_path=self.path, V=self.vbias, T=self.temp, module=self.module) raw.load_root() n_samples = raw.get_n_samples() n_waveforms = raw.get_n_waveforms() return n_waveforms, n_samples
class db_dataset (run_number: str, db: Union[NoneType, xenoscope_db])
-
A dataset is an object with the individual setup of each data taking process, ie, each time the DAQ starts acquiring at with a certain config. The db_dataset can point to different raw_files, one for each module used.
Expand source code
class db_dataset(): """A dataset is an object with the individual setup of each data taking process, ie, each time the DAQ starts acquiring at with a certain config. The db_dataset can point to different raw_files, one for each module used. """ def __init__(self, run_number: str, db: Union[None, xenoscope_db]): if db is None: self.db = xenoscope_db() self.run_number = run_number self.run_dict = self.db.get_run_dict(self.run_number) self.path = self.run_dict['Path to remote raw data'] self.run_type = self.run_dict['Run type'] self.vbias = self.run_dict['SiPM bias voltage'] # self.read_sizes() self.get_files() def get_files(self): files = glob(self.path + '**/*.root', recursive=True) self.files = files self.n_modules = len(files) # Assuming each file is a different module, i.e., the full dataset is # written to only 1 .root file (per module) def read_sizes(self) -> Tuple[int, int]: """Gets the number of waveforms and number of samples per waveform of a given dataset. Brieafly creates a raw_data object of the dataset to access the ROOT file and read the number of entries as number of waveforms and size of entries as number of samples in each waveform. Returns: Tuple[int, int]: (number of waveforms, number of samples) """ raw = raw_data(raw_path=self.path, V=0, # mockup T=0, # mockup module=0) # mockup raw.load_root() n_samples = raw.get_n_samples() n_waveforms = raw.get_n_waveforms() return n_waveforms, n_samples def print_config(self): config_print = f''' ---Dataset info--- path: {self.path} run_type: {self.run_type} number of modules found: {self.n_modules} bias voltage: {self.vbias} number of events: {self.run_dict['Total number of events']} --- --- ''' return config_print
Methods
def get_files(self)
-
Expand source code
def get_files(self): files = glob(self.path + '**/*.root', recursive=True) self.files = files self.n_modules = len(files) # Assuming each file is a different module, i.e., the full dataset is # written to only 1 .root file (per module)
def print_config(self)
-
Expand source code
def print_config(self): config_print = f''' ---Dataset info--- path: {self.path} run_type: {self.run_type} number of modules found: {self.n_modules} bias voltage: {self.vbias} number of events: {self.run_dict['Total number of events']} --- --- ''' return config_print
def read_sizes(self) ‑> Tuple[int, int]
-
Gets the number of waveforms and number of samples per waveform of a given dataset.
Brieafly creates a raw_data object of the dataset to access the ROOT file and read the number of entries as number of waveforms and size of entries as number of samples in each waveform.
Returns
Tuple[int, int]
- (number of waveforms, number of samples)
Expand source code
def read_sizes(self) -> Tuple[int, int]: """Gets the number of waveforms and number of samples per waveform of a given dataset. Brieafly creates a raw_data object of the dataset to access the ROOT file and read the number of entries as number of waveforms and size of entries as number of samples in each waveform. Returns: Tuple[int, int]: (number of waveforms, number of samples) """ raw = raw_data(raw_path=self.path, V=0, # mockup T=0, # mockup module=0) # mockup raw.load_root() n_samples = raw.get_n_samples() n_waveforms = raw.get_n_waveforms() return n_waveforms, n_samples
class raw_data (raw_path: str, V: float, T: float, module: int)
-
General raw data class to define paths to raw and processed data, acquisition conditions, …
Expand source code
class raw_data(): ''' General raw data class to define paths to raw and processed data, acquisition conditions, ... ''' def __init__(self, raw_path: str, V: float, T: float, module: int): self.raw_path = raw_path self.tree = 't1' self.load_root() self.get_available_channels() self.get_n_samples() self.get_n_waveforms() self.set_general_conditions() self.set_specific_conditions(V, T, module) def set_general_conditions(self): ''' Define the conditions of the data taking and of the setup to be propagated forward. TO DO: fetch and save in a DB ''' self.ADC_range = 2.25 self.ADC_impedance = 50 self.F_amp = 20 * 10 self.ADC_res = 2**14 self.q_e = 1.602176634e-19 self.dt = 10e-9 self.charge_factor = self.ADC_range / self.ADC_impedance / \ self.F_amp / self.ADC_res * self.dt / self.q_e def set_specific_conditions(self, V: float, T: float, module: int): """Sets the run specific conditions the data was taken Args: V (float): Bias voltage applied T (float): Temperature """ self.bias_voltage = V self.temperature = T self.module = module def load_root(self): """Open the ROOT file and put in memory. """ try: raw_file = uproot.open(self.raw_path) self.raw_file = raw_file except BaseException: print(f'No root file found for {self.raw_path}') def get_available_channels(self): ''' Scans the loaded raw file for branches in tree the tree ''' keys = self.raw_file[self.tree].keys() if keys[-1] == 'Time': keys.pop(-1) self.channels = keys def get_channel_data(self, ch: str) -> np.ndarray: ''' Return the raw data array of a given channel. ''' data = self.raw_file[self.tree][ch].array() # type: ignore return np.array(data) def get_n_waveforms(self) -> int: """Get the number of waveforms in the root file without reading the whole array. Returns: int: The number of wfs in the file """ first_channel = self.channels[0] n_waveforms = self.raw_file[self.tree][first_channel].num_entries # type: ignore self.n_waveforms = n_waveforms return n_waveforms def get_n_samples(self) -> int: """Get the number of samples in wach waveform in the root file without reading the whole array. Returns: int: The number of samples of the wfs in the file """ first_channel = self.channels[0] n_samples = self.raw_file[self.tree][first_channel].interpretation.inner_shape[0] # type: ignore self.n_samples = n_samples return n_samples
Methods
def get_available_channels(self)
-
Scans the loaded raw file for branches in tree the tree
Expand source code
def get_available_channels(self): ''' Scans the loaded raw file for branches in tree the tree ''' keys = self.raw_file[self.tree].keys() if keys[-1] == 'Time': keys.pop(-1) self.channels = keys
def get_channel_data(self, ch: str) ‑> numpy.ndarray
-
Return the raw data array of a given channel.
Expand source code
def get_channel_data(self, ch: str) -> np.ndarray: ''' Return the raw data array of a given channel. ''' data = self.raw_file[self.tree][ch].array() # type: ignore return np.array(data)
def get_n_samples(self) ‑> int
-
Get the number of samples in wach waveform in the root file without reading the whole array.
Returns
int
- The number of samples of the wfs in the file
Expand source code
def get_n_samples(self) -> int: """Get the number of samples in wach waveform in the root file without reading the whole array. Returns: int: The number of samples of the wfs in the file """ first_channel = self.channels[0] n_samples = self.raw_file[self.tree][first_channel].interpretation.inner_shape[0] # type: ignore self.n_samples = n_samples return n_samples
def get_n_waveforms(self) ‑> int
-
Get the number of waveforms in the root file without reading the whole array.
Returns
int
- The number of wfs in the file
Expand source code
def get_n_waveforms(self) -> int: """Get the number of waveforms in the root file without reading the whole array. Returns: int: The number of wfs in the file """ first_channel = self.channels[0] n_waveforms = self.raw_file[self.tree][first_channel].num_entries # type: ignore self.n_waveforms = n_waveforms return n_waveforms
def load_root(self)
-
Open the ROOT file and put in memory.
Expand source code
def load_root(self): """Open the ROOT file and put in memory. """ try: raw_file = uproot.open(self.raw_path) self.raw_file = raw_file except BaseException: print(f'No root file found for {self.raw_path}')
def set_general_conditions(self)
-
Define the conditions of the data taking and of the setup to be propagated forward. TO DO: fetch and save in a DB
Expand source code
def set_general_conditions(self): ''' Define the conditions of the data taking and of the setup to be propagated forward. TO DO: fetch and save in a DB ''' self.ADC_range = 2.25 self.ADC_impedance = 50 self.F_amp = 20 * 10 self.ADC_res = 2**14 self.q_e = 1.602176634e-19 self.dt = 10e-9 self.charge_factor = self.ADC_range / self.ADC_impedance / \ self.F_amp / self.ADC_res * self.dt / self.q_e
def set_specific_conditions(self, V: float, T: float, module: int)
-
Sets the run specific conditions the data was taken
Args
V
:float
- Bias voltage applied
T
:float
- Temperature
Expand source code
def set_specific_conditions(self, V: float, T: float, module: int): """Sets the run specific conditions the data was taken Args: V (float): Bias voltage applied T (float): Temperature """ self.bias_voltage = V self.temperature = T self.module = module
class run (run_number: int, main_data_path: str, F_amp: float, ADC_model: str = 'v1724', signal_negative_polarity: bool = True)
-
A run is made of a collection of datasets taken at a given setup. Usually, opening and closing the setup defines a run. The datasets can be at different tmeperature and bias voltage conditions but the layout of the array stays the same.
Expand source code
class run(): """A run is made of a collection of datasets taken at a given setup. Usually, opening and closing the setup defines a run. The datasets can be at different tmeperature and bias voltage conditions but the layout of the array stays the same.""" def __init__(self, run_number: int, main_data_path: str, F_amp: float, ADC_model: str = 'v1724', signal_negative_polarity: bool = True): self.run_number = run_number self.main_data_path = main_data_path self.main_run_path = self.get_run_path() self.root_files = self.get_all_files_of_run() self.datasets = self.fetch_datasets() self.define_ADC_config(F_amp=F_amp, model=ADC_model) self.signal_negative_polarity = signal_negative_polarity def __repr__(self) -> str: repr = f'Run {self.run_number}' return repr def get_run_path(self) -> str: """Creates string with the run raw data directory. Returns: str: path to run raw data. """ if self.run_number < 6: main_run_path = self.main_data_path + \ f'run{self.run_number}/' else: main_run_path = self.main_data_path + \ f'run{self.run_number}/data/' return main_run_path def read_layout(self): """Fetch the SiPM layout from a file. layout: dict(mod0 = dict(ch# = dict('tile': str 'mppc:[###,###,...] ) ch# = dict('tile': str 'mppc:[###,###,...] ) ), mod1 = dict(ch# = dict('tile': str 'mppc:[###,###,...] ) ch# = dict('tile': str 'mppc:[###,###,...] ) ), ) fetch info in the form: which tile: layout[<module>][<channel>]['tile'] -> str which mppc(s): layout[<module>][<channel>]['mppc'] -> list of ints """ raise NotImplementedError def define_ADC_config(self, F_amp: float, model: str = 'v1724') -> None: """Load the ADC related quantities for the run. Args: model (str): model of the digitizer F_amp (float): signal amplification from the sensor (pre-amp * external amplification on rack). """ self.ADC_config = load_ADC_config(model, F_amp) def get_all_files_of_run(self) -> list: """Look for all the raw files stored for a given run. Returns: list: list of all ROOT files in the run. """ all_root_files = glob( self.main_run_path + '**/*.root', recursive=True) return all_root_files def fetch_datasets(self) -> list: """Get all the datasets of a given run. This method needs to be adapted to the specific data storage system. Returns: list: list of all the datasets of a given run. Elements are type dataset. """ all_root_files = self.root_files datasets = [] if self.run_number == 9: self.root_files = [] temps = [190, 195, 200, 205, 210] for t in temps: root_files = glob( f'{self.main_data_path}{str(t)}/breakdown-v/**/*.root', recursive=True) for path in root_files: f = path.split('/')[-1].split('_') if f[0] == 'test': continue v = float(f'{f[0]}.{f[1][:-1]}') datasets.append(dataset(path, 'BV', 0, t, v)) # LED OFF for t in temps: root_files = glob( f'{self.main_data_path}{str(t)}/dcr/**/*.root', recursive=True) for path in root_files: f = path.split('/')[-1].split('_') if f[0] == 'test': continue v = float(f'{f[0]}.{f[1][:-1]}') datasets.append(dataset(path, 'DCR', 0, t, v)) if self.run_number >= 6: for file in all_root_files: try: split_file_path = file.split('/') _module = int(split_file_path[-1][-8]) _temp = float(split_file_path[-1][-27:-24]) _vbias = float( split_file_path[-1][-22:-17].replace('_', '.')) if split_file_path[-1][0] == 'B': _kind = 'BV' elif split_file_path[-1][0] == 'D': _kind = 'DCR' elif split_file_path[-1][0] == 'f': _kind = 'fplt' else: print('Ignoring file: ', file) continue datasets.append( dataset(file, _kind, _module, _temp, _vbias)) except BaseException: print('Ignoring file: ', file) elif self.run_number == 1: for file in all_root_files: file_split = file.split('/') f_split = file_split[-1].split('_') if f_split[0] == 'test': print('Ignoring test dataset: ', file) continue if file_split[8] == 'breakdown-v': _kind = 'BV' _vbias = float(f_split[1] + '.' + f_split[2][:-1]) elif file_split[8] == 'dcr': _kind = 'DCR' if len(f_split) == 5: _vbias = float(f_split[1][:-1]) else: _vbias = float( f_split[1] + '.' + f_split[2][:-1]) else: print('Ignoring file due to unknown kind: ', file) continue _temp = float(f_split[0][:-1]) _module = int(f_split[-2]) datasets.append(dataset(file, _kind, _module, _temp, _vbias)) elif self.run_number in (2, 3): for file in all_root_files: file_split = file.split('/') f_split = file_split[-1].split('_') if f_split[0] == 'test': print('Ignoring test dataset: ', file) continue if file_split[8] == 'breakdown-v': _kind = 'BV' _vbias = float(f_split[1] + '.' + f_split[2][:-1]) elif file_split[8] == 'dcr': _kind = 'DCR' _vbias = float(f_split[1][:-1]) else: print('Ignoring file due to unknown kind: ', file) continue _temp = float(f_split[0][:-1]) _module = int(f_split[-2]) datasets.append(dataset(file, _kind, _module, _temp, _vbias)) elif self.run_number == 4: for file in all_root_files: file_split = file.split('/') f_split = file_split[-1].split('_') if f_split[0] == 'test': print('Ignoring test dataset: ', file) continue if file_split[8] == 'breakdown-v': _kind = 'BV' elif file_split[8] == 'dcr': _kind = 'DCR' else: print('Ignoring file due to unknown kind: ', file) continue _vbias = float(f_split[1][:-1]) _temp = float(f_split[0][:-1]) _module = int(f_split[-2]) datasets.append(dataset(file, _kind, _module, _temp, _vbias)) elif self.run_number == 5: for file in all_root_files: file_split = file.split('/') f_split = file_split[-1].split('_') if f_split[0] == 'test': print('Ignoring test dataset: ', file) continue if file_split[8] == 'breakdown-v': _kind = 'BV' elif file_split[8] == 'dcr': _kind = 'DCR' else: print('Ignoring file due to unknown kind: ', file) continue _temp = float(f_split[0][:-1]) _vbias = float(f_split[1] + '.' + f_split[2][:-1]) _module = int(f_split[-2]) datasets.append(dataset(file, _kind, _module, _temp, _vbias)) else: raise NotImplementedError("Run not implemented yet.") return datasets def get_run_df(self) -> pd.DataFrame: """Get a frienly pandas dataframe with all the datasets available, their kind, V, T, module and path. Returns: pd.DataFrame: all the available datasets in the run. """ dataset_list = self.datasets dicts_list = [ds.dict for ds in dataset_list] dataset_df = pd.DataFrame(dicts_list) dataset_df = dataset_df.sort_values( ['kind', 'temp', 'vbias', 'module'], ignore_index=True) return dataset_df
Methods
def define_ADC_config(self, F_amp: float, model: str = 'v1724') ‑> NoneType
-
Load the ADC related quantities for the run.
Args: model (str): model of the digitizer F_amp (float): signal amplification from the sensor (pre-amp * external amplification on rack).
Expand source code
def define_ADC_config(self, F_amp: float, model: str = 'v1724') -> None: """Load the ADC related quantities for the run. Args: model (str): model of the digitizer F_amp (float): signal amplification from the sensor (pre-amp * external amplification on rack). """ self.ADC_config = load_ADC_config(model, F_amp)
def fetch_datasets(self) ‑> list
-
Get all the datasets of a given run.
This method needs to be adapted to the specific data storage system.
Returns
list
- list of all the datasets of a given run. Elements are type dataset.
Expand source code
def fetch_datasets(self) -> list: """Get all the datasets of a given run. This method needs to be adapted to the specific data storage system. Returns: list: list of all the datasets of a given run. Elements are type dataset. """ all_root_files = self.root_files datasets = [] if self.run_number == 9: self.root_files = [] temps = [190, 195, 200, 205, 210] for t in temps: root_files = glob( f'{self.main_data_path}{str(t)}/breakdown-v/**/*.root', recursive=True) for path in root_files: f = path.split('/')[-1].split('_') if f[0] == 'test': continue v = float(f'{f[0]}.{f[1][:-1]}') datasets.append(dataset(path, 'BV', 0, t, v)) # LED OFF for t in temps: root_files = glob( f'{self.main_data_path}{str(t)}/dcr/**/*.root', recursive=True) for path in root_files: f = path.split('/')[-1].split('_') if f[0] == 'test': continue v = float(f'{f[0]}.{f[1][:-1]}') datasets.append(dataset(path, 'DCR', 0, t, v)) if self.run_number >= 6: for file in all_root_files: try: split_file_path = file.split('/') _module = int(split_file_path[-1][-8]) _temp = float(split_file_path[-1][-27:-24]) _vbias = float( split_file_path[-1][-22:-17].replace('_', '.')) if split_file_path[-1][0] == 'B': _kind = 'BV' elif split_file_path[-1][0] == 'D': _kind = 'DCR' elif split_file_path[-1][0] == 'f': _kind = 'fplt' else: print('Ignoring file: ', file) continue datasets.append( dataset(file, _kind, _module, _temp, _vbias)) except BaseException: print('Ignoring file: ', file) elif self.run_number == 1: for file in all_root_files: file_split = file.split('/') f_split = file_split[-1].split('_') if f_split[0] == 'test': print('Ignoring test dataset: ', file) continue if file_split[8] == 'breakdown-v': _kind = 'BV' _vbias = float(f_split[1] + '.' + f_split[2][:-1]) elif file_split[8] == 'dcr': _kind = 'DCR' if len(f_split) == 5: _vbias = float(f_split[1][:-1]) else: _vbias = float( f_split[1] + '.' + f_split[2][:-1]) else: print('Ignoring file due to unknown kind: ', file) continue _temp = float(f_split[0][:-1]) _module = int(f_split[-2]) datasets.append(dataset(file, _kind, _module, _temp, _vbias)) elif self.run_number in (2, 3): for file in all_root_files: file_split = file.split('/') f_split = file_split[-1].split('_') if f_split[0] == 'test': print('Ignoring test dataset: ', file) continue if file_split[8] == 'breakdown-v': _kind = 'BV' _vbias = float(f_split[1] + '.' + f_split[2][:-1]) elif file_split[8] == 'dcr': _kind = 'DCR' _vbias = float(f_split[1][:-1]) else: print('Ignoring file due to unknown kind: ', file) continue _temp = float(f_split[0][:-1]) _module = int(f_split[-2]) datasets.append(dataset(file, _kind, _module, _temp, _vbias)) elif self.run_number == 4: for file in all_root_files: file_split = file.split('/') f_split = file_split[-1].split('_') if f_split[0] == 'test': print('Ignoring test dataset: ', file) continue if file_split[8] == 'breakdown-v': _kind = 'BV' elif file_split[8] == 'dcr': _kind = 'DCR' else: print('Ignoring file due to unknown kind: ', file) continue _vbias = float(f_split[1][:-1]) _temp = float(f_split[0][:-1]) _module = int(f_split[-2]) datasets.append(dataset(file, _kind, _module, _temp, _vbias)) elif self.run_number == 5: for file in all_root_files: file_split = file.split('/') f_split = file_split[-1].split('_') if f_split[0] == 'test': print('Ignoring test dataset: ', file) continue if file_split[8] == 'breakdown-v': _kind = 'BV' elif file_split[8] == 'dcr': _kind = 'DCR' else: print('Ignoring file due to unknown kind: ', file) continue _temp = float(f_split[0][:-1]) _vbias = float(f_split[1] + '.' + f_split[2][:-1]) _module = int(f_split[-2]) datasets.append(dataset(file, _kind, _module, _temp, _vbias)) else: raise NotImplementedError("Run not implemented yet.") return datasets
def get_all_files_of_run(self) ‑> list
-
Look for all the raw files stored for a given run.
Returns
list
- list of all ROOT files in the run.
Expand source code
def get_all_files_of_run(self) -> list: """Look for all the raw files stored for a given run. Returns: list: list of all ROOT files in the run. """ all_root_files = glob( self.main_run_path + '**/*.root', recursive=True) return all_root_files
def get_run_df(self) ‑> pandas.core.frame.DataFrame
-
Get a frienly pandas dataframe with all the datasets available, their kind, V, T, module and path.
Returns
pd.DataFrame
- all the available datasets in the run.
Expand source code
def get_run_df(self) -> pd.DataFrame: """Get a frienly pandas dataframe with all the datasets available, their kind, V, T, module and path. Returns: pd.DataFrame: all the available datasets in the run. """ dataset_list = self.datasets dicts_list = [ds.dict for ds in dataset_list] dataset_df = pd.DataFrame(dicts_list) dataset_df = dataset_df.sort_values( ['kind', 'temp', 'vbias', 'module'], ignore_index=True) return dataset_df
def get_run_path(self) ‑> str
-
Creates string with the run raw data directory.
Returns
str
- path to run raw data.
Expand source code
def get_run_path(self) -> str: """Creates string with the run raw data directory. Returns: str: path to run raw data. """ if self.run_number < 6: main_run_path = self.main_data_path + \ f'run{self.run_number}/' else: main_run_path = self.main_data_path + \ f'run{self.run_number}/data/' return main_run_path
def read_layout(self)
-
Fetch the SiPM layout from a file. layout: dict(mod0 = dict(ch# = dict('tile': str 'mppc:[###,###,…] ) ch# = dict('tile': str 'mppc:[###,###,…] ) ), mod1 = dict(ch# = dict('tile': str 'mppc:[###,###,…] ) ch# = dict('tile': str 'mppc:[###,###,…] ) ), )
fetch info in the form: which tile: layout[
][ ]['tile'] -> str which mppc(s): layout[ ][ ]['mppc'] -> list of ints Expand source code
def read_layout(self): """Fetch the SiPM layout from a file. layout: dict(mod0 = dict(ch# = dict('tile': str 'mppc:[###,###,...] ) ch# = dict('tile': str 'mppc:[###,###,...] ) ), mod1 = dict(ch# = dict('tile': str 'mppc:[###,###,...] ) ch# = dict('tile': str 'mppc:[###,###,...] ) ), ) fetch info in the form: which tile: layout[<module>][<channel>]['tile'] -> str which mppc(s): layout[<module>][<channel>]['mppc'] -> list of ints """ raise NotImplementedError
class xenoscope_run (run_number: int, main_data_path: str, F_amp: float, ADC_model: str = 'v1724', signal_negative_polarity: bool = True)
-
Main class of a Xenoscope run. It contains the necessary functions to find, load and process the data of muon and chntrg data taking. Some DB would be much better, please make a PR.
Expand source code
class xenoscope_run(): """Main class of a Xenoscope run. It contains the necessary functions to find, load and process the data of muon and chntrg data taking. Some DB would be much better, please make a PR. """ def __init__(self, run_number: int, main_data_path: str, F_amp: float, ADC_model: str = 'v1724', signal_negative_polarity: bool = True): self.run_number = run_number self.main_data_path = main_data_path self.root_files = self.get_all_files_of_run() self.files_df, self.files_fail = self.get_all_datasets() self.define_ADC_config(F_amp=F_amp, model=ADC_model) self.signal_negative_polarity = signal_negative_polarity self.load_labels() def __repr__(self) -> str: repr = f'Run {self.run_number}' return repr def get_all_files_of_run(self) -> list: """Look for all the raw files stored for a given run. Returns: list: list of all ROOT files in the run. """ all_root_files = glob( self.main_data_path + '/**/*.root', recursive=True) return all_root_files def get_run_info(self, file): file_name = file.split('/')[-1] specific_name = file_name[:-16] run_type = specific_name.split('_')[0] summary_file = f'{self.main_data_path}/{specific_name}/Summary_{specific_name}.txt' end, duration, n_events = get_summary_info(summary_file) start = end - duration module = file_name[-8] info_dict = {'start': start, 'end': end, 'duration': duration, 'run_type': run_type, 'n_events': int(n_events), 'module': int(module), 'path': file} return info_dict def get_all_datasets(self) -> Union[pd.DataFrame, Tuple]: files_df = pd.DataFrame(columns=['start', 'end', 'duration', 'run_type', 'n_events', 'module', 'path']) files_fail = [] for file in self.root_files: try: files_df = files_df.append(self.get_run_info(file), ignore_index=True) # type: ignore except FileNotFoundError: files_fail.append(file) files_df.sort_values(by=['start', 'module'], inplace=True, ignore_index=True) return files_df, files_fail def define_ADC_config(self, F_amp: float, model: str = 'v1724') -> None: """Load the ADC related quantities for the run. Args: model (str): model of the digitizer F_amp (float): signal amplification from the sensor (pre-amp * external amplification on rack). """ self.ADC_config = load_ADC_config(model, F_amp) def load_labels(self) -> None: """Define the labeling of all the channels and corresponding tiles. """ labels_complete = {'mod0': {'wf1': 'wf1 | Tile H', 'wf2': 'wf2 | Tile J', 'wf3': 'wf3 | Tile K', 'wf4': 'wf4 | Tile L', 'wf5': 'wf5 | Tile M', 'wf6': 'wf6 | Muon detector 1', 'wf7': 'wf7 | Muon detector 2'}, 'mod1': {'wf1': 'wf1 | Tile A', 'wf2': ' wf2 | Tile B', 'wf3': 'wf3 | Tile C', 'wf4': 'wf4 | Tile D', 'wf5': 'wf5 | Tile E', 'wf6': 'wf6 | Tile F', 'wf7': 'wf7 | Tile G'} } labels_tiles = {'mod0': {'wf1': 'H', 'wf2': 'J', 'wf3': 'K', 'wf4': 'L', 'wf5': 'M', 'wf6': 'Muon detector 1', 'wf7': 'Muon detector 2'}, 'mod1': {'wf1': 'A', 'wf2': 'B', 'wf3': 'C', 'wf4': 'D', 'wf5': 'E', 'wf6': 'F', 'wf7': 'G'} } self.labels_complete = labels_complete self.labels_tiles = labels_tiles
Methods
def define_ADC_config(self, F_amp: float, model: str = 'v1724') ‑> NoneType
-
Load the ADC related quantities for the run.
Args: model (str): model of the digitizer F_amp (float): signal amplification from the sensor (pre-amp * external amplification on rack).
Expand source code
def define_ADC_config(self, F_amp: float, model: str = 'v1724') -> None: """Load the ADC related quantities for the run. Args: model (str): model of the digitizer F_amp (float): signal amplification from the sensor (pre-amp * external amplification on rack). """ self.ADC_config = load_ADC_config(model, F_amp)
def get_all_datasets(self) ‑> Union[pandas.core.frame.DataFrame, Tuple]
-
Expand source code
def get_all_datasets(self) -> Union[pd.DataFrame, Tuple]: files_df = pd.DataFrame(columns=['start', 'end', 'duration', 'run_type', 'n_events', 'module', 'path']) files_fail = [] for file in self.root_files: try: files_df = files_df.append(self.get_run_info(file), ignore_index=True) # type: ignore except FileNotFoundError: files_fail.append(file) files_df.sort_values(by=['start', 'module'], inplace=True, ignore_index=True) return files_df, files_fail
def get_all_files_of_run(self) ‑> list
-
Look for all the raw files stored for a given run.
Returns
list
- list of all ROOT files in the run.
Expand source code
def get_all_files_of_run(self) -> list: """Look for all the raw files stored for a given run. Returns: list: list of all ROOT files in the run. """ all_root_files = glob( self.main_data_path + '/**/*.root', recursive=True) return all_root_files
def get_run_info(self, file)
-
Expand source code
def get_run_info(self, file): file_name = file.split('/')[-1] specific_name = file_name[:-16] run_type = specific_name.split('_')[0] summary_file = f'{self.main_data_path}/{specific_name}/Summary_{specific_name}.txt' end, duration, n_events = get_summary_info(summary_file) start = end - duration module = file_name[-8] info_dict = {'start': start, 'end': end, 'duration': duration, 'run_type': run_type, 'n_events': int(n_events), 'module': int(module), 'path': file} return info_dict
def load_labels(self) ‑> NoneType
-
Define the labeling of all the channels and corresponding tiles.
Expand source code
def load_labels(self) -> None: """Define the labeling of all the channels and corresponding tiles. """ labels_complete = {'mod0': {'wf1': 'wf1 | Tile H', 'wf2': 'wf2 | Tile J', 'wf3': 'wf3 | Tile K', 'wf4': 'wf4 | Tile L', 'wf5': 'wf5 | Tile M', 'wf6': 'wf6 | Muon detector 1', 'wf7': 'wf7 | Muon detector 2'}, 'mod1': {'wf1': 'wf1 | Tile A', 'wf2': ' wf2 | Tile B', 'wf3': 'wf3 | Tile C', 'wf4': 'wf4 | Tile D', 'wf5': 'wf5 | Tile E', 'wf6': 'wf6 | Tile F', 'wf7': 'wf7 | Tile G'} } labels_tiles = {'mod0': {'wf1': 'H', 'wf2': 'J', 'wf3': 'K', 'wf4': 'L', 'wf5': 'M', 'wf6': 'Muon detector 1', 'wf7': 'Muon detector 2'}, 'mod1': {'wf1': 'A', 'wf2': 'B', 'wf3': 'C', 'wf4': 'D', 'wf5': 'E', 'wf6': 'F', 'wf7': 'G'} } self.labels_complete = labels_complete self.labels_tiles = labels_tiles