-
Notifications
You must be signed in to change notification settings - Fork 95
Expand file tree
/
Copy pathrecord_loader_activity.py
More file actions
59 lines (40 loc) · 1.49 KB
/
record_loader_activity.py
File metadata and controls
59 lines (40 loc) · 1.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from dataclasses import dataclass
from typing import List
from temporalio import activity
@dataclass
class GetRecordsInput:
"""Input for the GetRecords activity."""
page_size: int
offset: int
max_offset: int
@dataclass
class SingleRecord:
"""Represents a single record to be processed."""
id: int
@dataclass
class GetRecordsOutput:
"""Output from the GetRecords activity."""
records: List[SingleRecord]
class RecordLoader:
"""Activities for loading records from an external data source."""
def __init__(self, record_count: int):
self.record_count = record_count
@activity.defn
async def get_record_count(self) -> int:
"""Get the total record count.
Used to partition processing across parallel sliding windows.
The sample implementation just returns a fake value passed during worker initialization.
"""
return self.record_count
@activity.defn
async def get_records(self, input: GetRecordsInput) -> GetRecordsOutput:
"""Get records loaded from an external data source.
The sample returns fake records.
"""
if input.max_offset > self.record_count:
raise ValueError(
f"max_offset({input.max_offset}) > record_count({self.record_count})"
)
limit = min(input.offset + input.page_size, input.max_offset)
records = [SingleRecord(id=i) for i in range(input.offset, limit)]
return GetRecordsOutput(records=records)