Got the basic structure of the worker functional with a job system
This commit is contained in:
parent
cc02fc9974
commit
d04df55582
4
.gitignore
vendored
4
.gitignore
vendored
@ -4,4 +4,6 @@ include/
|
||||
lib/
|
||||
lib64
|
||||
pyvenv.cfg
|
||||
.vscode/
|
||||
.vscode/
|
||||
waster.log
|
||||
__pycache__/
|
||||
@ -1,5 +1,6 @@
|
||||
import time, uuid, signal, random, os
|
||||
import time, signal, random, os, json
|
||||
from enum import Enum
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
class JobStatus(Enum):
|
||||
QUEUED = 1
|
||||
@ -9,37 +10,59 @@ class JobStatus(Enum):
|
||||
class TimeWaster:
|
||||
def __init__(self):
|
||||
self.running = True
|
||||
self.jobs: dict = {}
|
||||
self.jobs: list[JobResult] = []
|
||||
if (os.path.exists("waster.log")): os.remove("waster.log")
|
||||
self.logfd = os.open("waster.log", os.O_CREAT | os.O_WRONLY)
|
||||
signal.signal(signal.SIGUSR1, self.handleCancelJob)
|
||||
|
||||
def handleNewJob(self):
|
||||
msg = os.read(0, 1024)
|
||||
if (msg) == "job":
|
||||
jobId: uuid.UUID = uuid.uuid4()
|
||||
self.jobs[jobId] = JobResult(jobId, JobStatus.QUEUED, "exists I guess")
|
||||
os.write(1, self.jobs[jobId])
|
||||
msgs = msg.decode().split("<END>")
|
||||
msgs.pop()
|
||||
for message in msgs:
|
||||
self.log(f"Got job: \"{message}\"")
|
||||
data = json.loads(message)
|
||||
self.jobs.append(JobResult(data["ID"], JobStatus.QUEUED, "exists i guess"))
|
||||
# os.write(1, self.jobs[jobId])
|
||||
|
||||
def handleCancelJob(self):
|
||||
pass
|
||||
def handleCancelJob(self, signum, stack):
|
||||
while (len(self.jobs) > 0):
|
||||
canceled: JobResult = self.jobs.pop(0)
|
||||
self.log(f"Job Canceled: {canceled.id}")
|
||||
self.back = True
|
||||
|
||||
def quit(self):
|
||||
self.running = False
|
||||
|
||||
def main(self):
|
||||
while self.running:
|
||||
if len(self.jobs) == 0:
|
||||
self.back = False
|
||||
if (len(self.jobs) == 0):
|
||||
self.handleNewJob()
|
||||
else:
|
||||
job: JobResult = self.jobs[self.jobs.keys[0]]
|
||||
if (self.back): continue
|
||||
job: JobResult = self.jobs[0]
|
||||
if (self.back): continue
|
||||
time.sleep(random.randrange(1, 10) / 10)
|
||||
if (self.back): continue
|
||||
job.status = JobStatus.FINISHED
|
||||
job.result = "done i guess"
|
||||
os.write(1, job)
|
||||
job.result = f"done i guess"
|
||||
os.write(1, f"{{\"ID\":{job.id},\"status\":{job.status.value},\"result\":\"{job.result}\"}}<END>".encode())
|
||||
self.jobs.remove(job)
|
||||
self.log(f"Finished Job: {job.id}")
|
||||
|
||||
def log(self, msg: str):
|
||||
os.write(self.logfd, f"{msg}\n".encode())
|
||||
|
||||
class JobResult:
|
||||
def __init__(self, uuid: uuid.UUID, status: JobStatus, result) -> None:
|
||||
self.uuid = uuid
|
||||
def __init__(self, id: int, status: JobStatus, result) -> None:
|
||||
self.id = id
|
||||
self.status = status
|
||||
self.result = result
|
||||
|
||||
if __name__ == "__main__":
|
||||
TimeWaster().main()
|
||||
# while (True):
|
||||
# data = os.read(0, 100).decode()
|
||||
# print(data)
|
||||
# if (data == "QUIT"): break
|
||||
|
||||
@ -42,5 +42,6 @@ class TestClient:
|
||||
print("MESSAGE DISCARDED\n")
|
||||
else:
|
||||
print("MESSAGE UP TO DATE. ACCEPTED\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
client = TestClient()
|
||||
|
||||
31
server.py
31
server.py
@ -18,21 +18,36 @@ class WebSocketServer:
|
||||
|
||||
async def handleConnection(self, connection: ServerConnection) -> None: #TODO: Make this actually do something
|
||||
print(f"{connection.remote_address} Connected")
|
||||
stdio: tuple(int, int) = os.pipe2()
|
||||
worker: subprocess.Popen = subprocess.Popen(["python3", "TimeWaster.py"], stdin=stdio[0], stdout=stdio[1])
|
||||
worker: asyncio.subprocess.Process = await asyncio.create_subprocess_exec("python3", "-u", "TimeWaster.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
||||
connected: bool = True
|
||||
task: asyncio.Task = None
|
||||
asyncio.create_task(self.sendResponse(worker, connection))
|
||||
asyncio.create_task(self.readErr(worker))
|
||||
while (connected):
|
||||
raw: str = await connection.recv()
|
||||
os.write(stdio[1], bytes("job"))
|
||||
if task != None and task.cancel(): print("TASK CANCELED")
|
||||
task = asyncio.create_task(self.respond(raw, connection))
|
||||
print(f"Got data: {raw}")
|
||||
worker.send_signal(signal.SIGUSR1) #Cancel any existing jobs
|
||||
worker.stdin.write(f"{raw}<END>".encode())
|
||||
await worker.stdin.drain()
|
||||
print(f"Wrote data")
|
||||
|
||||
async def sendResponse(self, proc: asyncio.subprocess.Process, connection: ServerConnection):
|
||||
while (True):
|
||||
print("Running sendResponse")
|
||||
data = (await proc.stdout.readuntil("<END>".encode()))[:-5]
|
||||
print(f"Read \"{data.decode()}\"")
|
||||
# await self.respond(data, connection)
|
||||
task = asyncio.create_task(self.respond(data, connection))
|
||||
|
||||
async def readErr(self, proc: asyncio.subprocess.Process):
|
||||
data = await proc.stderr.read()
|
||||
print(f"ERROR: {data.decode()}")
|
||||
exit(1)
|
||||
|
||||
async def respond(self, raw: str, connection: ServerConnection) -> None:
|
||||
message = json.loads(raw)
|
||||
print(f"Received: {message["message"]} width id {message["ID"]}")
|
||||
print(f"Received: \'{message["result"]}\' with id \'{message["ID"]}\'")
|
||||
await asyncio.sleep(random.randrange(1, 10) / 10)
|
||||
response = {"ID": message["ID"], "message": f"received: {message["message"]}"}
|
||||
response = {"ID": message["ID"], "message": f"received: {message["result"]}"}
|
||||
await connection.send(json.dumps(response))
|
||||
print("Server replied")
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user