Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for async predictors #1813

Closed
wants to merge 31 commits into from
Closed

Add support for async predictors #1813

wants to merge 31 commits into from

Conversation

mattt
Copy link
Contributor

@mattt mattt commented Jul 18, 2024

Rebase of #1752 and #1512

* have runner return asyncio.Task instead of AsyncFuture
* make tests async and fix them
* delete remaining runner thread code :)
* review changes to tests and server

(reverts commit 828eee9)

Signed-off-by: technillogue <[email protected]>
this is the first step towards supporting continuous batching and concurrent predictions. eventually, you will be configure it so your predict function will be called concurrently

* bare minimum to support async predict
* add async tests

Signed-off-by: technillogue <[email protected]>
Signed-off-by: technillogue <[email protected]>
* conditionally create the event loop if predictor is async, and add a path for hypothetical async setup
* don't use async for predict loop if predict is not async
* add test cases for shared loop across setup and predict + asyncio.run in setup

(reverts commit b533c6b)

* lints

Signed-off-by: technillogue <[email protected]>
* async Worker._wait and its consequences
* AsyncPipe so that we can process idempotent endpoint and cancellation rather than _wait blocking the event loop
* test_prediction_cancel can be flaky on some machines
* separate _process_list to be less surprising than isasyncgen
* sleep wasn't needed
* suggestions from review
* suggestions from review
* even more suggestions from review

---------

Signed-off-by: technillogue <[email protected]>
Co-authored-by: Nick Stenning <[email protected]>

* format

Signed-off-by: technillogue <[email protected]>
* race utility for racing awaitables
* start mux, tag events with id, read pipe in a task, get events from mux
* use async pipe for async child loop
* _shutting_down vs _terminating
* race with shutdown event
* keep reading events during shutdown, but call terminate after the last Done
* emit heartbeats from mux.read
* don't use _wait. instead, setup reads event from the mux too
* worker semaphore and prediction ctx
* where _wait used to raise a fatal error, have _read_events set an error on Mux, and then Mux.read can raise the error in the right context. otherwise, the exception is stuck in a task and doesn't propagate correctly
* fix event loop errors for <3.9
* keep track of predictions in flight explicitly and use that to route logs
* don't wait for executor shutdown
* progress: check for cancelation in task done_handler
* let mux check if child is alive and set mux shutdown after leaving read event loop
* close pipe when exiting
* predict requires IDLE or PROCESSING
* try adding a BUSY state distinct from PROCESSING when we no longer have capacity
* move resetting events to setup() instead of _read_events()

previously this was in _read_events because it's a coroutine that will have the correct event loop. however, _read_events actually gets created in a task, which can run *after* the first mux.read call by setup. since setup is now the first async entrypoint in worker and in tests, we can safely move it there

* state_from_predictions_in_flight instead of checking the value of semaphore
* make prediction_ctx "private"

Signed-off-by: technillogue <[email protected]>
@mattt mattt requested a review from a team July 18, 2024 21:15
@mattt mattt marked this pull request as draft July 18, 2024 21:23
@mattt mattt force-pushed the mattt/rebase-async branch from e7931e1 to 89ab80c Compare July 19, 2024 12:12
technillogue and others added 16 commits July 19, 2024 05:13
* input downloads, output uploads, and webhooks are now handled by ClientManager, which persists for the lifetime of runner, allowing us to reuse connections, which may significantly help with large uploads.
* although I was originally going to drop output_file_prefix, it's not actually hard to maintain. the behavior is changed now and objects are uploaded as soon as they're outputted rather than after the prediction is completed.
* there's an ugly hack with uploading an empty body to get the redirect instead of making api time out from trying to upload an 140GB file. that can be fixed by implemented an MPU endpoint and/or a "fetch upload url" endpoint.
* the behavior of the non-indempotent endpoint is changed; the id is now randomly generated if it's not provided in the body. this isn't strictly required for this change alone, but is hard to carve out.
* the behavior of Path is changed significantly. see https://www.notion.so/replicate/Cog-Setup-Path-Problem-2fc41d40bcaf47579ccd8b2f4c71ee24

Co-authored-by: Mattt <[email protected]>

* format
* stick a %s on line 190 clients.py (#1707)
* local upload server can be called cluster.local in addition to .internal (#1714)

Signed-off-by: technillogue <[email protected]>
* wip

* some tweaks

* ignore some type errors

* test connection roundtrip

* add notes from python source code

Signed-off-by: technillogue <[email protected]>
* optimize webhook serialization and logging
* optimize logging by binding structlog proxies
* fix tests

---------

Signed-off-by: technillogue <[email protected]>
Signed-off-by: technillogue <[email protected]>
* add concurrency to config

* more descriptive names for predict functions

* don't cancel from signal handler if a loop is running. expose worker busy state to runner

* move handle_event_stream to PredictionEventHandler

* make setup and canceling work

* keep track of multiple runner prediction tasks to make idempotent endpoint return the same result and fix tests somewhat

* drop Runner._result, comments

* move create_event_handler into PredictionEventHandler.__init__

* break out Path.validate into value_to_path and inline get_filename and File.validate

* split out URLPath into BackwardsCompatibleDataURLTempFilePath and URLThatCanBeConvertedToPath with the download part of URLFile inlined

* let's make DataURLTempFilePath also use convert and move value_to_path back to Path.validate

* prediction->request

* split up predict/inner/prediction_ctx into enter_predict/exit_predict/prediction_ctx/inner_async_predict/predict/good_predict as one way to do it. however, exposing all of those for runner predict enter/coro exit still sucks, but this is still an improvement

* bigish change: inline predict_and_handle_errors

* inline make_error_handler into setup

* move runner.setup into runner.Runner.setup

* add concurrency to config in go

* try explicitly using prediction_ctx __enter__ and __exit__

* relax setup argument requirement to str

* glom worker into runner

* add logging message

* fix prediction retry and improve logging

* split out handle_event

* use CURL_CA_BUNDLE for file upload

* dubious upload fix

* skip worker and webhook tests since those were erroring on removed imports. fix or xfail runner tests

* validate prediction response to raise errors, but return the unvalidated output to avoid converting urls to File/Path

* expose concurrency in healthcheck

* mediocre logging that works like print

* COG_DISABLE_CANCEL to ignore cancelations

* COG_CONCURRENCY_OVERRIDE

* add ready probe as an http route

* encode webhooks only after knowing they will be sent, and bail our of upload type checks early for strs

* don't validate outputs

* add AsyncConcatenateIterator

* should_exit is not actually used by http

* format

* codecov

* describe the remaining problems with this PR and add comments about cancelation and validation

* add a test

* fix test (#1669)

* fix config schema

* allow setting both max and target concurrency in cog.yaml (#1672)

* drop default_target (#1685)

---------
Signed-off-by: technillogue <[email protected]>
Co-authored-by: Mattt <[email protected]>
* function to emit metrics
* add metrics docs

---------

Signed-off-by: technillogue <[email protected]>
predict_time_share tracks the portion of the worker's processing time that was dedicated to each individual prediction. the "cost" of each second is split across the predictions running during that second.

Co-authored-by: Zeke Sikelianos <[email protected]>

predict_time_share needs to be set before sending the completed webhook (#1683)

allow disabling time share metric with COG_DISABLE_TIME_SHARE_METRIC

Signed-off-by: technillogue <[email protected]>
* log traceback correctly
* use repr(exception) instead of str(exception) if str(exception) is blank

---------

Signed-off-by: technillogue <[email protected]>
ignore ruff lints added in the new ruff version

Fix broken `make go-test` command

This was due to conflicts in the dependencies

  === Errors
  Error: ../../../go/pkg/mod/github.com/anaskhan96/[email protected]/soup.go:20:2: missing go.sum entry for module providing package golang.org/x/net/html (imported by github.com/anaskhan96/soup); to add:
  	go get github.com/anaskhan96/[email protected]
  Error: ../../../go/pkg/mod/github.com/anaskhan96/[email protected]/soup.go:21:2: missing go.sum entry for module providing package golang.org/x/net/html/charset (imported by github.com/anaskhan96/soup); to add:
  	go get github.com/anaskhan96/[email protected]

Running `go mod tidy` fixes the issue and this commit contains the
updated go.mod and go.sum files.

Add fixes for ruff issues (#1799)
We have a problem in production where a broken model is not correctly
shutting down when requested, which means that director comes back up,
sees a healthy model (status READY/BUSY) and starts sending it new
predictions, even though it's supposed to be shutting down.

For now, try and improve the situation by poisoning the model
healthcheck on shutdown. This doesn't solve the underlying problem but
it should stop us losing more predictions to a known-broken pod.
Based on the implementation in #1698 for sync cog.

If the request to /predict contains headers `traceparent` and
`tracestate` defined by w3c Trace Context[^1] then these headers are
forwarded on to the webhook and upload calls.

This allows observability systems to link requests passing through cog.

[^1]: https://www.w3.org/TR/trace-context/

Signed-off-by: technillogue <[email protected]>
* Cast TraceContext into Mapping[str, str] to fix linter

* Include prediction id upload request

Based on #1667

This PR introduces two small changes to the file upload interface.

1. We now allow downstream services to include the destination of the
asset in a `Location` header, rather than assuming that it's the same as
the final upload url (either the one passed via `--upload-url` or the
result of a 307 redirect response.

2. We now include the `X-Prediction-Id` header in upload request, this
allows the downstream client to potentially do configuration/routing
based on the prediction ID. This ID should be considered unsafe and
needs to be validated by the downstream service.

* Extract ChunkFileReader into top-level class

---------

Co-authored-by: Mattt Zmuda <[email protected]>
@mattt mattt force-pushed the mattt/rebase-async branch from 89ab80c to 7b69bc4 Compare July 19, 2024 12:13
@mattt mattt marked this pull request as ready for review July 19, 2024 12:16
mattt added 2 commits July 19, 2024 05:20
Go version is already specified by go.mod
# webhooks

async def send_webhook(
self, url: str, response: Dict[str, Any], event: WebhookEvent
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@technillogue event is unused. Is that intentional?

@mattt mattt force-pushed the mattt/rebase-async branch from a60baee to fccd7ec Compare July 22, 2024 11:37
@mattt mattt force-pushed the mattt/rebase-async branch from fccd7ec to a9cf05d Compare July 22, 2024 11:52
@technillogue technillogue mentioned this pull request Jul 24, 2024
@mattt mattt mentioned this pull request Jul 31, 2024
* start with just changing Exception to BaseException to catch cancellation
* add much more shutdown logging

move runner.terminate into runner.shutdown after waiting for predictions to complete

* move runner.terminate into shutdown, make it async, and document the behavior of Server.stop, should_exit, force_exit, and app shutdown handler,
* fix tests

---------

Signed-off-by: technillogue <[email protected]>
@aron aron marked this pull request as draft October 16, 2024 09:37
@aron aron removed the request for review from a team October 16, 2024 09:37
@aron
Copy link
Contributor

aron commented Oct 16, 2024

Closing this for now, until it has an owner to pull it over the line.

@aron aron closed this Oct 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants