Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0ed7149
Add test workflow and schema-aligned test mocks
devin-ai-integration[bot] Dec 29, 2024
c118de7
Fix linting issues: remove unused imports and add noqa comment
devin-ai-integration[bot] Dec 29, 2024
645bede
Fix CI: explicitly install pytest before running tests
devin-ai-integration[bot] Dec 29, 2024
d8bea3d
Add PyJWT to dev dependencies for authentication tests
devin-ai-integration[bot] Dec 30, 2024
901ecc2
Fix auth: ensure consistent string handling in JWT encoding
devin-ai-integration[bot] Dec 30, 2024
9db43e2
Fix auth: add input validation for JWT creation
devin-ai-integration[bot] Dec 30, 2024
d668fe4
Fix client: update auth token parameter name and add validation
devin-ai-integration[bot] Dec 30, 2024
23829dd
Fix tests: properly mock response.text for json.loads
devin-ai-integration[bot] Dec 30, 2024
fb7f7fe
Fix tests: improve mock_request fixture setup and remove redundant st…
devin-ai-integration[bot] Dec 30, 2024
980815d
Fix tests: update header assertions to match client headers
devin-ai-integration[bot] Dec 30, 2024
2abcb6c
Fix tests: improve mock request verification with detailed assertions
devin-ai-integration[bot] Dec 30, 2024
ee0fb6a
Fix tests: update all test assertions to use consistent verification …
devin-ai-integration[bot] Dec 30, 2024
5526835
Fix client: ensure headers are properly passed through in _request me…
devin-ai-integration[bot] Dec 30, 2024
6b2135e
Fix tests: update mock_request fixture to properly handle headers
devin-ai-integration[bot] Dec 30, 2024
4ad4190
Fix tests: ensure mock_request returns headers in expected format
devin-ai-integration[bot] Dec 30, 2024
ae29f7e
Fix tests: update header assertions to handle httpx.Headers objects
devin-ai-integration[bot] Dec 30, 2024
398633c
Fix lint: remove unused ANY import
devin-ai-integration[bot] Dec 30, 2024
c10a414
Fix tests: update all header assertions to use consistent case-insens…
devin-ai-integration[bot] Dec 30, 2024
ed15b11
Fix client: wrap FileRange fields under 'range' key in read_source_code
devin-ai-integration[bot] Dec 30, 2024
5844c02
Fix tests: update remaining header assertions to use case-insensitive…
devin-ai-integration[bot] Dec 30, 2024
bd7544b
Fix client: return full health check response instead of boolean
devin-ai-integration[bot] Dec 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Run Tests and Lint

on:
pull_request:
branches: [ main ]

jobs:
build-and-test:
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'

- name: Install dependencies
run: |
pip install --upgrade pip
pip install -e .[dev]

- name: Lint with ruff
run: |
ruff check .

- name: Run tests
run: |
pip install pytest
pytest --maxfail=1 --disable-warnings -q
20 changes: 15 additions & 5 deletions lsproxy/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,20 @@ def base64url_encode(data):

padding = b"="
encoded = base64.b64encode(data).replace(b"+", b"-").replace(b"/", b"_")
return encoded.rstrip(padding)
return encoded.rstrip(padding).decode("utf-8")


def create_jwt(payload, secret):
# Validate inputs
if not isinstance(payload, dict):
raise TypeError("Payload must be a dictionary")
if not payload:
raise ValueError("Payload cannot be empty")
if not isinstance(secret, str):
raise TypeError("Secret must be a string")
if not secret:
raise ValueError("Secret cannot be empty")

# Create JWT header
header = {"typ": "JWT", "alg": "HS256"}

Expand All @@ -24,10 +34,10 @@ def create_jwt(payload, secret):
encoded_payload = base64url_encode(payload)

# Create signature
signing_input = encoded_header + b"." + encoded_payload
signature = hmac.new(secret.encode("utf-8"), signing_input, hashlib.sha256).digest()
signing_input = f"{encoded_header}.{encoded_payload}"
signature = hmac.new(secret.encode("utf-8"), signing_input.encode("utf-8"), hashlib.sha256).digest()
encoded_signature = base64url_encode(signature)

# Combine all parts
jwt = signing_input + b"." + encoded_signature
return jwt.decode("utf-8")
jwt = f"{signing_input}.{encoded_signature}"
return jwt
32 changes: 23 additions & 9 deletions lsproxy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

# Only import type hints for Modal if type checking
if TYPE_CHECKING:
import modal
import modal # noqa: F401

from .models import (
DefinitionResponse,
Expand Down Expand Up @@ -37,16 +37,27 @@ def __init__(
timeout: float = 10.0,
auth_token: Optional[str] = None,
):
if auth_token == "":
raise ValueError("Token cannot be empty")
if auth_token is None:
raise ValueError("Token cannot be None")

self._client.base_url = base_url
self._client.timeout = timeout
headers = {"Content-Type": "application/json"}
if auth_token:
headers["Authorization"] = f"Bearer {auth_token}"
headers["Authorization"] = f"Bearer {auth_token}"
self._client.headers = headers

def _request(self, method: str, endpoint: str, **kwargs) -> httpx.Response:
"""Make HTTP request with retry logic and better error handling."""
try:
# Ensure headers from client are included in the request
if "headers" in kwargs:
headers = {**self._client.headers, **kwargs["headers"]}
else:
headers = self._client.headers
kwargs["headers"] = headers

response = self._client.request(method, endpoint, **kwargs)
response.raise_for_status()
return response
Expand Down Expand Up @@ -104,7 +115,7 @@ def read_source_code(self, request: FileRange) -> ReadSourceCodeResponse:
f"Expected FileRange, got {type(request).__name__}. Please use FileRange model to construct the request."
)
response = self._request(
"POST", "/workspace/read-source-code", json=request.model_dump()
"POST", "/workspace/read-source-code", json={"range": request.model_dump()}
)
return ReadSourceCodeResponse.model_validate_json(response.text)

Expand Down Expand Up @@ -243,14 +254,17 @@ def initialize_with_modal(

return client

def check_health(self) -> bool:
"""Check if the server is healthy and ready."""
def check_health(self) -> dict:
"""Check if the server is healthy and ready.

Returns:
dict: Health check response containing status and supported languages
"""
try:
response = self._request("GET", "/system/health")
health_data = response.json()
return health_data.get("status") == "ok"
return response.json()
except Exception:
return False
return {"status": "error"}

def close(self):
"""Close the HTTP client and cleanup Modal resources if present."""
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ include = ["lsproxy*"]
[project.optional-dependencies]
dev = [
"ruff>=0.3.7",
"PyJWT>=2.8.0",
]
modal = [
"modal>=0.56.4",
Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Unit tests for the lsproxy package."""
72 changes: 72 additions & 0 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Unit tests for authentication utilities."""
import pytest
import jwt
from datetime import datetime, timedelta, timezone

from lsproxy.auth import create_jwt, base64url_encode


@pytest.fixture
def sample_payload():
"""Create a sample JWT payload."""
return {
"sub": "test-user",
"exp": int((datetime.now(timezone.utc) + timedelta(hours=1)).timestamp())
}


@pytest.fixture
def sample_secret():
"""Create a sample secret key."""
return "test-secret-key-1234"


def test_base64url_encode():
"""Test base64url encoding."""
# Test basic encoding
assert base64url_encode(b"test") == "dGVzdA"

# Test padding removal
assert base64url_encode(b"t") == "dA"
assert base64url_encode(b"te") == "dGU"
assert base64url_encode(b"tes") == "dGVz"

# Test URL-safe characters
assert "+" not in base64url_encode(b"???")
assert "/" not in base64url_encode(b"???")


def test_create_jwt(sample_payload, sample_secret):
"""Test JWT creation."""
token = create_jwt(sample_payload, sample_secret)

# Verify token structure
assert isinstance(token, str)
assert len(token.split(".")) == 3

# Verify token can be decoded
decoded = jwt.decode(token, sample_secret, algorithms=["HS256"])
assert decoded["sub"] == sample_payload["sub"]
assert decoded["exp"] == sample_payload["exp"]
Comment on lines +39 to +50
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Consider adding tests for edge cases such as payloads with special characters or very large payloads to ensure the create_jwt function handles them correctly. [enhancement]

Suggested change
def test_create_jwt(sample_payload, sample_secret):
"""Test JWT creation."""
token = create_jwt(sample_payload, sample_secret)
# Verify token structure
assert isinstance(token, str)
assert len(token.split(".")) == 3
# Verify token can be decoded
decoded = jwt.decode(token, sample_secret, algorithms=["HS256"])
assert decoded["sub"] == sample_payload["sub"]
assert decoded["exp"] == sample_payload["exp"]
def test_create_jwt(sample_payload, sample_secret):
"""Test JWT creation."""
token = create_jwt(sample_payload, sample_secret)
# Verify token structure
assert isinstance(token, str)
assert len(token.split(".")) == 3
# Verify token can be decoded
decoded = jwt.decode(token, sample_secret, algorithms=["HS256"])
assert decoded["sub"] == sample_payload["sub"]
assert decoded["exp"] == sample_payload["exp"]
# Test with special characters in payload
special_payload = {"sub": "user!@#$%^&*()"}
token = create_jwt(special_payload, sample_secret)
decoded = jwt.decode(token, sample_secret, algorithms=["HS256"])
assert decoded["sub"] == special_payload["sub"]
# Test with large payload
large_payload = {"sub": "user" * 1000}
token = create_jwt(large_payload, sample_secret)
decoded = jwt.decode(token, sample_secret, algorithms=["HS256"])
assert decoded["sub"] == large_payload["sub"]





def test_create_jwt_invalid_payload():
"""Test JWT creation with invalid payload."""
with pytest.raises(TypeError):
create_jwt("not a dict", "secret")

with pytest.raises(ValueError):
create_jwt({}, "secret") # Empty payload


def test_create_jwt_invalid_secret():
"""Test JWT creation with invalid secret."""
payload = {"sub": "test"}

with pytest.raises(ValueError):
create_jwt(payload, "") # Empty secret

with pytest.raises(TypeError):
create_jwt(payload, None) # None secret
Loading
Loading