# -*- coding: utf-8 -*-
"""
Module description ...
Reference:
- Surename1, Forename1 Initials., Surename2, Forename2 Initials, YEAR. Publication/Book title
Publisher, Number(Volume No), pp.142-161.
"""
import os
import sys
import ldds
import numpy as np
import numpy.ma as ma
import h5py
import pathlib
from scipy.integrate import solve_ivp
from functools import reduce
from scipy.interpolate import RectBivariateSpline, CubicSpline
from scipy.interpolate import interp1d, interp2d
from scipy.optimize import brentq
from ldds.hamiltonians import Hamiltonian_from_potential
[docs]def remaining_coordinate_quadratic(phase_space_axes, H0, Hamiltonian, momentum_sign):
"""
Returns a 1D array of values for the remaining momentum coordinate, assuming the kinetic energy
is a sum-of-squares function of the momenta (0.5*px**2+0.5*py**2+...). The sign of the returned
values is given by `momentum_sing`.
Parameters
----------
phase_space_axes: ndarray,
Array of coordinate values, where the remaining momentum is 0.
H0: float,
Value of energy.
Hamiltonian: function,
Function of (t,y) that returns the value of the Hamiltonian at time t and point y.
momentum_sign: int,
+1 returns positive momentum values, -1 negative.
Returns
-------
points_dims_remaining: ndarray,
Array of remaining momentum values.
"""
energy = H0-Hamiltonian(0, phase_space_axes)
masked_energy = np.copy(energy)
masked_energy[np.argwhere(energy<0)]=np.nan
points_dims_remaining = momentum_sign * np.sqrt(2*masked_energy)
return points_dims_remaining
[docs]def remaining_coordinate_value(u, ind_remaining, remaining_coordinate_bounds, H0, Hamiltonian):
"""
Returns a 1D array of values for the remaining coordinate (not necessarily momentum) using
Brent’s method in the bracketing interval `remaining_coordinate_bounds`.
Parameters
----------
u: ndarray, shape(n,)
Coordinate values of a single point, where the remaining momentum .
remaining_coordinate_bounds: ndarray, shape(2,)
Bracketing interval for root-finding method.
H0: float,
Value of energy.
Hamiltonian: function,
Function of (t,y) that returns the value of the Hamiltonian at time t and point y.
Returns
-------
point_dim_remaining: float,
Value of coordinate or, if root-finding is unsuccessfull, np.nan.
"""
def remaining_energy(guess):
u[ind_remaining] = guess
return H0 - Hamiltonian(0, u)
try:
point_dim_remaining = brentq(remaining_energy, remaining_coordinate_bounds[0], remaining_coordinate_bounds[1])
return point_dim_remaining
except:
return np.nan
[docs]def generate_points(grid_parameters):
"""
Returns a 1D array of all points from a on a uniform grid with dimensions and size defined by list of input parameters.
An additional dimension initiallised with zeros is added for the calculation of Lagrangian Descriptors.
NOTE: For n-DoF systems, currently energy conservation is only used to determine momenta dimensions.
Parameters
----------
grid_parameters : list (1-DoF systems) or dict (n-DoF systems)
if 1-DoF, list should have two 3-tuples of floats
entries are input parameters of limits and size of mesh per axis
if n-DoF, dict should have the following keys
* 'slice_parameters' : ndarray or list, should have two 3-tuples of floats, for a 2D slice
* 'dims_slice' : ndarray or list of 0 and 1, ones indicate slice axes
* 'dims_fixed' : ndarray or list of 0 and 1, ones indicate fixed axes
* 'dims_fixed_values' : ndarray or list of values on fixed axes
* 'energy_level' : float, energy value for energy conservation condition
and either one of
* 'Hamiltonian' : function for the Hamiltonian
* 'potential_energy' : potential energy function, sum-of-squares kinetic energy will be assumed
and either one of
* 'momentum_sign' : int, -1 / 1, for negative/positive momentum for remaining axis
* 'remaining_coordinate_bounds' : ndarray or list of values that bracket the values of the remaining coordinate (not necessarily momentum coordinate)
Returns
-------
mesh : ndarray,
Flattened array of initial conditions.
mask : ndarray,
Masks Nan values in further calculations.
"""
if type(grid_parameters) == dict:
# Unpack extra grid parameters
slice_parameters = grid_parameters['slice_parameters']
dims_slice = np.array(grid_parameters['dims_slice'])
dims_fixed = np.array(grid_parameters['dims_fixed'])
dims_fixed_values = np.array(grid_parameters['dims_fixed_values'])
H0 = grid_parameters['energy_level']
# If Hamiltonian is provided, use Hamiltonian, otherwise define Hamiltonian
# from provided potential and sum-of-squares kinetic energy.
try:
Hamiltonian = grid_parameters['Hamiltonian']
except:
potential_energy = grid_parameters['potential_energy']
Hamiltonian = Hamiltonian_from_potential(potential_energy)
N_dim = len(dims_slice) # Phase space dimensions
# Check N DoF is even.
if N_dim % 2 != 0:
error_mssg = ("Error: Number of phase space dimensions not even. ",
"Check your grid parameters")
print(error_mssg)
sys.exit()
# Determine number of dimensions for energy conservation.
# There must be only one.
dims_remaining = N_dim - np.sum(dims_fixed + dims_slice)
if dims_remaining > 1:
error_mssg = ("Error: More than one remaing dimension. ",
"Cannot uniquelly determine values of remaining coordinates.")
print(error_mssg)
sys.exit()
# Check if remaining dimension falls in configuration space
if np.sum(dims_slice) > 2:
error_mssg = ("Error: 3+ dimensional slices are not available.")
print(error_mssg)
sys.exit()
# Axes of visualisation slice
ax1 = np.linspace(*slice_parameters[0])
ax2 = np.linspace(*slice_parameters[1])
Ax1, Ax2 = np.meshgrid(ax1, ax2)
dims_slice_axes = np.column_stack((Ax1.ravel(),Ax2.ravel()))
# define array phase_space_axes containing phase space points
N_points_slice = len(dims_slice_axes)
phase_space_axes = np.zeros((N_points_slice,N_dim))
# fill 'slice' values of phase_space_axes
ind_slice = np.argwhere(dims_slice==1).squeeze()
phase_space_axes[:,ind_slice] = dims_slice_axes
# fill 'fixed' values of phase_space_axes
ind_fixed = np.argwhere(dims_fixed==1).squeeze()
phase_space_axes[:,ind_fixed] = dims_fixed_values
# determine remaining axis index
ind_remaining = np.argwhere(dims_fixed+dims_slice==0).squeeze()
if not (ind_remaining.shape == ()):
error_mssg = ("Error: More than one remaing dimension. ",
"Cannot uniquelly determine values of remaining coordinates.")
print(error_mssg)
sys.exit()
# If momentum sign is provided, determine remaining momentum values. This assumes
# sum-of-squares kinetic energy.
# Otherwise determine values of remaining coordinate from Hamiltonian using
# Brent’s method in the bracketing interval `remaining_coordinate_bounds`.
try:
momentum_sign = grid_parameters['momentum_sign']
phase_space_axes[:,ind_remaining] = remaining_coordinate_quadratic(
phase_space_axes, H0, Hamiltonian, momentum_sign)
except:
remaining_coordinate_bounds = np.array(grid_parameters['remaining_coordinate_bounds'])
def f_remaining_coordinate_value(u):
return remaining_coordinate_value(u, ind_remaining, remaining_coordinate_bounds, H0, Hamiltonian)
phase_space_axes[:,ind_remaining] = np.array(list( \
map(f_remaining_coordinate_value, phase_space_axes)))
mask = np.isnan(phase_space_axes[:,ind_remaining]) # Mask grid points
phase_space_axes[:,ind_remaining] = np.nan_to_num(phase_space_axes[:,ind_remaining])
# Return array of mesh points for integrator
lagrangian_descriptor_axis = np.zeros((N_points_slice,1))
mesh = np.column_stack((phase_space_axes,lagrangian_descriptor_axis))
return mesh.flatten(), mask
else:
if len(grid_parameters) > 2:
error_mssg = ("ERROR: grid_parameters must be a list for 2D slices for 1DoF systems. ",
"Provide a dictionary for a higher-dimensional systems.")
else:
x_min, x_max, Nx = grid_parameters[0]
y_min, y_max, Ny = grid_parameters[1]
points_x = np.linspace(x_min, x_max, Nx)
points_y = np.linspace(y_min, y_max, Ny)
X, Y = np.meshgrid(points_x, points_y) # Grid in phase space
# 2D grid + a zero column for LDs
mesh = np.transpose([X.ravel(), Y.ravel(), np.zeros(Nx*Ny)])
mask = False
return mesh.flatten(), mask
[docs]def perturb_field(vector_field, perturbation):
"""
Returns the vector field function with a linearly added pertubation
Both input function should input (t, u), with t: float, and u: ndarray
Also, the output of these funcs must be ndarrays of the same shape
Parameters
----------
vector_field: function
unperturbed vector field
perturbation: function
forcing added to the vector field
Returns
-------
perturbed function
"""
return lambda t, u: vector_field(t, u) + perturbation(t, u)
[docs]def check_if_points_escape_box(u, box_boundaries):
"""
Determine if points in phase space u have scaped box with user-defined defined dimensions
Parameters
----------
u : ndarray, shape(n, )
points in phase space to check if outside box boundaries
box_boundaries : list of 2-tuples of floats
box lower and upper limits along X and Y axes
Returns
-------
u_indices : ndarray, shape(n, )
array of True/False bool values if points inside/outside the box
"""
N_dim = u.shape[-1]
points_positions = u.T[:int(N_dim/2)]
if len(points_positions) == len(box_boundaries):
check = lambda x, box_axis_limits: (box_axis_limits[0]<=x)&(x<=box_axis_limits[1])
positions_within_box = [check(points_positions[i], box_boundaries[i]) for i in range(int(N_dim/2))]
u_indices = reduce(lambda x, y: x&y, positions_within_box)
return u_indices
else:
error_mssg = ("ERROR: Number of box axes and configuration space axes do not match. ",
"Check the defintion of your box boundaries.")
sys.exit()
[docs]def lagrangian_descriptor(u, v, p_value = 0.5):
"""
Vector field equation for Lagrangian descriptor.
Parameters
----------
v : ndarray, shape(n,2)
Vector field at given point.
p_value : float, optional
Exponent in Lagrangian descriptor definition.
0 is the acton-based LD,
0 < p_value < 1 is the Lp quasinorm,
1 <= p_value < 2 is the Lp norm LD,
2 is the arclength LD.
The default is 0.5.
Returns
-------
LD : ndarray, shape(n,1)
Vector field for Lagrangian descriptor dimension.
"""
if p_value == 0:
LD = np.abs(u[:,1]*v[:,0])
elif p_value>0:
LD = np.sum(np.abs(v)**p_value, axis=1)
else:
LD = np.zeros(len(u[:,0]))
return LD
[docs]def vector_field_flat(t, points, vector_field, p_value, box_boundaries):
"""
Returns vector field values for integration of flattened input array.
Parameters
----------
t : float
time
points : ndarray, shape(n,3)
vector_field: function
vector field over phase space, must return 2D array of vectors over a 2D array of initial conditions
and given as input to the API function `vector_field_flat` to be turned into a 1D array (column).
p_value : float, optional
Exponent in Lagrangian descriptor definition.
0 is the acton-based LD,
0 < p_value < 1 is the Lp quasinorm,
1 <= p_value < 2 is the Lp norm LD,
2 is the arclength LD.
The default is 0.5.
box_boundaries : list of 2-tuples, optional
box boundaries for escape condition of variable time integration
boundaries are infinite by default.
Returns
-------
1d array
y0 values for integrator
"""
N_mesh_axes = 2*len(box_boundaries)+1
u = points.reshape((-1,N_mesh_axes))
u = u[:,:-1] #remove LD-values axis
# Apply Escape condition
u_inbox = check_if_points_escape_box(u, box_boundaries)
# Define output vector field in combination with escape condition
v = np.zeros(u.shape)
v[u_inbox] = vector_field(t, u[u_inbox])
# Calculate LD vector field
LD_vec = np.zeros(len(u))
LD_vec [u_inbox] = lagrangian_descriptor(u[u_inbox], v[u_inbox], p_value)
# Add LD
v_out=np.column_stack((v, LD_vec))
return v_out.flatten()
[docs]def fit_pes(filename, clip_max = False):
"""
Returns a 1- or 2-dimensional spline function (potential energy surface) fitted to data located in pylds/pes_files/filename.hdf5.
Parameters
----------
filename : string
Name of file containing data:
coords : list of ndarrays
[x] or [x,y] contain coordinates.
pes_data : ndarray, shape(len(x)) or shape(len(y),len(x))
Array of potential energy values.
clip_max : float
Limit for clipping potential values that are not of interest.
The default is False.
Returns
-------
fspline : function
fspline returns the potential at (x0) or (x0,y0).
"""
dirname = "pes_files"
dirpath = os.path.join(pathlib.Path(__file__).parent.absolute(), dirname)
filepath = os.path.join(dirpath, filename+'.hdf5')
hf = h5py.File(filepath,'r')
coords = np.array(hf.get('coords'))
pes_data = np.array(hf.get('pes_data'))
hf.close()
if clip_max:
pes_data=np.clip(pes_data, a_min=-np.inf, a_max=clip_max)
if len(coords) == 1:
spline = CubicSpline(coords, pes_data)
def fspline(positions):
potential = np.array(list(map(spline,positions)))
return potential
elif len(coords) == 2:
x, y = coords
spline = RectBivariateSpline(x,y,pes_data.T)
def spline_wrap(v):
return spline(v[0],v[1]).squeeze()
def fspline(positions):
potential = np.array(list(map(spline_wrap,positions)))
return potential
else:
print('splines in +3D are not implemented yet')
fspline = np.nan
return fspline
[docs]def fit_vector_field(filename):
"""
Returns a 2-dimensional function (vector field) fitted to
data located in pylds/vector_field_files/filename.hdf5.
Parameters
----------
filename : string
Name of file containing data:
sample_time_points: 1d array,
time-points in a sample time-interval.
sample_coords : list of ndarrays,
[x,y] contain coordinates.
vector_field_data : ndarray, len(t),
Array of function/vector field values.
Returns
-------
vector_field_interpolated : function
returns a vector field function to be evaluated at (t, u).
with t a float and u (n,2)-array.
"""
dirname = "vector_field_files"
dirpath = os.path.join(pathlib.Path(__file__).parent.absolute(), dirname)
filepath = os.path.join(dirpath, filename+'.hdf5')
hf = h5py.File(filepath,'r')
#extract data
time = np.array(hf.get('sample_time'))
coords = np.array(hf.get('sample_coords'))
data = np.array(hf.get('vector_field_data'))
hf.close()
#interpolate data in time
v_interp_t = lambda t: interp1d(time, data.T, kind='cubic')(t).T
def vector_field_wrap(v, u):
return v(u[0],u[1])
def vector_field_interpolated(t, u):
#evaluate components of interpolated field at t
x, y = coords
v_t_eval_x = v_interp_t(t).T[0].reshape(len(x), len(y))
v_t_eval_y = v_interp_t(t).T[1].reshape(len(x), len(y))
#interpolate above data in space
v_x = interp2d(x, y, v_t_eval_x, kind='cubic')
v_y = interp2d(x, y, v_t_eval_y, kind='cubic')
#evaluate components of interpolated field at u
v_x_eval = np.array(list(map(lambda a: vector_field_wrap(v_x, a), u)))
v_y_eval = np.array(list(map(lambda a: vector_field_wrap(v_y, a), u)))
return np.column_stack([v_x_eval, v_y_eval])
return vector_field_interpolated
[docs]def EulerMaruyama_solver(t_initial, u_initial, vector_field, time_step, noise_amplitude=[0, 0], noise_type="additive"):
"""
Returns next time and state in the evolution of a stochastic ODE system via the Euler-Maruyama method.
Euler-Maruyama scheme:
t_next = t_initial + dt
u_next = u_initial + v(t_initial, u_initial)*dt + b*dW
with u = (x, y)
NOTE: Depending on the 'noise_type' b will be a random constant (additive) or a vector (multiplicative).
Currently only implemented for 2D vector fields.
Parameters
----------
t_initial : float
initial time-point of all initial points in phase space.
u_initial : array_like, shape(n,)
initial points in phase space to determine their evolution at time t_next.
vector_field: function
vector field over phase space.
time_step : float
noise_amplitude : list of floats
amplitude values multiplying Weiner process.
noise_type : string
options 'additive' (default) or 'multiplicative'.
Returns
-------
t_next : float
next time-point in the evolution of stochastic system.
u_next : array_like, shape(n,)
next states in the evolution of stochastic system.
"""
#solver parameters
dt = time_step
v = vector_field
b = np.array(noise_amplitude)
#define Weinner process
if noise_type == "additive":
N_dims = u_initial.shape[1] # phase space dim
dW = np.sqrt(abs(dt))*np.random.randn(N_dims)*np.ones(u_initial.shape)
elif noise_type == "multiplicative":
dW = np.sqrt(abs(dt))*np.random.randn(*u_initial.shape)
else:
error_mssg = ("ERROR: noise_type uknown. "
"Set as 'additive' or 'multiplicative'")
print(error_mssg)
#Euler-Maruyama iterative solver
t_next = t_initial + dt
u_next = u_initial + v(t_initial, u_initial)*dt + b*dW
return t_next, u_next
[docs]def compute_lagrangian_descriptor(grid_parameters, vector_field, tau, p_value=0.5, box_boundaries=False, rtol=1.0e-4):
"""
Returns the values of the LD function from integrated trajectories from initial conditions in phase space.
Parameters
----------
grid_parameters : list of 3-tuples of floats
input parameters of limits and size of mesh per axis
vector_field: function
vector field over phase space, must return 2D array of vectors over a 2D array of initial conditions
and given as input to the API function `vector_field_flat` to be turned into a 1D array (column).
tau : float
Upper limit of integration.
p_value : float, optional
Exponent in Lagrangian descriptor definition.
0 is the acton-based LD,
0 < p_value < 1 is the Lp quasinorm,
1 <= p_value < 2 is the Lp norm LD,
2 is the arclength LD.
The default is 0.5.
box_boundaries : list of 2-tuples, optional
Box boundaries for escape condition of variable time integration.
Boundaries are infinite by default.
rtol : float,
Relative tolerance of integration step.
Returns
-------
LD : ndarray, shape (Nx, Ny)
Array of computed Lagrangian descriptor values for all initial conditions.
"""
#get visualisation slice parameters and Number of DoF
if type(grid_parameters) == dict:
#n-DoF systems
slice_parameters = np.array(grid_parameters['slice_parameters']) # 2n-D grid
N_dim = len(grid_parameters['dims_slice'])
else:
#1-DoF systems
slice_parameters = np.array(grid_parameters) # 2-D grid
N_dim = len(slice_parameters)
#set boundaries for escape-box condition, if not defined
if not box_boundaries:
box_boundaries = int(N_dim/2)*[[-np.infty, np.infty]] #restricted to configuration space
#solve initial value problem
f = lambda t, y: vector_field_flat(t, y, vector_field, p_value, box_boundaries)
y0, mask = generate_points(grid_parameters)
#mask y0 values
if type(mask) == np.ndarray:
mask_y0 = np.transpose([mask for i in range(N_dim+1)]).flatten()
y0 = ma.masked_array(y0, mask=mask_y0)
solution = solve_ivp(f, [0,tau], y0, t_eval=[tau], rtol=rtol, atol=1.0e-12)
LD_values = solution.y[N_dim::N_dim+1] #values corresponding to LD
LD_values[mask] = np.nan #mask LD values for slice
N_points_slice_axes = slice_parameters[:,-1].astype('int')
LD = np.abs(LD_values).reshape(*N_points_slice_axes) #reshape to 2-D array
if p_value<=1:
return LD
else:
return LD**(1/p_value)
__author__ = 'Broncio Aguilar-Sanjuan, Victor-Jose Garcia-Garrido, Vladimir Krajnak, Shibabrat Naik'
__status__ = 'Development'