-
Notifications
You must be signed in to change notification settings - Fork 640
[API server] handle logs request in coroutine #5366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Aylei <[email protected]>
Signed-off-by: Aylei <[email protected]>
Signed-off-by: Aylei <[email protected]>
Signed-off-by: Aylei <[email protected]>
Signed-off-by: Aylei <[email protected]>
Signed-off-by: Aylei <[email protected]>
Signed-off-by: Aylei <[email protected]>
/smoke-test -k test_minimal |
/smoke-test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, awesome work @aylei! Left a handful of comments - mostly questions/asking for some clarification comments
break | ||
|
||
|
||
def cancellation_guard(func): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make sure this still works with the type checker? That is, it doesn't erase the types of functions it is applied to. We have this issue with some decorators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I will take a look.
|
||
background_tasks.add_task(cancel_task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty confusing - wouldn't this immediately cancel the task? Is this deferred for some reason? Can we leave a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From reading the BackgroundTasks doc, I see that "background tasks" don't actually start until after the request is finished. (Contrary to how it sounds, that they would immediately begin in the background.) Still, could we clarify in a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, good catch!
sky/utils/env_options.py
Outdated
ctx = context.get() | ||
if ctx is not None: | ||
v = ctx.getenv(self.env_var, str(self.default)) | ||
else: | ||
v = os.getenv(self.env_var, str(self.default)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can make this entire block into a common helper function.
also, can we check other uses of getenv? I'm a bit concerned especially about the config env vars
futs.append( | ||
pool.apply_async(pipe, | ||
(proc.stdout, ctx.output_stream(sys.stdout)))) | ||
if proc.stderr is not None: | ||
futs.append( | ||
pool.apply_async(pipe, | ||
(proc.stderr, ctx.output_stream(sys.stderr)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we update process_subprocess_stream? Seems like both that code and this code are reading from proc.stdout / proc.stderr, which I'd expect to cause issues. No?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! It works coincidentally because the code path of sky logs
always set process_stream=False
so process_subprocess_stream
and pipe_and_wait_process
will never be called in the same time in current code base. I did not update process_subprocess_stream
because it can only runs in main thread now
skypilot/sky/skylet/log_lib.py
Line 127 in 7b804da
# Do not launch a thread for stdout as the rich.status does not |
I think we can assert process_stream=False when ctx is not None for now and refine process_subprocess_stream
when we broad the usage of async context
@@ -390,6 +393,114 @@ def _request_execution_wrapper(request_id: str, | |||
logger.info(f'Request {request_id} finished') | |||
|
|||
|
|||
async def execute_request(request: api_requests.Request): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename to clarify that this uses a coroutine rather than the typical request submission?
async def execute_request(request: api_requests.Request): | |
async def execute_request_coroutine(request: api_requests.Request): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The async
keyword implies the same semantic here as "coroutine" and there would be a lint failure if execute_request
is not called with await
or asyncio primitives. For request that is executed in process executor, we use schedule_request
. So maybe it is okay to keep the current one for brevity, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still would like some distinction. If not in the name, in a comment, but I think the name is better.
My concern is not that this function must be run in a coroutine. As you say, that's clear from the async
keyword. It's more about clarifying the type of request (a "coroutine" request vs a normal executor-based request).
The main concern: If I am new to skypilot and look at this function, I might expect that all requests are going to be executed using this function. In fact, only coroutine requests use this. For other requests, there is a totally different code path that the executor uses to execute the request. I may not realize that the executor does not run requests using coroutines.
request.log_path.touch() | ||
return request | ||
|
||
|
||
def schedule_request( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: refactor schedule_request to take a Request, for consistency with the prepare_request / execute_request flow. That way we can remove all the args and move them/the docstring to prepare_request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed! I also considered the same approach and postponed this to make this PR more focused, follow up: #5434
Signed-off-by: Aylei <[email protected]>
close #4767
This PR includes the minimal changes that move
/logs
handling to coroutine:/logs
in uvicorn's event loop;Though the task is now executed directly in the unvicorn process, we still maintain a request record for logs request to keep the behavior consistent: user can still cancel a log request
sky api cancel
and retrieve the log again withsky api logs
.Follow ups:
sky jobs log
Benchmark
python tests/load_tests/test_load_on_server.py -n 100 --apis tail_logs -c kubernetes
under low server concurrency, 1c2g machine (1 long workers + 2 short workers):There is a 7x improvement in average. The bottleneck of this PR is that each log task runs in a dedicated thread and there is only 1 uvicorn worker process, GIL contention makes the 100 logs threads cannot be fully concurrent.
python tests/load_tests/test_load_on_server.py -n 100 --apis tail_logs -c aws
under unlimited concurrency local mode (burstable worker), 4c16g machine:Resources:
About 10x memory efficiency. However, the test found that logs on aws instance is significantly slower than logs on kubernetes instance (I switch the benchmark env to AWS EC2 for accurate resource usage accounting). This might be related to more RPCs/CPU cycles touched by the AWS code path, I leave this as a followup as it is not actually relevant to this PR.
Tests
Tested (run the relevant ones):
bash format.sh
/smoke-test
(CI) orpytest tests/test_smoke.py
(local)/smoke-test -k test_name
(CI) orpytest tests/test_smoke.py::test_name
(local)/quicktest-core
(CI) orpytest tests/smoke_tests/test_backward_compat.py
(local)