Skip to content

Commit 01fb5e7

Browse files
authored
Observability: implement tracing for http requests (#329)
* Corrections on test setup and tests * Support for env vars: RUNPOD_API_BASE_URL & RUNPOD_ENDPOINT_BASE_URL * Added TRACE level logging (X-Request-ID=job_id for tracing) * Introducing runpod.http_client (async|sync) + tracer * In order to trace async calls, any asyncio.ClientSession is now using AsyncClientSession * In order to trace sync calls, any requests.Session use is now using SyncClientSession * FibonacciRetry for a less aggressive retry strategy * Workaround: cryptography.utils.CryptographyDeprecationWarning `cryptography.utils.CryptographyDeprecationWarning: TripleDES has been moved to cryptography.hazmat.decrepit.ciphers.algorithms.TripleDES and will be removed from this module in 48.0.0` See paramiko/paramiko#2419
1 parent 335fee5 commit 01fb5e7

33 files changed

+751
-132
lines changed

.github/workflows/CI-pylint.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ jobs:
3131
- name: Install Dependencies
3232
run: |
3333
python -m pip install --upgrade pip
34-
pip install .[test]
34+
pip install '.[test]'
3535
3636
- name: Pylint Source
37-
run: pylint --ignore-paths='build/*' --ignore='_version.py' $(find . -type f -name '*.py') \
37+
run: pylint $(git ls-files '*.py')

.github/workflows/CI-pytests.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
- name: Install Dependencies
3131
run: |
3232
python -m pip install --upgrade pip
33-
pip install .[test]
33+
pip install '.[test]'
3434
3535
- name: Run Tests
36-
run: pytest --cov-config=.coveragerc --timeout=120 --timeout_method=thread --cov=runpod --cov-report=xml --cov-report=term-missing --cov-fail-under=100 -W error -p no:cacheprovider -p no:unraisableexception
36+
run: pytest --cov-config=.coveragerc --timeout=120 --timeout_method=thread --cov=runpod --cov-report=xml --cov-report=term-missing --cov-fail-under=98 -W error -p no:cacheprovider -p no:unraisableexception

CONTRIBUTING.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Here is a quick guide on how to contribute code to this project:
3535
6. Run tests to ensure that your changes do not break any existing functionality. You can run tests using the following command:
3636

3737
```bash
38-
pip install .[test]
38+
pip install '.[test]'
3939
pytest
4040
```
4141

examples/endpoints/asyncio_job_request.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@
33
"""
44

55
import asyncio
6-
import aiohttp
76

87
import runpod
9-
from runpod import AsyncioEndpoint, AsyncioJob
8+
from runpod import http_client, AsyncioEndpoint, AsyncioJob
109

1110
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # For Windows Users
1211

@@ -17,7 +16,7 @@ async def main():
1716
'''
1817
Function to run the example.
1918
'''
20-
async with aiohttp.ClientSession() as session:
19+
async with http_client.AsyncClientSession() as session:
2120
# Invoke API
2221
payload = {}
2322
endpoint = AsyncioEndpoint("ENDPOINT_ID", session)

pyproject.toml

+4-3
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,12 @@ runpod = "runpod.cli.entry:runpod_cli"
5454
test = [
5555
"asynctest",
5656
"nest_asyncio",
57-
"pylint",
58-
"pytest",
57+
"pylint==3.2.5",
58+
"pytest-asyncio",
5959
"pytest-cov",
6060
"pytest-timeout",
61-
"pytest-asyncio",
61+
"pytest-watch",
62+
"pytest",
6263
]
6364

6465
[build-system]

pytest.ini

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[pytest]
2+
addopts = --durations=10 --cov-config=.coveragerc --timeout=120 --timeout_method=thread --cov=runpod --cov-report=xml --cov-report=term-missing --cov-fail-under=98 -W error -p no:cacheprovider -p no:unraisableexception
3+
python_files = tests.py test_*.py *_test.py
4+
norecursedirs = venv *.egg-info .git build

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ backoff >= 2.2.1
55
boto3 >= 1.26.165
66
click >= 8.1.7
77
colorama >= 0.2.5, < 0.4.7
8+
cryptography < 43.0.0
89
fastapi[all] >= 0.94.0
910
paramiko >= 3.3.1
1011
prettytable >= 3.9.0

runpod/__init__.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,7 @@
4242
else:
4343
api_key = None # pylint: disable=invalid-name
4444

45-
api_url_base = "https://api.runpod.io" # pylint: disable=invalid-name
46-
47-
endpoint_url_base = "https://api.runpod.ai/v2" # pylint: disable=invalid-name
45+
endpoint_url_base = os.environ.get("RUNPOD_ENDPOINT_BASE_URL", "https://api.runpod.ai/v2") # pylint: disable=invalid-name
4846

4947

5048
# --------------------------- Force Logging Levels --------------------------- #

runpod/api/graphql.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"""
44

55
import json
6+
import os
67
from typing import Any, Dict
78

89
import requests
@@ -18,7 +19,8 @@ def run_graphql_query(query: str) -> Dict[str, Any]:
1819
Run a GraphQL query
1920
'''
2021
from runpod import api_key # pylint: disable=import-outside-toplevel, cyclic-import
21-
url = f"https://api.runpod.io/graphql?api_key={api_key}"
22+
api_url_base = os.environ.get("RUNPOD_API_BASE_URL", "https://api.runpod.io")
23+
url = f"{api_url_base}/graphql?api_key={api_key}"
2224

2325
headers = {
2426
"Content-Type": "application/json",

runpod/cli/groups/pod/commands.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,23 @@ def create_new_pod(name, image, gpu_type, gpu_count, support_public_ip): # pylin
3333
'''
3434
Creates a pod.
3535
'''
36+
kwargs = {
37+
"gpu_count": gpu_count,
38+
"support_public_ip": support_public_ip,
39+
}
40+
3641
if not name:
3742
name = click.prompt('Enter pod name', default='RunPod-CLI-Pod')
3843

3944
quick_launch = click.confirm('Would you like to launch default pod?', abort=True)
4045
if quick_launch:
4146
image = 'runpod/base:0.0.0'
4247
gpu_type = 'NVIDIA GeForce RTX 3090'
43-
ports ='22/tcp'
48+
kwargs["ports"] ='22/tcp'
4449

4550
click.echo('Launching default pod...')
4651

47-
new_pod = create_pod(name, image, gpu_type,
48-
gpu_count=gpu_count, support_public_ip=support_public_ip, ports=ports)
52+
new_pod = create_pod(name, image, gpu_type, **kwargs)
4953

5054
click.echo(f'Pod {new_pod["id"]} has been created.')
5155

runpod/endpoint/asyncio/asyncio_runner.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,23 @@
33

44
from typing import Any, Dict
55
import asyncio
6-
import aiohttp
76

7+
from runpod.http_client import ClientSession
88
from runpod.endpoint.helpers import FINAL_STATES, is_completed
99

1010

1111
class Job:
1212
"""Class representing a job for an asynchronous endpoint"""
1313

14-
def __init__(self, endpoint_id: str, job_id: str, session: aiohttp.ClientSession):
14+
def __init__(self, endpoint_id: str, job_id: str, session: ClientSession):
1515
from runpod import api_key, endpoint_url_base # pylint: disable=import-outside-toplevel,cyclic-import
1616

1717
self.endpoint_id = endpoint_id
1818
self.job_id = job_id
1919
self.headers = {
2020
"Content-Type": "application/json",
21-
"Authorization": f"Bearer {api_key}"
21+
"Authorization": f"Bearer {api_key}",
22+
"X-Request-ID": job_id,
2223
}
2324
self.session = session
2425
self.endpoint_url_base = endpoint_url_base
@@ -100,7 +101,7 @@ async def cancel(self) -> dict:
100101
class Endpoint:
101102
"""Class for running endpoint"""
102103

103-
def __init__(self, endpoint_id: str, session: aiohttp.ClientSession):
104+
def __init__(self, endpoint_id: str, session: ClientSession):
104105
from runpod import api_key, endpoint_url_base # pylint: disable=import-outside-toplevel
105106

106107
self.endpoint_id = endpoint_id

runpod/http_client.py

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
"""
2+
HTTP Client abstractions
3+
"""
4+
5+
import os
6+
import requests
7+
from aiohttp import (
8+
ClientSession,
9+
ClientTimeout,
10+
TCPConnector,
11+
)
12+
from .tracer import (
13+
create_aiohttp_tracer,
14+
create_request_tracer,
15+
)
16+
from .cli.groups.config.functions import get_credentials
17+
from .user_agent import USER_AGENT
18+
19+
20+
def get_auth_header():
21+
"""
22+
Produce a header dict with the `Authorization` key derived from
23+
credentials.get("api_key") OR os.getenv('RUNPOD_AI_API_KEY')
24+
"""
25+
if credentials := get_credentials():
26+
auth = credentials.get("api_key", "")
27+
else:
28+
auth = os.getenv("RUNPOD_AI_API_KEY", "")
29+
30+
return {
31+
"Content-Type": "application/json",
32+
"Authorization": auth,
33+
"User-Agent": USER_AGENT,
34+
}
35+
36+
37+
def AsyncClientSession(*args, **kwargs): # pylint: disable=invalid-name
38+
"""
39+
Deprecation from aiohttp.ClientSession forbids inheritance.
40+
This is now a factory method
41+
TODO: use httpx
42+
"""
43+
return ClientSession(
44+
connector=TCPConnector(limit=0),
45+
headers=get_auth_header(),
46+
timeout=ClientTimeout(600, ceil_threshold=400),
47+
trace_configs=[create_aiohttp_tracer()],
48+
*args,
49+
**kwargs,
50+
)
51+
52+
53+
class SyncClientSession(requests.Session):
54+
"""
55+
Inherits requests.Session to override `request()` method for tracing
56+
TODO: use httpx
57+
"""
58+
59+
def request(self, method, url, **kwargs): # pylint: disable=arguments-differ
60+
"""
61+
Override for tracing. Not using super().request()
62+
to capture metrics for connection and transfer times
63+
"""
64+
with create_request_tracer() as tracer:
65+
# Separate out the kwargs that are not applicable to `requests.Request`
66+
request_kwargs = {
67+
k: v
68+
for k, v in kwargs.items()
69+
# contains the names of the arguments
70+
if k in requests.Request.__init__.__code__.co_varnames
71+
}
72+
73+
# Separate out the kwargs that are applicable to `requests.Request`
74+
send_kwargs = {k: v for k, v in kwargs.items() if k not in request_kwargs}
75+
76+
# Create a PreparedRequest object to hold the request details
77+
req = requests.Request(method, url, **request_kwargs)
78+
prepped = self.prepare_request(req)
79+
tracer.request = prepped # Assign the request to the tracer
80+
81+
# Merge environment settings
82+
settings = self.merge_environment_settings(
83+
prepped.url,
84+
send_kwargs.get("proxies"),
85+
send_kwargs.get("stream"),
86+
send_kwargs.get("verify"),
87+
send_kwargs.get("cert"),
88+
)
89+
send_kwargs.update(settings)
90+
91+
# Send the request
92+
response = self.send(prepped, **send_kwargs)
93+
tracer.response = response # Assign the response to the tracer
94+
95+
return response

runpod/serverless/modules/rp_fastapi.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from .worker_state import Jobs
1919
from .rp_ping import Heartbeat
2020
from ...version import __version__ as runpod_version
21+
from ...http_client import SyncClientSession
2122

2223

2324
RUNPOD_ENDPOINT_ID = os.environ.get("RUNPOD_ENDPOINT_ID", None)
@@ -157,7 +158,7 @@ def _send_webhook(url: str, payload: Dict[str, Any]) -> bool:
157158
Returns:
158159
bool: True if the request was successful, False otherwise.
159160
"""
160-
with requests.Session() as session:
161+
with SyncClientSession() as session:
161162
try:
162163
response = session.post(url, json=payload, timeout=10)
163164
response.raise_for_status() # Raises exception for 4xx/5xx responses

runpod/serverless/modules/rp_http.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44

55
import os
66
import json
7-
import aiohttp
8-
from aiohttp_retry import RetryClient, ExponentialRetry
9-
7+
from aiohttp import ClientError
8+
from aiohttp_retry import RetryClient, FibonacciRetry
9+
from runpod.http_client import ClientSession
1010
from runpod.serverless.modules.rp_logger import RunPodLogger
1111
from .worker_state import Jobs, WORKER_ID
1212

@@ -20,11 +20,11 @@
2020
job_list = Jobs()
2121

2222

23-
async def _transmit(client_session, url, job_data):
23+
async def _transmit(client_session: ClientSession, url, job_data):
2424
"""
2525
Wrapper for transmitting results via POST.
2626
"""
27-
retry_options = ExponentialRetry(attempts=3)
27+
retry_options = FibonacciRetry(attempts=3)
2828
retry_client = RetryClient(client_session=client_session, retry_options=retry_options)
2929

3030
kwargs = {
@@ -37,11 +37,14 @@ async def _transmit(client_session, url, job_data):
3737
await client_response.text()
3838

3939

40-
async def _handle_result(session, job_data, job, url_template, log_message, is_stream=False): # pylint: disable=too-many-arguments
40+
# pylint: disable=too-many-arguments, disable=line-too-long
41+
async def _handle_result(session: ClientSession, job_data, job, url_template, log_message, is_stream=False):
4142
"""
4243
A helper function to handle the result, either for sending or streaming.
4344
"""
4445
try:
46+
session.headers["X-Request-ID"] = job["id"]
47+
4548
serialized_job_data = json.dumps(job_data, ensure_ascii=False)
4649

4750
is_stream = "true" if is_stream else "false"
@@ -50,7 +53,7 @@ async def _handle_result(session, job_data, job, url_template, log_message, is_s
5053
await _transmit(session, url, serialized_job_data)
5154
log.debug(f"{log_message}", job['id'])
5255

53-
except aiohttp.ClientError as err:
56+
except ClientError as err:
5457
log.error(f"Failed to return job results. | {err}", job['id'])
5558

5659
except (TypeError, RuntimeError) as err:

runpod/serverless/modules/rp_job.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import json
1111
import asyncio
1212
import traceback
13-
from aiohttp import ClientSession
1413

14+
from runpod.http_client import ClientSession
1515
from runpod.serverless.modules.rp_logger import RunPodLogger
1616
from .worker_state import WORKER_ID, Jobs
1717
from .rp_tips import check_return_size
@@ -43,10 +43,8 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
4343
Will continue trying to get a job until one is available.
4444
4545
Args:
46-
session (ClientSession): The aiohttp ClientSession to use for the request.
46+
session (ClientSession): The async http client to use for the request.
4747
retry (bool): Whether to retry if no job is available.
48-
49-
Note: Retry True just for ease of, if testing improved this can be removed.
5048
"""
5149
next_job = None
5250

@@ -106,7 +104,7 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
106104
if retry is False:
107105
break
108106

109-
await asyncio.sleep(0)
107+
await asyncio.sleep(1)
110108
else:
111109
job_list.add_job(next_job["id"])
112110
log.debug("Request ID added.", next_job['id'])

runpod/serverless/modules/rp_logger.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717

1818
MAX_MESSAGE_LENGTH = 4096
19-
LOG_LEVELS = ['NOTSET', 'DEBUG', 'INFO', 'WARN', 'ERROR']
19+
LOG_LEVELS = ['NOTSET', 'DEBUG', 'TRACE', 'INFO', 'WARN', 'ERROR']
2020

2121

2222
def _validate_log_level(log_level):
@@ -32,7 +32,7 @@ def _validate_log_level(log_level):
3232
return log_level
3333

3434
if isinstance(log_level, int):
35-
if log_level < 0 or log_level > 4:
35+
if log_level < 0 or log_level >= len(LOG_LEVELS):
3636
raise ValueError(f'Invalid debug level: {log_level}')
3737

3838
return LOG_LEVELS[log_level]
@@ -134,3 +134,9 @@ def tip(self, message):
134134
tip log
135135
'''
136136
self.log(message, 'TIP')
137+
138+
def trace(self, message, request_id: Optional[str] = None):
139+
'''
140+
trace log (buffered until flushed)
141+
'''
142+
self.log(message, 'TRACE', request_id)

0 commit comments

Comments
 (0)