From d357f502ab18a6c5574e7b0e573c19a01c7dfa32 Mon Sep 17 00:00:00 2001
From: Samo Penic <samo.penic@gmail.com>
Date: Mon, 04 Jun 2018 22:01:00 +0000
Subject: [PATCH] Multithreading and client pinging resolved
---
tsclient.py | 40 ++++++++++++++++++++++++++++++++--------
1 files changed, 32 insertions(+), 8 deletions(-)
diff --git a/tsclient.py b/tsclient.py
index ed061b7..e658635 100755
--- a/tsclient.py
+++ b/tsclient.py
@@ -58,8 +58,8 @@
response=requests.post(addr+"/api/pingclient/", data=client_data)
if(response.status_code==200):
client_data=json.loads(response.text)
-
- return
+
+ return client_data['concurrent_runs']
else:
raise ValueError
@@ -101,7 +101,6 @@
class ClientThread(Thread):
-
def __init__(self,conn_address='http://beti.trisurf.eu',subid=0, update_seconds=100):
super(ClientThread,self).__init__()
self._stop_event = Event()
@@ -115,6 +114,8 @@
self.update_seconds=update_seconds
self.max_client_ping_time_elapsed=250
+ self.subruns=[]
+
def stop(self):
self._stop_event.set()
@@ -124,6 +125,9 @@
def join(self):
print('joining threads')
super(ClientThread, self).join()
+ for sub in self.subruns:
+ sub.stop()
+ sub.join()
if self.p is not None:
self.p.terminate()
if self.workingdir is not None:
@@ -136,6 +140,19 @@
return False
sleep(1)
return True
+
+ def subrunsStartStop(self,nr):
+ while(self.id==0 and nr>len(self.subruns)+1):
+ #spawning a new worker:
+ print("[{}] Spawning a new worker".format(self.id))
+ t=ClientThread(conn_address=self.conn_address, subid=len(self.subruns)+1,update_seconds=self.update_seconds)
+ t.start()
+ self.subruns.append(t)
+ while(self.id==0 and nr<len(self.subruns)+1):
+ print("[{}] Stopping a worker".format(self.id))
+ self.subruns[-1].stop()
+ self.subruns[-1].join()
+ del self.subruns[-1]
def run(self):
while(not self.isStopped()): #try to register
@@ -147,6 +164,8 @@
continue
#print("Got CID. getting RID.")
client_ping_time_elapsed=0
+ concurrent_runs=client_ping(self.conn_address,cid)
+ self.subrunsStartStop(concurrent_runs)
while(not self.isStopped()): #successfully registered, now start pinging and searching for job
try:
(rid,tape,vtu,status)=get_run(self.conn_address,cid)
@@ -156,7 +175,8 @@
client_ping_time_elapsed+=10
if(client_ping_time_elapsed>=self.max_client_ping_time_elapsed):
try:
- client_ping(self.conn_address,cid)
+ concurrent_runs=client_ping(self.conn_address,cid)
+ self.subrunsStartStop(concurrent_runs)
except:
break
client_ping_time_elapsed=0
@@ -174,6 +194,7 @@
self.workingdir=Directory('/tmp/ts_'+str(uuid.uuid4()))
self.workingdir.makeifnotexist()
self.workingdir.goto()
+ #print("[{}] Using directory {}".format(self.id, self.workingdir.fullpath()))
with open(self.workingdir.fullpath()+"/tape", 'w') as f:
f.write(tape)
if(int(status)==-1):
@@ -193,15 +214,17 @@
newVTU=getNewVTU(self.workingdir.fullpath())
if newVTU: #upload
try:
- for nv in sorted(newVTU):
+ for nvfile in sorted(newVTU):
+ nv=os.path.join(self.workingdir.fullpath(),nvfile)
with open(nv,'r') as f:
fc=f.read()
s=s+1
- print('[{}] Uploading {}.'.format(self.id,nv))
+ print('[{}] Uploading {}.'.format(self.id,nvfile))
upload(self.conn_address, cid, rid, fc, s)
os.unlink(nv)
- except:
+ except Exception as e:
print("[{}] Could not upload".format(self.id))
+ print(e)
self.p.terminate()
removeDir(self.workingdir.fullpath())
self.p=None
@@ -236,7 +259,8 @@
self.sleep(self.update_seconds-1)
client_ping_time_elapsed+=self.update_seconds
if(client_ping_time_elapsed>self.max_client_ping_time_elapsed-self.update_seconds/2):
- client_ping(self.conn_address,cid)
+ concurrent_runs=client_ping(self.conn_address,cid)
+ self.subrunsStartStop(concurrent_runs)
client_ping_time_elapsed=0
--
Gitblit v1.8.0