Source code for nanotools.cmd

# -*- coding: utf-8 -*-
"""Command module."""

from __future__ import annotations

import os
import shutil
from pathlib import Path, PosixPath
import subprocess

import attr
from nanotools.base import Base


[docs] @attr.s class Cmd(Base): """``Cmd`` class. Attributes: mpi (string): MPI-launcher command. stdout (string): Standard output. If specified, the standard output is redirected to the file specified by ``stdout`` instead of to screen. path (string): Path to the nanodcal+ and rescu+ binaries. """ mpi: str = attr.ib( default="", validator=attr.validators.instance_of(str), ) path: list[str] = attr.ib( default=[ Path(__file__).parent.parent / "_lib/", Path(__file__).parent.parent / "build/src/rescu_calculators", Path(__file__).parent.parent / "build/src/dcal_calculators", ] ) cmd: str = attr.ib(default=None) stdout = attr.ib(default=None) rescuplus_license: Path | None = attr.ib(default=None) nanodcalplus_license: Path | None = attr.ib(default=None) rescuplus_path: Path | None = attr.ib(default=None) nanodcalplus_path: Path | None = attr.ib(default=None) preffered_software: str = attr.ib(default="nanodcalplus") def _find_licenses(self): # check environmetn variables if "RESCUPLUS_LICENSE_PATH" in os.environ: self.rescuplus_license = os.environ["RESCUPLUS_LICENSE_PATH"] if "NANODCALPLUS_LICENSE_PATH" in os.environ: self.nanodcalplus_license = os.environ["NANODCALPLUS_LICENSE_PATH"] # check default paths rs_def_path = Path().home() / ".nanoacademic/RESCUPLUS/license.lic" if self.rescuplus_license is None and rs_def_path.exists(): self.rescuplus_license = rs_def_path nd_def_path = Path().home() / ".nanoacademic/NANODCALPLUS/license.lic" if self.nanodcalplus_license is None and nd_def_path.exists(): self.nanodcalplus_license = nd_def_path def _find_binaries(self): for codename in ("rescuplus", "nanodcalplus"): binname = f"{codename}" # search paths for p in self.path: cmd = Path(p) / f"{binname}" if cmd.exists(): setattr(self, f"{codename}_path", cmd) # system path fpath = shutil.which(f"{binname}") if fpath is not None: setattr(self, f"{codename}_path", Path(fpath)) def set_path(self, path): if isinstance(path, PosixPath): tmp = str(path).split(":") elif isinstance(path, str): tmp = path.split(":") else: if not isinstance(path, list): raise Exception( f"Invalid object of type {path.__class__.__name__}, must be a str or PosixPath." ) tmp = path tmp = [Path(p) for p in tmp] for p in tmp: if not p.exists(): raise Exception(f"Failed to set path, directory {p} does not exist.") self.path = tmp
[docs] def get_cmd(self, cmdname): """Generates NanoDCAL+ or RESCU+ command for a binary of type ``cmdname``. For instance, if ``cmdname = scf``, then it searches for ``nanodcalplus`` and ``rescuplus`` in the path. Then it wraps it with an mpi-launcher command or script, which gives ``mpiexec -n 4 rescuplus``. """ # search, in order, RESCU+ or NanoDCAL+ binary to update path codenames = { "bsu": ["rescuplus"], "2prb": ["nanodcalplus"], "trsm": ["nanodcalplus"], "default": ["nanodcalplus", "rescuplus"], } self._find_binaries() self._find_licenses() for codename in codenames.get(cmdname, codenames["default"]): binname = f"{codename}" cmd = getattr(self, f"{codename}_path", None) if cmd is None: continue # check if license is available if getattr(self, f"{codename}_license", None) is None: continue # check if binary is executable if not os.access(cmd, os.X_OK): raise Exception(f"{binname} found but not executable.") self.cmd = codename # check if there is a preffered software if self.preffered_software == codename: break if cmd is None: if not self.rescuplus_license and not self.nanodcalplus_license: raise Exception( "Failed to find a NanoDCAL+ or RESCU+ license. Please see the installation documentation to set the path or place in default location." ) if not self.rescuplus_path and not self.nanodcalplus_path: raise Exception( "Failed to find NanoDCAL+ or RESCU+ binary. Please set the path." ) # create launch command and return binname if self.stdout is None: def function(file): return subprocess.run(f"{self.mpi} {cmd} -i {file}".split()) else: f = open(self.stdout, "a") def function(file): return subprocess.run( f"{self.mpi} {cmd} -i {file}".split(), stdout=f, stderr=subprocess.STDOUT, ) return function, binname