Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(ci): make test suites v3 ready #1063

Merged
merged 15 commits into from
Jan 14, 2025
10 changes: 8 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,13 @@ jobs:
rm -rf .env

echo "::group::Run server"
TELEMETRY_ENABLED=false LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT=http://localhost:9090 LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED=true LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false LANGFUSE_RETURN_FROM_CLICKHOUSE=false docker compose up -d

TELEMETRY_ENABLED=false \
LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT=http://localhost:9090 \
LANGFUSE_INGESTION_QUEUE_DELAY_MS=10 \
LANGFUSE_INGESTION_CLICKHOUSE_WRITE_INTERVAL_MS=10 \
docker compose up -d

echo "::endgroup::"

# Add this step to check the health of the container
Expand Down Expand Up @@ -149,7 +155,7 @@ jobs:
- name: Run the automated tests
run: |
python --version
poetry run pytest -s -v --log-cli-level=INFO
poetry run pytest -n auto -s -v --log-cli-level=INFO
hassiebp marked this conversation as resolved.
Show resolved Hide resolved

all-tests-passed:
# This allows us to have a branch protection rule for tests and deploys with matrix
Expand Down
51 changes: 31 additions & 20 deletions langfuse/callback/langchain.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from collections import defaultdict
import httpx
import logging
import typing
import warnings
from collections import defaultdict

import httpx
import pydantic

try: # Test that langchain is installed before proceeding
Expand All @@ -15,35 +15,36 @@
)
from typing import Any, Dict, List, Optional, Sequence, Union, cast
from uuid import UUID, uuid4

from langfuse.api.resources.ingestion.types.sdk_log_body import SdkLogBody
from langfuse.client import (
StatefulGenerationClient,
StatefulSpanClient,
StatefulTraceClient,
StatefulGenerationClient,
)
from langfuse.extract_model import _extract_model_name
from langfuse.types import MaskFunction
from langfuse.utils import _get_timestamp
from langfuse.utils.base_callback_handler import LangfuseBaseCallbackHandler
from langfuse.types import MaskFunction

try:
from langchain.callbacks.base import (
BaseCallbackHandler as LangchainBaseCallbackHandler,
)
from langchain.schema.agent import AgentAction, AgentFinish
from langchain.schema.document import Document
from langchain_core.outputs import (
ChatGeneration,
LLMResult,
)
from langchain_core.messages import (
AIMessage,
BaseMessage,
ChatMessage,
FunctionMessage,
HumanMessage,
SystemMessage,
ToolMessage,
FunctionMessage,
)
from langchain_core.outputs import (
ChatGeneration,
LLMResult,
)
except ImportError:
raise ModuleNotFoundError(
Expand Down Expand Up @@ -149,7 +150,9 @@ def on_llm_new_token(

self.updated_completion_start_time_memo.add(run_id)

def get_langchain_run_name(self, serialized: Optional[Dict[str, Any]], **kwargs: Any) -> str:
def get_langchain_run_name(
self, serialized: Optional[Dict[str, Any]], **kwargs: Any
) -> str:
"""Retrieve the name of a serialized LangChain runnable.

The prioritization for the determination of the run name is as follows:
Expand Down Expand Up @@ -1055,16 +1058,24 @@ def _parse_usage_model(usage: typing.Union[pydantic.BaseModel, dict]):
]

usage_model = usage.copy() # Copy all existing key-value pairs
for model_key, langfuse_key in conversion_list:
if model_key in usage_model:
captured_count = usage_model.pop(model_key)
final_count = (
sum(captured_count)
if isinstance(captured_count, list)
else captured_count
) # For Bedrock, the token count is a list when streamed

usage_model[langfuse_key] = final_count # Translate key and keep the value

# Skip OpenAI usage types as they are handled server side
if not all(
openai_key in usage_model
for openai_key in ["prompt_tokens", "completion_tokens", "total_tokens"]
):
for model_key, langfuse_key in conversion_list:
if model_key in usage_model:
captured_count = usage_model.pop(model_key)
final_count = (
sum(captured_count)
if isinstance(captured_count, list)
else captured_count
) # For Bedrock, the token count is a list when streamed

usage_model[langfuse_key] = (
final_count # Translate key and keep the value
)

if isinstance(usage_model, dict):
if "input_token_details" in usage_model:
Expand Down
5 changes: 5 additions & 0 deletions tests/api_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from time import sleep

import httpx

Expand All @@ -11,23 +12,27 @@ def __init__(self, username=None, password=None, base_url=None):
self.BASE_URL = base_url if base_url else os.environ["LANGFUSE_HOST"]

def get_observation(self, observation_id):
sleep(1)
url = f"{self.BASE_URL}/api/public/observations/{observation_id}"
response = httpx.get(url, auth=self.auth)
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
return response.json()
hassiebp marked this conversation as resolved.
Show resolved Hide resolved

def get_scores(self, page=None, limit=None, user_id=None, name=None):
sleep(1)
params = {"page": page, "limit": limit, "userId": user_id, "name": name}
url = f"{self.BASE_URL}/api/public/scores"
response = httpx.get(url, params=params, auth=self.auth)
return response.json()

def get_traces(self, page=None, limit=None, user_id=None, name=None):
sleep(1)
params = {"page": page, "limit": limit, "userId": user_id, "name": name}
url = f"{self.BASE_URL}/api/public/traces"
response = httpx.get(url, params=params, auth=self.auth)
return response.json()

def get_trace(self, trace_id):
sleep(1)
url = f"{self.BASE_URL}/api/public/traces/{trace_id}"
response = httpx.get(url, auth=self.auth)
return response.json()
Loading
Loading