Source code for src.validation.nodes
import time
from os.path import exists
import typer
from rich.console import Console
from typing_extensions import Annotated
# Typer application CLI
console = Console()
app = typer.Typer(
context_settings={"help_option_names": ["-h", "--help"]},
rich_markup_mode="markdown",
)
[docs]
def check_log(log_path: str, timeout: int = 30):
timeout_counter: float = 0
while exists(log_path) is False:
if timeout_counter > timeout:
raise ValueError(f"Timeout reached: {timeout}")
time.sleep(0.25)
timeout_counter += 0.25
[docs]
def check_nodes(log_path: str, requested_nodes: int, timeout: int = 300):
active_nodes: int = 0
timeout_counter: float = 0
while active_nodes < requested_nodes:
if timeout_counter > timeout:
raise ValueError(f"Timeout reached: {timeout}")
with open(log_path) as f:
content = f.read()
active_nodes = content.count("Ray runtime started")
time.sleep(0.25)
timeout_counter += 0.25
[docs]
@app.command(help="**Verify** Ray nodes are up and running.")
def verify_active_nodes(
log_path: Annotated[str, typer.Argument()],
requested_nodes: Annotated[int, typer.Argument()],
):
"""Read the logs generated by the Slurm scheduler to verify that runtime executed successfully.
Args:
log_path Annotated[str, typer.Argument()]: Slurm log path.
requested_nodes Annotated[int, typer.Argument()]: Number of requested nodes.
timeout: Annotated[int, typer.Option(help="timeout of job")]: Timeout in seconds.
"""
check_log(log_path)
check_nodes(log_path, requested_nodes)
console.print(f"")
console.print(f":party_popper: Successfully started #{requested_nodes} nodes")
if __name__ == "__main__":
app()