Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 104 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,102 @@ client = Schematic("YOUR_API_KEY")
```

## Async Client
The SDK also exports an async client so that you can make non-blocking calls to our API.

The SDK exports an async client for non-blocking API calls with automatic background event processing. The async client features **lazy initialization** - you can start using it immediately without manual setup.

### Simple Usage (Lazy Initialization)

The easiest way to use the async client - just create and use it directly:

```python
import asyncio
from schematic.client import AsyncSchematic

client = AsyncSchematic("YOUR_API_KEY")
async def main() -> None:
await client.companies.get_company(
company_id="company_id",
async def main():
# Create client - no initialize() needed!
client = AsyncSchematic("YOUR_API_KEY")

# Use immediately - auto-initializes on first call
is_enabled = await client.check_flag(
"new-feature",
company={"id": "company-123"},
user={"id": "user-456"}
)

if is_enabled:
print("New feature is enabled!")

# Track usage
await client.track(
event="feature-used",
company={"id": "company-123"},
user={"id": "user-456"}
)

# Always shutdown when done
await client.shutdown()

asyncio.run(main())
```

### Context Manager (Recommended)

Use the async client as a context manager for automatic lifecycle management:

```python
import asyncio
from schematic.client import AsyncSchematic

async def main():
async with AsyncSchematic("YOUR_API_KEY") as client:
# Client auto-initializes and will auto-shutdown

is_enabled = await client.check_flag(
"feature-flag",
company={"id": "company-123"}
)

await client.identify(
keys={"id": "company-123"},
name="Acme Corp"
)

# Automatic cleanup on exit

asyncio.run(main())
```

### Production Usage (Explicit Control)

For production applications that need precise control over initialization timing:

```python
import asyncio
from schematic.client import AsyncSchematic

# Web application example
client = AsyncSchematic("YOUR_API_KEY")

async def startup():
"""Call during application startup"""
await client.initialize() # Start background tasks now
print("Schematic client ready")

async def shutdown():
"""Call during application shutdown"""
await client.shutdown() # Stop background tasks and flush events
print("Schematic client stopped")

async def handle_request():
"""Handle individual requests"""
# Client is already initialized - this will be fast
is_enabled = await client.check_flag(
"feature-flag",
company={"id": "company-123"}
)
return {"feature_enabled": is_enabled}
```

## Exception Handling
All errors thrown by the SDK will be subclasses of [`ApiError`](./src/schematic/core/api_error.py).

Expand Down Expand Up @@ -106,7 +188,23 @@ client.track(
)
```

This call is non-blocking and there is no response to check.
**Async client:**
```python
import asyncio
from schematic.client import AsyncSchematic

async def main():
async with AsyncSchematic("YOUR_API_KEY") as client:
await client.track(
event="some-action",
user={"user_id": "your-user-id"},
company={"id": "your-company-id"},
)

asyncio.run(main())
```

These calls are non-blocking and there is no response to check.

If you want to record large numbers of the same event at once, or perhaps measure usage in terms of a unit like tokens or memory, you can optionally specify a quantity for your event:

Expand Down
84 changes: 71 additions & 13 deletions src/schematic/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import asyncio
import atexit
import logging
import signal
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

Expand Down Expand Up @@ -163,7 +161,34 @@ class AsyncSchematicConfig:


class AsyncSchematic(AsyncBaseSchematic):
"""Async Schematic client for feature flags and event tracking.

This client provides async methods for checking feature flags and tracking events.
It automatically initializes on first use and maintains background tasks for
event buffering that require proper cleanup.

IMPORTANT: Always call shutdown() when done, or use as a context manager:

# Recommended patterns:

# 1. Context manager (automatic cleanup):
async with AsyncSchematic(api_key, config) as client:
result = await client.check_flag("my-flag") # Auto-initializes

# 2. Manual (explicit cleanup):
client = AsyncSchematic(api_key, config)
try:
result = await client.check_flag("my-flag") # Auto-initializes
finally:
await client.shutdown() # REQUIRED for proper cleanup

# 3. Web framework (lifecycle managed):
# In startup: client = AsyncSchematic(api_key, config)
# In shutdown: await client.shutdown()
"""

def __init__(self, api_key: str, config: Optional[AsyncSchematicConfig] = None):
self._initialized = False
config = config or AsyncSchematicConfig()
httpx_client = (
AsyncOfflineHTTPClient() if config.offline else config.httpx_client
Expand All @@ -188,11 +213,18 @@ def __init__(self, api_key: str, config: Optional[AsyncSchematicConfig] = None):
LocalCache[bool](DEFAULT_CACHE_SIZE, DEFAULT_CACHE_TTL)
]
self.offline = config.offline
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, self._shutdown_handler)
self._shutdown_requested = False
self._is_shutting_down = False
self._initialized = True

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.shutdown()

async def initialize(self) -> None:
pass
pass

async def check_flag(
self,
Expand Down Expand Up @@ -233,7 +265,7 @@ async def identify(
company: Optional[EventBodyIdentifyCompany] = None,
name: Optional[str] = None,
traits: Optional[Dict[str, Any]] = None,
) -> None:
) -> None:
await self._enqueue_event(
"identify",
EventBodyIdentify(
Expand All @@ -251,7 +283,7 @@ async def track(
user: Optional[Dict[str, str]] = None,
traits: Optional[Dict[str, Any]] = None,
quantity: Optional[int] = None,
) -> None:
) -> None:
await self._enqueue_event(
"track",
EventBodyTrack(
Expand All @@ -275,10 +307,36 @@ async def _enqueue_event(self, event_type: str, body: EventBody) -> None:
def _get_flag_default(self, flag_key: str) -> bool:
return self.flag_defaults.get(flag_key, False)

def _shutdown_handler(self, signum, frame):
self.logger.info(f"Received signal {signum}. Initiating shutdown.")
asyncio.create_task(self.shutdown())

async def shutdown(self) -> None:
await self.event_buffer.stop()
self.logger.info("Shutdown complete.")
"""Properly shutdown the client, flushing any pending events.

This method should be called when you're done using the client to ensure:
- All pending events are flushed to the server
- Background tasks are properly terminated
- Resources are cleaned up

It's safe to call this method multiple times, even if the client was never used.
"""
# Only do the shutdown once
if self._is_shutting_down:
self.logger.debug("Shutdown already in progress, skipping")
return

self._is_shutting_down = True

# If we were never initialized, there's nothing to clean up
if not self._initialized:
self.logger.debug("Client was never initialized, nothing to clean up")
return

self.logger.info("Shutting down AsyncSchematic...")

try:
# Flush and stop the event buffer
await self.event_buffer.stop()
self.logger.info("Shutdown complete.")
except Exception as e:
self.logger.error(f"Error during shutdown: {e}")
finally:
self._shutdown_requested = True

Loading