From d04df555820ee03378e12c5b5428f100e7a477c1 Mon Sep 17 00:00:00 2001 From: nolan Date: Tue, 21 Oct 2025 23:18:15 -0400 Subject: [PATCH] Got the basic structure of the worker functional with a job system --- .gitignore | 4 +++- TimeWaster.py | 51 +++++++++++++++++++++++++++++++++++++-------------- client.py | 1 + server.py | 31 +++++++++++++++++++++++-------- 4 files changed, 64 insertions(+), 23 deletions(-) diff --git a/.gitignore b/.gitignore index 2e4062b..3e30575 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,6 @@ include/ lib/ lib64 pyvenv.cfg -.vscode/ \ No newline at end of file +.vscode/ +waster.log +__pycache__/ \ No newline at end of file diff --git a/TimeWaster.py b/TimeWaster.py index 1a246d3..222614d 100644 --- a/TimeWaster.py +++ b/TimeWaster.py @@ -1,5 +1,6 @@ -import time, uuid, signal, random, os +import time, signal, random, os, json from enum import Enum +from uuid import UUID, uuid4 class JobStatus(Enum): QUEUED = 1 @@ -9,37 +10,59 @@ class JobStatus(Enum): class TimeWaster: def __init__(self): self.running = True - self.jobs: dict = {} + self.jobs: list[JobResult] = [] + if (os.path.exists("waster.log")): os.remove("waster.log") + self.logfd = os.open("waster.log", os.O_CREAT | os.O_WRONLY) + signal.signal(signal.SIGUSR1, self.handleCancelJob) def handleNewJob(self): msg = os.read(0, 1024) - if (msg) == "job": - jobId: uuid.UUID = uuid.uuid4() - self.jobs[jobId] = JobResult(jobId, JobStatus.QUEUED, "exists I guess") - os.write(1, self.jobs[jobId]) + msgs = msg.decode().split("") + msgs.pop() + for message in msgs: + self.log(f"Got job: \"{message}\"") + data = json.loads(message) + self.jobs.append(JobResult(data["ID"], JobStatus.QUEUED, "exists i guess")) + # os.write(1, self.jobs[jobId]) - def handleCancelJob(self): - pass + def handleCancelJob(self, signum, stack): + while (len(self.jobs) > 0): + canceled: JobResult = self.jobs.pop(0) + self.log(f"Job Canceled: {canceled.id}") + self.back = True def quit(self): self.running = False def main(self): while self.running: - if len(self.jobs) == 0: + self.back = False + if (len(self.jobs) == 0): self.handleNewJob() else: - job: JobResult = self.jobs[self.jobs.keys[0]] + if (self.back): continue + job: JobResult = self.jobs[0] + if (self.back): continue time.sleep(random.randrange(1, 10) / 10) + if (self.back): continue job.status = JobStatus.FINISHED - job.result = "done i guess" - os.write(1, job) + job.result = f"done i guess" + os.write(1, f"{{\"ID\":{job.id},\"status\":{job.status.value},\"result\":\"{job.result}\"}}".encode()) + self.jobs.remove(job) + self.log(f"Finished Job: {job.id}") + + def log(self, msg: str): + os.write(self.logfd, f"{msg}\n".encode()) class JobResult: - def __init__(self, uuid: uuid.UUID, status: JobStatus, result) -> None: - self.uuid = uuid + def __init__(self, id: int, status: JobStatus, result) -> None: + self.id = id self.status = status self.result = result if __name__ == "__main__": TimeWaster().main() + # while (True): + # data = os.read(0, 100).decode() + # print(data) + # if (data == "QUIT"): break diff --git a/client.py b/client.py index c009343..ac5f72f 100644 --- a/client.py +++ b/client.py @@ -42,5 +42,6 @@ class TestClient: print("MESSAGE DISCARDED\n") else: print("MESSAGE UP TO DATE. ACCEPTED\n") + if __name__ == "__main__": client = TestClient() diff --git a/server.py b/server.py index 31592e9..0e2d3fd 100644 --- a/server.py +++ b/server.py @@ -18,21 +18,36 @@ class WebSocketServer: async def handleConnection(self, connection: ServerConnection) -> None: #TODO: Make this actually do something print(f"{connection.remote_address} Connected") - stdio: tuple(int, int) = os.pipe2() - worker: subprocess.Popen = subprocess.Popen(["python3", "TimeWaster.py"], stdin=stdio[0], stdout=stdio[1]) + worker: asyncio.subprocess.Process = await asyncio.create_subprocess_exec("python3", "-u", "TimeWaster.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) connected: bool = True - task: asyncio.Task = None + asyncio.create_task(self.sendResponse(worker, connection)) + asyncio.create_task(self.readErr(worker)) while (connected): raw: str = await connection.recv() - os.write(stdio[1], bytes("job")) - if task != None and task.cancel(): print("TASK CANCELED") - task = asyncio.create_task(self.respond(raw, connection)) + print(f"Got data: {raw}") + worker.send_signal(signal.SIGUSR1) #Cancel any existing jobs + worker.stdin.write(f"{raw}".encode()) + await worker.stdin.drain() + print(f"Wrote data") + async def sendResponse(self, proc: asyncio.subprocess.Process, connection: ServerConnection): + while (True): + print("Running sendResponse") + data = (await proc.stdout.readuntil("".encode()))[:-5] + print(f"Read \"{data.decode()}\"") + # await self.respond(data, connection) + task = asyncio.create_task(self.respond(data, connection)) + + async def readErr(self, proc: asyncio.subprocess.Process): + data = await proc.stderr.read() + print(f"ERROR: {data.decode()}") + exit(1) + async def respond(self, raw: str, connection: ServerConnection) -> None: message = json.loads(raw) - print(f"Received: {message["message"]} width id {message["ID"]}") + print(f"Received: \'{message["result"]}\' with id \'{message["ID"]}\'") await asyncio.sleep(random.randrange(1, 10) / 10) - response = {"ID": message["ID"], "message": f"received: {message["message"]}"} + response = {"ID": message["ID"], "message": f"received: {message["result"]}"} await connection.send(json.dumps(response)) print("Server replied")