-
Notifications
You must be signed in to change notification settings - Fork 136
Add Strands Agent Session Manager #872
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: yaron2 <schneider.yaron@live.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #872 +/- ##
==========================================
+ Coverage 86.63% 86.97% +0.34%
==========================================
Files 84 103 +19
Lines 4473 6900 +2427
==========================================
+ Hits 3875 6001 +2126
- Misses 598 899 +301 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| dapr_client: DaprClient, | ||
| ttl: Optional[int] = None, | ||
| consistency: ConsistencyLevel = DAPR_CONSISTENCY_EVENTUAL, | ||
| **kwargs: Any, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd remove this to avoid confusion, it's not being used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, removed
| def _get_manifest_key(self, session_id: str) -> str: | ||
| """Get session manifest key (tracks agent_ids for deletion).""" | ||
| session_id = _identifier.validate(session_id, _identifier.Identifier.SESSION) | ||
| return f'{session_id}:manifest' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this where the rest of the _key methods are please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| session_key = self._get_session_key(session.session_id) | ||
|
|
||
| # Check if session already exists | ||
| existing = self.read_session(session.session_id) | ||
| if existing is not None: | ||
| raise SessionException(f'Session {session.session_id} already exists') | ||
|
|
||
| # Write session data | ||
| session_dict = session.to_dict() | ||
| self._write_state(session_key, session_dict) | ||
| return session |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this require some sort of locking mechanim? If we get two concurrent calls to create_session, both might end up writing the state and both will use the session but shouldn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the existing implementations for Strands session managers (both in-tree and community contributed) assume the manager is not thread safe and do not coordinate locking. This is most likely because how Strands agents encapsulate sessions lifecycles and keeping access sequential
| def _write_state(self, key: str, data: Dict[str, Any]) -> None: | ||
| """Write JSON state to Dapr. | ||
|
|
||
| Args: | ||
| key: State store key. | ||
| data: Dictionary to serialize and store. | ||
|
|
||
| Raises: | ||
| SessionException: If write fails. | ||
| """ | ||
| try: | ||
| content = json.dumps(data, ensure_ascii=False) | ||
| self._dapr_client.save_state( | ||
| store_name=self._state_store_name, | ||
| key=key, | ||
| value=content, | ||
| state_metadata=self._get_write_metadata(), | ||
| options=self._get_state_options(), | ||
| ) | ||
| except Exception as e: | ||
| raise SessionException(f'Failed to write state key {key}: {e}') from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it'd make sense to make use of etags to make sure we don't overwrite states?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this would be tied to the locking question. I audited how the Strands built-in S3 file manager works, and it also does not use etags/record versioning even though it supports it. Again, I suspect this is because of the way Strands handles the access and lifecycle of a session manager in a way that makes this a non-issue.
| Raises: | ||
| SessionException: If creation fails. | ||
| """ | ||
| messages_key = self._get_messages_key(session_id, agent_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered using a unique key per message? This would simplify read operations so we won't need to fetch all messages every time, but we'd need to keep track of all messages created somewhere so we can delete them all when deleting a session... What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We encountered the key/value data structure paradigm in both agents and non-agents workloads. In the end it's preferable to optimize for the least amount of I/O operations, especially when it comes to writing state. Not just to stop split/brain but to optimize for costs. Likely DynamoDB will be used a lot with Strands and we don't want to penalize two write operation for every message.
| @@ -0,0 +1,155 @@ | |||
| # -*- coding: utf-8 -*- | |||
|
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing copyright header
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| @@ -0,0 +1,553 @@ | |||
| """Dapr state store session manager for distributed storage.""" | |||
|
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing copyright header
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| [metadata] | ||
| url = https://dapr.io/ | ||
| author = Dapr Authors | ||
| author_email = daprweb@microsoft.com |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤷♂️
Signed-off-by: yaron2 <schneider.yaron@live.com>
Signed-off-by: yaron2 <schneider.yaron@live.com>
|
@acroca All feedback and questions addressed. Examples seem to be failing regardless. |
Adds the ability to store Strands Agents session state with Dapr.