| commit | author | age | ||
| 57af9d | 1 | #!/usr/bin/python3 |
| SP | 2 | import requests |
| 3 | import json | |
| 4 | from time import sleep | |
| 5 | import uuid | |
| 6 | import subprocess | |
| 7 | import os | |
| 8 | import shutil | |
| 9 | import signal | |
| 10 | import sys | |
| 11 | import socket | |
| e08bff | 12 | from threading import Thread, Event |
| 4c9734 | 13 | import re |
| SP | 14 | |
| 8058ab | 15 | |
| SP | 16 | |
| 17 | glob_ts_version='00000' | |
| 18 | ||
| 4c9734 | 19 | def getTrisurfVersion(): |
| SP | 20 | p = subprocess.Popen('trisurf --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
| 21 | lines=p.stdout.readlines() | |
| 22 | version=re.findall(r'[0-9a-f]{7}(?:-dirty)?', lines[0].decode('ascii')) | |
| 23 | p.wait() | |
| 24 | if(len(version)): | |
| 25 | return version[0] | |
| 26 | else: | |
| 27 | return "unknown version" | |
| 57af9d | 28 | |
| SP | 29 | def get_hostname(): |
| 30 | return socket.gethostname() | |
| 31 | ||
| 32 | def get_ip(): | |
| 33 | return ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) | |
| 34 | ||
| e08bff | 35 | def get_client_id(addr, my_ip, my_hostname, subrun): |
| 8058ab | 36 | global glob_ts_version |
| SP | 37 | client_auth={'ip':my_ip,'hostname':my_hostname, 'subrun':subrun, 'trisurf_version':glob_ts_version } |
| 57af9d | 38 | response=requests.post(addr+"/api/register/", data=client_auth) |
| SP | 39 | if(response.status_code==200): |
| 40 | client_data=json.loads(response.text) | |
| 41 | client_id=client_data['id'] | |
| 42 | return client_id | |
| 43 | else: | |
| 44 | raise ValueError | |
| 45 | ||
| 46 | ||
| 47 | def get_run(addr,cid): | |
| 48 | response=requests.get(addr+"/api/getrun/"+str(cid)+"/") | |
| 49 | if(response.status_code==200): | |
| 50 | client_data=json.loads(response.text) | |
| 51 | rid=client_data['id'] | |
| 52 | tape=client_data['tape'] | |
| 53 | vtu=client_data['lastVTU'] | |
| 54 | status=client_data['status'] | |
| 55 | return (rid,tape,vtu,status) | |
| 56 | else: | |
| 79ee37 | 57 | #print(response.text) |
| e08bff | 58 | if(response.status_code==400): |
| SP | 59 | raise ValueError |
| 60 | else: | |
| 61 | raise NameError | |
| 57af9d | 62 | |
| SP | 63 | |
| 64 | def ping_run(addr,cid, rid): | |
| 65 | client_data={'client_id':cid, 'run_id':rid} | |
| 66 | response=requests.post(addr+"/api/ping/", data=client_data) | |
| 67 | if(response.status_code==200): | |
| 68 | return | |
| 69 | else: | |
| 70 | raise ValueError | |
| e08bff | 71 | |
| SP | 72 | def client_ping(addr,cid): |
| 73 | client_data={'client_id':cid} | |
| 74 | response=requests.post(addr+"/api/pingclient/", data=client_data) | |
| 75 | if(response.status_code==200): | |
| 59a59b | 76 | client_data=json.loads(response.text) |
| d357f5 | 77 | |
| SP | 78 | return client_data['concurrent_runs'] |
| e08bff | 79 | else: |
| SP | 80 | raise ValueError |
| 81 | ||
| 82 | ||
| 57af9d | 83 | |
| 1d3d12 | 84 | def send_error_report(addr,cid, rid,errcode): |
| SP | 85 | client_data={'client_id':cid, 'run_id':rid, 'error_code':errcode} |
| 86 | response=requests.post(addr+"/api/reporterr/", data=client_data) | |
| 87 | if(response.status_code==200): | |
| 88 | return | |
| 89 | else: | |
| 90 | raise ValueError | |
| 91 | ||
| 57af9d | 92 | def upload(addr,cid, rid, vtu, status): |
| SP | 93 | client_data={'client_id': cid, 'run_id': rid, 'lastVTU': vtu, 'status': status} |
| 94 | response=requests.post(addr+"/api/upload/", data=client_data) | |
| 95 | if(response.status_code==200): | |
| 96 | return | |
| 97 | else: | |
| 98 | raise ValueError | |
| 99 | ||
| 100 | def getNewVTU(directory): | |
| 101 | fset=set() | |
| 102 | for file in os.listdir(directory): | |
| 103 | if file.endswith(".vtu") and file.startswith("timestep_"): | |
| 104 | fset.add(file) | |
| 105 | return fset | |
| 106 | ||
| 107 | ||
| 108 | def removeDir(directory): | |
| 109 | os.chdir('/') | |
| 110 | try: | |
| 111 | shutil.rmtree(directory) | |
| 112 | except: | |
| 113 | print("Cannot remove directory "+directory+ "\n") | |
| 114 | return | |
| 115 | ||
| e08bff | 116 | |
| 57af9d | 117 | |
| e08bff | 118 | |
| SP | 119 | class ClientThread(Thread): |
| 120 | def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100): | |
| 121 | super(ClientThread,self).__init__() | |
| 122 | self._stop_event = Event() | |
| 123 | self._stop_event.clear() | |
| 124 | self.p=None | |
| 125 | self.workingdir=None | |
| 126 | self.conn_address=conn_address | |
| 127 | self.id=subid | |
| 128 | self.ip=get_ip() | |
| 129 | self.hostname=get_hostname() | |
| 130 | self.update_seconds=update_seconds | |
| 59a59b | 131 | self.max_client_ping_time_elapsed=250 |
| e08bff | 132 | |
| d357f5 | 133 | self.subruns=[] |
| SP | 134 | |
| e08bff | 135 | def stop(self): |
| SP | 136 | self._stop_event.set() |
| 137 | ||
| 138 | def isStopped(self): | |
| 139 | return self._stop_event.is_set() | |
| 140 | ||
| 141 | def join(self): | |
| 142 | print('joining threads') | |
| 143 | super(ClientThread, self).join() | |
| d357f5 | 144 | for sub in self.subruns: |
| SP | 145 | sub.stop() |
| 146 | sub.join() | |
| e08bff | 147 | if self.p is not None: |
| SP | 148 | self.p.terminate() |
| 149 | if self.workingdir is not None: | |
| 150 | removeDir(self.workingdir.fullpath()) | |
| 151 | ||
| 152 | ||
| 153 | def sleep(self,s): | |
| 154 | for i in range(0, s): | |
| 155 | if(self.isStopped()): | |
| 156 | return False | |
| 157 | sleep(1) | |
| 158 | return True | |
| d357f5 | 159 | |
| SP | 160 | def subrunsStartStop(self,nr): |
| 161 | while(self.id==0 and nr>len(self.subruns)+1): | |
| 162 | #spawning a new worker: | |
| 163 | print("[{}] Spawning a new worker".format(self.id)) | |
| 164 | t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds) | |
| 165 | t.start() | |
| 166 | self.subruns.append(t) | |
| 167 | while(self.id==0 and nr<len(self.subruns)+1): | |
| 168 | print("[{}] Stopping a worker".format(self.id)) | |
| 169 | self.subruns[-1].stop() | |
| 170 | self.subruns[-1].join() | |
| 171 | del self.subruns[-1] | |
| e08bff | 172 | |
| SP | 173 | def run(self): |
| 59a59b | 174 | while(not self.isStopped()): #try to register |
| e08bff | 175 | try: |
| SP | 176 | cid=get_client_id(self.conn_address, self.ip, self.hostname, self.id) |
| 177 | except: | |
| 178 | print("[{}] Could not get CID.".format(self.id)) | |
| 179 | self.sleep(10) | |
| 180 | continue | |
| 1bc9bb | 181 | print("[{}] Connected and got client ID {}.".format(self.id, cid)) |
| SP | 182 | try: |
| 183 | concurrent_runs=client_ping(self.conn_address,cid) | |
| 184 | client_ping_time_elapsed=0 | |
| 185 | except: | |
| 186 | self.sleep(10) | |
| 187 | continue | |
| d357f5 | 188 | self.subrunsStartStop(concurrent_runs) |
| 59a59b | 189 | while(not self.isStopped()): #successfully registered, now start pinging and searching for job |
| e08bff | 190 | try: |
| SP | 191 | (rid,tape,vtu,status)=get_run(self.conn_address,cid) |
| 192 | except NameError: | |
| 79ee37 | 193 | #print("[{}] Could not get RID.".format(self.id)) |
| e08bff | 194 | self.sleep(10) |
| SP | 195 | client_ping_time_elapsed+=10 |
| 59a59b | 196 | if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed): |
| 1d3d12 | 197 | try: |
| d357f5 | 198 | concurrent_runs=client_ping(self.conn_address,cid) |
| SP | 199 | self.subrunsStartStop(concurrent_runs) |
| 1d3d12 | 200 | except: |
| e08bff | 201 | break |
| SP | 202 | client_ping_time_elapsed=0 |
| 203 | #if you put break instead of continue, there is no need to ping client. And it is more robust... | |
| 204 | continue | |
| 205 | except ValueError: | |
| 206 | print("[{}] Wrong CID? Getting new CID.".format(self.id)) | |
| 207 | #self.sleep(10) | |
| 57af9d | 208 | break |
| e08bff | 209 | except: |
| SP | 210 | print("[{}] Cannot connect. Server down? Retrying....".format(self.id)) |
| 211 | break | |
| 212 | else: | |
| 213 | #start separate thread with simulations. | |
| 214 | self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4())) | |
| 215 | self.workingdir.makeifnotexist() | |
| 216 | self.workingdir.goto() | |
| d357f5 | 217 | #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath())) |
| e08bff | 218 | with open(self.workingdir.fullpath()+"/tape", 'w') as f: |
| SP | 219 | f.write(tape) |
| 220 | if(int(status)==-1): | |
| 221 | cmd=['trisurf', '--force-from-tape'] | |
| 222 | print("[{}] Run id={} :: Starting from tape.".format(self.id, rid)) | |
| 223 | else: | |
| 224 | with open(self.workingdir.fullpath()+"/.status",'w') as f: | |
| 225 | f.write(status) | |
| 226 | with open(self.workingdir.fullpath()+"/initial.vtu",'w') as f: | |
| 227 | f.write(vtu) | |
| 228 | cmd=['trisurf', '--restore-from-vtk', 'initial.vtu'] | |
| 229 | print("[{}] Run id={} :: Restoring from vtk, last timestep {}".format(self.id,rid,status)) | |
| 230 | self.p=subprocess.Popen(cmd, stdout=subprocess.DEVNULL) | |
| 231 | s=int(status) | |
| 232 | while(not self.isStopped()): | |
| 233 | #monitor for new file. If file is present, upload it! | |
| 234 | newVTU=getNewVTU(self.workingdir.fullpath()) | |
| 235 | if newVTU: #upload | |
| 236 | try: | |
| d357f5 | 237 | for nvfile in sorted(newVTU): |
| SP | 238 | nv=os.path.join(self.workingdir.fullpath(),nvfile) |
| e08bff | 239 | with open(nv,'r') as f: |
| SP | 240 | fc=f.read() |
| 241 | s=s+1 | |
| d357f5 | 242 | print('[{}] Uploading {}.'.format(self.id,nvfile)) |
| e08bff | 243 | upload(self.conn_address, cid, rid, fc, s) |
| SP | 244 | os.unlink(nv) |
| d357f5 | 245 | except Exception as e: |
| e08bff | 246 | print("[{}] Could not upload".format(self.id)) |
| d357f5 | 247 | print(e) |
| e08bff | 248 | self.p.terminate() |
| SP | 249 | removeDir(self.workingdir.fullpath()) |
| 250 | self.p=None | |
| 251 | self.workingdir=None | |
| 252 | break | |
| 253 | else: | |
| 254 | print("[{}] VTU uploaded.".format(self.id)) | |
| 255 | else: #ping | |
| 256 | try: | |
| 257 | ping_run(self.conn_address, cid, rid) | |
| 258 | except: | |
| 1bc9bb | 259 | print("[{}] Could not prolong a lease on the run.".format(self.id)) |
| e08bff | 260 | self.p.terminate() |
| SP | 261 | self.p=None |
| 262 | removeDir(self.workingdir.fullpath()) | |
| 263 | self.workingdir=None | |
| 264 | #stop simulations | |
| 265 | break | |
| 266 | #check if trisurf is still running. If not break the innermost loop. | |
| 267 | sleep(1) | |
| 268 | if(self.p.poll() is not None): # trisurf exited! | |
| 269 | print("[{}] Trisurf was stopped with return code {}".format(self.id, self.p.returncode)) | |
| 270 | if(self.p.returncode>0): | |
| 271 | try: | |
| 272 | send_error_report(self.conn_address, cid, rid, self.p.returncode) | |
| 273 | except: | |
| 274 | print("[{}] Server didn't accept error report".format(self.id)) | |
| 275 | removeDir(self.workingdir.fullpath()) | |
| 276 | self.workingdir=None | |
| 277 | self.p=None | |
| 278 | break | |
| 279 | self.sleep(self.update_seconds-1) | |
| 59a59b | 280 | client_ping_time_elapsed+=self.update_seconds |
| SP | 281 | if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2): |
| 1bc9bb | 282 | try: |
| SP | 283 | concurrent_runs=client_ping(self.conn_address,cid) |
| 284 | except: | |
| 285 | print("[{}] Could not client ping.".format(self.sid)) | |
| 286 | self.p.terminate() | |
| 287 | self.p=None | |
| 288 | removeDir(self.workingdir.fullpath()) | |
| 289 | self.workingdir=None | |
| 290 | break | |
| d357f5 | 291 | self.subrunsStartStop(concurrent_runs) |
| 59a59b | 292 | client_ping_time_elapsed=0 |
| 57af9d | 293 | |
| SP | 294 | |
| e08bff | 295 | |
| SP | 296 | #Stolen from trisurf library... Therefore, this client is not dependent on the installation of the library. |
| 297 | class Directory: | |
| 298 | ''' | |
| 299 | Class deals with the paths where the simulation is run and data is stored. | |
| 300 | ''' | |
| 301 | def __init__(self, maindir=".", simdir=""): | |
| 302 | '''Initialization Directory() takes two optional parameters, namely maindir and simdir. Defaults to current directory. It sets local variables maindir and simdir accordingly.''' | |
| 303 | self.maindir=maindir | |
| 304 | self.simdir=simdir | |
| 305 | return | |
| 306 | ||
| 307 | def fullpath(self): | |
| 308 | ''' | |
| 309 | Method returns string of path where the data is stored. It combines values of maindir and simdir as maindir/simdir on Unix. | |
| 310 | ''' | |
| 311 | return os.path.join(self.maindir,self.simdir) | |
| 312 | ||
| 313 | def exists(self): | |
| 314 | ''' Method checks whether the directory specified by fullpath() exists. It return True/False on completion.''' | |
| 315 | path=self.fullpath() | |
| 316 | if(os.path.exists(path)): | |
| 317 | return True | |
| 318 | else: | |
| 319 | return False | |
| 320 | ||
| 321 | def make(self): | |
| 322 | ''' Method make() creates directory. If it fails it exits the program with error message.''' | |
| 323 | try: | |
| 324 | os.makedirs(self.fullpath()) | |
| 325 | except: | |
| 326 | print("Cannot make directory "+self.fullpath()+"\n") | |
| 327 | exit(1) | |
| 328 | return | |
| 329 | ||
| 330 | def makeifnotexist(self): | |
| 331 | '''Method makeifnotexist() creates directory if it does not exist.''' | |
| 332 | if(self.exists()==0): | |
| 333 | self.make() | |
| 334 | return True | |
| 335 | else: | |
| 336 | return False | |
| 337 | ||
| 338 | def remove(self): | |
| 339 | '''Method remove() removes directory recursively. WARNING! No questions asked.''' | |
| 340 | if(self.exists()): | |
| 341 | try: | |
| 342 | os.rmdir(self.fullpath()) | |
| 343 | except: | |
| 344 | print("Cannot remove directory "+self.fullpath()+ "\n") | |
| 345 | exit(1) | |
| 346 | return | |
| 347 | ||
| 348 | def goto(self): | |
| 349 | ''' | |
| 350 | Method goto() moves current directory to the one specified by fullpath(). WARNING: when using the relative paths, do not call this function multiple times. | |
| 351 | ''' | |
| 352 | try: | |
| 353 | os.chdir(self.fullpath()) | |
| 354 | except: | |
| 355 | print("Cannot go to directory "+self.fullpath()+"\n") | |
| 356 | return | |
| 357 | ||
| 358 | ||
| 359 | ||
| 360 | #--- SIGINT and SIGTERM HANDLING --- | |
| 361 | def signal_handler(signal,frame): | |
| 362 | t.stop() | |
| 363 | t.join() | |
| 364 | print("Process ended with signal " +str(signal)) | |
| 365 | sys.exit(signal) | |
| 366 | #--- END SIGINT and SIGTERM---- | |
| 367 | ||
| 368 | if __name__ == '__main__': | |
| 8058ab | 369 | #global glob_ts_version |
| SP | 370 | glob_ts_version=getTrisurfVersion() |
| e08bff | 371 | signal.signal(signal.SIGINT, signal_handler) |
| SP | 372 | signal.signal(signal.SIGTERM, signal_handler) |
| 373 | ||
| 374 | t=ClientThread(update_seconds=100) | |
| 375 | t.start() | |
| 376 | #t.join() | |
| 377 | #print("main") | |
| 378 | while(True): | |
| 379 | sleep(1000) | |