diff --git a/caldb/equalization/sim_xpol3_equalization-3p68_uniform_v001.h5 b/caldb/equalization/sim_xpol3_equalization-3p68_uniform_v001.h5 new file mode 100644 index 0000000..8c66c78 Binary files /dev/null and b/caldb/equalization/sim_xpol3_equalization-3p68_uniform_v001.h5 differ diff --git a/docs/release_notes.rst b/docs/release_notes.rst index 5f63f1b..298e549 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -4,6 +4,34 @@ Release notes ============= +* Package dependencies updated: `joblib` and `iminuit` added. +* New calibration matrix type `equalization` added in `caldb.py` to store the pixel equalization + factors. +* New `CalibrateEqualization` class to calibrate the pixel equalization factors. This is + performed with a likelihood fit using the source spectrum probability density function (PDF) + and the observed energy of the events. Corresponding CLI command `calibgen equalization` added + to generate equalization calibration files from event files. +* The `CalibrateGain` class has been refactored. Now it produces a gain calibration matrix from + an equalization calibration matrix, by using the ADC to eV conversion and the chip sensor + material. The gain matrix is expressed in ADC counts per electron. Corresponding CLI command + `calibgen gain` refactored to reflect this change. +* Some CLI commands refactored to use the equalization calibration file instead of the gain + calibration file (`reconstruct`, `display`, `calibgen eta`). +* New `pdf.py` module containing facilities to calculate the PDF of the source spectrum and store + it in a `.npz` file. +* Added `calibspec` CLI command to create a source spectrum PDF file from a reconstructed event + file. The PDF is saved as a `.npz` file. +* New `adc` method to calculate the total ADC counts of the event added to `ReconEvent`. Now a + reconstructed file contains both the ADC counts and the energy in eV of the events (if the + conversion factor from ADC to eV is available in the equalization calibration file). +* New `energy` method to calculate the total energy of the event added to `Cluster`. The + conversion factor from ADC to eV is passed by `ClusteringNN`. +* Pull requests merged and issues closed: + + - https://github.com/lucabaldini/hexsample/pull/132 + - https://github.com/lucabaldini/hexsample/issues/129 + + Version 0.16.1 (2026-05-12) ~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/pyproject.toml b/pyproject.toml index 57db12f..726a8db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,8 @@ dependencies = [ "xraydb", "aptapy>=0.18.0", "scikit-image", + "iminuit", + "joblib", ] [project.optional-dependencies] diff --git a/scripts/populate_caldb.py b/scripts/populate_caldb.py index a95aa92..ebb5e04 100644 --- a/scripts/populate_caldb.py +++ b/scripts/populate_caldb.py @@ -23,11 +23,14 @@ """ import numpy as np +from xraydb import ionization_potential from hexsample.caldb import CalDB, CalibrationType from hexsample.tasks import SynthesizeCalibrationDefaults, synthesize_calibration_file +DEFAULT_IONIZATION_POTENTIAL = ionization_potential("Si") DEFAULT_GAIN = 1. +DEFAULT_ADC_TO_EV = DEFAULT_IONIZATION_POTENTIAL / DEFAULT_GAIN RMS_VALS = (0, 10) ENC_VALS = np.array((20., 30., 40., 50., 75., 100.)) PEDESTAL_VALS = (0, 1000) @@ -62,6 +65,7 @@ def populate_caldb() -> None: generate_files(CalibrationType.ENC, ENC_VALS, RMS_VALS) generate_files(CalibrationType.NOISE, ENC_VALS * DEFAULT_GAIN, RMS_VALS) generate_files(CalibrationType.GAIN, [DEFAULT_GAIN], RMS_VALS) + generate_files(CalibrationType.EQUALIZATION, [DEFAULT_ADC_TO_EV], [0.]) generate_files(CalibrationType.PEDESTAL, PEDESTAL_VALS, RMS_VALS) diff --git a/src/hexsample/caldb.py b/src/hexsample/caldb.py index 638d1ea..fb509ab 100644 --- a/src/hexsample/caldb.py +++ b/src/hexsample/caldb.py @@ -64,3 +64,9 @@ def open_gain(cls, designator: str) -> CalibrationMatrix: """Open the gain calibration file for the given designation. """ return cls._open(CalibrationType.GAIN, designator) + + @classmethod + def open_equalization(cls, designator: str) -> CalibrationMatrix: + """Open the equalization calibration file for the given designation. + """ + return cls._open(CalibrationType.EQUALIZATION, designator) diff --git a/src/hexsample/calibration.py b/src/hexsample/calibration.py index f14669a..4213157 100644 --- a/src/hexsample/calibration.py +++ b/src/hexsample/calibration.py @@ -20,22 +20,25 @@ """Calibration facilities. """ +import gc import pathlib from enum import Enum from itertools import product -from typing import Tuple +from typing import Optional, Tuple import h5py import numpy as np +import xraydb from aptapy.hist import Histogram3d from aptapy.models import Gaussian -from scipy.sparse import csr_matrix -from scipy.sparse.linalg import lsmr +from iminuit import Minuit +from joblib import Parallel, delayed +from scipy.sparse import csc_matrix, csr_matrix from tqdm import tqdm from .clustering import Cluster from .digi import DigiEventRectangular -from .recon import DEFAULT_IONIZATION_POTENTIAL +from .pdf import SpectrumPDF from .stats import RunningStats @@ -45,9 +48,10 @@ class CalibrationType(str, Enum): """ ENC = "enc" - PEDESTAL = "pedestal" - NOISE = "noise" + EQUALIZATION = "equalization" GAIN = "gain" + NOISE = "noise" + PEDESTAL = "pedestal" @classmethod def values(cls) -> Tuple[str, ...]: @@ -72,6 +76,9 @@ class CalibrationMetadata(str, Enum): VERSION = "version" CALIBRATION_TYPE = "calibration_type" IS_SYNTHETIC = "is_synthetic" + # This is to convert ADC counts to energy and it's stored only in the + # equalization matrix, and is measured in eV/ADC count. + ADC_TO_EV = "adc_to_ev" class CalibrationUnits(str, Enum): @@ -80,16 +87,18 @@ class CalibrationUnits(str, Enum): """ ENC = "Electrons" + EQUALIZATION = "" + GAIN = "ADC counts / electron" NOISE = "ADC counts" PEDESTAL = "ADC counts" - GAIN = "ADC counts / electron" CALIBRATION_UNITS = { CalibrationType.ENC: CalibrationUnits.ENC, + CalibrationType.EQUALIZATION: CalibrationUnits.EQUALIZATION, + CalibrationType.GAIN: CalibrationUnits.GAIN, CalibrationType.NOISE: CalibrationUnits.NOISE, CalibrationType.PEDESTAL: CalibrationUnits.PEDESTAL, - CalibrationType.GAIN: CalibrationUnits.GAIN } @@ -108,9 +117,9 @@ class CalibrationMatrix: The number of rows of the detector readout chip. """ - VALUES = "values" ENTRIES = "entries" ERRORS = "errors" + VALUES = "values" def __init__(self, num_cols: int, num_rows: int) -> None: """Class constructor. @@ -123,6 +132,7 @@ def __init__(self, num_cols: int, num_rows: int) -> None: self._errors = np.full(self._shape, np.nan) # Other useful information for the metadata self.num_events = 0 + self._cached = False self._metadata = { CalibrationMetadata.NUM_COLS: num_cols, CalibrationMetadata.NUM_ROWS: num_rows @@ -149,6 +159,8 @@ def values(self, new_matrix: np.ndarray) -> None: raise ValueError(f"Input matrix has shape {new_matrix.shape}, but expected shape is " f"{self._shape}.") self._values = new_matrix + # Invalidate the cached metadata since the values of the matrix have changed. + self._cached = False @property def entries(self) -> np.ndarray: @@ -166,6 +178,8 @@ def entries(self, new_entries: np.ndarray) -> None: raise ValueError(f"Input entries matrix has shape {new_entries.shape}, but expected " f"shape is {self._shape}.") self._entries = new_entries + # Invalidate the cached metadata since the number of events has changed. + self._cached = False @property def errors(self) -> np.ndarray: @@ -182,11 +196,15 @@ def errors(self, new_error: np.ndarray) -> None: raise ValueError(f"Input error matrix has shape {new_error.shape}, but expected shape " f"is {self._shape}.") self._errors = new_error + # Invalidate the cached metadata since the error values have changed. + self._cached = False @property def metadata(self) -> dict: """Return the metadata of the calibration matrix. """ + if self._cached: + return self._metadata mask = self._entries > 0 # If there are no pixels with events, we can set the average, minimum and maximum number of # events to zero. @@ -202,8 +220,21 @@ def metadata(self) -> dict: self._metadata[CalibrationMetadata.ENTRIES_MIN] = entries_min self._metadata[CalibrationMetadata.ENTRIES_MAX] = entries_max self._metadata[CalibrationMetadata.NUM_CALIBRATED_PIXELS] = int(mask.sum()) + self._cached = True return self._metadata + def update_metadata(self, key: CalibrationMetadata, value) -> None: + """Update the metadata of the calibration matrix with a new key-value pair. + + Arguments + --------- + key : CalibrationMetadata + The key of the metadata to be updated. + value : + The value of the metadata to be updated. + """ + self._metadata[key] = value + def set_value(self, value: float) -> None: """Set a value for all the pixels in the calibration matrix. @@ -213,6 +244,7 @@ def set_value(self, value: float) -> None: The value to be set for all the pixels in the calibration matrix. """ self._values = np.full(self._shape, value) + self._cached = False def fill(self, value: float, max_hits: int = 0) -> None: """Substitute the value of the pixels with less hits than or equal to a certain threshold @@ -228,6 +260,7 @@ def fill(self, value: float, max_hits: int = 0) -> None: The maximum number of hits for a pixel to be considered for replacement. """ self._values = np.where(self._entries <= max_hits, value, self._values) + self._cached = False def mean(self, min_hits: int = 1) -> float: """Return the mean value of the calibration matrix, calculated as the mean of the pixels @@ -296,6 +329,7 @@ def from_hdf5(cls, file_path: str) -> "CalibrationMatrix": attrs = dict(h5file.attrs) # Instantiate the object with the attributes loaded from the header. obj = cls(num_cols=attrs["num_cols"], num_rows=attrs["num_rows"]) + obj.num_events = attrs.get(CalibrationMetadata.NUM_EVENTS.value) for key, val in attrs.items(): obj._metadata[key] = val # Load the matrix and the hits matrices from the HDF5 file. @@ -321,8 +355,9 @@ def __str__(self) -> str: """ if CalibrationMetadata.FILE_NAME in self._metadata: return self._metadata[CalibrationMetadata.FILE_NAME] - return f"CalibrationMatrix(num_cols={self._metadata[CalibrationMetadata.NUM_COLS]}, " \ - f"num_rows={self._metadata[CalibrationMetadata.NUM_ROWS]})" + return f"CalibrationMatrix( " \ + f"num_cols={self._metadata[CalibrationMetadata.NUM_COLS]}, " \ + f"num_rows={self._metadata[CalibrationMetadata.NUM_ROWS]})" class CalibrateBase: @@ -656,32 +691,188 @@ def fit(self) -> Tuple[CalibrationMatrix, CalibrationMatrix]: raise ValueError(f"Invalid algorithm {self._algorithm} for the dark calibration.") -class CalibrateGain(CalibrateBase): +# The following methods are used for the equalization matrix calibration. They cannot be moved +# inside the class because they are used as functions to be executed in parallel, and they +# need to be defined at the top level of the module to be picklable by joblib. - """Calibrate the gain of the detector by analyzing the events in a DigiFile. This class takes a - CalibrationMatrix object as input, and updates the matrix with the data from the file. - At the moment, the dataset used for this task should contain events from a monochromatic source - and with known energy. +def _likelihood_fit(data: csr_matrix, conv_factor: float, pdf: SpectrumPDF, + pdf_derivative: callable) -> Optional[Tuple[np.ndarray, np.ndarray]]: + """Perform the likelihood fit for a region of the detector. + + Returns + ------- + equalization : np.ndarray + The equalization values for the pixels in the region, calculated from the fit. + error : np.ndarray + The error on the equalization values for the pixels in the region, calculated + from the fit. + """ + # Define the negative log-likelihood function for the fit. + def nll(pars): + total_adc = data @ pars + p = pdf(total_adc * conv_factor) + # To avoid negative or zero values in the log, clip p to be larger than 1e-10. + p_clipped = np.clip(p, 1e-10, None) + return -np.sum(np.log(p_clipped)) + # Define the gradient of the log-likelihood function for the fit. + def nll_grad(pars): + total_adc = data @ pars + p = pdf(total_adc * conv_factor) + p_clipped = np.clip(p, 1e-10, None) + dp = pdf_derivative(total_adc * conv_factor) + grad = -data.T @ (dp / p_clipped) * conv_factor + return np.asarray(grad).flatten() + # Define the initial parameters for the fit. + init_pars = np.ones(data.shape[1]) + # Initialize the Minuit minimizer. + m = Minuit(nll, init_pars, grad=nll_grad) + m.limits = [(1e-10, None) for _ in range(len(init_pars))] + m.errordef = Minuit.LIKELIHOOD + m.migrad() + # If the first fit is not successful, try to perform a second fit. We are not passing + # the gradient, because the fit should be more robust (but slower). + if not m.valid: + m = Minuit(nll, init_pars) + m.limits = [(1e-10, None) for _ in range(len(init_pars))] + m.errordef = Minuit.LIKELIHOOD + m.migrad() + # If the fit is successful, return the pixel equalization values and their errors, + # otherwise return None. + if m.valid: + equalization = 1 / np.array(m.values) + error = m.errors / np.array(m.values)**2 + return equalization, error + return None + + +def _cut_data(data: csc_matrix, ncols: int, cols: np.ndarray, rows: np.ndarray, + event_sum: np.ndarray) -> Tuple[csr_matrix, np.ndarray]: + """Cut the data to select only the events that have all the charge contained + in the pixels of the subregion defined by the input column and row coordinates. + This is done to ensure that the fit is performed only on events that are fully + contained in the subregion. Arguments --------- - cal_matrix : CalibrationMatrix - Calibration matrix to be updated with the gain values calculated from the data. - energy : float - The energy of the monochromatic source, in eV. + data : csr_matrix + The sparse matrix containing the pha values for the events in the entire detector. + cols : np.ndarray + The column coordinates of the pixels in the subregion. + rows : np.ndarray + The row coordinates of the pixels in the subregion. + event_sum : np.ndarray + The total ADC count for each event. + + Returns + ------- + data_active_pixels : csr_matrix + The sparse matrix containing the pha values for the events in the subregion, with + only the active pixels. + mask : np.ndarray + The boolean mask indicating which pixels in the subregion are active. + """ + # Calculate the index of the pixels in the flattened array to select the + # correct columns of the sparse matrix. + pixels_idxs = (rows * ncols + cols).astype(int) + # Select all the events but only the columns corresponding to the pixels + # in the subregion defined by the indices. + data_region = data[:, pixels_idxs] + # Re-convert to CSR format for efficient row slicing. + data_region = data_region.tocsr() + # Select only the events in the subregion where all the charge is contained + # in the pixels of the subregion. Add also another cut to select only the + # events with total charge above 0 ADC, just to be sure to remove empty events. + sub_region_event_sum = data_region.sum(axis=1).A1 + mask = (sub_region_event_sum == event_sum) & (sub_region_event_sum > 0) + data_good_events = data_region[mask] + # We want to remove the pixels in the subregion that never got hit, since + # only pixels with signal can be fitted. This operation ensures that the + # number of fit parameters is equal to the number of active pixels. + pixel_sum = data_good_events.sum(axis=0).A1 + active_mask = pixel_sum > 0 + data_active_pixels = data_good_events[:, active_mask] + return data_active_pixels, active_mask + + +def _fit_block( + r_start: int, c_start: int, size: int, nrows: int, ncols: int, data: csc_matrix, + event_sum: np.ndarray, conv_factor: float, pdf: SpectrumPDF, pdf_derivative: callable + ) -> Optional[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]]: + """Method to perform the fit for a block of pixels in the detector. + This method is meant to be used as a function to be executed in parallel for different + blocks of pixels. + + .. note:: This function must be defined outside the class to be picklable and usable + in parallel execution. + + Returns + ------- + gain : np.ndarray + The gain values for the pixels in the block, calculated from the fit. + error : np.ndarray + The error on the gain values for the pixels in the block, calculated from the fit. + rows : np.ndarray + The row coordinates of the active pixels in the block. + cols : np.ndarray + The column coordinates of the active pixels in the block. + num_events_per_pixel : np.ndarray + The number of events for each active pixel in the block. + """ + # Calculate the end row and column indices, taking into account the possibility + # of being at the end of the chip. + r_end = min(r_start + size, nrows) + c_end = min(c_start + size, ncols) + # Create a meshgrid to get all the coordinates of the pixels in the block. + cols, rows = np.meshgrid(np.arange(c_start, c_end), np.arange(r_start, r_end)) + cols = cols.flatten() + rows = rows.flatten() + # Cut the data to select only the events that have signal in the block. + data_fit, active_mask = _cut_data(data, ncols, cols, rows, event_sum) + num_events_per_pixel = np.asarray((data_fit > 0).sum(axis=0)).flatten() + # Check on the data: if there are no good events or no active pixels, + # we cannot perform the fit, so we return None. + if data_fit.nnz == 0 or data_fit.shape[1] == 0: + return None + # Fit the data of the block. + result = _likelihood_fit(data_fit, conv_factor, pdf, pdf_derivative) + # If the fit is not successful, return None and continue with the next block. + if result is None: + return None + # If the fit is successful, return the equalization values and their errors and the + # coordinates of the active pixels. + return result[0], result[1], rows[active_mask], cols[active_mask], num_events_per_pixel + + +class CalibrateEqualization(CalibrateBase): + + """Calculate the equalization matrix for the detector readout. This matrix is used + to convert the PHA values to Pulse Invariant (PI). By the definition, the mean value + of the matrix is 1.0. + + The calibration is peformed by analyzing events in a DigiFile. + + Arguments + --------- + num_cols : int + The number of columns of the detector readout chip. + num_rows : int + The number of rows of the detector readout chip. + pdf : SpectrumPDF + The probability density function of the spectrum of the events in the dataset, which is + used for the likelihood fit to calculate the equalization values for the pixels. """ - def __init__(self, num_cols: int, num_rows: int, energy: float) -> None: + def __init__(self, num_cols: int, num_rows: int, pdf: SpectrumPDF) -> None: """Class constructor. """ super().__init__(num_cols, num_rows) - self._energy = energy - self._event_count = 0 self._pha = [] self._coords = [] self._event_rows = [] + self._pdf = pdf + self._pdf_derivative = pdf.derivative def analyze_cluster(self, cluster: Cluster) -> None: """Analyze the event cluster to update the calibration matrix. @@ -691,57 +882,145 @@ def analyze_cluster(self, cluster: Cluster) -> None: rows = cluster.row # Update the arrays for the least squares fit. self._pha.extend(cluster.pha) + ncols = self.cal_matrix.shape[1] for col, row in zip(cols, rows): # Calculate the index of the pixel in the flattened array - i = row * self.cal_matrix.shape[1] + col - self._coords.append(i) + self._coords.append(row * ncols + col) self._event_rows.append(self._event_count) - # Update the matrix with the number of events for each pixel - self.cal_matrix.entries[row, col] += 1 # Update the event count self._event_count += 1 self.cal_matrix.num_events += 1 - def fit(self) -> CalibrationMatrix: - """Perform the least squares fit to determine the gain of each pixel. + def fit(self, size: int) -> CalibrationMatrix: + """Fit the collected events to determine the gain of each pixel. + + Arguments + --------- + size : int + The length of the square chip region to be fitted simultaneously. The optimal + value for this parameter is a trade-off between the number of active pixels in + the chip and the computational time of the fit, which increases significantly + with the number of pixels. For small active regions, a size of 6 can be a good + choice, while for the full chip calibration, 10 is a good value. + + Returns + ------- + cal_matrix : CalibrationMatrix + Updated calibration matrix with the gain values calculated from the data. """ - if self._event_count == 0: - raise ValueError("No events have been analyzed, cannot perform the fit.") + # Create a sparse matrix where the rows correspond to the events and the columns + # correspond to the pixels. In each row, the non-zero entries correspond to the + # pha values of the pixels that are hit in the event. nrows, ncols = self.cal_matrix.shape - # Create the sparse matrix for the least squares fit. This object allows to store - # and use efficiently the large and sparse matrix that we need for the fit. shape = (self._event_count, nrows * ncols) - a = csr_matrix((self._pha, (self._event_rows, self._coords)), shape=shape) - # Create the vector of the expected number of electrons. - b = np.full(self._event_count, self._energy / DEFAULT_IONIZATION_POTENTIAL) - # Perform the fit - results = lsmr(a, b) - # Get the best-fit weight vector and reshape it to the shape of the calibration matrix. - weight = results[0].reshape((nrows, ncols)) - signal_power = np.array(a.multiply(a).sum(axis=0)).reshape((nrows, ncols)) - # Calculate the number of degrees of freedom and the mean squared error. - dof = self._event_count - (ncols * nrows) - mse = results[3]**2 / max(dof, 1) - # Calculate the uncertainty of the gain best-fit values. - sigma_w = np.sqrt(mse / (signal_power + 1e-15)) - with np.errstate(divide='ignore', invalid='ignore'): - sigma_g_rel = sigma_w / np.abs(weight) - # Mask for the pixels that have a weight value close to zero (no events) - mask = np.abs(weight) > 1e-10 - values = self.cal_matrix.values.copy() - entries = self.cal_matrix.entries - # Set the gain value for the pixels that pass the quality cut. - values[mask] = 1 / weight[mask] - # Set the entries to zero for the pixels that don't pass the quality cut. - entries[~mask] = 0 - # Write back through the setter so updates persist on the shared object. + # Convert the lists to numpy array of int16 to save memory. + data_pha = np.array(self._pha, dtype=np.int16) + data_coords = np.array(self._coords, dtype=np.int32) + data_event_rows = np.array(self._event_rows, dtype=np.int32) + # Empty the lists to free memory. + self._pha = [] + self._coords = [] + self._event_rows = [] + # Create the sparse matrix with the collected data. + data_csr = csr_matrix((data_pha, (data_event_rows, data_coords)), shape=shape) + # Calculate the sum of the ADC counts in each event, which is used for the data + # cuts in the fit, and to calculate the mean ADC count in an event. + event_sum = data_csr.sum(axis=1).A1 + # Calculate the conversion factor from ADC to eV. + adc_to_ev = self._pdf.mean() / event_sum.mean() + # Calculate the starting column and row indices for the blocks. We are using an + # overlap of 2 pixels between the blocks to ensure that pixels on the edges are + # correctly calibrated. + col_starts = np.arange(0, ncols - 2, size - 2) + row_starts = np.arange(0, nrows - 2, size - 2) + block_indices = list(product(row_starts, col_starts)) + # Create the arrays to store the weighted gain values and the sum of the weights + # for each pixel. We will use these arrays to calculate a weighted average of the + # results, since some pixels are fitted multiple times due to the overlap. + weighted_gains = np.zeros((nrows, ncols)) + sum_weights = np.zeros((nrows, ncols)) + entries = np.zeros((nrows, ncols), dtype=int) + # Convert the CSR matrix to CSC format to optimize column slicing during data cuts. + data_csc = data_csr.tocsc() + # Free the memory used by the CSR matrix and temporary arrays. + del data_csr, data_pha, data_coords, data_event_rows + gc.collect() + # Start the parallel processing of the blocks. + args = (size, nrows, ncols, data_csc, event_sum, adc_to_ev, + self._pdf, self._pdf_derivative) + bar_format = "{desc}: {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]" + results = Parallel(n_jobs=-1, backend="loky")( + delayed(_fit_block)( + r_start, c_start, *args) + for r_start, c_start in tqdm(block_indices, total=len(block_indices), + bar_format=bar_format) + ) + # Now we need to aggregate the results from the blocks. + for _result in results: + # If the block is empty or the fit is not successful, move to the next block. + if _result is None: + continue + # Unpack the results from the block fit. + gain, error, rows, cols, num_events_per_pixel = _result + weights = 1 / (error**2 + 1e-10) + # Use the mask of the active pixels to update the weighted gains and the sum + # of the weights matrices for the pixels in the block that have been fitted. + np.add.at(weighted_gains, (rows, cols), gain * weights) + np.add.at(sum_weights, (rows, cols), weights) + np.maximum.at(entries, (rows, cols), num_events_per_pixel) + # Calculate the final values and errors as the weighted average of the fit results. + values = np.divide(weighted_gains, sum_weights, + out=np.full_like(weighted_gains, np.nan), + where=sum_weights > 0) + errors = np.full_like(weighted_gains, np.nan) + np.sqrt(sum_weights, out=errors, where=sum_weights > 0) + np.divide(1, errors, out=errors, where=sum_weights > 0) + # Write the results back to the calibration matrix. self.cal_matrix.values = values - self.cal_matrix.errors = np.where(mask, sigma_g_rel * values, self.cal_matrix.errors) + self.cal_matrix.errors = errors self.cal_matrix.entries = entries + self.cal_matrix.update_metadata(CalibrationMetadata.ADC_TO_EV, adc_to_ev) + return self.cal_matrix + + +class CalibrateGain: + + """Class for calibrating the gain of the readout chip. + + This class provides methods to calculate the gain values based on the equalization + matrix and sensor material properties. + """ + + def __init__(self, equalization_matrix: CalibrationMatrix, material_symbol: str) -> None: + """Class constructor. + """ + num_rows, num_cols = equalization_matrix.shape + self.cal_matrix = CalibrationMatrix(num_cols, num_rows) + self.equalization_matrix = equalization_matrix + self.material_symbol = material_symbol + + def fit(self) -> CalibrationMatrix: + """Calculate the gain values based on the equalization matrix and the sensor material + properties, and update the calibration matrix with the calculated values. + """ + # Get the ionization potential of the sensor material and the conversion factor + # from ADC to eV from the metadata of the equalization matrix. + ionization_potential = xraydb.ionization_potential(self.material_symbol) + adc_to_ev = self.equalization_matrix.metadata[CalibrationMetadata.ADC_TO_EV] + # Calculate the conversion factor from ADC to electrons. + adc_to_electrons = adc_to_ev / ionization_potential + # Calculate the gain values as the equalization values divided by the conversion + # factor and update the errors. + self.cal_matrix.values = self.equalization_matrix.values / adc_to_electrons + self.cal_matrix.errors = self.equalization_matrix.errors / adc_to_electrons + # Update some metadata for the gain matrix. + self.cal_matrix.entries = self.equalization_matrix.entries + self.cal_matrix.num_events = self.equalization_matrix.num_events return self.cal_matrix class CalibrateENC: + """Class for calibrating the equivalent noise charge (ENC) of the readout chip. This class provides methods to calculate the ENC values based on the noise and gain @@ -749,7 +1028,7 @@ class CalibrateENC: """ def __init__(self, noise_matrix: CalibrationMatrix, gain_matrix: CalibrationMatrix) -> None: - """Class constructor + """Class constructor. """ if noise_matrix.shape != gain_matrix.shape: raise ValueError("Noise and gain matrices must have the same shape.") diff --git a/src/hexsample/cli.py b/src/hexsample/cli.py index 956c0ec..871f2cf 100644 --- a/src/hexsample/cli.py +++ b/src/hexsample/cli.py @@ -29,6 +29,7 @@ calibration, hexagon, logging_, + pdf, pipeline, readout, roi, @@ -129,6 +130,13 @@ def __init__(self) -> None: self.add_enc_calibration_options(enc) self.add_logging_level(enc) enc.set_defaults(runner=pipeline.calibrate_enc) + # Pixel equalization calibration + equalization = calibrate_subparsers.add_parser("equalization", + help="calibrate the chip pixel equalization") + self.add_input_file(equalization) + self.add_calibrate_equalization_options(equalization) + self.add_logging_level(equalization) + equalization.set_defaults(runner=pipeline.calibrate_equalization) # Eta function calibration eta = calibrate_subparsers.add_parser("eta", help="calibrate the eta function") self.add_input_file(eta) @@ -137,8 +145,7 @@ def __init__(self) -> None: eta.set_defaults(runner=pipeline.calibrate_eta) # Gain calibration gain = calibrate_subparsers.add_parser("gain", help="calibrate the chip gain") - self.add_input_file(gain) - self.add_calibrate_gain_options(gain) + self.add_gain_calibration_options(gain) self.add_logging_level(gain) gain.set_defaults(runner=pipeline.calibrate_gain) # Noise calibration @@ -153,6 +160,14 @@ def __init__(self) -> None: self.add_logging_level(synthesize) synthesize.set_defaults(runner=pipeline.synthesize_calibration_file) + # Fit a spectrum to generate a PDF for the gain calibration? + calibspec = subparsers.add_parser("calibspec", + help="generate a pdf from a spectrum to use in the gain calibration", + formatter_class=self._FORMATTER_CLASS) + self.add_input_file(calibspec) + self.add_logging_level(calibspec) + calibspec.set_defaults(runner=pipeline.calibspec) + # Inspect a calibration matrix? calibview = subparsers.add_parser("calibview", help="inspect a calibration matrix", @@ -280,6 +295,16 @@ def add_cal_pedestal_file(parser: argparse.ArgumentParser, default: str, "calibration data or name of a calibration file inside the " \ "caldb/pedestal folder.") + @staticmethod + def add_cal_equalization_file(parser: argparse.ArgumentParser, default: str, + required: bool = False) -> None: + """Add an option for the equalization calibration file. + """ + parser.add_argument("--equalization", type=caldb.CalDB.open_equalization, default=default, + required=required, help="path to a file containing the equalization " \ + "calibration data or name of a calibration file inside the " \ + "caldb/equalization folder.") + @staticmethod def add_cal_gain_file(parser: argparse.ArgumentParser, default: str, required: bool = False) -> None: @@ -400,7 +425,7 @@ def add_recon_options(self, parser: argparse.ArgumentParser) -> None: type=str, default="centroid", help="How to reconstruct position") CliArgumentParser.add_cal_noise_file(group, default=None, required=True) CliArgumentParser.add_cal_pedestal_file(group, default=None, required=True) - CliArgumentParser.add_cal_gain_file(group, default=None, required=True) + CliArgumentParser.add_cal_equalization_file(group, default=None, required=True) group.add_argument("--eta_2pix_rad_sigma", default=defaults.eta_2pix_rad_sigma, type=float, help="probit function sigma parameter for two pixel" \ "events eta reconstruction") @@ -435,24 +460,25 @@ def add_calibrate_dark_options(self, parser: argparse.ArgumentParser) -> None: help="number of events to be analyzed in a batch for the dark" \ " calibration") - def add_calibrate_gain_options(self, parser: argparse.ArgumentParser) -> None: + def add_calibrate_equalization_options(self, parser: argparse.ArgumentParser) -> None: """Add an option group for the gain calibration properties. """ - defaults = tasks.CalibrationGainDefaults - parser.add_argument("energy", type=float, help="line energy in eV") + defaults = tasks.CalibrationEqualizationDefaults + parser.add_argument("pdf", type=pdf.SpectrumPDF.from_file, + help="path to the spectrum PDF file") + parser.add_argument("--size", type=int, default=defaults.size, + help="length of the square region of the chip to fit simultaneously") CliArgumentParser.add_cal_noise_file(parser, default=None, required=True) CliArgumentParser.add_cal_pedestal_file(parser, default=None, required=True) CliArgumentParser.add_zero_sup_threshold(parser, default=defaults.zero_sup_threshold) - CliArgumentParser.add_num_events(parser, default=defaults.num_events, - intent="used for the gain calibration") def add_calibrate_eta_options(self, parser: argparse.ArgumentParser) -> None: """Add an option group for the eta function calibration properties. """ CliArgumentParser.add_cal_noise_file(parser, default=None, required=True) CliArgumentParser.add_cal_pedestal_file(parser, default=None, required=True) - CliArgumentParser.add_cal_gain_file(parser, default=None, required=True) + CliArgumentParser.add_cal_equalization_file(parser, default=None, required=True) parser.add_argument("--num_bins", type=int, default=tasks.CalibrationEtaDefaults.num_bins, help="number of bins to be used in the eta function calibration") @@ -470,6 +496,19 @@ def add_enc_calibration_options(self, parser: argparse.ArgumentParser) -> None: help="directory where the generated ENC calibration file" \ " will be saved") + def add_gain_calibration_options(self, parser: argparse.ArgumentParser) -> None: + """Add an option group for the gain calibration properties. + """ + defaults = tasks.CalibrationGainDefaults + CliArgumentParser.add_cal_equalization_file(parser, default=None, required=True) + parser.add_argument("--material_symbol", type=str, + choices=sensor.material_symbols(), + default=defaults.material_symbol, + help="active sensor material") + parser.add_argument("--output_dir", type=str, default=defaults.output_dir, + help="directory where the generated gain calibration file" \ + " will be saved") + def add_synthesize_calibration_file_options(self, parser: argparse.ArgumentParser) -> None: """Add an option group to generate calibration files. """ @@ -501,7 +540,7 @@ def add_display_options(self, parser: argparse.ArgumentParser) -> None: default = tasks.DisplayDefaults CliArgumentParser.add_cal_noise_file(parser, default=default.noise_matrix) CliArgumentParser.add_cal_pedestal_file(parser, default=default.pedestal_matrix) - CliArgumentParser.add_cal_gain_file(parser, default=default.gain_matrix) + CliArgumentParser.add_cal_equalization_file(parser, default=default.equalization_matrix) def add_calibview_options(self, parser: argparse.ArgumentParser) -> None: """Add an option group for the calibration view properties. diff --git a/src/hexsample/clustering.py b/src/hexsample/clustering.py index 9fe2eda..a603282 100644 --- a/src/hexsample/clustering.py +++ b/src/hexsample/clustering.py @@ -43,6 +43,7 @@ class Cluster: col: np.ndarray row: np.ndarray pha: np.ndarray + adc_to_ev: float pos_recon_algorithm: str recon_pars: Optional[dict] = None @@ -62,6 +63,11 @@ def pulse_height(self) -> float: """ return self.pha.sum() + def energy(self) -> float: + """Return the energy of the cluster in eV. + """ + return self.pulse_height() * self.adc_to_ev + def centroid(self) -> Tuple[float, float]: """Return the cluster centroid. """ @@ -261,7 +267,6 @@ def position_suppress(self, pha: np.ndarray, col: np.ndarray, row: np.ndarray idx = np.argsort(-out_pha)[:3] return out_pha[idx], out_col[idx], out_row[idx] - def run(self, event: DigiEventRectangular) -> Cluster: """Workhorse method to be reimplemented by derived classes. """ @@ -293,7 +298,7 @@ class ClusteringNN(ClusteringBase): pos_recon_algorithm: str recon_pars: Optional[dict] = None - def run(self, event) -> Cluster: + def run(self, event) -> Optional[Cluster]: """Overladed method. .. warning:: @@ -301,10 +306,16 @@ def run(self, event) -> Cluster: for speed using proper numpy array for the offset indexes. """ readout = self.readout + # Load the adc_to_ev conversion factor from the readout metadata of the + # equalization matrix. If the data is not present, or the wrong matrix type + # is passed, this will raise a KeyError. + adc_to_ev = readout.gain.metadata["adc_to_ev"] if isinstance(event, DigiEventCircular): # If the readout is circular, we want to take all the neirest neighbors. # Trailing -1 is bc the central px is already considered. self.num_neighbors = 6 #HexagonalReadoutCircular.NUM_PIXELS - 1 + if readout.is_at_border(event.column, event.row): + return None col = [event.column] row = [event.row] adc_channel_order = [readout.adc_channel(event.column, event.row)] @@ -323,6 +334,8 @@ def run(self, event) -> Cluster: # pylint: disable = invalid-name elif isinstance(event, DigiEventRectangular): seed_col, seed_row = event.highest_pixel() + if readout.is_at_border(seed_col, seed_row): + return None col = [seed_col] row = [seed_row] for _col, _row in readout.neighbors(seed_col, seed_row): @@ -346,4 +359,4 @@ def run(self, event) -> Cluster: # Sort the arrays in decreasing order before applying the position suppression. pha, col, row = self.position_suppress(pha[mask], col[mask], row[mask]) x, y = self.readout.pixel_to_world(col, row) - return Cluster(x, y, col, row, pha, self.pos_recon_algorithm, self.recon_pars) + return Cluster(x, y, col, row, pha, adc_to_ev, self.pos_recon_algorithm, self.recon_pars) diff --git a/src/hexsample/fileio.py b/src/hexsample/fileio.py index 677e500..cb2c032 100644 --- a/src/hexsample/fileio.py +++ b/src/hexsample/fileio.py @@ -226,9 +226,10 @@ class ReconDescription(tables.IsDescription): livetime = tables.Int32Col(pos=2) roi_size = tables.Int32Col(pos=3) cluster_size = tables.Int8Col(pos=4) - energy = tables.Float32Col(pos=5) - posx = tables.Float32Col(pos=6) - posy = tables.Float32Col(pos=7) + adc = tables.Int32Col(pos=5) + energy = tables.Float32Col(pos=6) + posx = tables.Float32Col(pos=7) + posy = tables.Float32Col(pos=8) def _fill_recon_row(row: tables.tableextension.Row, event: ReconEvent) -> None: """Helper function to fill an output table row, given a ReconEvent object. @@ -244,6 +245,7 @@ def _fill_recon_row(row: tables.tableextension.Row, event: ReconEvent) -> None: row["livetime"] = event.livetime #row["roi_size"] = event.roi_size row["cluster_size"] = event.cluster.size() + row["adc"] = event.adc() row["energy"] = event.energy() row["posx"], row["posy"] = event.position() row.append() diff --git a/src/hexsample/pdf.py b/src/hexsample/pdf.py new file mode 100644 index 0000000..b40f10e --- /dev/null +++ b/src/hexsample/pdf.py @@ -0,0 +1,147 @@ +# Copyright (C) 2023--2026 the hexsample team. +# +# For the license terms see the file LICENSE, distributed along with this +# software. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Facilities to create and use probability density functions from spectra.""" + +from typing import Tuple + +import numpy as np +from aptapy.plotting import plt +from scipy.interpolate import CubicSpline +from scipy.stats import gaussian_kde + +try: + from numpy import trapezoid as trapezoid +except ImportError: + from numpy import trapz as trapezoid + + +class SpectrumPDF: + + """Create a probability density function from a spectrum. + + This class uses Gaussian kernel density estimation to create a smooth normalized + probability density function (PDF) from a set of values. The PDF is then interpolated + to allow for efficient evaluation. + """ + + def __init__(self) -> None: + """Class constructor + """ + self.pdf = None + + def fit(self, vals: np.ndarray) -> None: + """Fit the data to create the PDF. + + Arguments + --------- + vals : np.ndarray + The values to fit the PDF from. + """ + # Use Gaussian kernel density estimation to create a smooth PDF. + kde = gaussian_kde(vals) + # Create a grid of points for interpolation. We need to ensure that the grid exceeds + # the range of the data to avoid issues with extrapolation. + x_grid = np.linspace(min(vals)*0.9, max(vals)*1.1, 1000) + y_grid = kde(x_grid) + self.pdf = CubicSpline(x_grid, y_grid, extrapolate=True) + + def mean(self) -> float: + """Return the mean of the PDF. + """ + if self.pdf is None: + raise ValueError("The PDF is empty. Fit the PDF with data or load it from file.") + # The location of the PDF can be estimated as the mean of the distribution. + x_grid = np.linspace(min(self.pdf.x), max(self.pdf.x), 1000) + # Clip the PDF to avoid negative values. + y_grid = np.maximum(0, self.pdf(x_grid)) + area = trapezoid(y_grid, x_grid) + mean = trapezoid(x_grid * y_grid, x_grid) / area + return mean + + def to_file(self, file_path: str) -> str: + """Save the PDF to a file as a numpy archive. + + Arguments + --------- + file_path : str + The path to the file where the PDF will be saved. + """ + if self.pdf is None: + raise ValueError("The PDF is empty. Fit the PDF with data before saving it to file.") + np.savez(file_path, x=self.pdf.x, c=self.pdf.c) + return file_path + + @classmethod + def from_file(cls, file_path: str) -> "SpectrumPDF": + """Load the PDF from a numpy archive. + + Arguments + --------- + file_path : str + The path to the file from which the PDF will be loaded. + """ + if not file_path.endswith(".npz"): + raise ValueError("The file must be a numpy archive with .npz extension.") + data = np.load(file_path) + instance = cls() + instance.pdf = CubicSpline.construct_fast(data["c"], data["x"], extrapolate=True) + return instance + + def derivative(self, x: np.ndarray, order: int = 1) -> np.ndarray: + """Evaluate the derivative of the PDF at the given points. + + Arguments + --------- + x : np.ndarray + The points at which to evaluate the derivative. + order : int, optional + The order of the derivative to evaluate (default is 1). + """ + if self.pdf is None: + raise ValueError("The PDF is empty. Fit the PDF with data or load it from file.") + return self.pdf(x, nu=order) + + def plot(self, xlim: Tuple[float, float] = None, **kwargs) -> None: + """Plot the PDF in a given range. + + Arguments + --------- + xlim : tuple[float, float], optional + The range of x values to plot (default is None, which means the fitting range + of the PDF). + """ + plt.figure(kwargs.get("figname", "spectrum_pdf")) + if xlim is None: + xlim = (min(self.pdf.x), max(self.pdf.x)) + x_grid = np.linspace(xlim[0], xlim[1], 1000) + y_grid = self.pdf(x_grid) + plt.plot(x_grid, y_grid, **kwargs) + plt.xlabel(kwargs.get("xlabel", "Energy [eV]")) + plt.ylabel(kwargs.get("ylabel", "PDF")) + plt.xlim(xlim) + plt.grid(visible=True, linestyle="--", alpha=0.5, color="gray") + plt.tight_layout() + + def __call__(self, x: np.ndarray) -> np.ndarray: + """Evaluate the PDF at the given points. + """ + if self.pdf is None: + raise ValueError("The PDF is empty. Fit the PDF with data or load it from file.") + return self.pdf(x) diff --git a/src/hexsample/pipeline.py b/src/hexsample/pipeline.py index f85fb8f..386c635 100644 --- a/src/hexsample/pipeline.py +++ b/src/hexsample/pipeline.py @@ -50,7 +50,7 @@ def reconstruct(**kwargs) -> str: input_file_path = kwargs["input_file"] noise_matrix = kwargs["noise"] pedestal_matrix = kwargs["pedestal"] - gain_matrix = kwargs["gain"] + equalization_matrix = kwargs["equalization"] suffix = kwargs.get("suffix", defaults.suffix) zero_sup_threshold = kwargs.get("zero_sup_threshold", defaults.zero_sup_threshold) num_neighbors = kwargs.get("num_neighbors", defaults.num_neighbors) @@ -64,22 +64,30 @@ def reconstruct(**kwargs) -> str: eta_3pix_theta_sigma = kwargs.get("eta_3pix_theta_sigma", defaults.eta_3pix_theta_sigma) recon_args = (eta_2pix_rad_sigma, eta_2pix_rad_pivot, eta_3pix_rad_offset, eta_3pix_rad_sigma, eta_3pix_rad_pivot, eta_3pix_theta_sigma) - args = input_file_path, noise_matrix, pedestal_matrix, gain_matrix, suffix, \ + args = input_file_path, noise_matrix, pedestal_matrix, equalization_matrix, suffix, \ zero_sup_threshold,num_neighbors, max_neighbors, pos_recon_algorithm, *recon_args return tasks.reconstruct(*args, kwargs) +def calibspec(**kwargs) -> str: + """Create a probability density function from a reconstructed spectrum + to use in the gain calibration. + """ + input_file_path = kwargs["input_file"] + return tasks.calibspec(input_file_path) + + def calibrate_eta(**kwargs) -> None: """Calibrate the eta function using the events from a digi file. """ input_file_path = kwargs["input_file"] - gain_matrix = kwargs["gain"] + equalization_matrix = kwargs["equalization"] noise_matrix = kwargs["noise"] pedestal_matrix = kwargs["pedestal"] num_bins = kwargs.get("num_bins", tasks.CalibrationEtaDefaults.num_bins) zero_sup_threshold = kwargs.get("zero_sup_threshold", tasks.CalibrationEtaDefaults.zero_sup_threshold) - args = input_file_path, noise_matrix, pedestal_matrix, gain_matrix, num_bins, \ + args = input_file_path, noise_matrix, pedestal_matrix, equalization_matrix, num_bins, \ zero_sup_threshold return tasks.calibrate_eta(*args) @@ -112,17 +120,27 @@ def calibrate_enc(**kwargs) -> str: return tasks.calibrate_enc(*args) -def calibrate_gain(**kwargs) -> str: - """Calibrate the gain of the chip. +def calibrate_equalization(**kwargs) -> str: + """Calibrate the equalization of the chip. """ input_file_path = kwargs["input_file"] - energy = kwargs["energy"] - num_events = kwargs.get("num_events", tasks.CalibrationGainDefaults.num_events) + pdf = kwargs["pdf"] noise_matrix = kwargs["noise"] pedestal_matrix = kwargs["pedestal"] + size = kwargs.get("size", tasks.CalibrationEqualizationDefaults.size) zero_sup_threshold = kwargs.get("zero_sup_threshold", - tasks.CalibrationGainDefaults.zero_sup_threshold) - args = input_file_path, energy, noise_matrix, pedestal_matrix, num_events, zero_sup_threshold + tasks.CalibrationEqualizationDefaults.zero_sup_threshold) + args = input_file_path, pdf, noise_matrix, pedestal_matrix, size, zero_sup_threshold + return tasks.calibrate_equalization(*args) + + +def calibrate_gain(**kwargs) -> str: + """Calibrate the gain of the chip. + """ + equalization_matrix = kwargs["equalization"] + material_symbol = kwargs.get("material_symbol", tasks.CalibrationGainDefaults.material_symbol) + output_dir = kwargs.get("output_dir", tasks.CalibrationGainDefaults.output_dir) + args = equalization_matrix, material_symbol, output_dir return tasks.calibrate_gain(*args) @@ -145,10 +163,10 @@ def display(**kwargs) -> None: """Display events from a digi or recon file. """ input_file_path = kwargs["input_file"] - gain_matrix = kwargs["gain"] + equalization_matrix = kwargs["equalization"] noise_matrix = kwargs["noise"] pedestal_matrix = kwargs["pedestal"] - args = input_file_path, noise_matrix, pedestal_matrix, gain_matrix + args = input_file_path, noise_matrix, pedestal_matrix, equalization_matrix return tasks.display(*args) diff --git a/src/hexsample/recon.py b/src/hexsample/recon.py index 8b39d7e..679fa75 100644 --- a/src/hexsample/recon.py +++ b/src/hexsample/recon.py @@ -23,11 +23,8 @@ from dataclasses import dataclass from typing import Tuple -import xraydb - from .clustering import Cluster -DEFAULT_IONIZATION_POTENTIAL = xraydb.ionization_potential("Si") @dataclass class ReconEventBase: @@ -54,15 +51,15 @@ class ReconEventBase: livetime: int cluster: Cluster - def energy(self, ionization_potential: float = DEFAULT_IONIZATION_POTENTIAL) -> float: - """Return the energy of the event in eV. + def adc(self) -> int: + """Return the total ADC count for the event. + """ + return self.cluster.pulse_height() - .. warning:: - This is currently using the ionization energy of Silicon to do the - conversion, assuming a detector gain of 1. We will need to do some - bookkeeping, here, to make this work reliably. + def energy(self) -> float: + """Return the energy of the event in eV. """ - return ionization_potential * self.cluster.pulse_height() + return self.cluster.energy() def position(self) -> Tuple[float, float]: """Return the reconstructed position of the event. @@ -100,15 +97,15 @@ class ReconEvent: #roi_size: int cluster: Cluster - def energy(self, ionization_potential: float = DEFAULT_IONIZATION_POTENTIAL) -> float: - """Return the energy of the event in eV. + def adc(self) -> int: + """Return the total ADC counts for the event. + """ + return self.cluster.pulse_height() - .. warning:: - This is currently using the ionization energy of Silicon to do the - conversion, assuming a detector gain of 1. We will need to do some - bookkeeping, here, to make this work reliably. + def energy(self) -> float: + """Return the energy of the event in eV. """ - return ionization_potential * self.cluster.pulse_height() + return self.cluster.energy() def position(self) -> Tuple[float, float]: """Return the reconstructed position of the event. diff --git a/src/hexsample/tasks.py b/src/hexsample/tasks.py index ccd517b..45987d1 100644 --- a/src/hexsample/tasks.py +++ b/src/hexsample/tasks.py @@ -21,7 +21,6 @@ """ import inspect -import os import pathlib from dataclasses import dataclass from enum import Enum @@ -38,9 +37,11 @@ CALIBRATION_UNITS, CalibrateDark, CalibrateENC, + CalibrateEqualization, CalibrateGain, CalibrateNoise, CalibrationMatrix, + CalibrationMetadata, CalibrationType, ) from .clustering import ClusteringNN @@ -63,6 +64,7 @@ from .hexagon import HexagonalGrid, HexagonalLayout from .logging_ import logger from .mc import PhotonList +from .pdf import SpectrumPDF from .readout import ( AbstractReadout, HexagonalReadoutBase, @@ -72,7 +74,7 @@ ) from .recon import ReconEvent from .sensor import Sensor -from .source import DiskBeam, Line, Source +from .source import Source from .xpol import chip_descriptor # Make room for the output data. @@ -234,7 +236,7 @@ def reconstruct( input_file_path: str, noise_matrix: CalibrationMatrix, pedestal_matrix: CalibrationMatrix, - gain_matrix: CalibrationMatrix, + equalization_matrix: CalibrationMatrix, suffix: str = ReconstructionDefaults.suffix, zero_sup_threshold: int = ReconstructionDefaults.zero_sup_threshold, num_neighbors: int = ReconstructionDefaults.num_neighbors, @@ -267,8 +269,8 @@ def reconstruct( pedestal_matrix : CalibrationMatrix The pedestal matrix to use for the reconstruction. - gain_matrix : CalibrationMatrix - The gain matrix to use for the reconstruction. + equalization_matrix : CalibrationMatrix + The equalization matrix to use for the reconstruction. suffix : str The suffix to append to the output file name. @@ -308,7 +310,7 @@ def reconstruct( input_file, header, readout_mode = open_file(input_file_path) # Creating the readout object. args = HexagonalLayout(header["layout"]), header["num_cols"], header["num_rows"],\ - header["pitch"], noise_matrix, gain_matrix, pedestal_matrix + header["pitch"], noise_matrix, equalization_matrix, pedestal_matrix readout = create_readout(readout_mode, header, *args) # Define the effective number of neighbors to be used for the clustering. If max_neighbors is # specified (i.e. different from -1), it has priority over num_neighbors. It is necessary to @@ -342,7 +344,7 @@ def reconstruct( cluster = clustering.run(event) except IndexError as e: logger.warning(f"Error reconstructing event with trigger ID {event.trigger_id}: {e}") - if cluster.size() in size: + if cluster is not None and (cluster.size() in size): # Need to pass the recon method and other stuff as argument to ReconEvent args = event.trigger_id, event.timestamp(), event.livetime, cluster recon_event = ReconEvent(*args) @@ -358,6 +360,31 @@ def reconstruct( return output_file_path +def calibspec( + input_file_path: str + ) -> str: + """Create a probability density function from a reconstructed spectrum to use in + the gain calibration. + + Arguments + ---------- + input_file_path : str + The path to the input recon file. + """ + name, args = current_call(1) + logger.info(f"Running {__name__}.{name} with arguments {args}...") + input_file = ReconInputFile(input_file_path) + energy = input_file.column("energy") + input_file.close() + pdf = SpectrumPDF() + logger.info("Fitting the PDF to the energy distribution...") + pdf.fit(energy) + output_file_path = input_file_path.replace(".h5", ".npz") + logger.info(f"Saving the PDF to {output_file_path}...") + pdf.to_file(output_file_path) + return output_file_path + + @dataclass(frozen=True) class CalibrationEtaDefaults: @@ -375,7 +402,7 @@ def calibrate_eta( input_file_path: str, noise_matrix: CalibrationMatrix, pedestal_matrix: CalibrationMatrix, - gain_matrix: CalibrationMatrix, + equalization_matrix: CalibrationMatrix, num_bins: int = CalibrationEtaDefaults.num_bins, zero_sup_threshold: int = CalibrationEtaDefaults.zero_sup_threshold ) -> None: @@ -392,8 +419,8 @@ def calibrate_eta( pedestal_matrix : CalibrationMatrix The pedestal calibration matrix to use for the analysis. - gain_matrix : CalibrationMatrix - The gain calibration matrix to use for the analysis. + equalization_matrix : CalibrationMatrix + The equalization calibration matrix to use for the analysis. num_bins : int The number of bins to be used in the calibration. @@ -403,7 +430,7 @@ def calibrate_eta( """ input_file, header, readout_mode = open_file(input_file_path) args = HexagonalLayout(header["layout"]), header["num_cols"], header["num_rows"],\ - header["pitch"], noise_matrix, gain_matrix, pedestal_matrix + header["pitch"], noise_matrix, equalization_matrix, pedestal_matrix readout = create_readout(readout_mode, header, *args) clustering = ClusteringNN(readout, zero_sup_threshold, num_neighbors=6, pos_recon_algorithm="centroid") @@ -416,7 +443,7 @@ def calibrate_eta( except IndexError: continue # Analyze only 2-pixel and 3-pixel events. - if cluster.size() == 2 or cluster.size() == 3: + if cluster is not None and (cluster.size() == 2 or cluster.size() == 3): mc_event = input_file.mc_event(i) size_list.append(cluster.size()) # Calculate the photon position with respect to the most charged pixel @@ -498,7 +525,7 @@ def synthesize_calibration_file( random_seed : int, optional The seed for the random number generator. """ - name, args = current_call() + name, args = current_call(1) logger.info(f"Running {__name__}.{name} with arguments {args}...") # Initialize the random number generator with the given seed rng.initialize(seed=random_seed) @@ -517,6 +544,10 @@ def synthesize_calibration_file( # Generate the calibration matrix with the appropriate size and values calibration_matrix = CalibrationMatrix(num_cols, num_rows) rms = mean * percent_rms / 100 + if calibration_type == CalibrationType.EQUALIZATION: + calibration_matrix.update_metadata(CalibrationMetadata.ADC_TO_EV, mean) + rms /= mean + mean = 1. logger.info(f"Generating {calibration_type.value} calibration matrix with " f"mean {mean:g} and RMS {rms:g}...") calibration_matrix.values = rng.generator.normal(mean, scale=rms, size=(num_rows, num_cols)) @@ -639,7 +670,7 @@ def calibrate_enc( logger.info("Calculating the ENC matrix...") enc_matrix = enc_calibration.fit() noise_file_name = noise_matrix.metadata["file_name"] - enc_file_name = noise_file_name.replace("_matrix_noise", "_matrix_enc.h5") + enc_file_name = noise_file_name.replace("noise", "enc") + ".h5" output_file_path = pathlib.Path(output_dir) / enc_file_name logger.info(f"Saving to {output_file_path}...") enc_matrix.to_hdf5(output_file_path, CalibrationType.ENC, False) @@ -648,26 +679,26 @@ def calibrate_enc( @dataclass(frozen=True) -class CalibrationGainDefaults: - """Default parameters for the gain calibration task. +class CalibrationEqualizationDefaults: + """Default parameters for the pixel equalization calibration task. This is a small helper dataclass to help ensure consistency between the main task definition in this Python module and the command-line interface. """ - num_events: int = 200000 + size: int = 10 zero_sup_threshold: int = 20 -def calibrate_gain( +def calibrate_equalization( input_file_path: str, - energy: float, + pdf: SpectrumPDF, noise_matrix: CalibrationMatrix, pedestal_matrix: CalibrationMatrix, - num_events: int = CalibrationGainDefaults.num_events, - zero_sup_threshold: int = CalibrationGainDefaults.zero_sup_threshold + size: int = CalibrationEqualizationDefaults.size, + zero_sup_threshold: int = CalibrationEqualizationDefaults.zero_sup_threshold ) -> str: - """Calibrate gain of the readout chip using the events from a digi file. + """Calibrate pixel equalization of the readout chip using the events from a digi file. The results are stored as a matrix in a HDF5 file. Arguments @@ -675,33 +706,34 @@ def calibrate_gain( input_file_path : str The path to the input file. - energy : float - The energy of the X-ray photons in eV. This is used to convert the charge collected in - each pixel to the number of electron, which is necessary for the gain calibration. + pdf : SpectrumPDF + The spectrum probability density function to use for the gain calibration. noise_matrix : CalibrationMatrix The calibration noise matrix to use for the gain calibration. pedestal_matrix : CalibrationMatrix The pedestal matrix to use for the gain calibration. - - num_events : int - The number of events to simulate to correct the bias. + + size : int + The length of the square region of the chip to fit simultaneously during the + calibration. zero_sup_threshold : int The zero-suppression threshold to use for the clustering in the gain calibration. """ # Open the input file and extract the readout information. input_file, header, readout_mode = open_file(input_file_path) - # Define the arguments to create the readout object with unit gain, necessary for the - # calibration. + # Define the arguments to create the readout object with uniform pixel equalization, + # necessary for the calibration. unit_gain_map = CalibrationMatrix(header["num_cols"], header["num_rows"]) unit_gain_map.set_value(1.) + unit_gain_map.update_metadata(CalibrationMetadata.ADC_TO_EV, 1.) args = HexagonalLayout(header["layout"]), header["num_cols"], header["num_rows"],\ header["pitch"], noise_matrix, unit_gain_map, pedestal_matrix readout = create_readout(readout_mode, header, *args) - # Initialize the gain matrix and run the calibration. - gain_calibration = CalibrateGain(header["num_cols"], header["num_rows"], energy) + # Initialize the equalization matrix and run the calibration. + equalization_calibration = CalibrateEqualization(header["num_cols"], header["num_rows"], pdf) clustering = ClusteringNN(readout, zero_sup_threshold=zero_sup_threshold, num_neighbors=6, pos_recon_algorithm="centroid") logger.info("Starting the event loop...") @@ -710,65 +742,56 @@ def calibrate_gain( cluster = clustering.run(event) except IndexError: continue - gain_calibration.analyze_cluster(cluster) + if cluster is not None: + equalization_calibration.analyze_cluster(cluster) + logger.info("Calculating the equalization matrix...") + equalization_matrix = equalization_calibration.fit(size) + output_file_path = input_file_path.replace(".h5", "_matrix_equalization.h5") + logger.info(f"Saving equalization matrix to {output_file_path}...") + equalization_matrix.to_hdf5(output_file_path, CalibrationType.EQUALIZATION, False) + input_file.close() + logger.info("Done!") + return output_file_path + + +@dataclass(frozen=True) +class CalibrationGainDefaults: + """Default parameters for the gain calibration task. + + This is a small helper dataclass to help ensure consistency between the main task + definition in this Python module and the command-line interface. + """ + + output_dir: Union[str, pathlib.Path] = HEXSAMPLE_DATA + material_symbol: str = "Si" + + +def calibrate_gain( + equalization_matrix: CalibrationMatrix, + material_symbol: str = CalibrationGainDefaults.material_symbol, + output_dir: Union[str, pathlib.Path] = CalibrationGainDefaults.output_dir + ) -> str: + """Calibrate the gain of the readout chip using the equalization matrix and the + ionization potential of the sensor material. The results are stored as a matrix + in a HDF5 file. + + Arguments + equalization_matrix : CalibrationMatrix + The equalization matrix to use for the gain calibration. + material_symbol : str + The symbol of the sensor material to use for the gain calibration, e.g. "Si" for + silicon. + """ + name, args = current_call(num_backward_steps=1) + logger.info(f"Running {__name__}.{name} with arguments {args}...") + gain_calibration = CalibrateGain(equalization_matrix, material_symbol) logger.info("Calculating the gain matrix...") gain_matrix = gain_calibration.fit() - if not np.any(gain_matrix.entries > 0): - raise RuntimeError("No valid gain values found during the first step of calibration," \ - "cannot proceed further. The possible reason could be a small number of events over " \ - "the analyzed chip region.") - # Create the readout object for the simulation. We are using rectangular readout just because - # it's faster to simulate, and using a uniform gain matrix with the mean value of the first - # calibration to correct the bias in the gain matrix. - # To calculate the mean value, we are excluding the outliers by considering only the values - # between the 1st and the 99th percentile. - gain_sim = CalibrationMatrix(header["num_cols"], header["num_rows"]) - vals = gain_matrix.values - lower_bound, upper_bound = np.nanpercentile(vals, [1, 99]) - gain_sim.set_value(np.mean(vals[(vals > lower_bound) & (vals < upper_bound)])) - simulation_readout = HexagonalReadoutRectangular(HexagonalLayout(header["layout"]), - header["num_cols"], header["num_rows"], header["pitch"], - enc=noise_matrix, gain=gain_sim, pedestal=pedestal_matrix) - output = HEXSAMPLE_DATA / "_tmp_simulation_bias.h5" - # Simulate events with the best-fit gain matrix to correct the bias. - logger.info("Simulating file to correct the bias...") - simulate( - source=Source(Line(energy), DiskBeam(radius=0.15)), - sensor=Sensor(), - readout=simulation_readout, - num_events=num_events, - output_file_path=output) - tmp_input_file = digi_input_file_class("rectangular")(output) - tmp_gain_calibration = CalibrateGain(header["num_cols"], header["num_rows"], energy) - # Re-run the gain calibration on the simulated events to calculate the correction factor. - logger.info("Starting the event loop for the simulated file...") - for _, event in tqdm(enumerate(tmp_input_file)): - try: - cluster = clustering.run(event) - except IndexError: - continue - tmp_gain_calibration.analyze_cluster(cluster) - logger.info("Calculating the gain matrix from the simulation...") - tmp_gain_matrix = tmp_gain_calibration.fit() - # Calculate the correction factor from the simulation. - logger.info("Calculating the correction factor...") - mask = tmp_gain_matrix.entries > 0 - # Calculate the residuals between the MC and calibrated gain matrices from the simulation - # and calculate the mean residual to be used as a correction factor. - residuals = (tmp_gain_matrix.values[mask] - gain_sim.values[mask]) / gain_sim.values[mask] - # Exclude the outliers by considering only the values between the 1st and the 99th percentile. - lower_bound, upper_bound = np.percentile(residuals, [1, 99]) - mean_residual = np.mean(residuals[(residuals > lower_bound) & (residuals < upper_bound)]) - # Apply the correction factor to the gain matrix and save it to a HDF5 file. - mask_gain = gain_matrix.entries > 0 - gain_matrix.values[mask_gain] = gain_matrix.values[mask_gain] / (1 + mean_residual) - output_file_path = input_file_path.replace(".h5", "_matrix_gain.h5") - logger.info(f"Saving corrected gain matrix to {output_file_path}...") + equalization_file_name = equalization_matrix.metadata["file_name"] + gain_file_name = equalization_file_name.replace("equalization", "gain") + ".h5" + output_file_path = pathlib.Path(output_dir) / gain_file_name + logger.info(f"Saving to {output_file_path}...") gain_matrix.to_hdf5(output_file_path, CalibrationType.GAIN, False) - # Close the input files. - tmp_input_file.close() - os.remove(output) - input_file.close() logger.info("Done!") return output_file_path @@ -783,14 +806,14 @@ class DisplayDefaults: noise_matrix: Optional[CalibrationMatrix] = None pedestal_matrix: Optional[CalibrationMatrix] = None - gain_matrix: Optional[CalibrationMatrix] = None + equalization_matrix: Optional[CalibrationMatrix] = None def display( input_file_path: str, noise_matrix: Optional[CalibrationMatrix] = DisplayDefaults.noise_matrix, pedestal_matrix: Optional[CalibrationMatrix] = DisplayDefaults.pedestal_matrix, - gain_matrix: Optional[CalibrationMatrix] = DisplayDefaults.gain_matrix, + equalization_matrix: Optional[CalibrationMatrix] = DisplayDefaults.equalization_matrix, ) -> None: """Display events from a digi file. @@ -805,16 +828,17 @@ def display( pedestal_matrix : CalibrationMatrix, optional The pedestal calibration matrix to use for the display. - gain_matrix : CalibrationMatrix, optional - The gain calibration matrix to use for the display. + equalization_matrix : CalibrationMatrix, optional + The equalization calibration matrix to use for the display. """ # Open the input file and extract the header and the readout information. input_file, header, readout_mode = open_file(input_file_path) - cal_matrices = [noise_matrix, gain_matrix, pedestal_matrix] + cal_matrices = [noise_matrix, equalization_matrix, pedestal_matrix] missing = [matrix for matrix in cal_matrices if matrix is None] # Check if any of the calibration matrices is missing. if 0 < len(missing) < len(cal_matrices): - logger.warning(f"{len(missing)} calibration matrices are missing.") + logger.warning(f"{len(missing)} calibration matrices are missing to perform" \ + " event reconstruction.") # Initialize the correct type of event display based on the input matrices. grid_args = HexagonalLayout(header["layout"]), header["num_cols"], header["num_rows"], \ header["pitch"] @@ -862,7 +886,7 @@ def quicklook(input_file_path: str) -> None: The path to the input recon file. """ # Open the input file - name, args = current_call() + name, args = current_call(1) logger.info(f"Running {__name__}.{name} with arguments {args}...") input_file = ReconInputFile(input_file_path) # Plotting the reconstructed energy and the true energy @@ -949,7 +973,7 @@ def calibview( The upper quantile of the values in the matrix to be included in the statistics. """ # pylint: disable=too-many-statements - name, args = current_call() + name, args = current_call(1) logger.info(f"Running {__name__}.{name} with arguments {args}...") logger.info("Matrix metadata:") # Log the metadata of the matrix. @@ -959,7 +983,7 @@ def calibview( logger.info(f" {key}: {value}") unit = CALIBRATION_UNITS.get(matrix.metadata["calibration_type"]).value # Calculate the quantiles of the values in the matrix to set the limits for the plots. - rel_error_mask = matrix.errors / matrix.values < rel_error + rel_error_mask = abs(matrix.errors / matrix.values) < rel_error hits_mask = matrix.entries >= min_hits mask = rel_error_mask & hits_mask if not np.any(mask): @@ -997,6 +1021,10 @@ def calibview( plt.colorbar(label=mc_unit) # Plot the distribution of the Monte Carlo truth values. mc_edges = np.linspace(np.nanmin(mc_vals), np.nanmax(mc_vals), 100) + # If Monte Carlo distribution is uniform, we need to modify the edges + if mc_edges[0] == mc_edges[-1]: + val = mc_edges[0] + mc_edges = np.linspace(val*0.9, val*1.1, 100) mc_vals_hist = Histogram1d(mc_edges, label="MC Distribution", xlabel=mc_unit).fill(mc_vals) plt.figure("Distribution of Monte Carlo truth values") mc_vals_hist.plot(statistics=True) diff --git a/tests/test_pdf.py b/tests/test_pdf.py new file mode 100644 index 0000000..46e31d2 --- /dev/null +++ b/tests/test_pdf.py @@ -0,0 +1,76 @@ +# Copyright (C) 2023--2026 the hexsample team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Test suite for hexsample.pdf +""" + +import numpy as np +from aptapy.plotting import plt + +from hexsample import rng +from hexsample.pdf import SpectrumPDF + +rng.initialize() + +def test_unimodal_spectrum_pdf(size: int = 10000): + """Test the SpectrumPDF class with a unimodal distribution. + """ + # Create a unimodal distribution. + vals = rng.generator.normal(loc=1.0, scale=0.1, size=size) + # Create the PDF and fit it to the data. + pdf = SpectrumPDF() + pdf.fit(vals) + # Check that the mean of the PDF is close to the mean of the data. + assert np.isclose(pdf.mean(), vals.mean(), atol=0.01) + # Plot the pdf + plt.figure("Unimodal distribution") + xx = np.linspace(vals.min()*0.8, vals.max()*1.2, 1000) + plt.plot(xx, pdf(xx), label="PDF") + plt.hist(vals, bins=50, density=True, alpha=0.5, label="Data") + plt.legend() + + +def test_bimodal_spectrum_pdf(size: int = 100000): + """Test the SpectrumPDF class with a bimodal distribution. + """ + # Extract values from a bimodal distribution. + vals = rng.generator.choice((1., 2.), p=[0.7, 0.3], size=size) + # Add some noise + vals += rng.generator.normal(scale=0.2, size=size) + # Create the PDF and fit it to the data. + pdf = SpectrumPDF() + pdf.fit(vals) + # Check that the mean of the PDF is close to the mean of the data. + assert np.isclose(pdf.mean(), vals.mean(), atol=0.01) + # Plot the pdf + plt.figure("Bimodal distribution") + xx = np.linspace(vals.min()*0.8, vals.max()*1.2, 1000) + plt.plot(xx, pdf(xx), label="PDF") + plt.hist(vals, bins=50, density=True, alpha=0.5, label="Data") + plt.legend() + +def test_derivative(size: int = 100000): + """Test the derivative of the PDF. + """ + # Create a unimodal distribution. + vals = rng.generator.normal(loc=1.0, scale=0.1, size=size) + # Create the PDF and fit it to the data. + pdf = SpectrumPDF() + pdf.fit(vals) + derivative = pdf.derivative + # Check that the derivative of the PDF is close to zero at the mean of the data. + plt.figure("Derivative of a gaussian PDF") + xx = np.linspace(vals.min()*0.8, vals.max()*1.2, 10000) + plt.plot(xx, derivative(xx))