-
Notifications
You must be signed in to change notification settings - Fork 95
Expand file tree
/
Copy pathhello_change_log_level.py
More file actions
71 lines (54 loc) · 1.99 KB
/
hello_change_log_level.py
File metadata and controls
71 lines (54 loc) · 1.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""
Changes the log level of workflow task failures from WARN to ERROR.
Note that the __temporal_error_identifier attribute was added in
version 1.13.0 of the Python SDK.
"""
import asyncio
import logging
import sys
from temporalio import workflow
from temporalio.client import Client
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
# --- Begin logging set‑up ----------------------------------------------------------
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s %(message)s",
)
class CustomLogFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
# Note that the __temporal_error_identifier attribute was added in
# version 1.13.0 of the Python SDK.
if (
hasattr(record, "__temporal_error_identifier")
and getattr(record, "__temporal_error_identifier") == "WorkflowTaskFailure"
):
record.levelno = logging.ERROR
record.levelname = logging.getLevelName(logging.ERROR)
return True
for h in logging.getLogger().handlers:
h.addFilter(CustomLogFilter())
# --- End logging set‑up ----------------------------------------------------------
LOG_MESSAGE = "This error is an experiment to check the log level"
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self):
raise RuntimeError(LOG_MESSAGE)
async def main():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)
async with Worker(
client,
task_queue="hello-change-log-level-task-queue",
workflows=[GreetingWorkflow],
):
await client.execute_workflow(
GreetingWorkflow.run,
id="hello-change-log-level-workflow-id",
task_queue="hello-change-log-level-task-queue",
)
if __name__ == "__main__":
asyncio.run(main())