Source code for itm.grid.cpo_tools
@author: klingshi
'''
import ual
from ual import edge_types
from helper import *
import inspect
import base
import logging
import os
from type_helper import *
# List of all CPO names to be read if 'all' is supplied as a CPO name
ALL_CPOS = ('edge')
class CpoDescriptor():
'''Helper class describing one or more CPOs
[docs]
It holds the shot/run number(s), CPO name(s), time stamp(s), user/tokamak/version.
It provides a sequence interface to get individual CPOs.'''
# TODO: add occurrence...?
def __init__(self, shot, run=1, cpoNames='all', time=0.0,
user=None, tokamak=None, version=None, doOpen=True, useHDF5=False):
[docs] '''Create a CPO Descriptor.
The parameters can be either scalars or tuples. If tuples are used, the
CPOs described by all combinations of the parameters are obtained.'''
# Make sure every parameter is a sequence
self._shot = make_sequence(shot)
self._run = make_sequence(run)
self._cpoNames = make_sequence(cpoNames)
self._time = make_sequence(time)
# Environment parameters
self._user = make_sequence(user)
self._tokamak = make_sequence(tokamak)
self._version = make_sequence(version)
# Access parameters
self._doOpen = doOpen
self._useHDF5 = useHDF5
# Expand "all" CPO name into list of CPOs using the general grid description
if self._cpoNames[0] == 'all':
self._cpoNames = ALL_CPOS
# Figure out the counts for the individual parameters
self._nPar = [1,] * 7
self._nPar[0] = len(self._shot)
self._nPar[1] = len(self._run)
self._nPar[2] = len(self._cpoNames)
self._nPar[3] = len(self._time)
self._nPar[4] = len(self._user)
self._nPar[5] = len(self._tokamak)
self._nPar[6] = len(self._version)
def __str__(self):
return str(self._shot) + '/' \
[docs] + str(self._run) \
+ '/' + str(self._time) \
+ '/' + str(self._cpoNames) \
+ '/' + str(self._user) \
+ '/' + str(self._tokamak) \
+ '/' + str(self._version)
def __len__(self):
l = 1
[docs] for i in self._nPar:
l = l * i
return l
def __getitem__(self, ind):
'''Returns the CPO object for the given index.'''
[docs] if (ind < 0) or (ind >= len(self)):
raise IndexError()
# set up object counts for every components of the local index
iCount = [1,] * len(self._nPar)
for i in xrange(len(self._nPar) - 1):
iCount[-i-2] = iCount[-i-1] * self._nPar[-i-1]
# From global index ind, compute local index tuple lInd (which is 0-based)
lInd = [0,] * len(self._nPar)
tInd = ind
for i in xrange(len(self._nPar)):
lInd[i] = tInd / iCount[i]
tInd -= lInd[i] * iCount[i]
# create CPO object
return Cpo(self._shot[lInd[0]],
self._run[lInd[1]],
self._cpoNames[lInd[2]],
self._time[lInd[3]],
self._user[lInd[4]],
self._tokamak[lInd[5]],
self._version[lInd[6]],
self._doOpen,
self._useHDF5)
def get_all_cpos(cpoDescs):
'''Get a list of all CPOs described by a list of CPO descriptors.'''
[docs] all = []
for desc in cpoDescs:
cpos = list(desc)
all.extend(cpos)
return all
class Cpo():
'''Helper class wrapping a CPO data structure to add high-level functionality.'''
[docs]
def __init__(self, shot, run, cpoName, time=None,
user=None, tokamak=None, version=None, doOpen=True, useHDF5=False ):
[docs] '''Creates an object wrapping an CPO with the given parameters. Shot number, run number and
cpo name (e.g. 'equilibrium') have to be given. For time-dependent CPOs, time has to be given.
If user, tokamak, version are omitted, the values set in the environment are used.
The doOpen parameter controls whether UAL access is done immediately when creating the Cpo object.
If it is set to False, UAL access is deferred to the first access to the cpo property.
The useHDF5 property controls whether UAL access is done through HDF5. If set to True, the
parameters user, tokamak and version have no effect.'''
self._shot = shot
self._run = run
self._cpoName = cpoName
self._time = time
# Environment parameters
self._user = user
self._tokamak = tokamak
self._version = version
# We want to use open_env, so we need proper parameters for it
if not self._user: self._user=os.getenv("USER")
if not self._tokamak: self._tokamak=os.getenv("TOKAMAKNAME")
if not self._version: self._version=os.getenv("DATAVERSION")
self._useHDF5 = useHDF5
self._ualDAO = None
self._cpo = None
if doOpen:
self.cpo
@property
def shot(self):
'''The shot number of the CPO.'''
[docs] return self._shot
@property
def run(self):
'''The run number of the CPO.'''
[docs] return self._run
@property
def name(self):
'''The name of the CPO (e.g. 'edge').'''
[docs] return self._cpoName
@property
def time(self):
'''The time value of the CPO.'''
[docs] return self._time
def __str__(self):
"""Returns an identifier string for the CPO
[docs]
Format:
*shot/run/time/name for time-dependent CPOs
*shot/run/name for non-time-dependent CPOs."""
name = str(self._shot) + '/' + str(self._run)
if self._time is not None:
name += '/' + str(self._time)
return name + '/' + self._cpoName
@property
def cpo(self):
'''Return the UAL CPO data object associated with this CPO object.'''
[docs] if self._cpo is None:
self.reload()
return self._cpo
def close(self):
'''Close the UAL database access object for this CPO.'''
[docs] self._ualDAO.close()
self._ualDAO = None
def reload(self):
'''Reload the CPO data from the UAL.'''
[docs] if self._ualDAO is not None:
self._ualDAO.close()
self._ualDAO = None
self._cpo = None
self._retrieve_from_ual()
def _retrieve_from_ual(self):
"""Retrieve the CPO described by this object from the UAL and return it.
Subsequent calls will return the instance created on the first call."""
if self._cpo is not None:
return self._cpo
logging.debug("Retrieving CPO " + str(self))
ualDAO = ual.itm(self._shot, self._run)
if self._useHDF5:
ualDAO.open_hdf5()
else:
ualDAO.open_env(self._user, self._tokamak, self._version)
cpo = eval('ualDAO.' + self._cpoName)
if hasattr(cpo, 'get'):
# Time-independent CPO
cpo.get()
elif hasattr(cpo, 'getSlice'):
# Time-dependent CPO
if self._time is None:
raise ValueError("Need valid time for getting a time-dependent CPO")
cpo.getSlice(self._time, ual.ualdef.CLOSEST_SAMPLE)
else:
raise TypeError("Found unexpected type of CPO, check CPO name")
self._ualDAO = ualDAO
self._cpo = cpo
return self._cpo
def list_data(self, addPrefix=False):
'''List all complexgrid_scalar objects in this CPO data structure.
[docs]
For the result see list_components.
If the optional parameter addPrefix=True is given, the CPO identifier as obtained
by str() is prepended to the object names.'''
prefix = ''
if addPrefix: prefix = str(self)
return list_components(self._cpo, is_complexgrid_scalar, prefix)
def list_dataLists(self, addPrefix=False):
prefix = ''
[docs] if addPrefix: prefix = str(self)
return list_components(self._cpo, is_complexgrid_scalar_list, prefix)
@property
def grid(self):
'''Return the grid for this CPO.'''
[docs] # FIXME: assumes grid at top level - make this more general?
# TODO: multiple grids?
return base.Grid(self._cpo.grid)
# Below here helper routines for traversing CPO data structures
def list_components(obj, object_test, basename=""):
'''List components of a CPO that test positive for the given test function object_test.
[docs]
Returns a dictionary linking object names to objects of type complexgrid_scalar.
The keys/names have the form /part1/part2/part3, with the slashes indicating the
hierarchy in the data structure.
If the optional parameter basename is given, is is prepended to all names.'''
comps = dict();
# Check if this object matches
if (object_test(obj)):
comps[basename] = obj
#comps.append( (basename, obj) )
return comps
# If not, try to descend into its members
member_test = lambda x: (is_ual_type(x) or is_sequence(x)) \
and not is_module_type(x, "numpy")
members=inspect.getmembers(obj, member_test)
for member in members:
if is_sequence(member[1]):
# Loop over sequences, but only if parent is UAL object
if (is_ual_type(obj)):
try:
for i,element in enumerate(member[1]):
subcomps = list_components(element, object_test, basename + "/" + member[0] + "/" + str(i))
comps.update(subcomps)
except:
# Ignore errors caused by misidentifying sequence objects
pass
# Other members: descend
else:
subcomps = list_components(member[1], object_test, basename + "/" + member[0])
comps.update(subcomps)
return comps