Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 20 additions & 70 deletions bzt/modules/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,13 +580,6 @@ def __init__(self):
self.track_percentiles = [0.0, 50.0, 90.0, 95.0, 99.0, 99.9, 100.0]
self.listeners = []

self.buffer_scale_idx = None # string value of the most interesting percentile (e.q. '90.0')
self.buffer_multiplier = 2 # how many sequential samples we want to get before aggregation (approximately)

self.buffer_len = 2 # how many data points we want to collect before aggregation
self.min_buffer_len = 2 # small buffer is more responsive but tends to loose data
self.max_buffer_len = float('inf') # buffer_len value can be changed during runtime.

self.histogram_max = 1.0
self.known_errors = fuzzyset.FuzzySet(use_levenshtein=True)
self.max_error_count = 100
Expand Down Expand Up @@ -691,7 +684,7 @@ def __init__(self, perc_levels=None):
self.ignored_labels = []
self.log = logging.getLogger(self.__class__.__name__)
self.buffer = {}
self.min_timestamp = 0 # last aggregated timestamp, older data is obsolete and must be fixed
self.min_buffer_len = 1
if perc_levels is not None:
self.track_percentiles = perc_levels

Expand Down Expand Up @@ -731,10 +724,6 @@ def __process_readers(self, final_pass=False):
if any([label.startswith(ignore) for ignore in self.ignored_labels]):
continue

if t_stamp < self.min_timestamp:
self.log.debug("Putting sample %s into %s", t_stamp, self.min_timestamp)
t_stamp = self.min_timestamp

if r_time < 0:
self.log.warning("Negative response time reported by tool, resetting it to zero")
r_time = 0
Expand Down Expand Up @@ -809,32 +798,15 @@ def _calculate_datapoints(self, final_pass=False):
:type final_pass: bool
:rtype: DataPoint
"""
if final_pass or len(self.buffer) < self.buffer_len * 10: # safety valve to preserve RAM
self.__process_readers(final_pass)
else:
self.log.debug("Skipped reading new data, we have enough in the buffer")

self.__process_readers(final_pass)
self.log.debug("Buffer len: %s; Known errors count: %s", len(self.buffer), len(self.known_errors))

if not self.buffer:
return

if self.cumulative and self.track_percentiles and self.buffer_scale_idx is not None:
old_len = self.buffer_len

# choose average timing of the most interesting percentile
chosen_timing = self.cumulative[''][KPISet.PERCENTILES][self.buffer_scale_idx]

# and calculate new buffer_len based on current speed of data getting
self.buffer_len = round(chosen_timing * self.buffer_multiplier)
self.buffer_len = max(self.min_buffer_len, self.buffer_len)
self.buffer_len = min(self.max_buffer_len, self.buffer_len)
if self.buffer_len != old_len:
self.log.info("Changed data analysis delay to %ds", self.buffer_len)

timestamps = sorted(self.buffer.keys())
while final_pass or (timestamps[-1] >= (timestamps[0] + self.buffer_len)):
while final_pass or (timestamps[-1] >= (timestamps[0] + self.min_buffer_len)):
timestamp = timestamps.pop(0)
self.min_timestamp = timestamp + 1
self.log.debug("Aggregating: %s, %s in buffer", timestamp, len(self.buffer))
samples = self.buffer.pop(timestamp)
datapoint = self.__get_new_datapoint(timestamp)
Expand Down Expand Up @@ -878,9 +850,12 @@ def __init__(self):
self.ignored_labels = ["ignore"]
self.underlings = []
self.buffer = {}

self.min_buffer_len = 2
self.histogram_max = 5.0
self._sticky_concurrencies = {}
self.first_timestamp = None # first aggregated data timestamp, just for exclude_ramp_up feature
self.min_timestamp = None # last aggregated timestamp
self.first_timestamp = None # first data timestamp, just for exclude_ramp_up feature

def datapoints(self, final_pass=False):
"""
Expand All @@ -893,6 +868,7 @@ def datapoints(self, final_pass=False):

# exclude ramp-up block
timestamp = datapoint[DataPoint.TIMESTAMP]
self.min_timestamp = timestamp + 1
if not self.first_timestamp:
self.first_timestamp = timestamp

Expand Down Expand Up @@ -960,30 +936,6 @@ def prepare(self):

self.ignored_labels = self.settings.get("ignore-labels", self.ignored_labels)
self.generalize_labels = self.settings.get("generalize-labels", self.generalize_labels)

self.min_buffer_len = dehumanize_time(self.settings.get("min-buffer-len", self.min_buffer_len))

max_buffer_len = self.settings.get("max-buffer-len", self.max_buffer_len)
try:
self.max_buffer_len = dehumanize_time(max_buffer_len)
except TaurusInternalException as exc:
self.log.debug("Exception in dehumanize_time(%s): %s", max_buffer_len, exc)
raise TaurusConfigError("Wrong 'max-buffer-len' value: %s" % max_buffer_len)

self.buffer_multiplier = self.settings.get("buffer-multiplier", self.buffer_multiplier)

count = len(self.track_percentiles)
if count == 1:
self.buffer_scale_idx = str(float(self.track_percentiles[0]))
if count > 1:
percentile = self.settings.get("buffer-scale-choice", 0.5)
percentiles = [i / (count - 1.0) for i in range(count)]
distances = [abs(percentile - percentiles[i]) for i in range(count)]
index_position = distances.index(min(distances))
self.buffer_scale_idx = str(float(self.track_percentiles[index_position]))

debug_str = 'Buffer scaling setup: percentile %s from %s selected'
self.log.debug(debug_str, self.buffer_scale_idx, self.track_percentiles)
self.histogram_max = dehumanize_time(self.settings.get("histogram-initial", self.histogram_max))
self.max_error_count = self.settings.get("max-error-variety", self.max_error_count)

Expand All @@ -1002,10 +954,7 @@ def add_underling(self, underling):
"""
underling.track_percentiles = self.track_percentiles
underling.ignored_labels = self.ignored_labels
underling.min_buffer_len = self.min_buffer_len
underling.max_buffer_len = self.max_buffer_len
underling.buffer_multiplier = self.buffer_multiplier
underling.buffer_scale_idx = self.buffer_scale_idx
underling.buffer_len = 1
underling.histogram_max = self.histogram_max

underling.max_error_count = self.max_error_count
Expand Down Expand Up @@ -1050,14 +999,15 @@ def _process_underlings(self, final_pass):
break

def _put_into_buffer(self, point):
tstamp = point[DataPoint.TIMESTAMP]
if self.buffer:
mints = min(self.buffer.keys())
if tstamp < mints:
self.log.debug("Putting datapoint %s into %s", tstamp, mints)
point[DataPoint.TIMESTAMP] = mints
tstamp = mints
self.buffer.setdefault(tstamp, []).append(point)
timestamp = point[DataPoint.TIMESTAMP]
if self.min_timestamp:
if timestamp < self.min_timestamp:
self.log.debug("Putting datapoint %s into %s", timestamp, self.min_timestamp)
point[DataPoint.TIMESTAMP] = self.min_timestamp
timestamp = self.min_timestamp
else:
self.min_timestamp = timestamp
self.buffer.setdefault(timestamp, []).append(point)

def _is_ramp_up(self, ts):
ramp_ups = [0]
Expand All @@ -1081,7 +1031,7 @@ def _calculate_datapoints(self, final_pass=False):
return

timestamps = sorted(self.buffer.keys())
while timestamps and (final_pass or (timestamps[-1] >= timestamps[0] + self.buffer_len)):
while timestamps and (final_pass or (timestamps[-1] >= timestamps[0] + self.min_buffer_len)):
tstamp = timestamps.pop(0)
self.log.debug("Merging into %s", tstamp)
points_to_consolidate = self.buffer.pop(tstamp)
Expand Down