diff --git a/gcsfs/tests/perf/microbenchmarks/conftest.py b/gcsfs/tests/perf/microbenchmarks/conftest.py index 514e19c58..c981fce75 100644 --- a/gcsfs/tests/perf/microbenchmarks/conftest.py +++ b/gcsfs/tests/perf/microbenchmarks/conftest.py @@ -324,7 +324,18 @@ def publish_benchmark_extra_info( """ benchmark.extra_info["files"] = params.files benchmark.extra_info["file_size"] = getattr(params, "file_size_bytes", "N/A") - benchmark.extra_info["chunk_size"] = getattr(params, "chunk_size_bytes", "N/A") + + c_size = getattr(params, "chunk_size_bytes", 0) + benchmark.extra_info["chunk_size"] = c_size if c_size > 0 else "N/A" + + min_c = getattr(params, "min_chunk_size_bytes", 0) + max_c = getattr(params, "max_chunk_size_bytes", 0) + benchmark.extra_info["min_chunk_size"] = min_c if min_c > 0 else "N/A" + benchmark.extra_info["max_chunk_size"] = max_c if max_c > 0 else "N/A" + + prob = getattr(params, "seq_probability", None) + benchmark.extra_info["seq_probability"] = prob if prob is not None else "N/A" + benchmark.extra_info["block_size"] = getattr(params, "block_size_bytes", "N/A") benchmark.extra_info["pattern"] = getattr(params, "pattern", "N/A") benchmark.extra_info["runtime"] = getattr(params, "runtime", "N/A") diff --git a/gcsfs/tests/perf/microbenchmarks/read/configs.py b/gcsfs/tests/perf/microbenchmarks/read/configs.py index 2352f4dd1..4bcdd17be 100644 --- a/gcsfs/tests/perf/microbenchmarks/read/configs.py +++ b/gcsfs/tests/perf/microbenchmarks/read/configs.py @@ -15,11 +15,21 @@ def build_cases(self, scenario, common_config): file_sizes_mb = common_config.get("file_sizes_mb", [128]) chunk_sizes_mb = common_config.get("chunk_sizes_mb", [16]) block_sizes_mb = scenario.get("block_sizes_mb", [5]) - pattern = scenario.get("pattern", "seq") runtime = common_config.get("runtime", 30) scenario_files = scenario.get("files") + if pattern == "mixed": + seq_probabilities = scenario.get("seq_probabilities", [0.5]) + min_chunk_sizes_mb = scenario.get("min_chunk_sizes_mb", [1]) + max_chunk_sizes_mb = scenario.get("max_chunk_sizes_mb", [16]) + chunk_sizes_mb = [None] # Hide base chunk size + else: + seq_probabilities = [None] + min_chunk_sizes_mb = [None] + max_chunk_sizes_mb = [None] + chunk_sizes_mb = common_config.get("chunk_sizes_mb", [16]) + cases = [] param_combinations = itertools.product( procs_list, @@ -28,6 +38,9 @@ def build_cases(self, scenario, common_config): chunk_sizes_mb, block_sizes_mb, bucket_types, + seq_probabilities, + min_chunk_sizes_mb, + max_chunk_sizes_mb, ) for ( @@ -37,15 +50,20 @@ def build_cases(self, scenario, common_config): chunk_size_mb, block_size_mb, bucket_type, + seq_prob, + min_chunk_mb, + max_chunk_mb, ) in param_combinations: bucket_name = self.get_bucket_name(bucket_type) if not bucket_name: continue - name = ( - f"{scenario['name']}_{procs}procs_{threads}threads_" - f"{size_mb}MB_file_{chunk_size_mb}MB_chunk_{block_size_mb}MB_block_{bucket_type}" - ) + name = f"{scenario['name']}_{procs}procs_{threads}threads_{size_mb}MB_file_" + if pattern == "mixed": + name += f"mixed_{seq_prob}seq_{min_chunk_mb}to{max_chunk_mb}MB_chunk_" + else: + name += f"{chunk_size_mb}MB_chunk_" + name += f"{block_size_mb}MB_block_{bucket_type}" if scenario_files is not None: files_count = scenario_files @@ -61,10 +79,13 @@ def build_cases(self, scenario, common_config): processes=procs, files=files_count, rounds=rounds, - chunk_size_bytes=int(chunk_size_mb * MB), + chunk_size_bytes=int(chunk_size_mb * MB) if chunk_size_mb else 0, block_size_bytes=int(block_size_mb * MB), file_size_bytes=int(size_mb * MB), runtime=runtime, + min_chunk_size_bytes=int(min_chunk_mb * MB) if min_chunk_mb else 0, + max_chunk_size_bytes=int(max_chunk_mb * MB) if max_chunk_mb else 0, + seq_probability=seq_prob, ) cases.append(params) return cases diff --git a/gcsfs/tests/perf/microbenchmarks/read/configs.yaml b/gcsfs/tests/perf/microbenchmarks/read/configs.yaml index 215f51d82..af4f7e709 100644 --- a/gcsfs/tests/perf/microbenchmarks/read/configs.yaml +++ b/gcsfs/tests/perf/microbenchmarks/read/configs.yaml @@ -28,3 +28,9 @@ scenarios: pattern: "rand" processes: [16, 48] block_sizes_mb: [0.0625, 16] + + - name: "read_mixed_fixed_duration" + pattern: "mixed" + min_chunk_sizes_mb: [0.0625] + max_chunk_sizes_mb: [1024] + seq_probabilities: [0.3, 0.5, 0.7] # 30% sequential, 50% sequential, 70% sequential diff --git a/gcsfs/tests/perf/microbenchmarks/read/parameters.py b/gcsfs/tests/perf/microbenchmarks/read/parameters.py index 8c6a3151b..b5ffffeb0 100644 --- a/gcsfs/tests/perf/microbenchmarks/read/parameters.py +++ b/gcsfs/tests/perf/microbenchmarks/read/parameters.py @@ -17,3 +17,12 @@ class ReadBenchmarkParameters(IOBenchmarkParameters): # Time in seconds the test should run. runtime: int + + # Min I/O chunk size + min_chunk_size_bytes: int + + # Max I/O chunk size + max_chunk_size_bytes: int + + # The sequential probability, the seek probability would be 1 - seq_probability + seq_probability: float diff --git a/gcsfs/tests/perf/microbenchmarks/read/test_read.py b/gcsfs/tests/perf/microbenchmarks/read/test_read.py index 4b472b9fb..53b0f59da 100644 --- a/gcsfs/tests/perf/microbenchmarks/read/test_read.py +++ b/gcsfs/tests/perf/microbenchmarks/read/test_read.py @@ -48,6 +48,38 @@ def _read_op_rand(gcs, file_paths, chunk_size, offsets, runtime): return total_bytes_read +def _read_op_mixed( + gcs, + file_paths, + min_chunk_size, + max_chunk_size, + seq_probability, + file_size_bytes, + runtime, +): + """Read files with a mix of sequential reads and random seeks for a fixed duration.""" + total_bytes_read = 0 + start_time = time.perf_counter() + files_it = itertools.cycle(file_paths) + + while time.perf_counter() - start_time < runtime: + path = next(files_it) + with gcs.open(path, "rb") as f: + while time.perf_counter() - start_time < runtime: + chunk_size = random.randint(min_chunk_size, max_chunk_size) + if random.random() >= seq_probability: + max_offset = max(0, file_size_bytes - chunk_size) + f.seek(random.randint(0, max_offset)) + + data = f.read(chunk_size) + + if not data: + break + total_bytes_read += len(data) + + return total_bytes_read + + def _random_read_worker(gcs, file_paths, chunk_size, offsets, runtime): """A worker that reads files from random offsets.""" local_offsets = list(offsets) @@ -76,6 +108,17 @@ def test_read_single_threaded(benchmark, gcsfs_benchmark_read, monitor): op = _random_read_worker offsets = list(range(0, params.file_size_bytes, params.chunk_size_bytes)) op_args = (gcs, file_paths, params.chunk_size_bytes, offsets, params.runtime) + elif params.pattern == "mixed": + op = _read_op_mixed + op_args = ( + gcs, + file_paths, + params.min_chunk_size_bytes, + params.max_chunk_size_bytes, + params.seq_probability, + params.file_size_bytes, + params.runtime, + ) run_single_threaded_fixed_duration( benchmark, monitor, params, op, op_args, BENCHMARK_GROUP @@ -92,6 +135,9 @@ def _process_worker_fixed_duration( process_data_shared, index, runtime, + min_chunk_size=None, + max_chunk_size=None, + seq_probability=None, ): """A worker function for each process to read files for a fixed duration.""" with ThreadPoolExecutor(max_workers=threads) as executor: @@ -109,6 +155,20 @@ def _process_worker_fixed_duration( ) for _ in range(threads) ] + elif pattern == "mixed": + futures = [ + executor.submit( + _read_op_mixed, + gcs, + file_paths, + min_chunk_size, + max_chunk_size, + seq_probability, + file_size_bytes, + runtime, + ) + for _ in range(threads) + ] results = [f.result() for f in futures] total_bytes = sum(results) @@ -140,6 +200,9 @@ def args_builder(gcs_instance, i, shared_arr): shared_arr, i, params.runtime, + params.min_chunk_size_bytes, + params.max_chunk_size_bytes, + params.seq_probability, ) run_multi_process( diff --git a/gcsfs/tests/perf/microbenchmarks/run.py b/gcsfs/tests/perf/microbenchmarks/run.py index 4d4ab2d1d..2214aad71 100644 --- a/gcsfs/tests/perf/microbenchmarks/run.py +++ b/gcsfs/tests/perf/microbenchmarks/run.py @@ -252,6 +252,7 @@ def _create_table_row(row): row.get("bucket_type", ""), row.get("group", ""), row.get("pattern", ""), + row.get("seq_probability", ""), row.get("files", ""), row.get("folders", ""), row.get("threads", ""), @@ -260,6 +261,8 @@ def _create_table_row(row): row.get("target_type", ""), _format_mb(row.get("file_size", 0)), _format_mb(row.get("chunk_size", 0)), + _format_mb(row.get("min_chunk_size", 0)), + _format_mb(row.get("max_chunk_size", 0)), _format_mb(row.get("block_size", 0)), latency, _format_mb(throughput_val), @@ -289,6 +292,7 @@ def _print_csv_to_shell(report_path): "Bucket Type", "Group", "Pattern", + "Seq Prob", "Files", "Folders", "Threads", @@ -297,6 +301,8 @@ def _print_csv_to_shell(report_path): "Target Type", "File Size (MiB)", "Chunk Size (MiB)", + "Min Chunk (MiB)", + "Max Chunk (MiB)", "Block Size (MiB)", "Mean Latency (s)", "Mean Throughput (MiB/s)", diff --git a/gcsfs/tests/perf/microbenchmarks/test_run.py b/gcsfs/tests/perf/microbenchmarks/test_run.py index 489f3dbff..00f93c738 100644 --- a/gcsfs/tests/perf/microbenchmarks/test_run.py +++ b/gcsfs/tests/perf/microbenchmarks/test_run.py @@ -117,6 +117,7 @@ def test_create_table_row(): row = { "bucket_type": "regional", "group": "read", + "seq_probability": 0.6, "pattern": "seq", "files": 1, "folders": 0, @@ -135,7 +136,8 @@ def test_create_table_row(): } table_row = run._create_table_row(row) assert table_row[0] == "regional" - assert table_row[9] == "1.00" # file size MB + assert table_row[3] == 0.6 + assert table_row[10] == "1.00" # file size MB @mock.patch("gcsfs.tests.perf.microbenchmarks.run.PrettyTable")