Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for async predictors #1813

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3277023
run CI for this branch the same way as for main
technillogue Jan 16, 2024
a76a012
async runner (#1352)
technillogue Jan 16, 2024
b947c90
support async predict functions (#1350)
technillogue Oct 30, 2023
7e63a48
AsyncConcatenateIterator
technillogue May 7, 2024
40180a8
create event loop before predictor setup (#1366)
technillogue Jan 16, 2024
78879e4
minimal async worker (#1410)
technillogue Jan 22, 2024
7624116
Mux prediction events (#1405)
technillogue Feb 12, 2024
a616653
replace requests with httpx and factor out clients (#1574, #1707, #1714)
technillogue Mar 29, 2024
5904b60
implement mp.Connection with async streams (#1640)
technillogue May 7, 2024
ca14e0d
optimize webhook serialization and logging (#1651)
technillogue May 8, 2024
2be1715
tweak names and style
technillogue May 8, 2024
da1e997
omnibus actual concurrency and major refactor (#1530)
technillogue May 16, 2024
f77c205
function to emit metrics (#1649)
technillogue May 17, 2024
652074a
predict_time_share metric (#1643, #1683)
technillogue May 22, 2024
840cb3c
log traceback properly (#1734)
technillogue Jun 12, 2024
3875d00
add batch size metric (#1750)
technillogue Jun 18, 2024
403d52e
fix various lints
technillogue Jul 2, 2024
16a5811
Poison model healthcheck on shutdown
nickstenning Jul 3, 2024
0955c92
Propagate trace context to webhook and upload requests
aron Jul 4, 2024
66c7ed7
[async] Include prediction id upload request (#1788)
aron Jul 17, 2024
c856770
Passing tests
mattt Jul 18, 2024
52ba22f
Revert "run CI for this branch the same way as for main"
mattt Jul 18, 2024
7b69bc4
Implement webhook and file tests
mattt Jul 19, 2024
bd6bdf2
Add missing tests for data: URIs
mattt Jul 19, 2024
b215cea
Delete .tool-versions
mattt Jul 22, 2024
560b665
Add pylint tool settings to pyproject.toml
mattt Jul 22, 2024
59d0809
Ignore pylint warnings
mattt Jul 22, 2024
46c4980
Resolve pylint warnings
mattt Jul 22, 2024
a73266f
Remove duplicate and unused WorkerState declaration
mattt Jul 22, 2024
a9cf05d
Refactor get_weights_argument
mattt Jul 22, 2024
835b658
fix shutdown bugs (#1819, #1843)
technillogue Jul 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1 +1 @@
golang 1.20
golang 1.21.0
14 changes: 14 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Metrics

Prediction objects have a `metrics` field. This normally includes `predict_time` and `total_time`. Official language models have metrics like `input_token_count`, `output_token_count`, `tokens_per_second`, and `time_to_first_token`. Currently, custom metrics from Cog are ignored when running on Replicate. Official Replicate-published models are the only exception to this. When running outside of Replicate, you can emit custom metrics like this:


```python
import cog
from cog import BasePredictor, Path

class Predictor(BasePredictor):
def predict(self, width: int, height: int) -> Path:
"""Run a single prediction on the model"""
cog.emit_metric(name="pixel_count", value=width * height)
```
13 changes: 9 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,21 @@ type Build struct {
pythonRequirementsContent []string
}

type Concurrency struct {
Max int `json:"max,omitempty" yaml:"max"`
}

type Example struct {
Input map[string]string `json:"input" yaml:"input"`
Output string `json:"output" yaml:"output"`
}

type Config struct {
Build *Build `json:"build" yaml:"build"`
Image string `json:"image,omitempty" yaml:"image"`
Predict string `json:"predict,omitempty" yaml:"predict"`
Train string `json:"train,omitempty" yaml:"train"`
Build *Build `json:"build" yaml:"build"`
Image string `json:"image,omitempty" yaml:"image"`
Predict string `json:"predict,omitempty" yaml:"predict"`
Train string `json:"train,omitempty" yaml:"train"`
Concurrency *Concurrency `json:"concurrency,omitempty" yaml:"concurrency"`
}

func DefaultConfig() *Config {
Expand Down
5 changes: 0 additions & 5 deletions pkg/config/data/config_schema_v1.0.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,6 @@
"$id": "#/properties/concurrency/properties/max",
"type": "integer",
"description": "The maximum number of concurrent predictions."
},
"default_target": {
mattt marked this conversation as resolved.
Show resolved Hide resolved
"$id": "#/properties/concurrency/properties/default_target",
"type": "integer",
"description": "The default target for number of concurrent predictions. This setting can be used by an autoscaler to determine when to scale a deployment of a model up or down."
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ authors = [{ name = "Replicate", email = "[email protected]" }]
license.file = "LICENSE"
urls."Source" = "https://github.com/replicate/cog"

requires-python = ">=3.7"
requires-python = ">=3.8"
dependencies = [
# intentionally loose. perhaps these should be vendored to not collide with user code?
"attrs>=20.1,<24",
"fastapi>=0.75.2,<0.99.0",
# we may not need http2
"httpx[http2]>=0.21.0,<1",
"pydantic>=1.9,<2",
"PyYAML",
"requests>=2,<3",
Expand All @@ -27,14 +29,15 @@ dependencies = [
optional-dependencies = { "dev" = [
"black",
"build",
"httpx",
'hypothesis<6.80.0; python_version < "3.8"',
'hypothesis; python_version >= "3.8"',
"respx",
'numpy<1.22.0; python_version < "3.8"',
'numpy; python_version >= "3.8"',
"pillow",
"pyright==1.1.347",
"pytest",
"pytest-asyncio",
"pytest-httpserver",
"pytest-rerunfailures",
"pytest-xdist",
Expand Down
12 changes: 11 additions & 1 deletion python/cog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from pydantic import BaseModel

from .predictor import BasePredictor
from .types import ConcatenateIterator, File, Input, Path, Secret
from .server.worker import emit_metric
from .types import (
AsyncConcatenateIterator,
ConcatenateIterator,
File,
Input,
Path,
Secret,
)

try:
from ._version import __version__
Expand All @@ -14,8 +22,10 @@
"BaseModel",
"BasePredictor",
"ConcatenateIterator",
"AsyncConcatenateIterator",
"File",
"Input",
"Path",
"Secret",
"emit_metric",
]
22 changes: 20 additions & 2 deletions python/cog/command/ast_openapi_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,24 @@
"summary": "Healthcheck"
}
},
"/ready": {
"get": {
"summary": "Ready",
"operationId": "ready_ready_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"title": "Response Ready Ready Get"
}
}
}
}
}
}
},
"/predictions": {
"post": {
"description": "Run a single prediction on the model",
Expand Down Expand Up @@ -372,7 +390,7 @@ def get_call_name(call: ast.Call) -> str:
def parse_args(tree: ast.AST) -> "list[tuple[ast.arg, ast.expr | types.EllipsisType]]":
"""Parse argument, default pairs from a file with a predict function"""
predict = find(tree, "predict")
assert isinstance(predict, ast.FunctionDef)
assert isinstance(predict, (ast.FunctionDef, ast.AsyncFunctionDef))
args = predict.args.args # [-len(defaults) :]
# use Ellipsis instead of None here to distinguish a default of None
defaults = [...] * (len(args) - len(predict.args.defaults)) + predict.args.defaults
Expand Down Expand Up @@ -449,7 +467,7 @@ def parse_return_annotation(
tree: ast.AST, fn: str = "predict"
) -> "tuple[JSONDict, JSONDict]":
predict = find(tree, fn)
if not isinstance(predict, ast.FunctionDef):
if not isinstance(predict, (ast.FunctionDef, ast.AsyncFunctionDef)):
raise ValueError("Could not find predict function")
annotation = predict.returns
if not annotation:
Expand Down
86 changes: 0 additions & 86 deletions python/cog/files.py

This file was deleted.

26 changes: 1 addition & 25 deletions python/cog/json.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import io
from datetime import datetime
from enum import Enum
from types import GeneratorType
from typing import Any, Callable
from typing import Any

from pydantic import BaseModel

from .types import Path


def make_encodeable(obj: Any) -> Any:
"""
Expand Down Expand Up @@ -39,24 +36,3 @@ def make_encodeable(obj: Any) -> Any:
if isinstance(obj, np.ndarray):
return obj.tolist()
return obj


def upload_files(obj: Any, upload_file: Callable[[io.IOBase], str]) -> Any:
"""
Iterates through an object from make_encodeable and uploads any files.

When a file is encountered, it will be passed to upload_file. Any paths will be opened and converted to files.
"""
# skip four isinstance checks for fast text models
if type(obj) == str: # noqa: E721
return obj
if isinstance(obj, dict):
return {key: upload_files(value, upload_file) for key, value in obj.items()}
if isinstance(obj, list):
return [upload_files(value, upload_file) for value in obj]
if isinstance(obj, Path):
with obj.open("rb") as f:
return upload_file(f)
if isinstance(obj, io.IOBase):
return upload_file(obj)
return obj
1 change: 1 addition & 0 deletions python/cog/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,5 @@ def setup_logging(*, log_level: int = logging.NOTSET) -> None:

# Reconfigure log levels for some overly chatty libraries
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
# FIXME: no more urllib3(?)
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
Loading