Skip to content

Commit

Permalink
wip partly functional prototype. As ORM querysets are not thread-safe…
Browse files Browse the repository at this point in the history
…, transforming log_entries with material or personnel fails
  • Loading branch information
claasga committed Nov 18, 2024
1 parent 065ba91 commit 002d679
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 62 deletions.
25 changes: 19 additions & 6 deletions backend/dps_training_k/game/channel_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,10 @@ def subscribe_to_exercise(cls, exercise, subscriber, send_past_logs=False):
else:
cls._exercise_subscribers[exercise] = {subscriber}
if send_past_logs:
past_logs = models.LogEntry.objects.filter(exercise=exercise)
past_logs = models.LogEntry.objects.filter(exercise=exercise).order_by("pk")
for log_entry in past_logs:
subscriber.receive_log_entry(log_entry)
# subscriber.receive_log_entry(past_logs[0])

@classmethod
def unsubscribe_from_exercise(cls, exercise, subscriber):
Expand All @@ -336,9 +337,13 @@ def unsubscribe_from_exercise(cls, exercise, subscriber):

@classmethod
def _puplish_obj(cls, obj, exercise):
if not obj.is_valid():
return

print("Publishing obj")
if exercise in cls._exercise_subscribers:
for subscriber in cls._exercise_subscribers[exercise]:
async_to_sync(subscriber.receive_log_entry(obj))
subscriber.receive_log_entry(obj)


class LogEntryDispatcher(Observable, ChannelNotifier):
Expand Down Expand Up @@ -480,7 +485,6 @@ def create_trainer_log(cls, patient_instance, changes, is_updated):
changes_set = set(changes) if changes else set()
category = models.LogEntry.CATEGORIES.PATIENT
type = None
message = None
content = {"name": patient_instance.name, "code": str(patient_instance.code)}

if not is_updated:
Expand All @@ -500,7 +504,7 @@ def create_trainer_log(cls, patient_instance, changes, is_updated):
content["location_type"] = current_location.frontend_model_name()
content["location_name"] = str(current_location.name)

if message:
if content:
if patient_instance.area:
models.LogEntry.objects.create(
exercise=cls.get_exercise(patient_instance),
Expand Down Expand Up @@ -599,7 +603,7 @@ def create_trainer_log(cls, personnel, changes, is_updated):
patient_instance=current_location,
is_dirty=True,
)
if isinstance(current_location, models.Area):
elif isinstance(current_location, models.Area):
log_entry = models.LogEntry.objects.create(
exercise=cls.get_exercise(personnel),
category=models.LogEntry.CATEGORIES.PERSONNEL,
Expand All @@ -608,7 +612,7 @@ def create_trainer_log(cls, personnel, changes, is_updated):
area=current_location,
is_dirty=True,
)
if isinstance(current_location, models.Lab):
elif isinstance(current_location, models.Lab):
log_entry = models.LogEntry.objects.create(
exercise=cls.get_exercise(personnel),
category=models.LogEntry.CATEGORIES.PERSONNEL,
Expand All @@ -618,7 +622,16 @@ def create_trainer_log(cls, personnel, changes, is_updated):
)
if log_entry:
log_entry.personnel.add(personnel)
# print("Finalizing log entry")
# print(personnel)
# print(log_entry.personnel)
# print(log_entry.personnel.all())
# print("Settin dirty to False")
log_entry.set_dirty(False)
# print(log_entry.personnel.all())

#
# print("Dirty is false")

@classmethod
def delete_and_notify(cls, personnel, *args, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ def handle(self, *args, **options):
else:
PatientInstance.objects.get(frontend_id=123456).delete()

user, created = User.objects.get_or_create(username="test", user_type=User.UserType.TRAINER)
user, created = User.objects.get_or_create(
username="test", user_type=User.UserType.TRAINER
)
if created:
user.set_password("password")
user.save()
Expand Down Expand Up @@ -58,9 +60,7 @@ def handle(self, *args, **options):
"patient_instance": self.patient,
},
)

TrainerConsumer.handle_start_exercise(_, self.exercise)

# TrainerConsumer.handle_start_exercise(_, self.exercise)
self.stdout.write(
self.style.SUCCESS("Successfully added debug_exercise to the database")
)
3 changes: 2 additions & 1 deletion backend/dps_training_k/game/models/exercise.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ def start_exercise(self):
from ..channel_notifications import LogEntryDispatcher

test_log_runner = LogRuleRunner(self, LogEntryDispatcher)
print("LogRuleRunner created")
# print("LogRuleRunner created")
test_log_runner.start_log_rule()
# print("LogRuleRunner started")

def save(self, *args, **kwargs):
changes = kwargs.get("update_fields", None)
Expand Down
1 change: 1 addition & 0 deletions backend/dps_training_k/game/models/log_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def generate_local_id(self, exercise):
def is_valid(self):
if self.timestamp and self.local_id and not self.is_dirty:
return True
print("not valid")
return False

def set_dirty(self, new_dirty):
Expand Down
4 changes: 2 additions & 2 deletions backend/dps_training_k/game/templmon/kdps.sig
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
assign_personnel(string, string)
unassign_personnel(string)
assigned_personnel(string, string)
unassigned_personnel(string)
personnel_arrived(string, string)

patient_action_started(string, string)
Expand Down
60 changes: 32 additions & 28 deletions backend/dps_training_k/game/templmon/log_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,43 @@ class LogRuleRunner:
loop = asyncio.new_event_loop()

def __init__(self, exercise, log):
print("init called with ", str(log))
self.log = log
self.exercise = exercise
self.monpoly_started_event = asyncio.Event()

def __del__(self):
pass

def receive_log_entry(self, log_entry):

asyncio.run_coroutine_threadsafe(self._receive_log_entry(log_entry), self.loop)

async def _receive_log_entry(self, log_entry):
print("*******************************************")
print("Received log entry: ", log_entry.pk)
monpolified_log_entry = transform(log_entry)
self.monpoly.stdin.write(monpolified_log_entry.encode())
await self.monpoly.stdin.drain()
print(f"Received log entry: {log_entry}")
await self.monpoly_started_event.wait() # Wait until monpoly is ready
print("Monpoly is ready")
try:
monpolified_log_entry = transform(log_entry)
except Exception as e:
raise e
print(f"Monpolified log entry: {monpolified_log_entry}")
if self.monpoly.stdin:
try:
encoded = monpolified_log_entry.encode()
self.monpoly.stdin.write(encoded)
await self.monpoly.stdin.drain()
except Exception as e:
raise e
else:
raise Exception("Monpoly is not running")
print("Log entry sent")

async def read_output(self, process):
self.monpoly_started_event.set()
while True:
line = await process.stdout.readline()
if not line:
print("process terminated")
break
print(f"Received: {line.decode('utf-8')[:-1]}")

Expand All @@ -79,47 +94,36 @@ async def _launch_monpoly(
sig_path,
"-formula",
mfotl_path,
"-verbose",
"" if rewrite else "-no_rw",
env=os.environ.copy(), # Ensure the environment variables are passed
stdin=asyncio.subprocess.PIPE, # Allow writing to stdin
stdout=asyncio.subprocess.PIPE, # Capture stdout
stderr=asyncio.subprocess.PIPE, # Optionally capture stderr
)
"""loop.call_soon_threadsafe(
lambda: asyncio.create_task(self.read_output(self.monpoly))
)"""
asyncio.run_coroutine_threadsafe(self.read_output(self.monpoly), loop)
self.monpoly_started_event.wait()
print("MonPoly instance: ")
print(self.monpoly)
return self.monpoly
asyncio.create_task(self.read_output(self.monpoly))
self.monpoly_started_event.set() # Signal that monpoly is ready

def start_log_rule(self):
print(
"Current state of the loop: Is it running?" + str(self.loop.is_running())
if self.loop
else "No loop"
)

def launch_listener_loop(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
loop.run_forever()

self.monpoly_started_event = threading.Event()
if (not self.loop) or (not self.loop.is_running()):
threading.Thread(target=launch_listener_loop, args=(self.loop,)).start()
print("started loop")
print("Trying to run monpoly thread")

base_dir = os.path.dirname(os.path.abspath(__file__))
mfotl_path = os.path.join(base_dir, "personnel_check.mfotl")
sig_path = os.path.join(base_dir, "kdps.sig")
asyncio.run(
asyncio.run_coroutine_threadsafe(
self._launch_monpoly(
self.loop,
mfotl_path,
sig_path,
)
),
self.loop,
)
print("Succeeded starting monpoly")
self.log.subscribe_to_exercise(self.exercise, self, send_past_logs=True)
print("Subscribed to exercise")

# startup extra asyncio_thread as loop
# asyncio.run(lauch_monpoly())
Expand All @@ -132,4 +136,4 @@ def launch_listener_loop(loop: asyncio.AbstractEventLoop):
#
#
#
# atexitstuff
# atexitstuff"""
12 changes: 10 additions & 2 deletions backend/dps_training_k/game/templmon/log_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,20 @@ def transform(log_entry: le.LogEntry):
log_str = f"@{timestamp} "

if log_type == MonpolyLogEntry.ASSIGNED_PERSONNEL:
personnel_id = log_entry.personnel[0].pk
print(f"field: {log_entry.personnel}")
print(f"Queryset: {log_entry.personnel.all()}")
print(f"Entry: {log_entry.personnel.all().first()}")
print(f"key: {log_entry.personnel.all().first().pk}")
personnel_id = log_entry.personnel.all().first().pk
patient_id = log_entry.patient_instance.pk
log_str += f"assigned_personnel({personnel_id}, {patient_id})"
# log_str += f"assigned_personnel(144, {patient_id})"

elif log_type == MonpolyLogEntry.UNASSIGNED_PERSONNEL:
personnel_id = log_entry.personnel[0].pk
personnel_id = log_entry.personnel.all().first().pk
log_str += f"unassigned_personnel({personnel_id})"
# log_str += f"unassigned_personnel(144)"

elif log_type == MonpolyLogEntry.PATIENT_ARRIVED:
patient_id = log_entry.patient_instance.pk
area_id = log_entry.area.pk
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ONCE assigned_personnel(a,b)
4 changes: 2 additions & 2 deletions backend/dps_training_k/game/templmon/personnel_check.mfotl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(personnel_count <- CNT personnel_id;patient_id
(NOT unassign_personnel(personnel_id))
(NOT unassigned_personnel(personnel_id))
SINCE[0,*]
assign_personnel(personnel_id, patient_id))
assigned_personnel(personnel_id, patient_id))
AND
(personnel_count >= 4)
40 changes: 23 additions & 17 deletions backend/dps_training_k/game/templmon/subprocess_test copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,27 @@

tasks = []

log = """@10 assign_personnel(per_n_1, pat_n_1)
@13 assign_personnel(per_n_2, pat_n_1)
@16 assign_personnel(per_n_3, pat_n_1)
@19 assign_personnel(per_n_4, pat_n_1)
@19.5 unknown_log_type(lalalalal)
@20 assign_personnel(per_n_5, pat_n_1)
@21 unassign_personnel(per_n_1)
@22 unassign_personnel(per_n_2)
@30 assign_personnel(per_n_1, pat_n_1)
@31 assign_personnel(per_n_2, pat_n_1)
@32 unassign_personnel(per_n_3)
@33 assign_personnel(per_n_3, pat_n_2)
@35 assign_personnel(per_n_6, pat_n_2)
@37 assign_personnel(per_n_7, pat_n_2)
@39 assign_personnel(per_n_8, pat_n_2)"""

# log = """@10 assigned_personnel(per_n_1, pat_n_1)
# @13 assigned_personnel(per_n_2, pat_n_1)
# @16 assigned_personnel(per_n_3, pat_n_1)
# @19 assigned_personnel(per_n_4, pat_n_1)
# @19.5 unknown_log_type(lalalalal)
# @20 assigned_personnel(per_n_5, pat_n_1)
# @21 unassigned_personnel(per_n_1)
# @22 unassigned_personnel(per_n_2)
# @30 assigned_personnel(per_n_1, pat_n_1)
# @31 assigned_personnel(per_n_2, pat_n_1)
# @32 unassigned_personnel(per_n_3)
# @33 assigned_personnel(per_n_3, pat_n_2)
# @35 assigned_personnel(per_n_6, pat_n_2)
# @37 assigned_personnel(per_n_7, pat_n_2)
# @39 assigned_personnel(per_n_8, pat_n_2)"""
log = """@70 assigned_personnel(hans, peter)
@71 assigned_personnel(hans, peter)
@72 assigned_personnel(hans, peter)
@73 assigned_personnel(hans, peter)
@74 assigned_personnel(hans, peter)
"""
# Get the absolute path of the current directory
base_dir = os.path.dirname(os.path.abspath(__file__))

Expand Down Expand Up @@ -57,7 +62,8 @@ async def main():
"-sig",
os.path.join(base_dir, "kdps.sig"),
"-formula",
os.path.join(base_dir, "personnel_check.mfotl"),
os.path.join(base_dir, "personnel_check copy.mfotl"),
"-verbose",
)
)

Expand Down

0 comments on commit 002d679

Please sign in to comment.