Source code for RiskQuantLib.Tool.threadTool
#!/usr/bin/python
#coding = utf-8
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Callable, Tuple, Any, Optional
#<import>
#</import>
[docs]def multiThread(functionList: List[Callable], argsList: List[Tuple] = None, kwargsList: List[Dict] = None, workerNumber: Optional[int] = None) -> List[Any]:
"""
Run callables by multi-thread. It will pass the first element of argList as parameter of the first function, pass
the second element of argList as parameter of the second function, etc. It returns a list of results.
Parameters
----------
functionList : List[Callable]
A list of callables.
argsList : List[Tuple]
A list of parameters, like [(arg1, arg2), (arg3,)]
kwargsList : List[Dict]
A list of parameters, like [{argName1:argValue1, argName2:argValue2}, {argName3:argValue3}]
workerNumber : Optional[int]
Max number of core used.
Returns
-------
List[Any]
The results of every function.
"""
numberThread = len(functionList)
argsList = [tuple() for _ in range(numberThread)] if argsList is None else argsList
kwargsList = [dict() for _ in range(numberThread)] if kwargsList is None else kwargsList
assert numberThread == len(argsList) == len(kwargsList)
results = [None for _ in range(numberThread)]
with ThreadPoolExecutor(max_workers=workerNumber) as executor:
futureDict = {executor.submit(func, *args, **kwargs): i for i, (func, args, kwargs) in enumerate(zip(functionList, argsList, kwargsList))}
for future in as_completed(futureDict):
index = futureDict[future]
try:
results[index] = future.result()
except Exception as e:
results[index] = e
print(f"Task at index {index} ({functionList[index].__name__}) failed with error: {e}")
return results
#<threadTool>
#</threadTool>