-
Notifications
You must be signed in to change notification settings - Fork 95
Expand file tree
/
Copy pathhello_parallel_activity.py
More file actions
69 lines (59 loc) · 2.3 KB
/
hello_parallel_activity.py
File metadata and controls
69 lines (59 loc) · 2.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import asyncio
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from typing import List
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
@activity.defn
def say_hello_activity(name: str) -> str:
return f"Hello, {name}!"
@workflow.defn
class SayHelloWorkflow:
@workflow.run
async def run(self) -> List[str]:
# Run 5 activities at the same time
results = await asyncio.gather(
workflow.execute_activity(
say_hello_activity, "user1", start_to_close_timeout=timedelta(seconds=5)
),
workflow.execute_activity(
say_hello_activity, "user2", start_to_close_timeout=timedelta(seconds=5)
),
workflow.execute_activity(
say_hello_activity, "user3", start_to_close_timeout=timedelta(seconds=5)
),
workflow.execute_activity(
say_hello_activity, "user4", start_to_close_timeout=timedelta(seconds=5)
),
workflow.execute_activity(
say_hello_activity, "user5", start_to_close_timeout=timedelta(seconds=5)
),
)
# Sort the results because they can complete in any order
return list(sorted(results))
async def main():
# Start client
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-parallel-activity-task-queue",
workflows=[SayHelloWorkflow],
activities=[say_hello_activity],
activity_executor=ThreadPoolExecutor(10),
):
# While the worker is running, use the client to run the workflow and
# print out its result. Note, in many production setups, the client
# would be in a completely separate process from the worker.
result = await client.execute_workflow(
SayHelloWorkflow.run,
id="hello-parallel-activity-workflow-id",
task_queue="hello-parallel-activity-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())