Skip to content

fix: cancel workflow when grpc task is cancelled

Jan Provaznik requested to merge jp-task-cancel into main

When client closes connection, grpc closes the task which handles the connection (so asyncio.CancelledError is raised). When this happens, we should make sure that also async Workflow task is cancelled otherwise it keeps running (mostly just gets stuck on waiting for another message in inbox).

Reproducer:

  • you can use following patch to better show that workflow task is stuck on waitinf for response from executor:
diff --git a/duo_workflow_service/executor/action.py b/duo_workflow_service/executor/action.py
index debdcbe..c382eaa 100644
--- a/duo_workflow_service/executor/action.py
+++ b/duo_workflow_service/executor/action.py
@@ -3,12 +3,22 @@ from typing import Any, Dict
 
 from contract import contract_pb2
 
+import structlog
+log = structlog.stdlib.get_logger("tool exec")
+
 
 async def _execute_action(metadata: Dict[str, Any], action: contract_pb2.Action):
     outbox: asyncio.Queue = metadata["outbox"]
     inbox: asyncio.Queue = metadata["inbox"]
 
     await outbox.put(action)
-    event: contract_pb2.ClientEvent = await inbox.get()
+    while True:
+       try:
+           event: contract_pb2.ClientEvent = await asyncio.wait_for(inbox.get(), timeout=5)
+           break
+       except asyncio.exceptions.TimeoutError:
+          log.error("timeout")
+          pass
+    log.error("in action - event received")
     inbox.task_done()
     return event.actionResponse.response
  1. start duo workflow service and run some workflow (by starting executor with some goal)
  2. monitor service's log: without this MR applied, "timeout" messages should keep popping up in log even after you cancel executor command before it's finished

Related to #69 (closed)

Edited by Jan Provaznik

Merge request reports

Loading