from attr import define, field, validators
from joblib import Parallel, delayed
from nptyping import NDArray
from pathlib import Path
from nanotools import TotalEnergy, Energy
from nanotools.base import Base, Quantity
from nanotools.utils import to_quantity, ureg
from nanotools.jsonio import json_read
import attr
import numpy as np
import os
[docs]
@define
class Displacement(Base):
atom_index: int = field(default=0)
direction_index: int = field(default=0)
h_direction: NDArray = field(default=np.array([0, 0, 1]))
h_magnitude: Quantity = field(
default=0.0001 * ureg.angstrom,
converter=lambda x: to_quantity(x, "angstrom"),
validator=attr.validators.optional(attr.validators.instance_of(Quantity)),
)
sign: int = field(default=1, converter=int, validator=validators.instance_of(int))
energy: Energy = field(default=None)
free_energy: Quantity = field(
default=None,
converter=lambda x: to_quantity(x, "eV"),
validator=attr.validators.optional(attr.validators.instance_of(Quantity)),
)
workdir: str = field(default="/")
def generate_input(self, calc):
self.workdir.mkdir(parents=True, exist_ok=True)
etot = self.get_calc(calc)
path = Path(self.workdir) / "nano_scf_in.json"
etot.system.atoms.write_xyz(Path(self.workdir) / "Atom.xyz")
etot.write(path)
return
def get_calc(self, calc):
etot = calc.copy()
pos = etot.system.atoms.positions
if self.atom_index != -1:
h = self.h_magnitude
pos[self.atom_index, :] += h * self.h_direction * self.sign
etot.system.set_positions(pos)
return etot
def get_h_direction(self):
return self.h_direction / np.linalg.norm(self.h_direction)
def get_output_paths(self):
self.workdir.mkdir(parents=True, exist_ok=True)
output = Path(self.workdir) / "nano_scf_out.json"
outhdf = Path(self.workdir) / "nano_scf_out.h5"
return output, outhdf
def _get_energy(self):
if isinstance(self.energy, Energy):
return self.energy
output, outhdf = self.get_output_paths()
if output.exists() and outhdf.exists():
etot = TotalEnergy.read(output)
self.energy = etot.energy
return self.energy
else:
raise Exception(f"Output file {output.absolute()} not found.")
def get_free_energy(self):
output, outhdf = self.get_output_paths()
if output.exists() and outhdf.exists():
with open(output, "r") as f:
d = json_read(f)
energy = Energy(**d["energy"])
energy.set_units("si")
return energy.efree
# etot = TotalEnergy.read(output)
# return etot.energy.efree
else:
raise Exception(f"Output file {output.absolute()} not found.")
def solve(self, calc):
root = os.getcwd()
output, outhdf = self.get_output_paths()
if output.exists() and outhdf.exists():
etot = TotalEnergy.read(output)
else:
etot = self.get_calc(calc)
if etot.solver.mix.converged:
print(f"Found converged calculation {output.absolute()}")
else:
os.chdir(self.workdir)
etot.solve(output=output)
os.chdir(root)
[docs]
@define
class FDForces(Base):
displacements = field(default=None)
fd_scheme: str = field(default="forward")
forces: NDArray = field(default=None)
h_cartesian: bool = field(default=True)
h_magnitude: Quantity = field(
default=0.0001 * ureg.angstrom,
converter=lambda x: to_quantity(x, "angstrom"),
validator=attr.validators.optional(attr.validators.instance_of(Quantity)),
)
free_energies = field(default=None)
total_energy_calc: TotalEnergy = field(default=None)
workdir: str = field(
default="./", converter=str, validator=validators.instance_of(str)
)
def __attrs_post_init__(self):
natm = self.total_energy_calc.system.get_number_of_atoms()
if self.h_cartesian:
directions = [np.array([1, 0, 0]), np.array([0, 1, 0]), np.array([0, 0, 1])]
else:
raise Exception(f"Invalid h_cartesian value {self.h_cartesian}")
workdir = Path(self.workdir).absolute() / "etot_center"
self.displacements = [
Displacement(
atom_index=-1,
h_direction=directions[0],
h_magnitude=0.0 * ureg.angstrom,
workdir=workdir,
)
]
if self.fd_scheme == "central":
signs = [1, -1]
else:
signs = [1]
for sind, s in enumerate(signs):
for i in range(natm):
for d in range(3):
workdir = (
Path(self.workdir).absolute() / f"etot_atm{i}_dir{d}_sgn_{sind}"
)
self.displacements.append(
Displacement(
atom_index=i,
direction_index=d,
h_direction=directions[d],
h_magnitude=self.h_magnitude,
sign=s,
workdir=workdir,
)
)
self.forces = np.empty((natm, 3)) * ureg.eV / ureg.angstrom
self.free_energies = np.empty(len(self.displacements)) * ureg.eV
def generate_inputs(self):
for d in self.displacements:
d.generate_input(self.total_energy_calc)
def get_forces(self):
ftots = self.get_free_energies()
natm = self.total_energy_calc.get_number_of_atoms()
for j, disp in enumerate(self.displacements):
i = disp.atom_index
if i == -1:
ftot0 = ftots[j]
break
ftotf = np.empty((natm, 3))
ftotb = np.empty((natm, 3))
hs = np.empty((natm, 3))
for j, disp in enumerate(self.displacements):
i = disp.atom_index
if i == -1:
continue
d = disp.direction_index
hs[i, d] = disp.h_magnitude.to(ureg.angstrom).m
s = disp.sign
if s == 1:
ftotf[i, d] = ftots[j].to(ureg.eV).m
else:
ftotb[i, d] = ftots[j].to(ureg.eV).m
ftotf *= ureg.eV
ftotb *= ureg.eV
hs *= ureg.angstrom
if self.fd_scheme == "central":
self.forces = -(ftotf - ftotb) / hs / 2
else:
self.forces = -(ftotf - ftot0) / hs
return self.forces
def _get_forces(self, name="efree"):
ftots = self._get_energies()
allowed = [
"esr",
"ebg",
"ebs",
"edh",
"exc",
"evxc",
"etot",
"efree",
"entropy",
]
if name not in allowed:
raise Exception(f"Invalid force name {name}.")
ftots = [getattr(f, name) for f in ftots]
natm = self.total_energy_calc.get_number_of_atoms()
for j, disp in enumerate(self.displacements):
i = disp.atom_index
if i == -1:
ftot0 = ftots[j]
break
ftotf = np.zeros((natm, 3))
ftotb = np.zeros((natm, 3))
hs = np.zeros((natm, 3))
for j, disp in enumerate(self.displacements):
i = disp.atom_index
if i == -1:
continue
d = disp.direction_index
hs[i, d] = disp.h_magnitude
s = disp.sign
if s == 1:
ftotf[i, d] = ftots[j]
else:
ftotb[i, d] = ftots[j]
if self.fd_scheme == "central":
self.forces = -(ftotf - ftotb) / hs / 2
else:
self.forces = -(ftotf - ftot0) / hs
return self.forces
def _get_energies(self, n_jobs=1):
self.energies = []
Parallel(n_jobs=n_jobs)(
delayed(self.energies.append)(d._get_energy()) for d in self.displacements
)
return self.energies
# def _get_energies(self):
# self.energies = []
# for i, d in enumerate(self.displacements):
# self.energies.append(d._get_energy())
def get_free_energies(self):
for i, d in enumerate(self.displacements):
self.free_energies[i] = d.get_free_energy()
return self.free_energies
def solve(self, n_jobs=1):
Parallel(n_jobs=n_jobs)(
delayed(d.solve)(self.total_energy_calc) for d in self.displacements
)
# for d in self.displacements:
# d.solve(self.total_energy_calc)