Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion gcsfs/tests/perf/microbenchmarks/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,18 @@ def publish_benchmark_extra_info(
"""
benchmark.extra_info["files"] = params.files
benchmark.extra_info["file_size"] = getattr(params, "file_size_bytes", "N/A")
benchmark.extra_info["chunk_size"] = getattr(params, "chunk_size_bytes", "N/A")

c_size = getattr(params, "chunk_size_bytes", 0)
benchmark.extra_info["chunk_size"] = c_size if c_size > 0 else "N/A"

min_c = getattr(params, "min_chunk_size_bytes", 0)
max_c = getattr(params, "max_chunk_size_bytes", 0)
benchmark.extra_info["min_chunk_size"] = min_c if min_c > 0 else "N/A"
benchmark.extra_info["max_chunk_size"] = max_c if max_c > 0 else "N/A"

prob = getattr(params, "seq_probability", None)
benchmark.extra_info["seq_probability"] = prob if prob is not None else "N/A"

benchmark.extra_info["block_size"] = getattr(params, "block_size_bytes", "N/A")
benchmark.extra_info["pattern"] = getattr(params, "pattern", "N/A")
benchmark.extra_info["runtime"] = getattr(params, "runtime", "N/A")
Expand Down
33 changes: 27 additions & 6 deletions gcsfs/tests/perf/microbenchmarks/read/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,21 @@ def build_cases(self, scenario, common_config):
file_sizes_mb = common_config.get("file_sizes_mb", [128])
chunk_sizes_mb = common_config.get("chunk_sizes_mb", [16])
block_sizes_mb = scenario.get("block_sizes_mb", [5])

pattern = scenario.get("pattern", "seq")
runtime = common_config.get("runtime", 30)
scenario_files = scenario.get("files")

if pattern == "mixed":
seq_probabilities = scenario.get("seq_probabilities", [0.5])
min_chunk_sizes_mb = scenario.get("min_chunk_sizes_mb", [1])
max_chunk_sizes_mb = scenario.get("max_chunk_sizes_mb", [16])
chunk_sizes_mb = [None] # Hide base chunk size
else:
seq_probabilities = [None]
min_chunk_sizes_mb = [None]
max_chunk_sizes_mb = [None]
chunk_sizes_mb = common_config.get("chunk_sizes_mb", [16])

cases = []
param_combinations = itertools.product(
procs_list,
Expand All @@ -28,6 +38,9 @@ def build_cases(self, scenario, common_config):
chunk_sizes_mb,
block_sizes_mb,
bucket_types,
seq_probabilities,
min_chunk_sizes_mb,
max_chunk_sizes_mb,
)

for (
Expand All @@ -37,15 +50,20 @@ def build_cases(self, scenario, common_config):
chunk_size_mb,
block_size_mb,
bucket_type,
seq_prob,
min_chunk_mb,
max_chunk_mb,
) in param_combinations:
bucket_name = self.get_bucket_name(bucket_type)
if not bucket_name:
continue

name = (
f"{scenario['name']}_{procs}procs_{threads}threads_"
f"{size_mb}MB_file_{chunk_size_mb}MB_chunk_{block_size_mb}MB_block_{bucket_type}"
)
name = f"{scenario['name']}_{procs}procs_{threads}threads_{size_mb}MB_file_"
if pattern == "mixed":
name += f"mixed_{seq_prob}seq_{min_chunk_mb}to{max_chunk_mb}MB_chunk_"
else:
name += f"{chunk_size_mb}MB_chunk_"
name += f"{block_size_mb}MB_block_{bucket_type}"

if scenario_files is not None:
files_count = scenario_files
Expand All @@ -61,10 +79,13 @@ def build_cases(self, scenario, common_config):
processes=procs,
files=files_count,
rounds=rounds,
chunk_size_bytes=int(chunk_size_mb * MB),
chunk_size_bytes=int(chunk_size_mb * MB) if chunk_size_mb else 0,
block_size_bytes=int(block_size_mb * MB),
file_size_bytes=int(size_mb * MB),
runtime=runtime,
min_chunk_size_bytes=int(min_chunk_mb * MB) if min_chunk_mb else 0,
max_chunk_size_bytes=int(max_chunk_mb * MB) if max_chunk_mb else 0,
seq_probability=seq_prob,
)
cases.append(params)
return cases
Expand Down
6 changes: 6 additions & 0 deletions gcsfs/tests/perf/microbenchmarks/read/configs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,9 @@ scenarios:
pattern: "rand"
processes: [16, 48]
block_sizes_mb: [0.0625, 16]

- name: "read_mixed_fixed_duration"
pattern: "mixed"
min_chunk_sizes_mb: [0.0625]
max_chunk_sizes_mb: [1024]
seq_probabilities: [0.3, 0.5, 0.7] # 30% sequential, 50% sequential, 70% sequential
9 changes: 9 additions & 0 deletions gcsfs/tests/perf/microbenchmarks/read/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,12 @@ class ReadBenchmarkParameters(IOBenchmarkParameters):

# Time in seconds the test should run.
runtime: int

# Min I/O chunk size
min_chunk_size_bytes: int

# Max I/O chunk size
max_chunk_size_bytes: int

# The sequential probability, the seek probability would be 1 - seq_probability
seq_probability: float
63 changes: 63 additions & 0 deletions gcsfs/tests/perf/microbenchmarks/read/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,38 @@ def _read_op_rand(gcs, file_paths, chunk_size, offsets, runtime):
return total_bytes_read


def _read_op_mixed(
gcs,
file_paths,
min_chunk_size,
max_chunk_size,
seq_probability,
file_size_bytes,
runtime,
):
"""Read files with a mix of sequential reads and random seeks for a fixed duration."""
total_bytes_read = 0
start_time = time.perf_counter()
files_it = itertools.cycle(file_paths)

while time.perf_counter() - start_time < runtime:
path = next(files_it)
with gcs.open(path, "rb") as f:
while time.perf_counter() - start_time < runtime:
chunk_size = random.randint(min_chunk_size, max_chunk_size)
if random.random() >= seq_probability:
max_offset = max(0, file_size_bytes - chunk_size)
f.seek(random.randint(0, max_offset))

data = f.read(chunk_size)

if not data:
break
total_bytes_read += len(data)

return total_bytes_read


def _random_read_worker(gcs, file_paths, chunk_size, offsets, runtime):
"""A worker that reads files from random offsets."""
local_offsets = list(offsets)
Expand Down Expand Up @@ -76,6 +108,17 @@ def test_read_single_threaded(benchmark, gcsfs_benchmark_read, monitor):
op = _random_read_worker
offsets = list(range(0, params.file_size_bytes, params.chunk_size_bytes))
op_args = (gcs, file_paths, params.chunk_size_bytes, offsets, params.runtime)
elif params.pattern == "mixed":
op = _read_op_mixed
op_args = (
gcs,
file_paths,
params.min_chunk_size_bytes,
params.max_chunk_size_bytes,
params.seq_probability,
params.file_size_bytes,
params.runtime,
)

run_single_threaded_fixed_duration(
benchmark, monitor, params, op, op_args, BENCHMARK_GROUP
Expand All @@ -92,6 +135,9 @@ def _process_worker_fixed_duration(
process_data_shared,
index,
runtime,
min_chunk_size=None,
max_chunk_size=None,
seq_probability=None,
):
"""A worker function for each process to read files for a fixed duration."""
with ThreadPoolExecutor(max_workers=threads) as executor:
Expand All @@ -109,6 +155,20 @@ def _process_worker_fixed_duration(
)
for _ in range(threads)
]
elif pattern == "mixed":
futures = [
executor.submit(
_read_op_mixed,
gcs,
file_paths,
min_chunk_size,
max_chunk_size,
seq_probability,
file_size_bytes,
runtime,
)
for _ in range(threads)
]

results = [f.result() for f in futures]
total_bytes = sum(results)
Expand Down Expand Up @@ -140,6 +200,9 @@ def args_builder(gcs_instance, i, shared_arr):
shared_arr,
i,
params.runtime,
params.min_chunk_size_bytes,
params.max_chunk_size_bytes,
params.seq_probability,
)

run_multi_process(
Expand Down
6 changes: 6 additions & 0 deletions gcsfs/tests/perf/microbenchmarks/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ def _create_table_row(row):
row.get("bucket_type", ""),
row.get("group", ""),
row.get("pattern", ""),
row.get("seq_probability", ""),
row.get("files", ""),
row.get("folders", ""),
row.get("threads", ""),
Expand All @@ -260,6 +261,8 @@ def _create_table_row(row):
row.get("target_type", ""),
_format_mb(row.get("file_size", 0)),
_format_mb(row.get("chunk_size", 0)),
_format_mb(row.get("min_chunk_size", 0)),
_format_mb(row.get("max_chunk_size", 0)),
_format_mb(row.get("block_size", 0)),
latency,
_format_mb(throughput_val),
Expand Down Expand Up @@ -289,6 +292,7 @@ def _print_csv_to_shell(report_path):
"Bucket Type",
"Group",
"Pattern",
"Seq Prob",
"Files",
"Folders",
"Threads",
Expand All @@ -297,6 +301,8 @@ def _print_csv_to_shell(report_path):
"Target Type",
"File Size (MiB)",
"Chunk Size (MiB)",
"Min Chunk (MiB)",
"Max Chunk (MiB)",
"Block Size (MiB)",
"Mean Latency (s)",
"Mean Throughput (MiB/s)",
Expand Down
4 changes: 3 additions & 1 deletion gcsfs/tests/perf/microbenchmarks/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def test_create_table_row():
row = {
"bucket_type": "regional",
"group": "read",
"seq_probability": 0.6,
"pattern": "seq",
"files": 1,
"folders": 0,
Expand All @@ -135,7 +136,8 @@ def test_create_table_row():
}
table_row = run._create_table_row(row)
assert table_row[0] == "regional"
assert table_row[9] == "1.00" # file size MB
assert table_row[3] == 0.6
assert table_row[10] == "1.00" # file size MB


@mock.patch("gcsfs.tests.perf.microbenchmarks.run.PrettyTable")
Expand Down
Loading