# -*- coding: utf-8 -*-
"""Command module."""
from __future__ import annotations
import os
import shutil
from pathlib import Path, PosixPath
import subprocess
import attr
from nanotools.base import Base
[docs]
@attr.s
class Cmd(Base):
"""``Cmd`` class.
Attributes:
mpi (string):
MPI-launcher command.
stdout (string):
Standard output. If specified, the standard output is redirected to
the file specified by ``stdout`` instead of to screen.
path (string):
Path to the nanodcal+ and rescu+ binaries.
"""
mpi: str = attr.ib(
default="",
validator=attr.validators.instance_of(str),
)
path: list[str] = attr.ib(
default=[
Path(__file__).parent.parent / "_lib/",
Path(__file__).parent.parent / "build/src/rescu_calculators",
Path(__file__).parent.parent / "build/src/dcal_calculators",
]
)
cmd: str = attr.ib(default=None)
stdout = attr.ib(default=None)
rescuplus_license: Path | None = attr.ib(default=None)
nanodcalplus_license: Path | None = attr.ib(default=None)
rescuplus_path: Path | None = attr.ib(default=None)
nanodcalplus_path: Path | None = attr.ib(default=None)
preffered_software: str = attr.ib(default="nanodcalplus")
def _find_licenses(self):
# check environmetn variables
if "RESCUPLUS_LICENSE_PATH" in os.environ:
self.rescuplus_license = os.environ["RESCUPLUS_LICENSE_PATH"]
if "NANODCALPLUS_LICENSE_PATH" in os.environ:
self.nanodcalplus_license = os.environ["NANODCALPLUS_LICENSE_PATH"]
# check default paths
rs_def_path = Path().home() / ".nanoacademic/RESCUPLUS/license.lic"
if self.rescuplus_license is None and rs_def_path.exists():
self.rescuplus_license = rs_def_path
nd_def_path = Path().home() / ".nanoacademic/NANODCALPLUS/license.lic"
if self.nanodcalplus_license is None and nd_def_path.exists():
self.nanodcalplus_license = nd_def_path
def _find_binaries(self):
for codename in ("rescuplus", "nanodcalplus"):
binname = f"{codename}"
# search paths
for p in self.path:
cmd = Path(p) / f"{binname}"
if cmd.exists():
setattr(self, f"{codename}_path", cmd)
# system path
fpath = shutil.which(f"{binname}")
if fpath is not None:
setattr(self, f"{codename}_path", Path(fpath))
def set_path(self, path):
if isinstance(path, PosixPath):
tmp = str(path).split(":")
elif isinstance(path, str):
tmp = path.split(":")
else:
if not isinstance(path, list):
raise Exception(
f"Invalid object of type {path.__class__.__name__}, must be a str or PosixPath."
)
tmp = path
tmp = [Path(p) for p in tmp]
for p in tmp:
if not p.exists():
raise Exception(f"Failed to set path, directory {p} does not exist.")
self.path = tmp
[docs]
def get_cmd(self, cmdname):
"""Generates NanoDCAL+ or RESCU+ command for a binary of type ``cmdname``.
For instance, if ``cmdname = scf``, then it searches for ``nanodcalplus``
and ``rescuplus`` in the path. Then it wraps it with an mpi-launcher
command or script, which gives ``mpiexec -n 4 rescuplus``.
"""
# search, in order, RESCU+ or NanoDCAL+ binary to update path
codenames = {
"bsu": ["rescuplus"],
"2prb": ["nanodcalplus"],
"trsm": ["nanodcalplus"],
"default": ["nanodcalplus", "rescuplus"],
}
self._find_binaries()
self._find_licenses()
for codename in codenames.get(cmdname, codenames["default"]):
binname = f"{codename}"
cmd = getattr(self, f"{codename}_path", None)
if cmd is None:
continue
# check if license is available
if getattr(self, f"{codename}_license", None) is None:
continue
# check if binary is executable
if not os.access(cmd, os.X_OK):
raise Exception(f"{binname} found but not executable.")
self.cmd = codename
# check if there is a preffered software
if self.preffered_software == codename:
break
if cmd is None:
if not self.rescuplus_license and not self.nanodcalplus_license:
raise Exception(
"Failed to find a NanoDCAL+ or RESCU+ license. Please see the installation documentation to set the path or place in default location."
)
if not self.rescuplus_path and not self.nanodcalplus_path:
raise Exception(
"Failed to find NanoDCAL+ or RESCU+ binary. Please set the path."
)
# create launch command and return binname
if self.stdout is None:
def function(file):
return subprocess.run(f"{self.mpi} {cmd} -i {file}".split())
else:
f = open(self.stdout, "a")
def function(file):
return subprocess.run(
f"{self.mpi} {cmd} -i {file}".split(),
stdout=f,
stderr=subprocess.STDOUT,
)
return function, binname