From b1b8fc9f25fcff8bc145889abfaefe4ea48bf629 Mon Sep 17 00:00:00 2001
From: Samo Penic <samo.penic@gmail.com>
Date: Tue, 15 May 2018 17:42:19 +0000
Subject: [PATCH] Merge branch 'master' of ssh://git.penic.eu:29418/trisurf-client
---
playground/tsclient.py | 320 +++++++++++++++++++++++++++++++++++++++++++++++++++++
tsclient.py | 1
2 files changed, 321 insertions(+), 0 deletions(-)
diff --git a/playground/tsclient.py b/playground/tsclient.py
new file mode 100755
index 0000000..4b0724c
--- /dev/null
+++ b/playground/tsclient.py
@@ -0,0 +1,320 @@
+#!/usr/bin/python3
+import requests
+import json
+from time import sleep
+import uuid
+import subprocess
+import os
+import shutil
+import signal
+import sys
+import socket
+from threading import Thread, Event
+
+def get_hostname():
+ return socket.gethostname()
+
+def get_ip():
+ return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
+
+def get_client_id(addr, my_ip, my_hostname, subrun):
+ client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun}
+ response=requests.post(addr+"/api/register/", data=client_auth)
+ if(response.status_code==200):
+ client_data=json.loads(response.text)
+ client_id=client_data['id']
+ return client_id
+ else:
+ raise ValueError
+
+
+def get_run(addr,cid):
+ response=requests.get(addr+"/api/getrun/"+str(cid)+"/")
+ if(response.status_code==200):
+ client_data=json.loads(response.text)
+ rid=client_data['id']
+ tape=client_data['tape']
+ vtu=client_data['lastVTU']
+ status=client_data['status']
+ return (rid,tape,vtu,status)
+ else:
+ print(response.text)
+ if(response.status_code==400):
+ raise ValueError
+ else:
+ raise NameError
+
+
+def ping_run(addr,cid, rid):
+ client_data={'client_id':cid, 'run_id':rid}
+ response=requests.post(addr+"/api/ping/", data=client_data)
+ if(response.status_code==200):
+ return
+ else:
+ raise ValueError
+
+def client_ping(addr,cid):
+ client_data={'client_id':cid}
+ response=requests.post(addr+"/api/pingclient/", data=client_data)
+ if(response.status_code==200):
+ return
+ else:
+ raise ValueError
+
+
+
+def send_error_report(addr,cid, rid,errcode):
+ client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode}
+ response=requests.post(addr+"/api/reporterr/", data=client_data)
+ if(response.status_code==200):
+ return
+ else:
+ raise ValueError
+
+def upload(addr,cid, rid, vtu, status):
+ client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status}
+ response=requests.post(addr+"/api/upload/", data=client_data)
+ if(response.status_code==200):
+ return
+ else:
+ raise ValueError
+
+def getNewVTU(directory):
+ fset=set()
+ for file in os.listdir(directory):
+ if file.endswith(".vtu") and file.startswith("timestep_"):
+ fset.add(file)
+ return fset
+
+
+def removeDir(directory):
+ os.chdir('/')
+ try:
+ shutil.rmtree(directory)
+ except:
+ print("Cannot remove directory "+directory+ "\n")
+ return
+
+
+
+
+class ClientThread(Thread):
+
+ def __init__(self,conn_address='http://localhost:8000',subid=0, update_seconds=100):
+ super(ClientThread,self).__init__()
+ self._stop_event = Event()
+ self._stop_event.clear()
+ self.p=None
+ self.workingdir=None
+ self.conn_address=conn_address
+ self.id=subid
+ self.ip=get_ip()
+ self.hostname=get_hostname()
+ self.update_seconds=update_seconds
+
+
+ def stop(self):
+ self._stop_event.set()
+
+ def isStopped(self):
+ return self._stop_event.is_set()
+
+ def join(self):
+ print('joining threads')
+ super(ClientThread, self).join()
+ if self.p is not None:
+ self.p.terminate()
+ if self.workingdir is not None:
+ removeDir(self.workingdir.fullpath())
+
+
+ def sleep(self,s):
+ for i in range(0, s):
+ if(self.isStopped()):
+ return False
+ sleep(1)
+ return True
+
+ def run(self):
+ while(not self.isStopped()):
+ try:
+ cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id)
+ except:
+ print("[{}] Could not get CID.".format(self.id))
+ self.sleep(10)
+ continue
+ #print("Got CID. getting RID.")
+ client_ping_time_elapsed=0
+ while(not self.isStopped()):
+ try:
+ (rid,tape,vtu,status)=get_run(self.conn_address,cid)
+ except NameError:
+ print("[{}] Could not get RID.".format(self.id))
+ self.sleep(10)
+ client_ping_time_elapsed+=10
+ if(client_ping_time_elapsed>=250):
+ try:
+ client_ping(self.conn_address,cid)
+ except:
+ break
+ client_ping_time_elapsed=0
+ #if you put break instead of continue, there is no need to ping client. And it is more robust...
+ continue
+ except ValueError:
+ print("[{}] Wrong CID? Getting new CID.".format(self.id))
+ #self.sleep(10)
+ break
+ except:
+ print("[{}] Cannot connect. Server down? Retrying....".format(self.id))
+ break
+ else:
+ #start separate thread with simulations.
+ self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
+ self.workingdir.makeifnotexist()
+ self.workingdir.goto()
+ with open(self.workingdir.fullpath()+"/tape", 'w') as f:
+ f.write(tape)
+ if(int(status)==-1):
+ cmd=['trisurf', '--force-from-tape']
+ print("[{}] Run id={} :: Starting from tape.".format(self.id, rid))
+ else:
+ with open(self.workingdir.fullpath()+"/.status",'w') as f:
+ f.write(status)
+ with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f:
+ f.write(vtu)
+ cmd=['trisurf', '--restore-from-vtk', 'initial.vtu']
+ print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status))
+ self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
+ s=int(status)
+ while(not self.isStopped()):
+ #monitor for new file. If file is present, upload it!
+ newVTU=getNewVTU(self.workingdir.fullpath())
+ if newVTU: #upload
+ try:
+ for nv in sorted(newVTU):
+ with open(nv,'r') as f:
+ fc=f.read()
+ s=s+1
+ print('[{}] Uploading {}.'.format(self.id,nv))
+ upload(self.conn_address, cid, rid, fc, s)
+ os.unlink(nv)
+ except:
+ print("[{}] Could not upload".format(self.id))
+ self.p.terminate()
+ removeDir(self.workingdir.fullpath())
+ self.p=None
+ self.workingdir=None
+ break
+ else:
+ print("[{}] VTU uploaded.".format(self.id))
+ else: #ping
+ try:
+ ping_run(self.conn_address, cid, rid)
+ except:
+ print("[{}] Could not ping.".format(self.id))
+ self.p.terminate()
+ self.p=None
+ removeDir(self.workingdir.fullpath())
+ self.workingdir=None
+ #stop simulations
+ break
+ #check if trisurf is still running. If not break the innermost loop.
+ sleep(1)
+ if(self.p.poll() is not None): # trisurf exited!
+ print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode))
+ if(self.p.returncode>0):
+ try:
+ send_error_report(self.conn_address, cid, rid, self.p.returncode)
+ except:
+ print("[{}] Server didn't accept error report".format(self.id))
+ removeDir(self.workingdir.fullpath())
+ self.workingdir=None
+ self.p=None
+ break
+ self.sleep(self.update_seconds-1)
+
+
+
+#Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library.
+class Directory:
+ '''
+ Class deals with the paths where the simulation is run and data is stored.
+ '''
+ def __init__(self, maindir=".", simdir=""):
+ '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.'''
+ self.maindir=maindir
+ self.simdir=simdir
+ return
+
+ def fullpath(self):
+ '''
+ Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix.
+ '''
+ return os.path.join(self.maindir,self.simdir)
+
+ def exists(self):
+ ''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.'''
+ path=self.fullpath()
+ if(os.path.exists(path)):
+ return True
+ else:
+ return False
+
+ def make(self):
+ ''' Method make() creates directory. If it fails it exits the program with error message.'''
+ try:
+ os.makedirs(self.fullpath())
+ except:
+ print("Cannot make directory "+self.fullpath()+"\n")
+ exit(1)
+ return
+
+ def makeifnotexist(self):
+ '''Method makeifnotexist() creates directory if it does not exist.'''
+ if(self.exists()==0):
+ self.make()
+ return True
+ else:
+ return False
+
+ def remove(self):
+ '''Method remove() removes directory recursively. WARNING! No questions asked.'''
+ if(self.exists()):
+ try:
+ os.rmdir(self.fullpath())
+ except:
+ print("Cannot remove directory "+self.fullpath()+ "\n")
+ exit(1)
+ return
+
+ def goto(self):
+ '''
+ Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times.
+ '''
+ try:
+ os.chdir(self.fullpath())
+ except:
+ print("Cannot go to directory "+self.fullpath()+"\n")
+ return
+
+
+
+#--- SIGINT and SIGTERM HANDLING ---
+def signal_handler(signal,frame):
+ t.stop()
+ t.join()
+ print("Process ended with signal " +str(signal))
+ sys.exit(signal)
+#--- END SIGINT and SIGTERM----
+
+if __name__ == '__main__':
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ t=ClientThread(update_seconds=100)
+ t.start()
+ #t.join()
+ #print("main")
+ while(True):
+ sleep(1000)
diff --git a/tsclient.py b/tsclient.py
index a736951..492badc 100755
--- a/tsclient.py
+++ b/tsclient.py
@@ -60,6 +60,7 @@
status=client_data['status']
return (rid,tape,vtu,status)
else:
+ print(response.text)
raise ValueError
--
Gitblit v1.8.0