Source code for RiskQuantLib.Operation.operation

#!/usr/bin/python
#coding = utf-8
import sys
import copy
import traceback
import numpy as np
import pandas as pd
from collections.abc import Iterable
from RiskQuantLib.Operation.vectorization import vectorization

#<import>
#</import>

[docs]class operation(object): """ operation() is the function class that makes all RiskQuantLib list object functional. Any RiskQuantLib list class should inherit from this class to own certain functions, such as groupBy, execFunc, etc. Users can define their own functions here to set new attribute function to RiskQuantLib list class. """
[docs] def setAll(self,List:list): """ Update all elements by the list of new elements. Old elements will be deleted. Parameters ---------- List : list A collection of RiskQuantLib instrument objects. Returns ------- None """ self.all = List
def _tunnelingGetitem(self, attrName): """ Find the attribute of elements in present list which meet requirements and find elements of that attribute, if it is iterable. If this attribute is not iterable, it returns collection of this attribute itself. """ subIndex = None if type(attrName)==type(''): filter = lambda x:True elif type(attrName)==type(()) and len(attrName)==2: if hasattr(attrName[1],"__call__"): filter = attrName[1] else: filter = lambda x: True subIndex = attrName[1] attrName = attrName[0] elif type(attrName)==type(()) and len(attrName)>=3: if hasattr(attrName[1],"__call__"): filter = attrName[1] subIndex = attrName[2] else: filter = attrName[2] subIndex = attrName[1] attrName = attrName[0] else: attrName = attrName[0] filter = lambda x:True if (not type(subIndex)==type(None)) or subIndex: try: tmp = [getattr(i,attrName) for i in self.all if hasattr(i, attrName)] thisLayer = sorted(set(tmp),key=tmp.index) except: thisLayer = [getattr(i, attrName) for i in self.all if hasattr(i, attrName)] if type(subIndex) == slice: try: tmp = [j for i in thisLayer if hasattr(i,'__getitem__') for j in i[subIndex] if isinstance(i[subIndex],Iterable)] nextLayer = sorted(set(tmp),key=tmp.index) except: nextLayer = [j for i in thisLayer if hasattr(i,'__getitem__') for j in i[subIndex] if isinstance(i[subIndex],Iterable)] else: nextLayer = [i[subIndex] if hasattr(i,'__getitem__') else i for i in thisLayer] if len(nextLayer)==0: tmp = operation() tmp.setAll([i for i in thisLayer if filter(i)]) return tmp else: tmp = operation() tmp.setAll([i for i in nextLayer if filter(i)]) return tmp else: tmp = operation() thisLayer = [getattr(i, attrName,np.nan) for i in self.all] tmp.setAll([i for i in thisLayer if filter(i)]) return tmp def __getitem__(self, item): """ This function makes RiqkQuantLib list object selectable. Use [] to call this function. Mixed index is used here. By calling [], you can index either element or attribute, or elements of attribute. Parameters ---------- item : str or list or slice or tuple If a string is given, all elements will be examined, if 'code' attribute equals to the given string, return the first element that meets this requirement. If no element code meets this requirement, try to return a operation object, holding values of attribute whose name equals to the given string. If a slice or int number is given, this function behaves like iloc, returns the item-th element or a collection of elements. If a list is given, return all elements whose code in the given list. If no code in the given list, return a 2-dimension array of attribute values, where attribute name is in the given list. If a tuple is given, it returns a operation object, but the elements of this object depends on how you specify your parameter. For the first element of tuple, it must be a string, specify the attribute you want to choose. For the second element of tuple, you can pass a lambda function or any function into tuple, if you do this, the function will be used to filter your results. For the second element of tuple, you can also pass a slice. If you pass a slice into tuple, RiskQuantLib will assume the attribute value is iterable, and again, get the element of the attribute with this slice. If you want to select some elements of the attribute, and do it for every element of present list, and filter your result. You may pass a slice and a function at the same time. In this case, your tuple has three elements. Returns ------- None """ if type(item) == type(''): try: return [i for i in self.all if i.code == item][0] except Exception as e: return self._tunnelingGetitem(item) elif isinstance(item,tuple): return self._tunnelingGetitem(item) elif isinstance(item,int): return self.all[item] elif isinstance(item,slice): tmp = operation() tmp.setAll(self.all[item]) return tmp else: tmpList = [i for i in self.all if hasattr(i, 'code') and i.code in item] if len(tmpList)!=0: tmp = operation() tmp.setAll(tmpList) return tmp else: propertyArray = [[getattr(i, j, np.nan) for i in self.all] for j in item] return dict(zip(item,propertyArray)) def __iter__(self): i = 0 try: while True: v = self.all[i] yield v i += 1 except IndexError: return def __add__(self, other, useObj = True): """ Add a RiskQuantLib list object to a list or another RiskQuantLib list object. All attributes of the RiskQuantLib list will be neglected, only elements will be added to the merged list. This use shallow copy. After add finish, all elements refer to the original elements. Change to the original will render to changes of the results. If you want to use a deep copy add, call '.add()' rather than use '+'. Parameters ---------- other : list or RiskQuantLib list The other RiskQuantLib list object or a collection of RiskQuantLib instrument object. useObj : bool If true, return a RiskQuantLib list object; if false, return a list. Returns ------- list or RiskQuantLib list """ if useObj: tmpObj = self.new() if type(other) is list: tmpObj.setAll(self.all + other) else: tmpObj.setAll(self.all + other.all) return tmpObj else: if type(other) is list: return self.all + other else: return self.all + other.all def __sub__(self, other, useObj = True): """ Subtract a list or another RiskQuantLib list object from a RiskQuantLib list object. All attributes of the RiskQuantLib list will be neglected, only elements will be subtracted from the first list. This use shallow copy. After sub finish, all elements refer to the original elements. Change to the original will render to changes of the results. If you want to use a deep copy sub, call '.sub()' rather than use '-'. Parameters ---------- other : list or RiskQuantLib list The other RiskQuantLib list object or a collection of RiskQuantLib instrument object. useObj : bool If true, return a RiskQuantLib list object; if false, return a list. Returns ------- list or RiskQuantLib list """ if useObj: tmpObj = copy.deepcopy(self) if type(other) == type([]): tmpObj.setAll([i for i in self.all if i not in other]) else: tmpObj.setAll([i for i in self.all if i not in other.all]) return tmpObj else: if type(other) == type([]): return [i for i in self.all if i not in other] else: return [i for i in self.all if i not in other.all] def __str__(self): """ This returns a list of string, showing the code of each element. """ try: return ','.join(self['code']) except: return '' def __len__(self): """ This function returns the length of RiskQuantLib list. """ return len(self.all) def _sortIterator(self,sortObj): import pandas as pd sortList = [getattr(sortObj,i) if type(getattr(sortObj,i))!=type(pd.Series(dtype=float)) else len(getattr(sortObj,i)) for i in self._sortPropertyList] return tuple(sortList)
[docs] def add(self,other, useObj = True): """ This function uses deep copy. It returns the addition of copies of two RiskQuantLib lists. """ a = self.copy(deep=True) b = other.copy(deep=True) if useObj: return a+b else: return (a+b).all
[docs] def sub(self, other, useObj=True): """ This function uses deep copy. It returns the subtraction of copies of two RiskQuantLib lists. """ a = self.copy(deep=True) b = other.copy(deep=True) if useObj: return a - b else: return (a - b).all
[docs] def uniqueCode(self,inplace = False, keep = 'First'): """ This function returns a RiskQuantLib list object, where the code value is unique. Parameters ---------- inplace : bool If true, operation will done in present list object. If false, operation will done in a copy of present list object. keep : str Only support 'First' or 'Last'. If multiple elements have the same code value. The first or last element will be kept, given your choice. Returns ------- RiskQuantLib list object """ codeList = list(set([i.code for i in self.all])) if keep == 'First': keepIndex = 0 elif keep == 'Last': keepIndex = -1 else: print("uniqueCode Function Can Only Accept 'Keep' Parameter as First or Last") return None if inplace: self.setAll([[j for j in self.all if j.code == i][keepIndex] for i in codeList]) return None else: result = self.new() result.setAll([[j for j in self.all if j.code == i][keepIndex] for i in codeList]) return result
[docs] def uniqueAttr(self,attrNameString:str,inplace = False, keep = 'First'): """ This function returns a RiskQuantLib list object, where the attribute value is unique. Parameters ---------- attrNameString :str The attribute name that you want to get the unique value list from. inplace : bool If true, operation will done in present list object. If false, operation will done in a copy of present list object. keep : str Only support 'First' or 'Last'. If multiple elements have the same attribute value. The first or last element will be kept, given your choice. Returns ------- RiskQuantLib list object """ propertyValueList = list(set([getattr(i,attrNameString,np.nan) for i in self.all if hasattr(i,attrNameString)])) if keep == 'First': keepIndex = 0 elif keep == 'Last': keepIndex = -1 else: print("uniqueAttr Function Can Only Accept 'Keep' Parameter as First or Last") return None if inplace: self.setAll([[j for j in self.all if getattr(j,attrNameString) == i][keepIndex] for i in propertyValueList]) return None else: result = self.new() result.setAll([[j for j in self.all if getattr(j,attrNameString) == i][keepIndex] for i in propertyValueList]) return result
[docs] def apply(self,applyFunction,*args): """ This function will apply the given function to each element of RiskQuantLib list. """ result = [applyFunction(i,*args) for i in self.all] tmp = operation() tmp.setAll(result) return tmp
[docs] def groupBy(self, attrName:str, useObj = True, inplace = True): """ This function use pandas.DataFrame.groupby as engine. Its behavior is totally the same with pandas. Parameters ---------- attrName : str The attribute that you want to group data by. useObj : bool If true, return a RiskQuantLib list object, each element of which is also a RiskQuantLib list object. Each element will be marked by setting the attribute as the common value. inplace : bool If true, operation will be done in present RiskQuantLib list object. """ if type(attrName) == type(''): attrName = [attrName] else: pass groupBy = pd.DataFrame(self[attrName+['index']]) groupBy['obj'] = self.all df = groupBy.groupby(attrName)['obj'] kvList = [([j for j in k] if type(k) is tuple else [k], v.to_list()) for k, v in df] attrValueList = [i[0] for i in kvList] securityList = [i[1] for i in kvList] if useObj: if inplace: objList = [self.__new__(type(self)) for i in attrValueList] [i.__init__() for i in objList] else: objList = [self.copy(deep=True) for i in attrValueList] [i.setAll(j) for i,j in zip(objList,securityList)] [[setattr(j,attrName[q],i[q]) for q in range(len(attrName))] for i,j in zip(attrValueList,objList)] returnObj = self.__new__(type(self)) returnObj.__init__() returnObj.setAll(objList) return returnObj else: return dict(zip(attrValueList,securityList))
[docs] def groupByFunc(self, func, useObj = True, inplace = True): """ This function use pandas.DataFrame.groupby as engine. Its behavior is totally the same with pandas. Parameters ---------- func : function Elements will be divided into groups by the return values of this function. useObj : bool If true, return a RiskQuantLib list object, each element of which is also a RiskQuantLib list object. Each element will be marked by setting the attribute as the common value. inplace : bool If true, operation will be done in present RiskQuantLib list object. """ attrValue = self.apply(func) groupBy = attrValue.toDF() groupBy.columns = [i if type(i)==str else "groupByAttr"+str(i) for i in groupBy.columns] attrName = groupBy.columns.to_list() groupBy['index'] = self['index'] groupBy['obj'] = self.all df = groupBy.groupby(attrName)['obj'] kvList = [([j for j in k] if type(k) is tuple else [k], v.to_list()) for k, v in df] attrValueList = [i[0] for i in kvList] securityList = [i[1] for i in kvList] if useObj: if inplace: objList = [self.__new__(type(self)) for i in attrValueList] [i.__init__() for i in objList] else: objList = [self.copy(deep=True) for i in attrValueList] [i.setAll(j) for i,j in zip(objList,securityList)] [[setattr(j,attrName[q],i[q]) for q in range(len(attrName))] for i,j in zip(attrValueList,objList)] returnObj = self.__new__(type(self)) returnObj.__init__() returnObj.setAll(objList) return returnObj else: return dict(zip(attrValueList,securityList))
[docs] def sum(self,attrName:str,inplace = False): """ This function will sum the value of each element, given the attribute name. If some elements don't have the attribute, an numpy.nan will be used, and the result will skip this value. Parameters ---------- attrName : str The attribute name whose value you want to sum. inplace : bool If true, the 'attributeSum' property will be added to present object. """ if inplace: setattr(self,attrName+'Sum',np.nansum([getattr(i,attrName,np.nan) for i in self.all])) return None else: return np.nansum([getattr(i,attrName,np.nan) for i in self.all])
[docs] def mean(self,attrName:str,inplace=False): """ This function will average the value of each element, given the attribute name. If some elements don't have the attribute, an numpy.nan will be used, and the result will skip this value when calculating mean. Parameters ---------- attrName : str The attribute name whose value you want to average. inplace : bool If true, the 'attributeMean' property will be added to present object. """ if inplace: setattr(self,attrName+'Mean',np.nanmean([getattr(i,attrName,np.nan) for i in self.all])) return None else: return np.nanmean([getattr(i,attrName,np.nan) for i in self.all])
[docs] def std(self,attrName:str,inplace=False): """ This function will calculate the standard deviation of value of each element, given the attribute name. If some elements don't have the attribute, an numpy.nan will be used, and the result will skip this value. Parameters ---------- attrName : str The attribute name whose value you want to calculate standard deviation. inplace : bool If true, the 'attributeStd' property will be added to present object. """ if inplace: setattr(self,attrName+'Std',np.nanstd([getattr(i,attrName,np.nan) for i in self.all])) return None else: return np.nanstd([getattr(i,attrName,np.nan) for i in self.all])
[docs] def execFunc(self,functionName,*args,**kwargs): """ This function will execute instance function for every element in present RiskQuantLib list object, given the function name. If some elements don't have the function, a Null function will be used, and the result will skip the execution for that element. Parameters ---------- functionName : str The function that element has, and you want to call. """ try: result = [getattr(i,functionName,lambda *arg, **kwarg:None)(*args, **kwargs) for i in self.all] tmp = operation() tmp.setAll(result) return tmp except Exception as e: print('Execution Failed As Follows:', file=sys.stderr) print('Instrument Type:', self.__class__.__name__, file=sys.stderr) print('Function Name:', functionName, file=sys.stderr) print('Exception Info:', e, file=sys.stderr) traceback.print_exc()
[docs] def paraFunc(self, functionName, *args, **kwargs): """ This function will execute instance function for every element in present RiskQuantLib list object by parallel, which means elements will be serialized and sent to different processes. If some elements don't have the function which named as you specify, a Null function will be used, and the result will skip the execution for that element. The call will be delayed until you run currentRiskQuantLibList.paraRun(). Parameters ---------- functionName : str The function that element has, and you want to call. Returns ------- operation """ self._paraQueue = [] if not hasattr(self, "_paraQueue") else self._paraQueue self._paraQueue.append((functionName, args, kwargs)) return self
[docs] def paraRun(self, nJobs: int = -1): """ This is the trigger of run paralleled function for every element in this RiskQuantLib list. Before you use this function, you should call someList.paraFunc('functionName') for more than one time to tell RiskQuantLib what function you want to parallel. Returns ------- operation """ self._paraQueue = [] if not hasattr(self, "_paraQueue") else self._paraQueue def _coll(obj): paraTmp = [getattr(obj, functionName, lambda *arg, **kwarg:None)(*args, **kwargs) for functionName,args,kwargs in self._paraQueue] return paraTmp from joblib import Parallel, delayed try: result = Parallel(n_jobs=nJobs)(delayed(_coll)(i) for i in self.all) tmp = operation() tmp.setAll(result) self._paraQueue = [] return tmp except Exception as e: print('Parallel Failed:', e) pass
[docs] def vecApply(self, lambdaFunction=lambda x: None): """ This function is like operation.apply, it will apply the given function to each element of RiskQuantLib list. The passed function should be one-parameter function, whose parameter represents the element of current RiskQuantLib list. The difference is this function will try to vectorize the calculation to speed up. It will convert the attribute to np.array and operate mathematical calculation. Notice: Only Pure Mathematic Function can be vectorized, this is to say, any logic key word of python, like 'in', 'if', 'for', etc, can Not be used in passed function. If must be pure math formula. Luckily, all numpy function can be used. If you want to do some logistic calculation, you should make sure you write function as logic-matrix-like. Returns ------- operation """ allElement = self.all if len(allElement) != 0: dynamicClass = type('tmp', (vectorization,), {'lambdaFuncName': lambdaFunction}) dynamicObj = dynamicClass(allElement) result = super(vectorization, dynamicObj).__getattribute__('lambdaFuncName')() del dynamicObj, dynamicClass return self if result is None else result else: return self
[docs] def vecFunc(self, functionName, *args, **kwargs): """ This function is like operation.execFunc, it will call the given function of each element of RiskQuantLib list. The passed function should be the attribute function of element of current RiskQuantLib List. The difference is this function will try to vectorize the calculation to speed up. It will convert the attribute to np.array and operate mathematical calculation. Notice: Only Pure Mathematic Function can be vectorized, this is to say, any logic key word of python, like 'in', 'if', 'for', etc, can Not be used in passed function. If must be pure math formula. Luckily, all numpy function can be used. If you want to do some logistic calculation, you should make sure you write function as logic-matrix-like. Returns ------- operation """ allElement = self.all if len(allElement) != 0: functionObj = getattr(type(allElement[0]), functionName, lambda *argsSub, **kwargsSub: None) functionObjName = functionName + 'Attribute' dynamicClass = type(functionName, (vectorization,), {functionObjName: functionObj}) dynamicObj = dynamicClass(allElement) result = super(vectorization, dynamicObj).__getattribute__(functionObjName)(*args, **kwargs) del dynamicObj, dynamicClass return self if result is None else result else: return self
[docs] def copy(self,deep = True): """ Get a copy of present RiskQuantLib list object. """ if deep: tmp = copy.deepcopy(self) return tmp else: tmp = copy.copy(self) return tmp
[docs] def sort(self,propertyList:list,reverse = False,inplace=False, useObj = True): """ Sort the present RiskQuantLib list by attributes. You can also sort it by a single attribute. """ if type(propertyList)!=type([]): self._sortPropertyList = [propertyList] else: self._sortPropertyList = propertyList if inplace: sortBy = pd.DataFrame(self[self._sortPropertyList]) sortBy['obj'] = self.all df = sortBy.sort_values(by=self._sortPropertyList, ascending=not reverse) sortedList = df['obj'].to_list() self.setAll(sortedList) return None elif useObj: tmpObj = copy.deepcopy(self) sortBy = pd.DataFrame(tmpObj[self._sortPropertyList]) sortBy['obj'] = tmpObj.all df = sortBy.sort_values(by=self._sortPropertyList, ascending=not reverse) sortedList = df['obj'].to_list() tmpObj.setAll(sortedList) return tmpObj else: sortBy = pd.DataFrame(self[self._sortPropertyList]) sortBy['obj'] = self.all df = sortBy.sort_values(by=self._sortPropertyList, ascending=not reverse) return df['obj'].to_list()
[docs] def fillna(self, propertyList:list, value, inplace=False, useObj = True): """ Fill the nan value or blank string will the given value. If attribute doesn't exist, nothing will be done. You can fill the nan value by single attribute or a list of attributes. """ from RiskQuantLib.Tool.mathTool import isnan if type(propertyList)!=type([]): propertyList = [propertyList] else: pass if inplace: [[getattr(j,'set'+i[0].capitalize()+i[1:])(value) if hasattr(j,'set'+i[0].capitalize()+i[1:]) and (isnan(getattr(j,i)) or getattr(j, i)=='') else None for j in self.all if hasattr(j,i)] for i in propertyList] [[setattr(j,i,value) if hasattr(j, i) and (isnan(getattr(j, i)) or getattr(j, i)=='') else None for j in self.all if hasattr(j, i)] for i in propertyList] elif useObj: tmpObj = self.copy(deep=True) [[getattr(j,'set'+i[0].capitalize()+i[1:])(value) if hasattr(j,'set'+i[0].capitalize()+i[1:]) and (isnan(getattr(j,i)) or getattr(j, i)=='') else None for j in tmpObj.all if hasattr(j,i)] for i in propertyList] [[setattr(j,i,value) if hasattr(j, i) and (isnan(getattr(j, i)) or getattr(j, i)=='') else None for j in tmpObj.all if hasattr(j, i)] for i in propertyList] return tmpObj else: tmpList = copy.deepcopy(self.all) [[getattr(j,'set'+i[0].capitalize()+i[1:])(value) if hasattr(j,'set'+i[0].capitalize()+i[1:]) and (isnan(getattr(j,i)) or getattr(j, i)=='') else None for j in tmpList if hasattr(j,i)] for i in propertyList] [[setattr(j,i,value) if hasattr(j, i) and (isnan(getattr(j, i)) or getattr(j, i)=='') else None for j in tmpList if hasattr(j, i)] for i in propertyList] return tmpList
[docs] def reIndex(self, attrName: str, indexList: list, useObj: bool = True, inplace: bool = False): """ Return a new RiskQuantLib list object whose elements are chosen according to the index, the index of new list element must be in th indexList to be included. If more than one elements meet requirement, only the first one will be kept. """ tmpList = [[j for j in self.all if getattr(j, attrName, np.nan) == i][0] for i in indexList] if inplace: self.setAll(tmpList) return None elif useObj: tmpObj = self.copy(deep=True) tmpObj.setAll(tmpList) return tmpObj else: return tmpList
[docs] def str(self): return self.__str__()
[docs] def rolling(self, windowNumber: int, useObj: bool = True): """ For each element, This function will collect the n elements before that element, create a new RiskQuantLib list object to holding it. The new list will be set as an attribute of 'rolling', thus the present element can reach to the information of its former elements. """ rollingList = [self[max((i-windowNumber+1),0):(i+1)] for i in range(len(self))] if useObj: rollingObj = [self.__new__(type(self)) for i in range(len(self))] [i.__init__() for i in rollingObj] [i.setAll(j) for i,j in zip(rollingObj,rollingList)] [setattr(i,'rolling',j) for i,j in zip(self.all,rollingObj)] [setattr(i, 'rollingWindow', windowNumber) for i in self.all] returnObj = self.__new__(type(self)) returnObj.__init__() returnObj.setAll(rollingObj) return returnObj else: return rollingList
[docs] def new(self): """ Return a totally new RiskQuantLib list object like this. """ tmpObj = self.__new__(type(self)) tmpObj.__init__() return tmpObj
[docs] def filter(self,filterFunction,useObj = True): """ Return a RiskQuantLib list object given the fiter function. This is used to choose some elements which meet your requirements. """ resultList = [i for i in self.all if filterFunction(i)] if useObj: tmpObj = self.new() tmpObj.setAll(resultList) return tmpObj else: return resultList
[docs] def head(self, numberOfElement:int): """ Get the first numberOfElement elements of the list. """ return self[:numberOfElement]
[docs] def tail(self, numberOfElement:int): """ Get the last numberOfElement elements of the list. """ return self[(-1 * numberOfElement):]
[docs] def haveAttr(self, attrName): """ Get the elements which have certain attributes, if passed a list, the elements which have all attributes in the list will be returned, otherwise an empty list will be returned. """ if type(attrName)==str: return self.filter(lambda x:hasattr(x, attrName)) elif type(attrName)==list: return self.filter(lambda x:sum([hasattr(x, i) for i in attrName])==len(attrName)) else: return self.filter(lambda x:False)
[docs] def isIn(self, anotherList): """ Find the elements which are also in the given list. Use these elements to generate a new RiskQuantLib list and return it. anotherList can be any iterable object. """ return self.filter(lambda x: x in set(anotherList))
[docs] def isNotIn(self, anotherList): """ Find the elements which are not in the given list. Use these elements to generate a new RiskQuantLib list and return it. anotherList can be any iterable object. """ return self.filter(lambda x: x not in set(anotherList))
[docs] def union(self, anotherList): """ Find the elements which are in the given list or in the current list. Use these elements to generate a new RiskQuantLib list and return it. anotherList can be any iterable object. """ return self.isNotIn(anotherList) + anotherList
[docs] def match(self, anotherList, targetAttrName: str, matchFunctionOnLeft=lambda x: True, matchFunctionOnRight=lambda y: True): """ For each element in current rqlList, find all elements that meet requirement matchFunctionOnRight(element_in_another_list) == matchFunctionOnLeft(element_in_this_list) from another RiskQuantLib list object. The elements meeting requirement will be set as an attribute of element_in_this_list. This function is very like rqlList.join, the only difference is that this function is vectorized, thus run with faster speed. Of cause, this function has some drawbacks, it can not deal with complicated relation which is described as coupled equation, while rqlList.join can do it. Parameters ---------- anotherList : RiskQuantLib list or list Another list object, holding elements waiting to be selected. targetAttrName : str The attribute name that you want to use to mark collected elements from another list. matchFunctionOnLeft : function This function has and only has one parameter, which stands for element in current list. matchFunctionOnRight : function This function has and only has one parameter, which stands for element in another list. """ anotherMatchArray = np.array([matchFunctionOnRight(another) for another in anotherList]) thisMatchArray = np.array([matchFunctionOnLeft(this) for this in self.all]).reshape(-1, 1) targetObjIDArray = np.apply_along_axis(lambda x: x == anotherMatchArray, 1, thisMatchArray) anotherObjIndexArray = np.array(range(len(anotherList.all))) matchedValueList = [[anotherList.all[idx] for idx in anotherObjIndexArray[targetIDList]] for targetIDList in targetObjIDArray] matchedObjList = [anotherList.new() for _ in range(len(self.all))] [matchedObj.setAll(matchedValue) for matchedObj,matchedValue in zip(matchedObjList, matchedValueList)] [setattr(this, targetAttrName, matchedObj) for this, matchedObj in zip(self.all, matchedObjList)]
[docs] def join(self, anotherList, targetAttrName: str, filterFunction = lambda x,y:True): """ For each element, find all elements that meet requirements from another RiskQuantLib list object. The elements meeting requirement will be set as an attribute of present list. This function is very like pandas.DataFrame.merge. Parameters ---------- anotherList : RiskQuantLib list or list Another list object, holding elements waiting to be selected. targetAttrName : str The attribute name that you want to use to mark collected elements from another list. filterFunction : function This function has two parameter, left and right, and must return a bool value. If true is returned, the element will be added. False means not added. """ valueList = [[another for another in anotherList.all if filterFunction(this,another)] for this in self.all] valueObj = [anotherList.new() for i in self.all] [obj.setAll(value) for obj,value in zip(valueObj,valueList)] [setattr(i,targetAttrName,j) for i,j in zip(self.all,valueObj)]
[docs] def connect(self, anotherList, targetAttrNameOnLeft: str, targetAttrNameOnRight: str, filterFunction = lambda x,y:True, unsymmetrical = False, filterFunctionOnLeft = lambda presentList, anotherList: True, filterFunctionOnRight=lambda presentList,anotherList: True): """ For each element, find all elements that meet requirements from another RiskQuantLib list object. The elements meeting requirement will be set as an attribute of present list. After this is down, for each element of another RiskQuantLib, find all elements that meet requirements from present RiskQuantLib list object. The elements meeting requirement will be set as an attribute of another list. This function actually calls join twice, but it switch the position of two RiskQuantLib list in the second call. Parameters ---------- anotherList : RiskQuantLib list or list Another list object, holding elements waiting to be selected. targetAttrNameOnLeft : str The attribute name of present list that you want to use to mark collected elements from another list. filterFunction : function This function has two parameter, left and right, and must return a bool value. If true is returned, the element will be added to each other. False means not added. unsymmetrical : bool If true, different rules will be used when join two list, where filterFunctionOnLeft will be used when present list join another list, and filterFunctionOnRight will be used when another list join present list. filterFunctionOnLeft : function This function has two parameter, presentList and anotherList, and must return a bool value. If true is returned, the element from another list will be added to present list. False means not added. targetAttrNameOnRight : str The attribute name of another list that you want to use to mark collected elements from present list. filterFunctionOnRight : function This function has two parameter, presentList and anotherList, and must return a bool value. If true is returned, the element from present list will be added to another list. False means not added. Returns ------- None """ if unsymmetrical: def adjustedFilterFunctionOnRight(x,y): return filterFunctionOnRight(y,x) self.join(anotherList,targetAttrNameOnLeft,filterFunctionOnLeft) anotherList.join(self,targetAttrNameOnRight,adjustedFilterFunctionOnRight) else: def adjustedFilterFunctionOnRight(x,y): return filterFunction(y,x) self.join(anotherList,targetAttrNameOnLeft,filterFunction) anotherList.join(self,targetAttrNameOnRight,adjustedFilterFunctionOnRight)
[docs] def scale(self, attrName: str, targetAttrName: str = '', filterFunction = lambda x: True, inplace: bool = False): """ For each element, find the attribute value and calculate the portion of each element to the sum, given attribute name. After calling this function, a 'scaled_attrName' attribute will be added to each element. This function is very like pandas.DataFrame.groupby.agg. Parameters ---------- attrName :str The attribute which you want to scale. targetAttrName : str The attribute name that you want to use to mark the portion of element. filterFunction : function This function has one parameter, element, and must return a bool value. If true is returned, the element will be include when process scale. False means not included. """ if targetAttrName=='': targetAttrName = 'scaled_'+attrName total = self.sum(attrName) objList = [i for i in self.all if filterFunction(i)] scaled = [getattr(i,attrName,np.nan)/total for i in objList] if inplace: [setattr(obj,targetAttrName,scaledValue) for obj,scaledValue in zip(self.all,scaled)] else: tmpObj = self.new() tmpObj.setAll(objList) [setattr(obj, targetAttrName, scaledValue) for obj, scaledValue in zip(tmpObj.all, scaled)] return tmpObj
[docs] def merge(self, anotherList, how : str, on = lambda x,y:True, inplace = False): """ This function will merge two RiskQuantLib list object. This function should only be used when washing data. After merge, any change to source list will or will not influence merge result, depending on python copy mechanism. This won't be a problem if data flows by single direction. However, troubles will come when you wish to change the origin to influence the merge result list, after merge action. In short, do not change the origin list once you merge them, any change should be done by merged result. If 'on' function can not identify the only element to merge, all unique attributes of the elements that meet requirement will be added to present list element. (Unique means this attribute only appears once in the list element, the others don't have it.) If the attribute appears more than one times, only the first will be added. It's like pandas.DataFrame.merge, but not totally the same. In RiskQuantLib, listA.merge(listB) is not the same from listB.merge(listA), even if you specify the same value of parameter 'how' and 'on'. No matter what value parameter 'how' is given, attributes of elements in anotherList that meet requirement of 'on' will be added as attributes of elements in present list. The attributes who have the same name in both lists won't be copied, which means you can't merge two lists twice. (In the second time you merge them, the first list object has all attributes that anotherList has, so all attributes will be skipped, no attribute will be copied.) If how == 'inner', only the elements in the present list who have relative elements in anotherList will be returned. If how == 'outer', all elements in presentList, plus elements from anotherList that don't have relative elements in present list will be returned. If how == 'left', all elements in presentList will be returned. if how == 'right', the elements in the present list who have relative elements in anotherList, plus elements from anotherList that don't have relative elements in present list will be returned. """ def trySetAttr(obj,attrName,attrValue): try: setattr(obj,attrName,attrValue) except: pass if inplace: leftList = self.all rightList = anotherList else: leftList = self.copy(deep=True).all rightList = copy.deepcopy(anotherList) inner = [{'left':left,'right':right} for left in leftList for right in rightList if on(left,right)] leftInner = list(set([pair['left'] for pair in inner])) rightInner = list(set([pair['right'] for pair in inner])) leftResidual = [i for i in leftList if i not in leftInner] rightResidual = [i for i in rightList if i not in rightInner] if len(leftInner) == len(inner): [[trySetAttr(element['left'],attr,getattr(element['right'],attr,np.nan)) if not hasattr(element['left'],attr) else None for attr in dir(element['right'])] for element in inner] else: for element in inner: [trySetAttr(element['left'], attr, getattr(element['right'], attr, np.nan)) if not hasattr(element['left'], attr) else None for attr in dir(element['right'])] tmpObj = self.new() if how == 'outer': tmpObj.setAll(leftInner+leftResidual+rightResidual) elif how == 'inner': tmpObj.setAll(leftInner) elif how == 'left': tmpObj.setAll(leftInner + leftResidual) elif how == 'right': tmpObj.setAll(leftInner + rightResidual) return tmpObj
[docs] def zip(self, attrNameList : list, *args): """ This function will convert the values of given attribute into a zip object. If you also pass an external list or other iterable object, it will zip that list with the value of given internal attribute. Parameters ---------- attrNameList : str or list The attribute name whose values you want to zip. args : list or iterable External iterable object, whose value is zipped with internal attribute values. Returns ------- zip object """ argsMerged = tuple([self[attrName] for attrName in attrNameList] + [i for i in args]) return zip(*argsMerged)
[docs] def toDict(self, attrNameAsKey : str, attrNameAsValue : str): """ This function will return a dict, given the attribute name whose values are used as dict key, and attribute name whose values are used as dict value. Parameters ---------- attrNameAsKey : str The attribute name whose values you want to use as dict keys. attrNameAsValue : str The attribute name whose values you want to use as dict values. Returns ------- dict """ return dict(self.zip([attrNameAsKey, attrNameAsValue]))
[docs] def toSeries(self, attrNameAsValue : str = '', attrNameAsIndex : str = '', nameString : str = '', index = None): """ This function will return a pandas.Series object, given the attribute name whose values are used as series value, and attribute name whose values are used as series index. You can also pass a string to identify the name of series. If you don't pass any attribute name, this function will use all elements themselves in this list to form a Series. Parameters ---------- attrNameAsValue : str The attribute name whose values you want to use as series value. attrNameAsIndex : str The attribute name whose values you want to use as series index. nameString : str The name of series. index : list or operation The index list. If you do not specify the attribute name used as index, you can pass a list as index manually. Returns ------- pd.Series """ if len(self)==0: return pd.Series(dtype=float, name=nameString) if attrNameAsValue == '': result = pd.Series(self.all, index=index, name=nameString) if index else pd.Series(self.all, name=nameString) elif attrNameAsIndex == '': result = pd.Series(self[attrNameAsValue], name=nameString, index=index) if index else pd.Series(self[attrNameAsValue], name=nameString) else: result = pd.Series(self[attrNameAsValue], index = self[attrNameAsIndex], name = nameString) return result
[docs] def toDF(self, attrNameList : str or list = '', attrNameAsIndex : str = '', index = None): """ This function will return a pandas.DataFrame object, given the attribute name whose values are used as dataframe value, and attribute name whose values are used as dataframe index. If you don't pass any attribute name, this function will use all elements themselves in this list to form a dataframe. Parameters ---------- attrNameList : str or list The attribute name whose values you want to use as dataframe value. attrNameAsIndex : str The attribute name whose values you want to use as dataframe index. index : list or operation The index list. If you do not specify the attribute name used as index, you can pass a list as index manually. Returns ------- pd.DataFrame """ if attrNameList == '': result = pd.DataFrame(self.all, index=index) if index else pd.DataFrame(self.all) return result elif type(attrNameList) == str: attrNameList = [attrNameList] result = pd.DataFrame(self[attrNameList]) if attrNameAsIndex == '': dfIndex = index if index else result.index else: dfIndex = self[attrNameAsIndex] result.index = dfIndex return result
[docs] def toArray(self, attrNameList : str or list = ''): """ This function will return a numpy.ndarray object, given the attribute name whose values are used as array value. If you don't pass any attribute name, this function will use all elements themselves in this list to form an array. Parameters ---------- attrNameList : str or list The attribute name whose values you want to use as array value. Returns ------- np.ndarray """ if attrNameList == '': return np.array(self.all) elif type(attrNameList) == str: attrNameList = [attrNameList] return self.toDF(attrNameList).values
[docs] def toList(self, attrNameList: str or list = ''): """ This function will return a list object, given the attribute name whose values are used as list value. If you don't pass any attribute name, this function will use all elements themselves in this list to form a new list. Parameters ---------- attrNameList : str or list The attribute name whose values you want to use as list value. Returns ------- list """ if attrNameList == '': return self.all elif type(attrNameList) == str: attrNameList = [attrNameList] return [[getattr(ele,attr,np.nan) for attr in attrNameList] for ele in self]
[docs] def fromDF(self, df: pd.DataFrame, code: str = '', name: str = ''): """ This function will convert a dataframe to RiskQuantLib list object. If there is already elements in present list, all elements will be deleted. The column name of dataframe should be in English, and this function will try to set it to element attribute if there exists the attribute with the same name. That is, this function will only set value of registered attribute. If not registered in current list class, it will be skipped. Parameters ---------- df : pd.DataFrame The dataframe you want to pull data from. code : str The column name that you want to mark as 'code' attribute. name : str The column name that you want to mark as 'name' attribute. Returns ------- RiskQuantLib List """ if not hasattr(self,'__elementClass__'): print("The List Must Have Attribute Named __elementClass__ To Call This Function.") return else: self.setAll([]) instrumentNameString = self.__elementClass__.__name__ c_instrumentNameString = instrumentNameString[0].capitalize() + instrumentNameString[1:] if code == '' and name == '': getattr(self, 'add' + c_instrumentNameString + 'Series', lambda x,y:None)(df.index,df.index) elif code!='' and name!='': getattr(self, 'add' + c_instrumentNameString + 'Series', lambda x, y: None)(df[code], df[name]) elif code!='': getattr(self, 'add' + c_instrumentNameString + 'Series', lambda x, y: None)(df[code], df[code]) elif name!='': getattr(self, 'add' + c_instrumentNameString + 'Series', lambda x, y: None)(df[name], df[name]) else: pass def trySetAttr(attrName,pdseries): try: c_attrName = attrName[0].capitalize() + attrName[1:] if hasattr(self,'set'+c_attrName): getattr(self, 'set'+c_attrName, lambda x,y:None)(self['code'],pdseries) else: pass except: pass [trySetAttr(col,df[col]) for col in df.columns] return self
[docs] def addFromDF(self, df : pd.DataFrame, code :str = '', name:str = ''): """ This function will add elements from a dataframe. If there is already elements in present list, all elements will be kept, while new elements will be added. The column name of dataframe should be in English, and this function will try to set it to element attribute if there exists the attribute with the same name. That is, this function will only set value of registered attribute. If not registered in current list class, it will be skipped. Parameters ---------- df : pd.DataFrame The dataframe you want to pull data from. code : str The column name that you want to mark as 'code' attribute. name : str The column name that you want to mark as 'name' attribute. Returns ------- RiskQuantLib List """ tmpList = self.new() tmpList.fromDF(df = df, code=code,name=name) self.setAll(self.all + tmpList.all)
[docs] def fromIterable(self, iterable:list, code :str = '', name:str = ''): """ This function will convert an iterable to RiskQuantLib list object. If there is already elements in present list, all elements will be deleted. This function will only set value of registered attribute. If not registered in current list class, it will be skipped. Parameters ---------- iterable : list The iterable object you want to pull data from. code : str The attribute name of iterable that you want to mark as 'code' in RiskQuantLib List Object. name : str The attribute name of iterable that you want to mark as 'name' in RiskQuantLib List Object. Returns ------- RiskQuantLib List """ if not hasattr(self,'__elementClass__'): print("The List Must Have Attribute Named __elementClass__ To Call This Function.") return else: self.setAll([]) instrumentNameString = self.__elementClass__.__name__ c_instrumentNameString = instrumentNameString[0].capitalize() + instrumentNameString[1:] if code == '' and name == '': getattr(self, 'add' + c_instrumentNameString + 'Series', lambda x,y:None)([str(index) for index,obj in enumerate(iterable)],[str(index) for index,obj in enumerate(iterable)]) elif code!='' and name!='': getattr(self, 'add' + c_instrumentNameString + 'Series', lambda x, y: None)([getattr(obj,code,str(index)) for index,obj in enumerate(iterable)], [getattr(obj,name,str(index)) for index,obj in enumerate(iterable)]) elif code!='': getattr(self, 'add' + c_instrumentNameString + 'Series', lambda x, y: None)([getattr(obj,code,str(index)) for index,obj in enumerate(iterable)], [getattr(obj,code,str(index)) for index,obj in enumerate(iterable)]) elif name!='': getattr(self, 'add' + c_instrumentNameString + 'Series', lambda x, y: None)([getattr(obj,name,str(index)) for index,obj in enumerate(iterable)], [getattr(obj,name,str(index)) for index,obj in enumerate(iterable)]) else: pass def trySetAttr(obj,attrName,attrValue): try: c_attrName = attrName[0].capitalize() + attrName[1:] if hasattr(obj,'set'+c_attrName): getattr(obj, 'set'+c_attrName, lambda x:None)(attrValue) else: pass except: pass [[trySetAttr(instrumentObj,attr,getattr(obj,attr,np.nan)) for attr in dir(obj)] for obj,instrumentObj in zip(iterable,self.all)] return self
[docs] def addFromIterable(self, iterable:list, code :str = '', name:str = ''): """ This function will add elements from an iterable object. If there is already elements in present list, all elements will be kept, while new elements will be added. This function will only set value of registered attribute. If not registered in current list class, it will be skipped. Parameters ---------- iterable : list The iterable object you want to pull data from. code : str The attribute name of iterable that you want to mark as 'code' in RiskQuantLib List Object. name : str The attribute name of iterable that you want to mark as 'name' in RiskQuantLib List Object. Returns ------- RiskQuantLib List """ tmpList = self.new() tmpList.fromIterable(iterable=iterable, code=code,name=name) self.setAll(self.all + tmpList.all)
[docs] def updateAttr(self, attrName : str, codeSeries, valueSeries, byAttr = 'code'): """ This function will update the attribute value of elements whose code appears in given codeSeries. If code of element doesn't show in codeSeries, these elements will be skipped and not changed. If the attribute is not registered, it will be created. Parameters ---------- attrName : str The attribute whose value is exactly what you want to update. codeSeries : list An iterable object that contains codes of elements whose attribute need to be updated. valueSeries : list An iterable object that contains new values of attributes. byAttr : str The attribute that is used to index element in this list. By default, this attribute is code. But you can specify another attribute A, if you do it, this function will update the attribute value of elements whose attribute A appears in given codeSeries. Returns ------- None """ def setAttr(totalList,attrNameNotRegistered,codeSeriesNotRegistered,valueSeriesNotRegistered,byAttrNotRegistered): index = set(codeSeriesNotRegistered) updateList = totalList.filter(lambda x: getattr(x, byAttrNotRegistered, np.nan) in index) setDict = dict(zip(codeSeriesNotRegistered,valueSeriesNotRegistered)) [setattr(i,attrNameNotRegistered,setDict[getattr(i,byAttrNotRegistered,np.nan)]) for i in updateList] newCreated = totalList.filter(lambda x:not hasattr(x,attrNameNotRegistered)) [setattr(i,attrNameNotRegistered,np.nan) for i in newCreated] c_attrName = attrName[0].capitalize() + attrName[1:] getattr(self,'set'+c_attrName,lambda x,y,z,k:setAttr(self,attrName,x,y,z))(codeSeries,valueSeries,byAttr,True)
[docs] def updateAttrFromSeries(self, sr: pd.Series, attrName = '', byAttr = 'code'): """ This function will update attribute value of current list by passed series. The name of passes series should be in English, and if current list has registered attribute whose name is the same with column name, it will be updated. If the attribute is not registered, it will be created. Parameters ---------- sr : pd.Series The series that you want to update data from. attrName : str The attribute whose value is exactly what you want to update. byAttr : str The attribute that is used to index element in this list. By default, this attribute is code. But you can specify another attribute A, if you do it, this function will update the attribute value of elements whose attribute A appears in df[code] or df.index. Returns ------- None """ attrName = sr.name if attrName=='' else attrName if attrName: self.updateAttr(attrName, sr.index, sr.values, byAttr=byAttr) else: raise ValueError("Can not update from a series without name if you don't pass an attrName.")
[docs] def updateAttrFromDF(self, df: pd.DataFrame, code:str = '', byAttr = 'code'): """ This function will update attribute value of current list by passed dataframe. The column name of passes dataframe should be in English, and if current list has registered attribute whose name is the same with column name, it will be updated. If the attribute is not registered, it will be created. Parameters ---------- df : pd.DataFrame The dataframe that you want to update data from. code : str A column name of df that you used to mark rows. Elements whose code is in this column will be updated, and those not in this column will not be influenced. If blank, the index of df will be used as code. byAttr : str The attribute that is used to index element in this list. By default, this attribute is code. But you can specify another attribute A, if you do it, this function will update the attribute value of elements whose attribute A appears in df[code] or df.index. Returns ------- None """ if code == '': [self.updateAttr(col, df.index, df[col], byAttr=byAttr) for col in df.columns] else: [self.updateAttr(col, df[code], df[col], byAttr=byAttr) for col in df.columns]
[docs] def updateAttrFromArray(self, array: np.ndarray, codeList: list, attributeNameList: str or list, byAttr = 'code'): """ This function will update attribute value of current list by passed array. You have to identify that each column of array belongs to which attribute, and each row of array represents which element. The row number of passed array must equal to length of codeList, and the column number of passed array must equal to length of attributeNameList. If the attribute are not registered, the attribute will be created. Parameters ---------- array : np.ndarray The array you which want to update this list from. codeList : list The code of element whose attribute will be updated by this function. Those whose code is not specified in this list will not be influenced. The length of this list must equal to the row number of passed array. attributeNameList : str or list The attribute that you want to update. Those attribute in this list will be updated, and those are not in this list will not be influenced. byAttr : str The attribute that is used to index element in this list. By default, this attribute is code. But you can specify another attribute A, if you do it, this function will update the attribute value of elements whose attribute A appears in given codeList. Returns ------- None """ attributeNameList = [attributeNameList] if type(attributeNameList)==str else attributeNameList [self.updateAttr(attr, codeList, array[:,index], byAttr=byAttr) for index,attr in enumerate(attributeNameList)]
[docs] def updateAttrFromDict(self, attrName : str, codeValueDict : dict, byAttr = 'code'): """ This function will update attribute value of current list by passed dict. The key of passes dict should be code string. If current list has registered attribute whose name is in keys of codeValueDict, it will be updated. If the attribute is not registered, it will be created. Parameters ---------- attrName : str The attribute whose value is exactly what you want to update. codeValueDict : dict A dict that maps code to value. byAttr : str The attribute that is used to index element in this list. By default, this attribute is code. But you can specify another attribute A, if you do it, this function will update the attribute value of elements whose attribute A appears in given codeValueDict. Returns ------- None """ self.updateAttr(attrName,codeValueDict.keys(),codeValueDict.values(), byAttr=byAttr)
#<operation> #</operation>