Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,21 @@
Release notes
=============

* New algorithm added to calibrate the pixel equalization factors. This algorithm calculates
the gain equalization factors without any assumption on the source spectrum, by analyzing
the energy distributions of one-pixel events and comparing them across the pixels.
* `calibgen equalization` now accepts the argument `algorithm` to choose between two different
calibration algorithms: "relative", without using the source spectrum, and "absolute", that
needs the source spectrum PDF to be passed as an argument.
* Pull requests merged and issues closed:

- https://github.com/lucabaldini/hexsample/pull/134
- https://github.com/lucabaldini/hexsample/issues/133


Version 0.16.2 (2026-05-12)
~~~~~~~~~~~~~~~~~~~~~~~~~~~


* Package dependencies updated: `joblib` and `iminuit` added.
* New calibration matrix type `equalization` added in `caldb.py` to store the pixel equalization
factors.
Expand Down
116 changes: 94 additions & 22 deletions src/hexsample/calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,40 +858,64 @@ class CalibrateEqualization(CalibrateBase):
The number of columns of the detector readout chip.
num_rows : int
The number of rows of the detector readout chip.
algorithm : str
The algorithm to be used for the calculation of the equalization values. The options
are "absolute" and "relative". The "absolute" algorithm performs a likelihood fit to
calculate the gain of each pixel, while the "relative" algorithm calculates the
equalization values by comparing the mean PHA of 1-pixel events.
pdf : SpectrumPDF
The probability density function of the spectrum of the events in the dataset, which is
used for the likelihood fit to calculate the equalization values for the pixels.
"""

def __init__(self, num_cols: int, num_rows: int, pdf: SpectrumPDF) -> None:
def __init__(self, num_cols: int, num_rows: int, algorithm: str = "relative",
pdf: Optional[SpectrumPDF] = None) -> None:
"""Class constructor.
"""
super().__init__(num_cols, num_rows)
self._event_count = 0
self._pha = []
self._coords = []
self._event_rows = []
self._pdf = pdf
self._pdf_derivative = pdf.derivative
self._algorithm = algorithm
# Initialize the data structures for the absolute calibration algorithm.
if algorithm == "absolute":
self._event_count = 0
self._pha = []
self._coords = []
self._event_rows = []
if pdf is None:
raise ValueError("A SpectrumPDF object must be provided for the "
"absolute equalization calibration.")
self._pdf = pdf
self._pdf_derivative = pdf.derivative
# Initialize the data structures for the relative calibration algorithm.
elif algorithm == "relative":
self._stats = RunningStats(shape=self.cal_matrix.shape)
else:
raise ValueError(f"Invalid algorithm {algorithm} for the equalization calibration. "
f"Valid options are 'absolute' and 'relative'.")

def analyze_cluster(self, cluster: Cluster) -> None:
"""Analyze the event cluster to update the calibration matrix.
"""
# Get the coordinates of the cluster pixels
cols = cluster.col
rows = cluster.row
# Update the arrays for the least squares fit.
self._pha.extend(cluster.pha)
ncols = self.cal_matrix.shape[1]
for col, row in zip(cols, rows):
# Calculate the index of the pixel in the flattened array
self._coords.append(row * ncols + col)
self._event_rows.append(self._event_count)
# Update the event count
self._event_count += 1
self.cal_matrix.num_events += 1

def fit(self, size: int) -> CalibrationMatrix:
if self._algorithm == "absolute":
# Get the coordinates of the cluster pixels
cols = cluster.col
rows = cluster.row
# Update the arrays for the least squares fit.
self._pha.extend(cluster.pha)
ncols = self.cal_matrix.shape[1]
for col, row in zip(cols, rows):
# Calculate the index of the pixel in the flattened array
self._coords.append(row * ncols + col)
self._event_rows.append(self._event_count)
# Update the event count
self._event_count += 1
self.cal_matrix.num_events += 1
elif self._algorithm == "relative":
if cluster.size() == 1:
pha = cluster.pha.reshape((1, 1))
self._stats.update(pha, offset=(cluster.row[0], cluster.col[0]))
self.cal_matrix.num_events += 1

def _fit_absolute(self, size: int) -> CalibrationMatrix:
"""Fit the collected events to determine the gain of each pixel.

Arguments
Expand Down Expand Up @@ -982,6 +1006,54 @@ def fit(self, size: int) -> CalibrationMatrix:
self.cal_matrix.update_metadata(CalibrationMetadata.ADC_TO_EV, adc_to_ev)
return self.cal_matrix

def _fit_relative(self) -> CalibrationMatrix:
"""Analyze the collected data to calculate the equalization values
for the pixels and update the calibration matrix with the calculated values.

Returns
-------
cal_matrix : CalibrationMatrix
Updated calibration matrix with the equalization values calculated from the data.
"""
# Get the mean and the standard deviation of the pixel value distribution
# for each pixel.
mu = self._stats.mean()
std = self._stats.std()
# Calculate the mean value of the pixel averages. This value is used to
# normalize the equalization values, imposing that the mean of the distribution
# of the equalization values is 1.0.
# Note that we are using nanmean and masking values above zero to avoid
# numerical issues.
mean = np.nanmean(mu[mu > 0])
Comment thread
augustocattafesta marked this conversation as resolved.
mu /= mean
# Write the equalization values to the calibration matrix.
self.cal_matrix.values = np.where(mu > 0, mu, self.cal_matrix.values)
# Update the entries.
self.cal_matrix.entries = self._stats.counts()
# Calculate the errors on the pixels as the standard deviation of the
# sample mean, divided by the total mean.
mask = self.cal_matrix.entries > 1
denominator = mean * np.sqrt(self.cal_matrix.entries - 1, where=mask,
out=np.full_like(self.cal_matrix.entries, np.nan, dtype=float))
np.divide(std, denominator, out=self.cal_matrix.errors, where=mask)
self.cal_matrix.update_metadata(CalibrationMetadata.ADC_TO_EV, 1.)
return self.cal_matrix

def fit(self, **kwargs) -> CalibrationMatrix:
"""Calculate the equalization calibration matrix.

Returns
-------
equalization_matrix : CalibrationMatrix
Updated calibration matrix with the equalization values calculated from the data.
"""
if self._algorithm == "absolute":
return self._fit_absolute(**kwargs)
if self._algorithm == "relative":
return self._fit_relative()
raise ValueError(f"Invalid algorithm {self._algorithm} for the equalization"
" calibration.")


class CalibrateGain:

Expand Down
12 changes: 7 additions & 5 deletions src/hexsample/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,14 +464,16 @@ def add_calibrate_equalization_options(self, parser: argparse.ArgumentParser) ->
"""Add an option group for the gain calibration properties.
"""
defaults = tasks.CalibrationEqualizationDefaults
parser.add_argument("pdf", type=pdf.SpectrumPDF.from_file,
CliArgumentParser.add_cal_noise_file(parser, default=None, required=True)
CliArgumentParser.add_cal_pedestal_file(parser, default=None, required=True)
parser.add_argument("--algorithm", type=str, choices=["relative", "absolute"],
default=defaults.algorithm,
help="algorithm to be used for the equalization calibration")
parser.add_argument("--pdf", type=pdf.SpectrumPDF.from_file, default=defaults.pdf,
help="path to the spectrum PDF file")
Comment thread
augustocattafesta marked this conversation as resolved.
parser.add_argument("--size", type=int, default=defaults.size,
help="length of the square region of the chip to fit simultaneously")
CliArgumentParser.add_cal_noise_file(parser, default=None, required=True)
CliArgumentParser.add_cal_pedestal_file(parser, default=None, required=True)
CliArgumentParser.add_zero_sup_threshold(parser,
default=defaults.zero_sup_threshold)
CliArgumentParser.add_zero_sup_threshold(parser, default=defaults.zero_sup_threshold)

def add_calibrate_eta_options(self, parser: argparse.ArgumentParser) -> None:
"""Add an option group for the eta function calibration properties.
Expand Down
2 changes: 1 addition & 1 deletion src/hexsample/pdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from scipy.stats import gaussian_kde

try:
from numpy import trapezoid as trapezoid
from numpy import trapezoid
except ImportError:
from numpy import trapz as trapezoid

Expand Down
12 changes: 7 additions & 5 deletions src/hexsample/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,16 @@ def calibrate_enc(**kwargs) -> str:
def calibrate_equalization(**kwargs) -> str:
"""Calibrate the equalization of the chip.
"""
defaults = tasks.CalibrationEqualizationDefaults
input_file_path = kwargs["input_file"]
pdf = kwargs["pdf"]
noise_matrix = kwargs["noise"]
pedestal_matrix = kwargs["pedestal"]
size = kwargs.get("size", tasks.CalibrationEqualizationDefaults.size)
zero_sup_threshold = kwargs.get("zero_sup_threshold",
tasks.CalibrationEqualizationDefaults.zero_sup_threshold)
args = input_file_path, pdf, noise_matrix, pedestal_matrix, size, zero_sup_threshold
algorithm = kwargs.get("algorithm", defaults.algorithm)
pdf = kwargs.get("pdf", defaults.pdf)
size = kwargs.get("size", defaults.size)
zero_sup_threshold = kwargs.get("zero_sup_threshold", defaults.zero_sup_threshold)
args = input_file_path, noise_matrix, pedestal_matrix, algorithm, \
pdf, size, zero_sup_threshold
Comment thread
augustocattafesta marked this conversation as resolved.
return tasks.calibrate_equalization(*args)


Expand Down
52 changes: 34 additions & 18 deletions src/hexsample/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import numpy as np
from aptapy.hist import Histogram1d, Histogram2d
from aptapy.models import Line
from aptapy.plotting import plt, setup_gca
from tqdm import tqdm

Expand Down Expand Up @@ -686,15 +687,18 @@ class CalibrationEqualizationDefaults:
definition in this Python module and the command-line interface.
"""

algorithm: str = "relative"
pdf: Optional[SpectrumPDF] = None
size: int = 10
zero_sup_threshold: int = 20


def calibrate_equalization(
input_file_path: str,
pdf: SpectrumPDF,
noise_matrix: CalibrationMatrix,
pedestal_matrix: CalibrationMatrix,
algorithm: str = CalibrationEqualizationDefaults.algorithm,
pdf: Optional[SpectrumPDF] = CalibrationEqualizationDefaults.pdf,
size: int = CalibrationEqualizationDefaults.size,
zero_sup_threshold: int = CalibrationEqualizationDefaults.zero_sup_threshold
Comment thread
augustocattafesta marked this conversation as resolved.
) -> str:
Expand All @@ -706,36 +710,43 @@ def calibrate_equalization(
input_file_path : str
The path to the input file.

pdf : SpectrumPDF
The spectrum probability density function to use for the gain calibration.

noise_matrix : CalibrationMatrix
The calibration noise matrix to use for the gain calibration.
The calibration noise matrix to use for the pixel equalization calibration.

pedestal_matrix : CalibrationMatrix
The pedestal matrix to use for the gain calibration.

size : int
The length of the square region of the chip to fit simultaneously during the
The pedestal matrix to use for the pixel equalization calibration.

algorithm : str, optional
The algorithm to use for the pixel equalization calibration. Supported values
are "relative" and "absolute".

pdf : SpectrumPDF, optional
The spectrum probability density function to use for the pixel equalization
calibration.

zero_sup_threshold : int
The zero-suppression threshold to use for the clustering in the gain calibration.
size : int, optional
The length of the square region of the chip to fit simultaneously during the
pixel equalization calibration.

zero_sup_threshold : int, optional
The zero-suppression threshold to use for the clustering in the pixel
equalization calibration.
"""
# Open the input file and extract the readout information.
input_file, header, readout_mode = open_file(input_file_path)
num_cols, num_rows = header["num_cols"], header["num_rows"]
# Define the arguments to create the readout object with uniform pixel equalization,
# necessary for the calibration.
unit_gain_map = CalibrationMatrix(header["num_cols"], header["num_rows"])
unit_gain_map = CalibrationMatrix(num_cols, num_rows)
unit_gain_map.set_value(1.)
unit_gain_map.update_metadata(CalibrationMetadata.ADC_TO_EV, 1.)
args = HexagonalLayout(header["layout"]), header["num_cols"], header["num_rows"],\
header["pitch"], noise_matrix, unit_gain_map, pedestal_matrix
args = HexagonalLayout(header["layout"]), num_cols, num_rows, header["pitch"], \
noise_matrix, unit_gain_map, pedestal_matrix
readout = create_readout(readout_mode, header, *args)
# Initialize the equalization matrix and run the calibration.
equalization_calibration = CalibrateEqualization(header["num_cols"], header["num_rows"], pdf)
clustering = ClusteringNN(readout, zero_sup_threshold=zero_sup_threshold, num_neighbors=6,
pos_recon_algorithm="centroid")
equalization_calibration = CalibrateEqualization(num_cols, num_rows, algorithm, pdf)
logger.info("Starting the event loop...")
for _, event in tqdm(enumerate(input_file)):
try:
Expand All @@ -745,7 +756,7 @@ def calibrate_equalization(
if cluster is not None:
equalization_calibration.analyze_cluster(cluster)
logger.info("Calculating the equalization matrix...")
equalization_matrix = equalization_calibration.fit(size)
equalization_matrix = equalization_calibration.fit(size=size)
output_file_path = input_file_path.replace(".h5", "_matrix_equalization.h5")
logger.info(f"Saving equalization matrix to {output_file_path}...")
equalization_matrix.to_hdf5(output_file_path, CalibrationType.EQUALIZATION, False)
Expand Down Expand Up @@ -1030,10 +1041,14 @@ def calibview(
mc_vals_hist.plot(statistics=True)
plt.legend()
# Plot the correlation between the calibrated values and the Monte Carlo truth values.
x = np.linspace(mc_edges[0], mc_edges[-1], 2)
plt.figure("Correlation between calibrated values and Monte Carlo truth values")
plt.scatter(vals, mc_vals[mask.flatten()], alpha=0.1, s=10)
plt.plot(x, x, color="k", linestyle="--")
line = Line()
line.intercept.freeze(0.)
line.fit(vals, mc_vals[mask.flatten()])
label = f"Slope: {line.slope.ufloat()}"
line.plot(label=label, color="black", linestyle="--", )
plt.legend()
plt.xlabel(f"Calibrated values [{unit}]")
plt.ylabel(f"Monte Carlo truth values [{mc_unit}]")
# Plot the residuals distribution.
Expand All @@ -1046,6 +1061,7 @@ def calibview(
plt.legend()
# Plot the pull distribution.
pull = (vals - mc_vals[mask.flatten()]) / matrix.errors.flatten()[mask.flatten()]
pull = pull[~np.isnan(pull) & ~np.isinf(pull)]
pull_edges = np.linspace(np.nanmin(pull), np.nanmax(pull), 100)
pull_hist = Histogram1d(pull_edges, label="Pull", xlabel="Pull").fill(pull)
plt.figure("Pull distribution")
Expand Down