Source code for RiskQuantLib.Tool.fileTool

#!/usr/bin/python
#coding = utf-8

import time, os, datetime
import pandas as pd
#<import>
#</import>


[docs]def modifyDateIsToday(filePath:str,mode='M'): """ This function will return a bool value, showing whether the target file is modified on today. """ if mode == 'M': modifyDate = os.path.getmtime(filePath) elif mode=='C': modifyDate = os.path.getctime(filePath) else: print("Mode Error") return datetimeModifyDate = datetime.datetime.utcfromtimestamp(modifyDate).strftime("%Y--%m--%d") datetimeDateNow = datetime.datetime.now().strftime("%Y--%m--%d") if datetimeModifyDate == datetimeDateNow: return True else: return False
[docs]def waitForFile(filePath:str,fileNameKeyWord:str): """ This function will wait for the change or modification of a file. """ fileListInPath = [i for i in os.listdir(filePath) if i.find(fileNameKeyWord)!=-1] while len(fileListInPath)==0: time.sleep(2) fileListInPath = [i for i in os.listdir(filePath) if i.find(fileNameKeyWord)!=-1] return 0
[docs]def dumpVariable(variable,filePath:str): """ Use python module pickle to dump variable. """ import pickle as pkl pkl.dump(variable,open(filePath,"wb"))
[docs]def loadVariable(filePath:str): """ Use python module pickle to load variable. """ import pickle as pkl return pkl.load(open(filePath,"rb"))
[docs]def loadCsv(filePath:str, index_col=0): """ Use pandas.read_csv to load csv file, default set the first column as index. """ return pd.read_csv(filePath, index_col=index_col)
[docs]def loadCsvTimeSeries(filePath:str, index_col=0, converters=None): """ Use pandas.read_csv to load csv file, default set the first column as index, and set its datatype as pandas.Timestamp. """ converters = {0: pd.Timestamp} if converters is None else converters return pd.read_csv(filePath, index_col=index_col, converters=converters)
[docs]def loadCsvDict(dirPath:str, index_col=0): """ Read all csv file in the target directory, return a dict whose keys are file names and values are pandas.DataFrames. """ return {i:loadCsv(dirPath+os.sep+i, index_col=index_col) for i in os.listdir(dirPath) if os.path.splitext(i)[-1] == '.csv'}
[docs]def loadCsvTimeSeriesDict(dirPath:str, index_col=0, converters=None): """ Read all csv file in the target directory, return a dict whose keys are file names and values are pandas.DataFrames. Default set the first column of each file as index, and set its datatype as pandas.Timestamp. """ converters = {0: pd.Timestamp} if converters is None else converters return {i:loadCsvTimeSeries(dirPath+os.sep+i, index_col=index_col, converters=converters) for i in os.listdir(dirPath) if os.path.splitext(i)[-1] == '.csv'}
[docs]def loadExcel(filePath:str): """ Use pandas.read_excel to load csv file. """ return pd.read_excel(filePath)
[docs]def loadExcelDict(dirPath:str): """ Read all excel file in the target directory, return a dict whose keys are file names and values are pandas.DataFrames. """ return {i:loadExcel(dirPath+os.sep+i) for i in os.listdir(dirPath) if os.path.splitext(i)[-1] in {'.xlsx', '.xls'}}
[docs]def dumpDictToJson(dictVariable:dict,filePath:str): """ Dump dict to json file. """ import json json.dump(dictVariable,open(filePath,"w",encoding='UTF-8'),ensure_ascii=False)
[docs]def deleteFile(filePath:str): """ Delete a file if it exists. """ if os.path.isfile(filePath): os.remove(filePath) print('Delete File: ', filePath) else: print('Delete Failed, File Does Not Exist: ', filePath)
[docs]def deleteFileWithConfirm(filePath:str): """ Before Delete a file, ask for confirmation. """ from RiskQuantLib.Tool.decoratorTool import confirmer confirmer()(deleteFile)(filePath)
[docs]def clearCachePklFile(filePath:str): """ Delete all '.pkl' files in filePath. """ print("Clearing Cache in "+filePath) fileListInPath = [i for i in os.listdir(filePath) if i.find('.pkl')!=-1] while len(fileListInPath)!=0: [deleteFile(i) for i in fileListInPath] fileListInPath = [i for i in os.listdir(filePath) if i.find('.pkl')!=-1] time.sleep(2)
#<fileTool> #</fileTool>
[docs]class fileReceiver: def __init__(self, targetFilePath): import socket self.broadcast = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.broadcast.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.broadcast.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.listen = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.listen.settimeout(1) self.listen.bind(('',9004)) self.hostname = socket.gethostname() self.fileAlreadyReceived = [] self.fileNeglected = [] self.fileReceiveFinished = False self.targetFilePath = targetFilePath def __del__(self): self.broadcast.close() self.listen.close()
[docs] def sendOnLineInfo(self): self.broadcast.sendto(('OnLine->'+self.hostname).encode('utf-8'), ('255.255.255.255', 9003))
[docs] def sendIPInfo(self): self.broadcast.sendto(('IPInfo->' + self.hostname).encode('utf-8'), ('255.255.255.255', 9006))
[docs] def receiveFileInfo(self): try: fileInfo, address = self.listen.recvfrom(1024) except: fileInfo = None address = None if fileInfo and address: fileInfo = fileInfo.decode("utf-8") fileInfoList = fileInfo.split('->') senderName = fileInfoList[1] fileName = fileInfoList[2] fileSize = int(fileInfoList[3]) return address,senderName,fileName,fileSize else: return '','','',0
[docs] def receiveFileContent(self,address,fileName,fileSize): import socket self.receive = socket.socket() self.receive.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.receive.settimeout(15) self.receive.bind(('', 9005)) self.receive.listen(5) connectEstablished = False while not connectEstablished: self.sendIPInfo() connect, _ = self.receive.accept() connectEstablished = True if connect else False totalFileSize = fileSize with open(self.targetFilePath+os.sep+fileName, 'wb') as targetFile: chunkSize = 4096 while fileSize > 0: if fileSize < chunkSize: chunkSize = fileSize data = connect.recv(chunkSize) targetFile.write(data) fileSize -= len(data) percentage = min(1 - fileSize / totalFileSize, 1) print("\r"+"Download "+fileName+": "+"".join(["=" for i in range(int(50*percentage))])+">"+str(int(100*percentage))+"%",end="") print("") if os.path.exists(self.targetFilePath+os.sep+fileName) and os.path.getsize(self.targetFilePath+os.sep+fileName)!=0: print('File Received Successfully') self.fileReceiveFinished = True self.fileAlreadyReceived.append((address,fileName)) else: print('File Received Failed') self.receive.close()
[docs] def receiveFile(self): address, senderName, fileName, fileSize = self.receiveFileInfo() if address and senderName and fileName and fileSize and senderName!=self.hostname and ((address[0],fileName) not in self.fileAlreadyReceived + self.fileNeglected): receiveFile = input("Do you want to receive " + fileName + " from " + senderName + " ? (Y/N)") if receiveFile.lower() == 'y' or receiveFile == '': print("Preparing File, This May Take A While, Please Wait Until All Processes Finish...") self.receiveFileContent(address[0], fileName, fileSize) else: self.fileNeglected.append((address[0],fileName))
[docs] def run(self, timeOut = 100): print("Start Receiving File") startTime = time.time() while time.time() - startTime <= timeOut and not self.fileReceiveFinished: try: self.receiveFile() except: pass if len(self.fileNeglected)+len(self.fileAlreadyReceived)==0: print("Can Not Find Any Sender")
#<fileReceiver> #</fileReceiver>
[docs]class fileSender: def __init__(self, fileName): import socket self.broadcast = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.broadcast.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.broadcast.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.listen = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.listen.settimeout(1) self.listen.bind(('',9006)) self.hostname = socket.gethostname() self.filePath = fileName self.fileName = os.path.basename(fileName) self.fileSize = os.path.getsize(fileName) self.fileAlreadySent = [] self.fileNeglected = [] self.fileSendFinished = False def __del__(self): self.broadcast.close() self.listen.close()
[docs] def sendOnLineInfo(self): self.broadcast.sendto(('OnLine->'+self.hostname).encode('utf-8'), ('255.255.255.255', 9003))
[docs] def receiveIPInfo(self): try: IPInfo, address = self.listen.recvfrom(1024) except: IPInfo = None address = None if IPInfo and address: IPInfo = IPInfo.decode("utf-8") IPInfoList = IPInfo.split('->') receiverName = IPInfoList[1] return address,receiverName else: return '',''
[docs] def sendFileInfo(self): self.broadcast.sendto(('FileInfo->'+self.hostname+'->'+self.fileName+'->'+str(self.fileSize)).encode('utf-8'),('255.255.255.255', 9004))
[docs] def sendFileContent(self,address): import socket self.send = socket.socket() self.send.connect((address, 9005)) with open(self.filePath, 'rb') as targetFile: line = targetFile.read() self.send.sendall(line) print('File Sent: ' + self.fileName) self.fileSendFinished = True self.fileAlreadySent.append((address,self.fileName)) self.send.close()
[docs] def sendFile(self): self.sendFileInfo() address,receiverName = self.receiveIPInfo() if address and receiverName and receiverName!=self.hostname and ((address[0],self.fileName) not in self.fileAlreadySent + self.fileNeglected): sendFile = input("Do you confirm to send " + self.fileName + " to " + receiverName + " ? (Y/N)") if sendFile.lower() == 'y' or sendFile == '': print("Preparing File, This May Take A While, Please Wait Until Receiver Has Finished All Processes...") self.sendFileContent(address[0]) else: self.fileNeglected.append((address[0],self.fileName))
[docs] def run(self, timeOut = 100): print("Start Sending File") startTime = time.time() while time.time() - startTime <= timeOut and not self.fileSendFinished: try: self.sendFile() except: pass if len(self.fileNeglected)+len(self.fileAlreadySent)==0: print("Can Not Find Any Receiver")
#<fileSender> #</fileSender>
[docs]class systemGuardian: def __init__(self, path:str, call_back_function = lambda x:x): if os.path.isdir(path) or os.path.isfile(path): self._cachedStamp = 0 self.path = path self.call_back_function = call_back_function
[docs] def watch(self): if hasattr(self, 'path') and (os.path.isdir(self.path) or os.path.isfile(self.path)): stamp = os.stat(self.path).st_mtime if stamp != self._cachedStamp: self._cachedStamp = stamp return self.call_back_function(self.path)
[docs] def tryWatch(self): try: return self.watch() except Exception as e: pass
[docs] def start(self): try: while True: self.watch() except KeyboardInterrupt: pass
#<systemGuardian> #</systemGuardian>
[docs]class systemWatcher: def __init__(self, monitorPath: str or list, call_back_function_on_file = lambda x:x, call_back_function_on_dir = lambda x:x, call_back_function_on_any_change = lambda x:x, withFormat:bool = False, monitorFormat:set = {}): self.monitorPath = monitorPath self.call_back_function_on_file = call_back_function_on_file self.call_back_function_on_dir = call_back_function_on_dir self.call_back_function_on_any_change = call_back_function_on_any_change self.withFormat = withFormat self.monitorFormat = monitorFormat self.fileGuardian = [] self.dirGuardian = [] self.validatedFilePath = [] self.validatedDirPath = [] self.scanMonitorPath(monitorPath) self.createGuardian()
[docs] def scanMonitorPath(self, monitorPath): monitorPath = [monitorPath] if type(monitorPath)==str else monitorPath self.validatedFilePath = [i for i in monitorPath if os.path.isfile(i) and (not self.withFormat or os.path.splitext(i)[-1] in self.monitorFormat)] self.validatedDirPath = [i for i in monitorPath if os.path.isdir(i)] recursivePath = [([os.path.join(dirPath,dir) for dir in dirs],[os.path.join(dirPath,file) for file in files if not self.withFormat or os.path.splitext(file)[-1] in self.monitorFormat]) for rootPath in self.validatedDirPath for dirPath, dirs, files in os.walk(rootPath)] [self.validatedDirPath.extend(i[0]) for i in recursivePath] [self.validatedFilePath.extend(i[1]) for i in recursivePath]
[docs] def createGuardian(self): fileGuardianAlreadyExist = set([i.path for i in self.fileGuardian]) dirGuardianAlreadyExist = set([i.path for i in self.dirGuardian]) newFileGuardian = [systemGuardian(path) for path in self.validatedFilePath if path not in fileGuardianAlreadyExist] newDirGuardian = [systemGuardian(path) for path in self.validatedDirPath if path not in dirGuardianAlreadyExist] self.fileGuardian.extend(newFileGuardian) self.dirGuardian.extend(newDirGuardian) self.allGuardian = self.fileGuardian + self.dirGuardian
[docs] def watch(self): fileInfo = [i.tryWatch() for i in self.fileGuardian] updatedFile = [i for i in fileInfo if type(i)==str and os.path.isfile(i)] self.call_back_function_on_file_result = [self.call_back_function_on_file(i) for i in updatedFile] dirInfo = [i.tryWatch() for i in self.dirGuardian] updatedDir = [i for i in dirInfo if type(i)==str and os.path.isdir(i)] self.call_back_function_on_dir_result = [self.call_back_function_on_dir(i) for i in updatedDir] self.call_back_function_on_any_change_result = self.call_back_function_on_any_change(updatedDir+updatedFile) if len(updatedDir+updatedFile)!=0 else None
[docs] def start(self): try: while True: self.scanMonitorPath(self.monitorPath) self.createGuardian() self.watch() except KeyboardInterrupt: pass
#<systemWatcher> #</systemWatcher>