Source code for itm.grid.cpo_tools


@author: klingshi
'''
import ual
from ual import edge_types
from helper import *
import inspect 
import base
import logging
import os
from type_helper import *

# List of all CPO names to be read if 'all' is supplied as a CPO name
ALL_CPOS = ('edge') 

class CpoDescriptor():
    '''Helper class describing one or more CPOs
[docs] It holds the shot/run number(s), CPO name(s), time stamp(s), user/tokamak/version. It provides a sequence interface to get individual CPOs.''' # TODO: add occurrence...? def __init__(self, shot, run=1, cpoNames='all', time=0.0, user=None, tokamak=None, version=None, doOpen=True, useHDF5=False):
[docs] '''Create a CPO Descriptor. The parameters can be either scalars or tuples. If tuples are used, the CPOs described by all combinations of the parameters are obtained.''' # Make sure every parameter is a sequence self._shot = make_sequence(shot) self._run = make_sequence(run) self._cpoNames = make_sequence(cpoNames) self._time = make_sequence(time) # Environment parameters self._user = make_sequence(user) self._tokamak = make_sequence(tokamak) self._version = make_sequence(version) # Access parameters self._doOpen = doOpen self._useHDF5 = useHDF5 # Expand "all" CPO name into list of CPOs using the general grid description if self._cpoNames[0] == 'all': self._cpoNames = ALL_CPOS # Figure out the counts for the individual parameters self._nPar = [1,] * 7 self._nPar[0] = len(self._shot) self._nPar[1] = len(self._run) self._nPar[2] = len(self._cpoNames) self._nPar[3] = len(self._time) self._nPar[4] = len(self._user) self._nPar[5] = len(self._tokamak) self._nPar[6] = len(self._version) def __str__(self): return str(self._shot) + '/' \
[docs] + str(self._run) \ + '/' + str(self._time) \ + '/' + str(self._cpoNames) \ + '/' + str(self._user) \ + '/' + str(self._tokamak) \ + '/' + str(self._version) def __len__(self): l = 1
[docs] for i in self._nPar: l = l * i return l def __getitem__(self, ind): '''Returns the CPO object for the given index.'''
[docs] if (ind < 0) or (ind >= len(self)): raise IndexError() # set up object counts for every components of the local index iCount = [1,] * len(self._nPar) for i in xrange(len(self._nPar) - 1): iCount[-i-2] = iCount[-i-1] * self._nPar[-i-1] # From global index ind, compute local index tuple lInd (which is 0-based) lInd = [0,] * len(self._nPar) tInd = ind for i in xrange(len(self._nPar)): lInd[i] = tInd / iCount[i] tInd -= lInd[i] * iCount[i] # create CPO object return Cpo(self._shot[lInd[0]], self._run[lInd[1]], self._cpoNames[lInd[2]], self._time[lInd[3]], self._user[lInd[4]], self._tokamak[lInd[5]], self._version[lInd[6]], self._doOpen, self._useHDF5) def get_all_cpos(cpoDescs): '''Get a list of all CPOs described by a list of CPO descriptors.'''
[docs] all = [] for desc in cpoDescs: cpos = list(desc) all.extend(cpos) return all class Cpo(): '''Helper class wrapping a CPO data structure to add high-level functionality.'''
[docs] def __init__(self, shot, run, cpoName, time=None, user=None, tokamak=None, version=None, doOpen=True, useHDF5=False ):
[docs] '''Creates an object wrapping an CPO with the given parameters. Shot number, run number and cpo name (e.g. 'equilibrium') have to be given. For time-dependent CPOs, time has to be given. If user, tokamak, version are omitted, the values set in the environment are used. The doOpen parameter controls whether UAL access is done immediately when creating the Cpo object. If it is set to False, UAL access is deferred to the first access to the cpo property. The useHDF5 property controls whether UAL access is done through HDF5. If set to True, the parameters user, tokamak and version have no effect.''' self._shot = shot self._run = run self._cpoName = cpoName self._time = time # Environment parameters self._user = user self._tokamak = tokamak self._version = version # We want to use open_env, so we need proper parameters for it if not self._user: self._user=os.getenv("USER") if not self._tokamak: self._tokamak=os.getenv("TOKAMAKNAME") if not self._version: self._version=os.getenv("DATAVERSION") self._useHDF5 = useHDF5 self._ualDAO = None self._cpo = None if doOpen: self.cpo @property def shot(self):
'''The shot number of the CPO.'''
[docs] return self._shot @property def run(self):
'''The run number of the CPO.'''
[docs] return self._run @property def name(self):
'''The name of the CPO (e.g. 'edge').'''
[docs] return self._cpoName @property def time(self):
'''The time value of the CPO.'''
[docs] return self._time def __str__(self): """Returns an identifier string for the CPO
[docs] Format: *shot/run/time/name for time-dependent CPOs *shot/run/name for non-time-dependent CPOs.""" name = str(self._shot) + '/' + str(self._run) if self._time is not None: name += '/' + str(self._time) return name + '/' + self._cpoName @property def cpo(self):
'''Return the UAL CPO data object associated with this CPO object.'''
[docs] if self._cpo is None: self.reload() return self._cpo def close(self): '''Close the UAL database access object for this CPO.'''
[docs] self._ualDAO.close() self._ualDAO = None def reload(self): '''Reload the CPO data from the UAL.'''
[docs] if self._ualDAO is not None: self._ualDAO.close() self._ualDAO = None self._cpo = None self._retrieve_from_ual() def _retrieve_from_ual(self): """Retrieve the CPO described by this object from the UAL and return it.
Subsequent calls will return the instance created on the first call.""" if self._cpo is not None: return self._cpo logging.debug("Retrieving CPO " + str(self)) ualDAO = ual.itm(self._shot, self._run) if self._useHDF5: ualDAO.open_hdf5() else: ualDAO.open_env(self._user, self._tokamak, self._version) cpo = eval('ualDAO.' + self._cpoName) if hasattr(cpo, 'get'): # Time-independent CPO cpo.get() elif hasattr(cpo, 'getSlice'): # Time-dependent CPO if self._time is None: raise ValueError("Need valid time for getting a time-dependent CPO") cpo.getSlice(self._time, ual.ualdef.CLOSEST_SAMPLE) else: raise TypeError("Found unexpected type of CPO, check CPO name") self._ualDAO = ualDAO self._cpo = cpo return self._cpo def list_data(self, addPrefix=False): '''List all complexgrid_scalar objects in this CPO data structure.
[docs] For the result see list_components. If the optional parameter addPrefix=True is given, the CPO identifier as obtained by str() is prepended to the object names.''' prefix = '' if addPrefix: prefix = str(self) return list_components(self._cpo, is_complexgrid_scalar, prefix) def list_dataLists(self, addPrefix=False): prefix = ''
[docs] if addPrefix: prefix = str(self) return list_components(self._cpo, is_complexgrid_scalar_list, prefix) @property def grid(self):
'''Return the grid for this CPO.'''
[docs] # FIXME: assumes grid at top level - make this more general? # TODO: multiple grids? return base.Grid(self._cpo.grid) # Below here helper routines for traversing CPO data structures def list_components(obj, object_test, basename=""): '''List components of a CPO that test positive for the given test function object_test.
[docs] Returns a dictionary linking object names to objects of type complexgrid_scalar. The keys/names have the form /part1/part2/part3, with the slashes indicating the hierarchy in the data structure. If the optional parameter basename is given, is is prepended to all names.''' comps = dict(); # Check if this object matches if (object_test(obj)): comps[basename] = obj #comps.append( (basename, obj) ) return comps # If not, try to descend into its members member_test = lambda x: (is_ual_type(x) or is_sequence(x)) \ and not is_module_type(x, "numpy") members=inspect.getmembers(obj, member_test) for member in members: if is_sequence(member[1]): # Loop over sequences, but only if parent is UAL object if (is_ual_type(obj)): try: for i,element in enumerate(member[1]): subcomps = list_components(element, object_test, basename + "/" + member[0] + "/" + str(i)) comps.update(subcomps) except: # Ignore errors caused by misidentifying sequence objects pass # Other members: descend else: subcomps = list_components(member[1], object_test, basename + "/" + member[0]) comps.update(subcomps) return comps