Skip to content

Commit

Permalink
add row limit/offset to Tru.get_records_and_feedback (#1232)
Browse files Browse the repository at this point in the history
* fix up

* format

* add eager join

* fix order as per PR comments

* format

* typo

* format and wording

* changed how apps in evaluation page get retrieved

* format

---------

Co-authored-by: Josh Reini <[email protected]>
  • Loading branch information
sfc-gh-pmardziel and sfc-gh-jreini authored Jun 27, 2024
1 parent 3ae409f commit 65828bd
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 49 deletions.
3 changes: 2 additions & 1 deletion docs/docs_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ notebook <7

black >= 24.4.2

# Formatting packages pinned for consistency with format checker.
yapf == 0.32.0
isort == 5.10.1
isort == 5.10.1
11 changes: 10 additions & 1 deletion trulens_eval/examples/experimental/dummy_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"from trulens_eval import Feedback\n",
"from trulens_eval import Tru\n",
"from trulens_eval.feedback.provider.hugs import Dummy\n",
"from trulens_eval.schema import FeedbackMode\n",
"from trulens_eval.schema.feedback import FeedbackMode\n",
"from trulens_eval.tru_custom_app import TruCustomApp\n",
"from trulens_eval.utils.threading import TP\n",
"\n",
Expand All @@ -76,6 +76,15 @@
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tru.get_records_and_feedback(limit=10)[0]"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
8 changes: 7 additions & 1 deletion trulens_eval/trulens_eval/database/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,19 @@ def get_apps(self) -> Iterable[JSON]:
@abc.abstractmethod
def get_records_and_feedback(
self,
app_ids: Optional[List[mod_types_schema.AppID]] = None
app_ids: Optional[List[mod_types_schema.AppID]] = None,
offset: Optional[int] = None,
limit: Optional[int] = None
) -> Tuple[pd.DataFrame, Sequence[str]]:
"""Get records fom the database.
Args:
app_ids: If given, retrieve only the records for the given apps.
Otherwise all apps are retrieved.
offset: Database row offset.
limit: Limit on rows (records) returned.
Returns:
A dataframe with the records.
Expand Down
6 changes: 5 additions & 1 deletion trulens_eval/trulens_eval/database/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from trulens_eval.schema import app as mod_app_schema
from trulens_eval.schema import feedback as mod_feedback_schema
from trulens_eval.schema import record as mod_record_schema
from trulens_eval.schema import types as mod_types_schema
from trulens_eval.utils.json import json_str_of_obj

TYPE_JSON = Text
Expand Down Expand Up @@ -222,6 +221,7 @@ class Record(base):
backref=backref('records', cascade="all,delete"),
primaryjoin='AppDefinition.app_id == Record.app_id',
foreign_keys=app_id,
order_by="(Record.ts,Record.record_id)"
)

@classmethod
Expand Down Expand Up @@ -282,6 +282,8 @@ class FeedbackResult(base):
backref=backref('feedback_results', cascade="all,delete"),
primaryjoin='Record.record_id == FeedbackResult.record_id',
foreign_keys=record_id,
order_by=
"(FeedbackResult.last_ts,FeedbackResult.feedback_result_id)"
)

feedback_definition = relationship(
Expand All @@ -290,6 +292,8 @@ class FeedbackResult(base):
primaryjoin=
"FeedbackDefinition.feedback_definition_id == FeedbackResult.feedback_definition_id",
foreign_keys=feedback_definition_id,
order_by=
"(FeedbackResult.last_ts,FeedbackResult.feedback_result_id)"
)

@classmethod
Expand Down
116 changes: 93 additions & 23 deletions trulens_eval/trulens_eval/database/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
import numpy as np
import pandas as pd
from pydantic import Field
from sqlalchemy import create_engine
from sqlalchemy import Engine
from sqlalchemy import func
from sqlalchemy import select
import sqlalchemy as sa
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text as sql_text
Expand Down Expand Up @@ -75,7 +72,7 @@ class SQLAlchemyDB(DB):
session_params: dict = Field(default_factory=dict)
"""Sqlalchemy-related session."""

engine: Optional[Engine] = None
engine: Optional[sa.Engine] = None
"""Sqlalchemy engine."""

session: Optional[sessionmaker] = None
Expand Down Expand Up @@ -113,7 +110,7 @@ def __init__(
)

def _reload_engine(self):
self.engine = create_engine(**self.engine_params)
self.engine = sa.create_engine(**self.engine_params)
self.session = sessionmaker(self.engine, **self.session_params)

@classmethod
Expand Down Expand Up @@ -415,7 +412,7 @@ def get_feedback_defs(
"""See [DB.get_feedback_defs][trulens_eval.database.base.DB.get_feedback_defs]."""

with self.session.begin() as session:
q = select(self.orm.FeedbackDefinition)
q = sa.select(self.orm.FeedbackDefinition)
if feedback_definition_id:
q = q.filter_by(feedback_definition_id=feedback_definition_id)
fb_defs = (row[0] for row in session.execute(q))
Expand Down Expand Up @@ -484,9 +481,9 @@ def _feedback_query(
limit: Optional[int] = None
):
if count:
q = func.count(self.orm.FeedbackResult.feedback_result_id)
q = sa.func.count(self.orm.FeedbackResult.feedback_result_id)
else:
q = select(self.orm.FeedbackResult)
q = sa.select(self.orm.FeedbackResult)

if record_id:
q = q.filter_by(record_id=record_id)
Expand Down Expand Up @@ -515,7 +512,7 @@ def _feedback_query(
q = q.limit(limit)

if shuffle:
q = q.order_by(func.random())
q = q.order_by(sa.func.random())

return q

Expand Down Expand Up @@ -573,7 +570,9 @@ def get_feedback(

def get_records_and_feedback(
self,
app_ids: Optional[List[str]] = None
app_ids: Optional[List[str]] = None,
offset: Optional[int] = None,
limit: Optional[int] = None
) -> Tuple[pd.DataFrame, Sequence[str]]:
"""See [DB.get_records_and_feedback][trulens_eval.database.base.DB.get_records_and_feedback]."""

Expand All @@ -582,18 +581,42 @@ def get_records_and_feedback(
# for large databases without the use of pagination.

with self.session.begin() as session:
stmt = select(self.orm.AppDefinition).options(
joinedload(self.orm.AppDefinition.records)\
.joinedload(self.orm.Record.feedback_results)
)
stmt = sa.select(self.orm.Record)
# NOTE: We are selecting records here because offset and limit need
# to be with respect to those rows instead of AppDefinition or
# FeedbackResult rows.

if app_ids:
stmt = stmt.where(self.orm.AppDefinition.app_id.in_(app_ids))
stmt = stmt.where(self.orm.Record.app_id.in_(app_ids))

stmt = stmt.options(joinedload(self.orm.Record.feedback_results))
# NOTE(piotrm): The joinedload here makes it so that the
# feedback_results get loaded eagerly instead if lazily when
# accessed later.

# TODO(piotrm): The subsequent logic in helper methods end up
# reading all of the records and feedback_results in order to create
# a DataFrame so there is no reason to not eagerly get all of this
# data. Ideally, though, we would be making some sort of lazy
# DataFrame and then could use the lazy join feature of sqlalchemy.

stmt = stmt.order_by(self.orm.Record.ts, self.orm.Record.record_id)
# NOTE: feedback_results order is governed by the order_by on the
# orm.FeedbackResult.record backref definition. Here, we need to
# order Records as we did not use an auto join to retrieve them. If
# records were to be retrieved from AppDefinition.records via auto
# join, though, the orm backref ordering would be able to take hold.

stmt = stmt.limit(limit).offset(offset)

ex = session.execute(stmt).unique() # unique needed for joinedload
apps = (row[0] for row in ex)
ex = session.execute(stmt).unique()
# unique needed for joinedload above.

return AppsExtractor().get_df_and_cols(apps)
records = [rec[0] for rec in ex]
# TODO: Make the iteration of records lazy in some way. See
# TODO(piotrm) above.

return AppsExtractor().get_df_and_cols(records=records)


# Use this Perf for missing Perfs.
Expand Down Expand Up @@ -699,6 +722,8 @@ def _extract(_cost_json: Union[str, dict]) -> Tuple[int, float]:


class AppsExtractor:
"""Utilities for creating dataframes from orm instances."""

app_cols = ["app_id", "app_json", "type"]
rec_cols = [
"record_id", "input", "output", "tags", "record_json", "cost_json",
Expand All @@ -711,9 +736,33 @@ def __init__(self):
self.feedback_columns = set()

def get_df_and_cols(
self, apps: Iterable[orm.AppDefinition]
self,
apps: Optional[List[orm.AppDefinition]] = None,
records: Optional[List[orm.Record]] = None
) -> Tuple[pd.DataFrame, Sequence[str]]:
df = pd.concat(self.extract_apps(apps))
"""Produces a records dataframe which joins in information from apps and
feedback results.
Args:
apps: If given, includes all records of all of the apps in this
iterable.
records: If given, includes only these records. Mutually exclusive
with `apps`.
"""

assert apps is None or records is None, "`apps` and `records` are mutually exclusive"

if apps is not None:
df = pd.concat(self.extract_apps(apps))

elif records is not None:
apps = set(record.app for record in records)
df = pd.concat(self.extract_apps(apps=apps, records=records))

else:
raise ValueError("'apps` or `records` must be provided")

df["latency"] = _extract_latency(df["perf_json"])
df.reset_index(
drop=True, inplace=True
Expand All @@ -722,14 +771,35 @@ def get_df_and_cols(
return df, list(self.feedback_columns)

def extract_apps(
self, apps: Iterable[orm.AppDefinition]
self,
apps: Iterable[orm.AppDefinition],
records: Optional[List[orm.Record]] = None
) -> Iterable[pd.DataFrame]:
"""
Creates record rows with app information.
TODO: The means for enumerating records in this method is not ideal as
it does a lot of filtering.
"""

yield pd.DataFrame(
[], columns=self.app_cols + self.rec_cols
) # prevent empty iterator
for _app in apps:
try:
if _recs := _app.records:
if records is None:
# If records not provided, get all of them for `_app`.
_recs = _app.records
else:
# Otherwise get only the ones in `records`. WARNING: Avoid
# using _app.records here as doing so might get all of the
# records even the ones not in `records`
_recs = (
record for record in records
if record.app_id == _app.app_id
)

if _recs:
df = pd.DataFrame(data=self.extract_records(_recs))

for col in self.app_cols:
Expand Down
37 changes: 17 additions & 20 deletions trulens_eval/trulens_eval/pages/Evaluations.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@
tru = Tru()
lms = tru.db

df_results, feedback_cols = lms.get_records_and_feedback([])

# TODO: remove code redundancy / redundant database calls
feedback_directions = {
(
Expand Down Expand Up @@ -157,33 +155,32 @@ def extract_metadata(row: pd.Series) -> str:
return str(record_data["meta"])


if df_results.empty:
st.write("No records yet...")
apps = list(app['app_id'] for app in lms.get_apps())

if "app" in st.session_state:
app = st.session_state.app
else:
apps = list(df_results.app_id.unique())
if "app" in st.session_state:
app = st.session_state.app
else:
app = apps
app = apps

st.query_params['app'] = app
st.query_params['app'] = app

options = st.multiselect("Filter Applications", apps, default=app)
options = st.multiselect("Filter Applications", apps, default=app)

if len(options) == 0:
st.header("All Applications")
app_df = df_results
df_results, feedback_cols = lms.get_records_and_feedback(app_ids=options)

elif len(options) == 1:
st.header(options[0])
if len(options) == 0:
st.header("All Applications")

app_df = df_results[df_results.app_id.isin(options)]
elif len(options) == 1:
st.header(options[0])

else:
st.header("Multiple Applications Selected")
else:
st.header("Multiple Applications Selected")

app_df = df_results[df_results.app_id.isin(options)]
if df_results.empty:
st.write("No records yet...")
else:
app_df = df_results

tab1, tab2 = st.tabs(["Records", "Feedback Functions"])

Expand Down
12 changes: 10 additions & 2 deletions trulens_eval/trulens_eval/tru.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,14 +688,20 @@ def get_apps(self) -> List[serial.JSONized[mod_app_schema.AppDefinition]]:

def get_records_and_feedback(
self,
app_ids: Optional[List[mod_types_schema.AppID]] = None
app_ids: Optional[List[mod_types_schema.AppID]] = None,
offset: Optional[int] = None,
limit: Optional[int] = None
) -> Tuple[pandas.DataFrame, List[str]]:
"""Get records, their feeback results, and feedback names.
Args:
app_ids: A list of app ids to filter records by. If empty or not given, all
apps' records will be returned.
offset: Record row offset.
limit: Limit on the number of records to return.
Returns:
Dataframe of records with their feedback results.
Expand All @@ -705,7 +711,9 @@ def get_records_and_feedback(
if app_ids is None:
app_ids = []

df, feedback_columns = self.db.get_records_and_feedback(app_ids)
df, feedback_columns = self.db.get_records_and_feedback(
app_ids, offset=offset, limit=limit
)

return df, feedback_columns

Expand Down

0 comments on commit 65828bd

Please sign in to comment.