From e08bfff865c6deff8251d71d30cf9c3ec62c7fac Mon Sep 17 00:00:00 2001
From: Samo Penic <samo.penic@gmail.com>
Date: Sat, 02 Jun 2018 20:32:21 +0000
Subject: [PATCH] changed the directory
---
tsclient.py | 332 +++++++++++++++++++++++++++++++++++++++----------------
1 files changed, 234 insertions(+), 98 deletions(-)
diff --git a/tsclient.py b/tsclient.py
index 492badc..6d2be9e 100755
--- a/tsclient.py
+++ b/tsclient.py
@@ -4,34 +4,12 @@
from time import sleep
import uuid
import subprocess
-from trisurf import trisurf
import os
import shutil
import signal
import sys
import socket
-CONNECT_ADDR='http://localhost:8000'
-
-p=None
-workingdir=None
-
-
-
-#--- SIGINT and SIGTERM HANDLING ---
-def signal_handler(signal,frame):
- global p
- global wirkingdir
- if p is not None:
- p.terminate()
- if(workingdir is not None):
- removeDir(workingdir.fullpath())
- print("Process ended with signal " +str(signal))
- sys.exit(0)
-signal.signal(signal.SIGINT, signal_handler)
-signal.signal(signal.SIGTERM, signal_handler)
-#--- END SIGINT and SIGTERM----
-
-
+from threading import Thread, Event
def get_hostname():
return socket.gethostname()
@@ -39,8 +17,8 @@
def get_ip():
return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
-def get_client_id(addr, my_ip, my_hostname):
- client_auth={'ip':my_ip,'hostname':my_hostname}
+def get_client_id(addr, my_ip, my_hostname, subrun):
+ client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun}
response=requests.post(addr+"/api/register/", data=client_auth)
if(response.status_code==200):
client_data=json.loads(response.text)
@@ -61,7 +39,10 @@
return (rid,tape,vtu,status)
else:
print(response.text)
- raise ValueError
+ if(response.status_code==400):
+ raise ValueError
+ else:
+ raise NameError
def ping_run(addr,cid, rid):
@@ -71,6 +52,16 @@
return
else:
raise ValueError
+
+def client_ping(addr,cid):
+ client_data={'client_id':cid}
+ response=requests.post(addr+"/api/pingclient/", data=client_data)
+ if(response.status_code==200):
+ return
+ else:
+ raise ValueError
+
+
def send_error_report(addr,cid, rid,errcode):
client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
@@ -104,81 +95,226 @@
print("Cannot remove directory "+directory+ "\n")
return
+
-while(True):
- try:
- cid=get_client_id(CONNECT_ADDR, get_ip(),get_hostname())
- except:
- print("Cannot get CID.")
- sleep(10)
- continue
- print("Got CID. getting RID.")
- while(True):
- try:
- (rid,tape,vtu,status)=get_run(CONNECT_ADDR,cid)
- except:
- print("Could not get RID.")
- sleep(10)
- break
- else:
- #start separate thread with simulations.
- workingdir=trisurf.Directory('/tmp/ts_'+str(uuid.uuid4()))
- workingdir.makeifnotexist()
- workingdir.goto()
- with open(workingdir.fullpath()+"/tape", 'w') as f:
- f.write(tape)
- if(int(status)==-1):
- cmd=['trisurf', '--force-from-tape']
- print("Run id="+str(rid)+ " :: Starting from tape")
- else:
- with open(workingdir.fullpath()+"/.status",'w') as f:
- f.write(status)
- with open(workingdir.fullpath()+"/initial.vtu",'w') as f:
- f.write(vtu)
- cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
- print("Run id="+str(rid)+ " :: Restoring from vtk, last timestep "+status)
- p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
- s=int(status)
- while(True):
- #monitor for new file. If file is present, upload it!
- newVTU=getNewVTU(workingdir.fullpath())
- if newVTU: #upload
- try:
- for nv in sorted(newVTU):
- with open(nv,'r') as f:
- fc=f.read()
- s=s+1
- print('Uploading '+nv)
- upload(CONNECT_ADDR, cid, rid, fc, s)
- os.unlink(nv)
- except:
- print("Could not upload")
- p.terminate()
- removeDir(workingdir.fullpath())
- break
- else:
- print("VTU uploaded")
- else: #ping
- try:
- ping_run(CONNECT_ADDR, cid, rid)
- except:
- print("Could not ping")
- p.terminate()
- removeDir(workingdir.fullpath())
- #stop simulations
- break
- #check if trisurf is still running. If not break the highest level loop.
- sleep(1)
- if(p.poll() is not None): # trisurf exited!
- print("Trisurf was stopped with return code {}".format(p.returncode))
- if(p.returncode>0):
+
+class ClientThread(Thread):
+
+ def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
+ super(ClientThread,self).__init__()
+ self._stop_event = Event()
+ self._stop_event.clear()
+ self.p=None
+ self.workingdir=None
+ self.conn_address=conn_address
+ self.id=subid
+ self.ip=get_ip()
+ self.hostname=get_hostname()
+ self.update_seconds=update_seconds
+
+
+ def stop(self):
+ self._stop_event.set()
+
+ def isStopped(self):
+ return self._stop_event.is_set()
+
+ def join(self):
+ print('joining threads')
+ super(ClientThread, self).join()
+ if self.p is not None:
+ self.p.terminate()
+ if self.workingdir is not None:
+ removeDir(self.workingdir.fullpath())
+
+
+ def sleep(self,s):
+ for i in range(0, s):
+ if(self.isStopped()):
+ return False
+ sleep(1)
+ return True
+
+ def run(self):
+ while(not self.isStopped()):
+ try:
+ cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id)
+ except:
+ print("[{}] Could not get CID.".format(self.id))
+ self.sleep(10)
+ continue
+ #print("Got CID. getting RID.")
+ client_ping_time_elapsed=0
+ while(not self.isStopped()):
+ try:
+ (rid,tape,vtu,status)=get_run(self.conn_address,cid)
+ except NameError:
+ print("[{}] Could not get RID.".format(self.id))
+ self.sleep(10)
+ client_ping_time_elapsed+=10
+ if(client_ping_time_elapsed>=250):
try:
- send_error_report(CONNECT_ADDR, cid, rid, p.returncode)
+ client_ping(self.conn_address,cid)
except:
- print("Server didn't accept error report")
- removeDir(workingdir.fullpath())
+ break
+ client_ping_time_elapsed=0
+ #if you put break instead of continue, there is no need to ping client. And it is more robust...
+ continue
+ except ValueError:
+ print("[{}] Wrong CID? Getting new CID.".format(self.id))
+ #self.sleep(10)
break
- sleep(100)
+ except:
+ print("[{}] Cannot connect. Server down? Retrying....".format(self.id))
+ break
+ else:
+ #start separate thread with simulations.
+ self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
+ self.workingdir.makeifnotexist()
+ self.workingdir.goto()
+ with open(self.workingdir.fullpath()+"/tape", 'w') as f:
+ f.write(tape)
+ if(int(status)==-1):
+ cmd=['trisurf', '--force-from-tape']
+ print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
+ else:
+ with open(self.workingdir.fullpath()+"/.status",'w') as f:
+ f.write(status)
+ with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
+ f.write(vtu)
+ cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
+ print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
+ self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
+ s=int(status)
+ while(not self.isStopped()):
+ #monitor for new file. If file is present, upload it!
+ newVTU=getNewVTU(self.workingdir.fullpath())
+ if newVTU: #upload
+ try:
+ for nv in sorted(newVTU):
+ with open(nv,'r') as f:
+ fc=f.read()
+ s=s+1
+ print('[{}] Uploading {}.'.format(self.id,nv))
+ upload(self.conn_address, cid, rid, fc, s)
+ os.unlink(nv)
+ except:
+ print("[{}] Could not upload".format(self.id))
+ self.p.terminate()
+ removeDir(self.workingdir.fullpath())
+ self.p=None
+ self.workingdir=None
+ break
+ else:
+ print("[{}] VTU uploaded.".format(self.id))
+ else: #ping
+ try:
+ ping_run(self.conn_address, cid, rid)
+ except:
+ print("[{}] Could not ping.".format(self.id))
+ self.p.terminate()
+ self.p=None
+ removeDir(self.workingdir.fullpath())
+ self.workingdir=None
+ #stop simulations
+ break
+ #check if trisurf is still running. If not break the innermost loop.
+ sleep(1)
+ if(self.p.poll() is not None): # trisurf exited!
+ print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
+ if(self.p.returncode>0):
+ try:
+ send_error_report(self.conn_address, cid, rid, self.p.returncode)
+ except:
+ print("[{}] Server didn't accept error report".format(self.id))
+ removeDir(self.workingdir.fullpath())
+ self.workingdir=None
+ self.p=None
+ break
+ self.sleep(self.update_seconds-1)
-
+
+#Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
+class Directory:
+ '''
+ Class deals with the paths where the simulation is run and data is stored.
+ '''
+ def __init__(self, maindir=".", simdir=""):
+ '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
+ self.maindir=maindir
+ self.simdir=simdir
+ return
+
+ def fullpath(self):
+ '''
+ Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
+ '''
+ return os.path.join(self.maindir,self.simdir)
+
+ def exists(self):
+ ''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.'''
+ path=self.fullpath()
+ if(os.path.exists(path)):
+ return True
+ else:
+ return False
+
+ def make(self):
+ ''' Method make() creates directory. If it fails it exits the program with error message.'''
+ try:
+ os.makedirs(self.fullpath())
+ except:
+ print("Cannot make directory "+self.fullpath()+"\n")
+ exit(1)
+ return
+
+ def makeifnotexist(self):
+ '''Method makeifnotexist() creates directory if it does not exist.'''
+ if(self.exists()==0):
+ self.make()
+ return True
+ else:
+ return False
+
+ def remove(self):
+ '''Method remove() removes directory recursively. WARNING! No questions asked.'''
+ if(self.exists()):
+ try:
+ os.rmdir(self.fullpath())
+ except:
+ print("Cannot remove directory "+self.fullpath()+ "\n")
+ exit(1)
+ return
+
+ def goto(self):
+ '''
+ Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
+ '''
+ try:
+ os.chdir(self.fullpath())
+ except:
+ print("Cannot go to directory "+self.fullpath()+"\n")
+ return
+
+
+
+#--- SIGINT and SIGTERM HANDLING ---
+def signal_handler(signal,frame):
+ t.stop()
+ t.join()
+ print("Process ended with signal " +str(signal))
+ sys.exit(signal)
+#--- END SIGINT and SIGTERM----
+
+if __name__ == '__main__':
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ t=ClientThread(update_seconds=100)
+ t.start()
+ #t.join()
+ #print("main")
+ while(True):
+ sleep(1000)
--
Gitblit v1.8.0