Source code for src.validation.nodes

import time
from os.path import exists

import typer
from rich.console import Console
from typing_extensions import Annotated


# Typer application CLI
console = Console()
app = typer.Typer(
    context_settings={"help_option_names": ["-h", "--help"]},
    rich_markup_mode="markdown",
)


[docs] def check_log(log_path: str, timeout: int = 30): timeout_counter: float = 0 while exists(log_path) is False: if timeout_counter > timeout: raise ValueError(f"Timeout reached: {timeout}") time.sleep(0.25) timeout_counter += 0.25
[docs] def check_nodes(log_path: str, requested_nodes: int, timeout: int = 300): active_nodes: int = 0 timeout_counter: float = 0 while active_nodes < requested_nodes: if timeout_counter > timeout: raise ValueError(f"Timeout reached: {timeout}") with open(log_path) as f: content = f.read() active_nodes = content.count("Ray runtime started") time.sleep(0.25) timeout_counter += 0.25
[docs] @app.command(help="**Verify** Ray nodes are up and running.") def verify_active_nodes( log_path: Annotated[str, typer.Argument()], requested_nodes: Annotated[int, typer.Argument()], ): """Read the logs generated by the Slurm scheduler to verify that runtime executed successfully. Args: log_path Annotated[str, typer.Argument()]: Slurm log path. requested_nodes Annotated[int, typer.Argument()]: Number of requested nodes. timeout: Annotated[int, typer.Option(help="timeout of job")]: Timeout in seconds. """ check_log(log_path) check_nodes(log_path, requested_nodes) console.print(f"") console.print(f":party_popper: Successfully started #{requested_nodes} nodes")
if __name__ == "__main__": app()