Skip to content

Add tests for update-with-start reattach behavior #1000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,12 @@ async def start_workflow(
retries and continue as new.
run_timeout: Timeout of a single workflow run.
task_timeout: Timeout of a single workflow task.
id_reuse_policy: How already-existing IDs are treated.
id_conflict_policy: How already-running workflows of the same ID are
treated. Default is unspecified which effectively means fail the
start attempt. This cannot be set if ``id_reuse_policy`` is set
to terminate if running.
id_conflict_policy: Behavior when a workflow is currently running with the same ID.
Default is UNSPECIFIED, which effectively means fail the start attempt.
Set to USE_EXISTING for idempotent deduplication on workflow ID.
Cannot be set if ``id_reuse_policy`` is set to TERMINATE_IF_RUNNING.
id_reuse_policy: Behavior when a closed workflow with the same ID exists.
Default is ALLOW_DUPLICATE.
retry_policy: Retry policy for the workflow.
cron_schedule: See https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/
memo: Memo for the workflow.
Expand Down Expand Up @@ -2487,9 +2488,6 @@ class WithStartWorkflowOperation(Generic[SelfType, ReturnType]):

Update-With-Start allows you to send an update to a workflow, while starting the
workflow if necessary.

.. warning::
This API is experimental
"""

# Overload for no-param workflow, with_start
Expand Down Expand Up @@ -2657,9 +2655,6 @@ def __init__(
) -> None:
"""Create a WithStartWorkflowOperation.

.. warning::
This API is experimental

See :py:meth:`temporalio.client.Client.start_workflow` for documentation of the
arguments.
"""
Expand Down Expand Up @@ -2700,11 +2695,7 @@ def __init__(
self._used = False

async def workflow_handle(self) -> WorkflowHandle[SelfType, ReturnType]:
"""Wait until workflow is running and return a WorkflowHandle.

.. warning::
This API is experimental
"""
"""Wait until workflow is running and return a WorkflowHandle."""
return await self._workflow_handle


Expand Down
3 changes: 1 addition & 2 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
# TODO: Change back to "default" after next CLI release
DEV_SERVER_DOWNLOAD_VERSION = "v1.3.1-persistence-fix.0"
DEV_SERVER_DOWNLOAD_VERSION = "default"
268 changes: 267 additions & 1 deletion tests/worker/test_update_with_start.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import annotations

import asyncio
import uuid
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum
from enum import Enum, IntEnum
from typing import Any, Iterator, Mapping, Optional
from unittest.mock import patch

Expand All @@ -17,12 +18,14 @@
OutboundInterceptor,
StartWorkflowUpdateWithStartInput,
WithStartWorkflowOperation,
WorkflowExecutionStatus,
WorkflowUpdateFailedError,
WorkflowUpdateHandle,
WorkflowUpdateStage,
)
from temporalio.common import (
WorkflowIDConflictPolicy,
WorkflowIDReusePolicy,
)
from temporalio.exceptions import ApplicationError, WorkflowAlreadyStartedError
from temporalio.service import RPCError, RPCStatusCode, ServiceCall
Expand Down Expand Up @@ -859,3 +862,266 @@ async def __call__(
assert err.value.status == RPCStatusCode.INTERNAL
assert err.value.message == "empty details"
assert len(err.value.grpc_status.details) == 0


class ExecutionBehavior(IntEnum):
COMPLETES = 0
BLOCKS = 1


@workflow.defn
class WorkflowWithUpdate:
def __init__(self) -> None:
self._unblock_workflow = asyncio.Event()
self._unblock_update = asyncio.Event()

@workflow.run
async def run(self, behavior: ExecutionBehavior) -> str:
if behavior == ExecutionBehavior.BLOCKS:
await self._unblock_workflow.wait()
return str(workflow.uuid4())

@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)
async def update(self, behavior: ExecutionBehavior) -> str:
if behavior == ExecutionBehavior.BLOCKS:
await self._unblock_update.wait()
return str(workflow.uuid4())

@workflow.signal
async def unblock_workflow(self):
self._unblock_workflow.set()

@workflow.signal
async def unblock_update(self):
self._unblock_update.set()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Non-Deterministic Workflow Synchronization

The WorkflowWithUpdate class uses asyncio.Event() for synchronization, which is not workflow-safe. This violates Temporal's determinism requirements, leading to non-deterministic replay behavior (e.g., workflows hanging) because asyncio.Event objects do not persist state across replays. Workflows should use workflow-safe primitives like workflow.wait_condition() with boolean flags.

Locations (1)

Fix in CursorFix in Web



@pytest.mark.parametrize(
"workflow_behavior_name",
[ExecutionBehavior.COMPLETES.name, ExecutionBehavior.BLOCKS.name],
)
@pytest.mark.parametrize(
"id_conflict_policy_name",
[
WorkflowIDConflictPolicy.USE_EXISTING.name,
WorkflowIDConflictPolicy.FAIL.name,
],
)
@pytest.mark.parametrize(
"id_reuse_policy_name",
[
WorkflowIDReusePolicy.ALLOW_DUPLICATE.name,
WorkflowIDReusePolicy.REJECT_DUPLICATE.name,
],
)
async def test_update_with_start_always_attaches_to_completed_update(
env: WorkflowEnvironment,
workflow_behavior_name: str,
id_conflict_policy_name: str,
id_reuse_policy_name: str,
):
"""
A workflow exists and contains a completed update. An update-with-start sent for that workflow ID and that
update ID attaches to the update if workflow is running. If the workflow is closed then it attaches iff
the update is completed. The behavior is unaffected by the conflict policy or id reuse policy (so, for
example, we attach to an update in an existing workflow even if the conflict policy is FAIL).
"""
if env.supports_time_skipping:
pytest.skip("TODO: make update_with_start tests pass under Java test server")
client = env.client
id_conflict_policy = WorkflowIDConflictPolicy[id_conflict_policy_name]
id_reuse_policy = WorkflowIDReusePolicy[id_reuse_policy_name]
workflow_behavior = ExecutionBehavior[workflow_behavior_name]
shared_workflow_id = f"workflow-id-{uuid.uuid4()}"
shared_update_id = f"update-id-{uuid.uuid4()}"
async with new_worker(client, WorkflowWithUpdate) as worker:

def start_op():
return WithStartWorkflowOperation(
WorkflowWithUpdate.run,
workflow_behavior,
id=shared_workflow_id,
task_queue=worker.task_queue,
id_conflict_policy=id_conflict_policy,
id_reuse_policy=id_reuse_policy,
)

start_op_1 = start_op()
update_result_1 = await client.execute_update_with_start_workflow(
WorkflowWithUpdate.update,
ExecutionBehavior.COMPLETES,
id=shared_update_id,
start_workflow_operation=start_op_1,
)
wf_handle_1 = await start_op_1.workflow_handle()
assert (await wf_handle_1.describe()).status == (
WorkflowExecutionStatus.COMPLETED
if workflow_behavior == ExecutionBehavior.COMPLETES
else WorkflowExecutionStatus.RUNNING
)

# Whether or not the workflow closed, the update exists in the last workflow run and is completed, so
# we attach to it.

start_op_2 = start_op()
update_result_2 = await client.execute_update_with_start_workflow(
WorkflowWithUpdate.update,
ExecutionBehavior.COMPLETES,
id=shared_update_id,
start_workflow_operation=start_op_2,
)
wf_handle_2 = await start_op_2.workflow_handle()
assert wf_handle_1.first_execution_run_id == wf_handle_2.first_execution_run_id
assert update_result_1 == update_result_2


@pytest.mark.parametrize(
"id_conflict_policy_name",
[
WorkflowIDConflictPolicy.USE_EXISTING.name,
WorkflowIDConflictPolicy.FAIL.name,
],
)
@pytest.mark.parametrize(
"id_reuse_policy_name",
[
WorkflowIDReusePolicy.ALLOW_DUPLICATE.name,
WorkflowIDReusePolicy.REJECT_DUPLICATE.name,
],
)
async def test_update_with_start_attaches_to_non_completed_update_in_running_workflow(
env: WorkflowEnvironment,
id_conflict_policy_name: str,
id_reuse_policy_name: str,
):
"""
A workflow exists and is running and contains a non-completed update. An update-with-start sent for that
workflow ID and that update ID attaches to the update. The behavior is unaffected by the conflict policy
or id reuse policy (so, for example, we attach to the update in an existing workflow even if the conflict
policy is FAIL).
"""
if env.supports_time_skipping:
pytest.skip("TODO: make update_with_start tests pass under Java test server")
client = env.client
id_conflict_policy = WorkflowIDConflictPolicy[id_conflict_policy_name]
id_reuse_policy = WorkflowIDReusePolicy[id_reuse_policy_name]
shared_workflow_id = f"workflow-id-{uuid.uuid4()}"
shared_update_id = f"update-id-{uuid.uuid4()}"
async with new_worker(client, WorkflowWithUpdate) as worker:

def start_op():
return WithStartWorkflowOperation(
WorkflowWithUpdate.run,
ExecutionBehavior.BLOCKS,
id=shared_workflow_id,
task_queue=worker.task_queue,
id_conflict_policy=id_conflict_policy,
id_reuse_policy=id_reuse_policy,
)

start_op_1 = start_op()
update_handle_1 = await client.start_update_with_start_workflow(
WorkflowWithUpdate.update,
ExecutionBehavior.BLOCKS,
id=shared_update_id,
start_workflow_operation=start_op_1,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
wf_handle_1 = await start_op_1.workflow_handle()
assert (await wf_handle_1.describe()).status == WorkflowExecutionStatus.RUNNING

# The workflow is running with the update not-completed. We will attach to the update.

start_op_2 = start_op()

update_handle_2 = await client.start_update_with_start_workflow(
WorkflowWithUpdate.update,
ExecutionBehavior.COMPLETES,
id=shared_update_id,
start_workflow_operation=start_op_2,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
wf_handle_2 = await start_op_2.workflow_handle()
assert wf_handle_1.first_execution_run_id == wf_handle_2.first_execution_run_id
await wf_handle_1.signal(WorkflowWithUpdate.unblock_update)
assert (await update_handle_1.result()) == (await update_handle_2.result())


@pytest.mark.parametrize(
"id_conflict_policy_name",
[
WorkflowIDConflictPolicy.USE_EXISTING.name,
WorkflowIDConflictPolicy.FAIL.name,
],
)
@pytest.mark.parametrize(
"id_reuse_policy_name",
[
WorkflowIDReusePolicy.ALLOW_DUPLICATE.name,
WorkflowIDReusePolicy.REJECT_DUPLICATE.name,
],
)
async def test_update_with_start_does_not_attach_to_non_completed_update_in_closed_workflow(
env: WorkflowEnvironment,
id_conflict_policy_name: str,
id_reuse_policy_name: str,
):
"""
A workflow exists but is closed and contains a non-completed update. An update-with-start sent for that workflow
ID and that update ID does not attach to the update. If the id reuse policy is ALLOW_DUPLICATE then a new
workflow is started and the update is issued.
"""
if env.supports_time_skipping:
pytest.skip("TODO: make update_with_start tests pass under Java test server")
client = env.client
id_conflict_policy = WorkflowIDConflictPolicy[id_conflict_policy_name]
id_reuse_policy = WorkflowIDReusePolicy[id_reuse_policy_name]
shared_workflow_id = f"workflow-id-{uuid.uuid4()}"
shared_update_id = f"update-id-{uuid.uuid4()}"
async with new_worker(client, WorkflowWithUpdate) as worker:

def start_op():
return WithStartWorkflowOperation(
WorkflowWithUpdate.run,
ExecutionBehavior.COMPLETES,
id=shared_workflow_id,
task_queue=worker.task_queue,
id_conflict_policy=id_conflict_policy,
id_reuse_policy=id_reuse_policy,
)

start_op_1 = start_op()
await client.start_update_with_start_workflow(
WorkflowWithUpdate.update,
ExecutionBehavior.BLOCKS,
id=shared_update_id,
start_workflow_operation=start_op_1,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
wf_handle_1 = await start_op_1.workflow_handle()
assert (
await wf_handle_1.describe()
).status == WorkflowExecutionStatus.COMPLETED

# The workflow closed with the update not-completed. We will start a new workflow and issue the update
# iff reuse_policy is ALLOW_DUPLICATE. Conflict policy is irrelevant.

start_op_2 = start_op()

async def _do_update() -> Any:
return await client.execute_update_with_start_workflow(
WorkflowWithUpdate.update,
ExecutionBehavior.COMPLETES,
id=shared_update_id,
start_workflow_operation=start_op_2,
)

if id_reuse_policy == WorkflowIDReusePolicy.ALLOW_DUPLICATE:
await _do_update()
wf_handle_2 = await start_op_2.workflow_handle()
assert (
wf_handle_1.first_execution_run_id != wf_handle_2.first_execution_run_id
)
elif id_reuse_policy == WorkflowIDReusePolicy.REJECT_DUPLICATE:
with pytest.raises(WorkflowAlreadyStartedError):
await _do_update()