diff options
| author | kj_sh604 | 2026-02-11 11:57:38 -0500 |
|---|---|---|
| committer | kj_sh604 | 2026-02-11 11:57:38 -0500 |
| commit | d060ff5ca58e87e6dd1c6d93b97adbfd45f57f9b (patch) | |
| tree | aded1017253e8aa5b5c2996caf30630f8ed89c1f /src | |
| parent | aa845e20842ecf93f7e69b03097dbc6508a70fc8 (diff) | |
Diffstat (limited to 'src')
| -rw-r--r-- | src/.gitignore | 6 | ||||
| -rw-r--r-- | src/config.py | 23 | ||||
| -rw-r--r-- | src/loadtest.py | 504 | ||||
| -rw-r--r-- | src/requirements.txt | 1 | ||||
| -rw-r--r-- | src/scenarios/__init__.py | 1 | ||||
| -rw-r--r-- | src/scenarios/base.py | 82 | ||||
| -rw-r--r-- | src/scenarios/example.py | 72 |
7 files changed, 689 insertions, 0 deletions
diff --git a/src/.gitignore b/src/.gitignore new file mode 100644 index 0000000..6f29642 --- /dev/null +++ b/src/.gitignore @@ -0,0 +1,6 @@ +__pycache__/ +*.pyc +.venv/ +venv/ +logs/* +results/*
\ No newline at end of file diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..eadda3f --- /dev/null +++ b/src/config.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 + +# yup, super simple config file +BASE_URL = "https://localhost:8080" +ACTION_TIMEOUT = 10000 # milliseconds +BROWSER_TYPE = "firefox" # chromium, firefox, webkit +CONCURRENT_USERS = 3 +HEADLESS = True # headless mode = no browser UI +ITERATIONS_PER_USER = 3 +LOG_FILE = "logs/logs.log" +LOG_LEVEL = "INFO" # DEBUG, INFO, WARNING, ERROR +LOG_TO_FILE = True +NAVIGATION_TIMEOUT = 30000 # milliseconds +RESULTS_FILE = "results/results.json" +THINK_TIME = 1.0 # seconds to sleep between iterations +VIEWPORT_HEIGHT = 720 # viewport height for each virtual user +VIEWPORT_WIDTH = 1280 # viewport width for each virtual user + +# scenario configuration +# - use default example.py = "None" +# - single scenario = "path/to/scenario.py" +# - multiple scenarios = ["path/to/scenario.py", "path/to/scenario0.py"] +SCENARIOS = None
\ No newline at end of file diff --git a/src/loadtest.py b/src/loadtest.py new file mode 100644 index 0000000..73807b5 --- /dev/null +++ b/src/loadtest.py @@ -0,0 +1,504 @@ +#!/usr/bin/env python3 + +# this is the entry point. run it with: +# python3 loadtest.py +# +# or with a custom scenario: +# python3 loadtest.py --scenario scenarios/example.py +# +# all configuration lives in config.py — go change stuff +# there before running this. + +import argparse +import importlib.util +import inspect +import json +import logging +import os +import statistics +import sys +import time +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timezone + +from playwright.sync_api import sync_playwright + +import config +from scenarios.base import BaseScenario + +# logging setup +def setup_logging(): + """configure logging based on config.py settings.""" + log_level = getattr(logging, config.LOG_LEVEL.upper(), logging.DEBUG) + + fmt = ( + "%(asctime)s | %(levelname)-8s | %(name)-30s | %(message)s" + ) + datefmt = "%Y-%m-%d %H:%M:%S" + + handlers = [logging.StreamHandler(sys.stdout)] + + if config.LOG_TO_FILE: + log_dir = os.path.dirname(config.LOG_FILE) + if log_dir: + os.makedirs(log_dir, exist_ok=True) + file_handler = logging.FileHandler(config.LOG_FILE, mode="w") + handlers.append(file_handler) + + logging.basicConfig( + level=log_level, + format=fmt, + datefmt=datefmt, + handlers=handlers, + ) + + # quiet down some noisy loggers + logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("asyncio").setLevel(logging.WARNING) + + +logger = logging.getLogger("loadtest") + + +# scenario loader +def load_scenario(scenario_path): + """ + dynamically load a scenario from a .py file path. + finds the first class that subclasses BaseScenario. + """ + logger.info(f"loading scenario from: {scenario_path}") + + if not os.path.isfile(scenario_path): + logger.error(f"scenario file not found: {scenario_path}") + sys.exit(1) + + spec = importlib.util.spec_from_file_location("scenario_module", scenario_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # find the first BaseScenario subclass in the module + scenario_class = None + for _name, obj in inspect.getmembers(module, inspect.isclass): + if issubclass(obj, BaseScenario) and obj is not BaseScenario: + scenario_class = obj + break + + if scenario_class is None: + logger.error( + f"no BaseScenario subclass found in {scenario_path}. " + "make sure your scenario class inherits from scenarios.base.BaseScenario." + ) + sys.exit(1) + + logger.info(f"loaded scenario: {scenario_class.name} ({scenario_class.__name__})") + return scenario_class + + +# single virtual user runner +def run_virtual_user(user_id, scenario_class, results_list): + """ + runs a single virtual user through the scenario. + each virtual user gets its own browser context. + """ + user_logger = logging.getLogger(f"loadtest.user-{user_id:03d}") + user_logger.info("virtual user starting up...") + + user_results = { + "user_id": user_id, + "iterations": [], + "errors": [], + } + + try: + with sync_playwright() as p: + # launch browser + browser_launcher = getattr(p, config.BROWSER_TYPE) + browser = browser_launcher.launch(headless=config.HEADLESS) + + user_logger.info( + f"browser launched: {config.BROWSER_TYPE} " + f"(headless={config.HEADLESS})" + ) + + context = browser.new_context( + base_url=config.BASE_URL, + viewport={ + "width": config.VIEWPORT_WIDTH, + "height": config.VIEWPORT_HEIGHT, + }, + ) + context.set_default_navigation_timeout(config.NAVIGATION_TIMEOUT) + context.set_default_timeout(config.ACTION_TIMEOUT) + + page = context.new_page() + + # instantiate the scenario + scenario = scenario_class() + + # wire up request/response hooks + page.on("response", scenario.on_response) + page.on("request", scenario.on_request) + + user_logger.info( + f"starting {config.ITERATIONS_PER_USER} iteration(s)..." + ) + + for iteration in range(1, config.ITERATIONS_PER_USER + 1): + iter_logger = logging.getLogger( + f"loadtest.user-{user_id:03d}.iter-{iteration}" + ) + iter_logger.info(f"--- iteration {iteration} start ---") + + iter_result = { + "iteration": iteration, + "start_time": None, + "end_time": None, + "duration_ms": None, + "success": False, + "error": None, + } + + start = time.perf_counter() + iter_result["start_time"] = datetime.now(timezone.utc).isoformat() + + try: + scenario.setup(page) + scenario.run(page) + scenario.teardown(page) + iter_result["success"] = True + iter_logger.info("iteration completed successfully.") + except Exception as e: + iter_result["error"] = str(e) + user_results["errors"].append( + { + "iteration": iteration, + "error": str(e), + "traceback": traceback.format_exc(), + } + ) + iter_logger.error(f"iteration failed: {e}") + iter_logger.debug(traceback.format_exc()) + finally: + end = time.perf_counter() + duration_ms = round((end - start) * 1000, 2) + iter_result["end_time"] = datetime.now(timezone.utc).isoformat() + iter_result["duration_ms"] = duration_ms + iter_logger.info(f"duration: {duration_ms}ms") + user_results["iterations"].append(iter_result) + + # think time between iterations + if iteration < config.ITERATIONS_PER_USER and config.THINK_TIME > 0: + iter_logger.debug( + f"think time: sleeping {config.THINK_TIME}s..." + ) + time.sleep(config.THINK_TIME) + + # cleanup + page.close() + context.close() + browser.close() + user_logger.info("virtual user finished, browser closed.") + + except Exception as e: + user_logger.error(f"virtual user crashed: {e}") + user_logger.debug(traceback.format_exc()) + user_results["errors"].append( + { + "iteration": 0, + "error": str(e), + "traceback": traceback.format_exc(), + } + ) + + results_list.append(user_results) + return user_results + + +# results summary generaton +def summarize_results(all_results): + """crunch the numbers and print a summary.""" + total_iterations = 0 + successful = 0 + failed = 0 + durations = [] + + for user in all_results: + for iteration in user["iterations"]: + total_iterations += 1 + if iteration["success"]: + successful += 1 + durations.append(iteration["duration_ms"]) + else: + failed += 1 + + summary = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "config": { + "base_url": config.BASE_URL, + "concurrent_users": config.CONCURRENT_USERS, + "iterations_per_user": config.ITERATIONS_PER_USER, + "think_time": config.THINK_TIME, + "browser_type": config.BROWSER_TYPE, + "headless": config.HEADLESS, + }, + "totals": { + "total_iterations": total_iterations, + "successful": successful, + "failed": failed, + "success_rate": ( + round(successful / total_iterations * 100, 2) + if total_iterations > 0 + else 0 + ), + }, + "timing_ms": {}, + "per_user": all_results, + } + + if durations: + summary["timing_ms"] = { + "min": round(min(durations), 2), + "max": round(max(durations), 2), + "mean": round(statistics.mean(durations), 2), + "median": round(statistics.median(durations), 2), + "stdev": ( + round(statistics.stdev(durations), 2) + if len(durations) > 1 + else 0 + ), + "p90": round( + sorted(durations)[int(len(durations) * 0.9) - 1], 2 + ) if len(durations) >= 2 else round(durations[0], 2), + "p95": round( + sorted(durations)[int(len(durations) * 0.95) - 1], 2 + ) if len(durations) >= 2 else round(durations[0], 2), + } + + return summary + + +# stdout printing +def print_summary(summary): + """pretty print the results to stdout.""" + logger.info("") + logger.info("=" * 60) + logger.info(" LOAD TEST RESULTS") + logger.info("=" * 60) + logger.info(f" target: {summary['config']['base_url']}") + logger.info(f" browser: {summary['config']['browser_type']}") + logger.info(f" concurrent users: {summary['config']['concurrent_users']}") + logger.info(f" iterations/user: {summary['config']['iterations_per_user']}") + logger.info(f" think time: {summary['config']['think_time']}s") + logger.info("-" * 60) + logger.info(f" total iterations: {summary['totals']['total_iterations']}") + logger.info(f" successful: {summary['totals']['successful']}") + logger.info(f" failed: {summary['totals']['failed']}") + logger.info(f" success rate: {summary['totals']['success_rate']}%") + logger.info("-" * 60) + + if summary["timing_ms"]: + t = summary["timing_ms"] + logger.info(f" min: {t['min']}ms") + logger.info(f" max: {t['max']}ms") + logger.info(f" mean: {t['mean']}ms") + logger.info(f" median: {t['median']}ms") + logger.info(f" stdev: {t['stdev']}ms") + logger.info(f" p90: {t['p90']}ms") + logger.info(f" p95: {t['p95']}ms") + + logger.info("=" * 60) + + # log any errors + total_errors = sum(len(u["errors"]) for u in summary["per_user"]) + if total_errors > 0: + logger.warning(f" total errors: {total_errors}") + for user in summary["per_user"]: + for err in user["errors"]: + logger.warning( + f" user {user['user_id']} / iter {err['iteration']}: " + f"{err['error']}" + ) + logger.info("=" * 60) + + + +def main(): + parser = argparse.ArgumentParser( + description="pw-loadtest: playwright-based load testing framework", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=( + "examples:\n" + " python3 loadtest.py\n" + " python3 loadtest.py --scenario scenarios/example.py\n" + " python3 loadtest.py --url http://myapp:3000 --users 10 --iterations 5\n" + ), + ) + parser.add_argument( + "--scenario", + default=None, + help="path to the scenario .py file (overrides config.py)", + ) + parser.add_argument( + "--url", + default=None, + help="override BASE_URL from config.py", + ) + parser.add_argument( + "--users", + type=int, + default=None, + help="override CONCURRENT_USERS from config.py", + ) + parser.add_argument( + "--iterations", + type=int, + default=None, + help="override ITERATIONS_PER_USER from config.py", + ) + parser.add_argument( + "--headed", + action="store_true", + default=False, + help="run browsers visually (overrides HEADLESS=True in config)", + ) + + args = parser.parse_args() + + # apply CLI overrides + if args.url: + config.BASE_URL = args.url + if args.users: + config.CONCURRENT_USERS = args.users + if args.iterations: + config.ITERATIONS_PER_USER = args.iterations + if args.headed: + config.HEADLESS = False + + setup_logging() + + # determine which scenario(s) to run + # priority: CLI arg > config.SCENARIOS > default + scenario_paths = [] + + if args.scenario: + scenario_paths = [args.scenario] + elif config.SCENARIOS is not None: + if isinstance(config.SCENARIOS, str): + scenario_paths = [config.SCENARIOS] + elif isinstance(config.SCENARIOS, list): + scenario_paths = config.SCENARIOS + else: + logger.error( + "config.SCENARIOS must be None, a string, or a list of strings" + ) + sys.exit(1) + else: + # default + scenario_paths = ["scenarios/example.py"] + + logger.info("=" * 60) + logger.info(" pw-loadtest starting up...") + logger.info("=" * 60) + logger.info(f" target URL: {config.BASE_URL}") + logger.info(f" concurrent users: {config.CONCURRENT_USERS}") + logger.info(f" iterations/user: {config.ITERATIONS_PER_USER}") + logger.info(f" think time: {config.THINK_TIME}s") + logger.info(f" browser: {config.BROWSER_TYPE}") + logger.info(f" headless: {config.HEADLESS}") + logger.info(f" log level: {config.LOG_LEVEL}") + logger.info(f" scenario(s): {len(scenario_paths)}") + logger.info("=" * 60) + + # run each scenario + all_scenario_results = [] + overall_start = time.perf_counter() + + for scenario_path in scenario_paths: + logger.info("") + logger.info("=" * 60) + logger.info(f" running scenario: {scenario_path}") + logger.info("=" * 60) + + # load the scenario + scenario_class = load_scenario(scenario_path) + + # run virtual users concurrently + scenario_results = [] + + logger.info( + f"launching {config.CONCURRENT_USERS} virtual user(s) concurrently..." + ) + + with ThreadPoolExecutor(max_workers=config.CONCURRENT_USERS) as executor: + futures = { + executor.submit( + run_virtual_user, user_id, scenario_class, scenario_results + ): user_id + for user_id in range(1, config.CONCURRENT_USERS + 1) + } + + for future in as_completed(futures): + user_id = futures[future] + try: + future.result() + except Exception as e: + logger.error(f"user {user_id} thread failed: {e}") + + # store results for this scenario + all_scenario_results.append({ + "scenario_path": scenario_path, + "scenario_name": scenario_class.name, + "results": scenario_results, + }) + + overall_end = time.perf_counter() + overall_duration = round((overall_end - overall_start) * 1000, 2) + logger.info("") + logger.info(f"all scenarios finished. total wall time: {overall_duration}ms") + + # summarize and print results for each scenario + logger.info("") + for scenario_data in all_scenario_results: + logger.info("=" * 60) + logger.info(f" scenario: {scenario_data['scenario_name']}") + logger.info(f" path: {scenario_data['scenario_path']}") + logger.info("=" * 60) + summary = summarize_results(scenario_data["results"]) + print_summary(summary) + + # combine all results for final output + all_results = [] + for scenario_data in all_scenario_results: + all_results.extend(scenario_data["results"]) + + final_summary = summarize_results(all_results) + final_summary["wall_time_ms"] = overall_duration + final_summary["scenarios"] = [ + { + "path": s["scenario_path"], + "name": s["scenario_name"], + } + for s in all_scenario_results + ] + + # write results to file + results_path = config.RESULTS_FILE + results_dir = os.path.dirname(results_path) + if results_dir: + os.makedirs(results_dir, exist_ok=True) + with open(results_path, "w") as f: + json.dump(final_summary, f, indent=2, default=str) + logger.info("") + logger.info(f"detailed results written to: {results_path}") + + # exit with non-zero if any failures + if final_summary["totals"]["failed"] > 0: + logger.warning("some iterations failed — exiting with code 1") + sys.exit(1) + + logger.info("all done. 🤙") + + +if __name__ == "__main__": + main()
\ No newline at end of file diff --git a/src/requirements.txt b/src/requirements.txt new file mode 100644 index 0000000..4777061 --- /dev/null +++ b/src/requirements.txt @@ -0,0 +1 @@ +playwright>=1.40.0 diff --git a/src/scenarios/__init__.py b/src/scenarios/__init__.py new file mode 100644 index 0000000..9f5de1c --- /dev/null +++ b/src/scenarios/__init__.py @@ -0,0 +1 @@ +# scenarios/__init__.py diff --git a/src/scenarios/base.py b/src/scenarios/base.py new file mode 100644 index 0000000..f15a82f --- /dev/null +++ b/src/scenarios/base.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 + +# scenarios/base.py - base scenario class + +# inherit from BaseScenario and override the `run()` method +# to define your own load test behavior. that's it. + +import logging +import time + +logger = logging.getLogger("loadtest.scenario") + + +class BaseScenario: + """ + base class for all load test scenarios. + + subclass this and implement `run(page)` to define what + each virtual user does during the load test. the `page` + argument is a playwright Page object — you can navigate, + click, scrape, fill forms, whatever you want. + + optionally override `setup(page)` and `teardown(page)` + for per-iteration init and cleanup. + """ + + name = "base" + + def setup(self, page): + """ + called BEFORE each iteration of `run()`. + override this if you need to do login, seed data, etc. + """ + pass + + def run(self, page): + """ + the main scenario logic. this is what each virtual + user will execute on every iteration. + + override this method in your subclass. + + Args: + page: a playwright Page object. + """ + raise NotImplementedError( + "you need to implement the run() method in your scenario. " + "check out scenarios/example.py for reference." + ) + + def teardown(self, page): + """ + called AFTER each iteration of `run()`. + override this for cleanup, logging out, etc. + """ + pass + + def on_response(self, response): + """ + optional hook — called on every HTTP response the page + receives during the scenario. useful for logging API + calls, checking status codes, etc. + + override if you want response-level visibility. + + Args: + response: a playwright Response object. + """ + pass + + def on_request(self, request): + """ + optional hook — called on every HTTP request the page + makes during the scenario. useful for logging outgoing + requests. + + override if you want request-level visibility. + + Args: + request: a playwright Request object. + """ + pass diff --git a/src/scenarios/example.py b/src/scenarios/example.py new file mode 100644 index 0000000..fd555ff --- /dev/null +++ b/src/scenarios/example.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 + +# this is an example that shows how to write your own +# scenario. copy this file, rename it, and modify the run() +# method to do whatever you want. + +# the run() method receives a playwright Page object. +# you can navigate, click, scrape text, fill forms, +# take screenshots, assert content — the full playwright +# API is at your disposal. + +import logging + +from scenarios.base import BaseScenario + +logger = logging.getLogger("loadtest.scenario.example") + + +class ExampleScenario(BaseScenario): + """ + example scenario that demonstrates: + - navigating to the target URL + - scraping page content + - clicking links / navigating around + - filling out a form + - grabbing data from the page + + modify this to fit your actual load test needs. + """ + + name = "example" + + def setup(self, page): + logger.debug("[setup] preparing for iteration...") + + def run(self, page): + """ + this is where the action happens. each virtual user + will execute this method once per iteration. + """ + + logger.info("[run] navigating to base URL...") + page.goto("/") + page.wait_for_load_state("networkidle") + + title = page.title() + logger.info(f"[run] page title: {title}") + + # python playwright code goes here - click links, fill forms, scrape content, etc. + + logger.info("[run] scenario iteration complete.") + + def teardown(self, page): + logger.debug("[teardown] cleaning up after iteration...") + + def on_response(self, response): + """ + log every HTTP response for visibility. + you can filter by URL pattern, status code, etc. + """ + status = response.status + url = response.url + if status >= 400: + logger.warning(f"[response] {status} <- {url}") + else: + logger.debug(f"[response] {status} <- {url}") + + def on_request(self, request): + """ + log outgoing requests. + """ + logger.debug(f"[request] {request.method} -> {request.url}")
\ No newline at end of file |
