Skip to content

Commit d7806d7

Browse files
author
juanroesel
committed
Added mechanism to monitor task queue and log metric to prometheus
1 parent e413c65 commit d7806d7

File tree

5 files changed

+365
-70
lines changed

5 files changed

+365
-70
lines changed

llama_cpp/_utils.py

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import sys
33
import psutil
4+
import asyncio
45
import subprocess
56

67
from typing import Any, Dict, List, Tuple, Union
@@ -12,6 +13,7 @@
1213
STDOUT_FILENO = 1
1314
STDERR_FILENO = 2
1415

16+
1517
class suppress_stdout_stderr(object):
1618
# NOTE: these must be "saved" here to avoid exceptions when using
1719
# this context manager inside of a __del__ method
@@ -88,6 +90,7 @@ def get_cpu_usage(pid) -> float:
8890
process = psutil.Process(pid)
8991
return process.cpu_percent()
9092

93+
9194
def get_ram_usage(pid) -> float:
9295
"""
9396
RAM usage in MiB by the current process.
@@ -97,12 +100,19 @@ def get_ram_usage(pid) -> float:
97100
ram_usage = ram_info.rss / (1024 * 1024) # Convert to MiB
98101
return ram_usage
99102

103+
100104
def get_gpu_info_by_pid(pid) -> float:
101105
"""
102106
GPU memory usage by the current process (if GPU is available)
103107
"""
104108
try:
105-
gpu_info = subprocess.check_output(["nvidia-smi", "--query-compute-apps=pid,used_memory", "--format=csv,noheader"]).decode("utf-8")
109+
gpu_info = subprocess.check_output(
110+
[
111+
"nvidia-smi",
112+
"--query-compute-apps=pid,used_memory",
113+
"--format=csv,noheader",
114+
]
115+
).decode("utf-8")
106116
gpu_info = gpu_info.strip().split("\n")
107117
for info in gpu_info:
108118
gpu_pid, gpu_ram_usage = info.split(", ")
@@ -112,14 +122,59 @@ def get_gpu_info_by_pid(pid) -> float:
112122
pass
113123
return 0.0
114124

125+
115126
def get_gpu_general_info() -> Tuple[float, float, float]:
116127
"""
117128
GPU general info (if GPU is available)
118129
"""
119130
try:
120-
gpu_info = subprocess.check_output(["nvidia-smi", "--query-gpu=utilization.gpu,memory.used,memory.free", "--format=csv,noheader"]).decode("utf-8")
121-
gpu_utilization, gpu_memory_used, gpu_memory_free = gpu_info.strip().split("\n")[0].split(", ")
122-
return tuple(float(tup.split()[0]) for tup in [gpu_utilization, gpu_memory_used, gpu_memory_free])
131+
gpu_info = subprocess.check_output(
132+
[
133+
"nvidia-smi",
134+
"--query-gpu=utilization.gpu,memory.used,memory.free",
135+
"--format=csv,noheader",
136+
]
137+
).decode("utf-8")
138+
gpu_utilization, gpu_memory_used, gpu_memory_free = (
139+
gpu_info.strip().split("\n")[0].split(", ")
140+
)
141+
return tuple(
142+
float(tup.split()[0])
143+
for tup in [gpu_utilization, gpu_memory_used, gpu_memory_free]
144+
)
123145
except (subprocess.CalledProcessError, FileNotFoundError):
124146
pass
125147
return 0.0, 0.0, 0.0
148+
149+
150+
async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]):
151+
"""
152+
An asynchronous function that monitors the task queue and updates
153+
a shared status dictionary with the number of tasks that have not
154+
started and the number of tasks that are currently running.
155+
It recursively calls itself to continuously monitor the task queue.
156+
NOTE: There will always be 4 tasks running in the task queue:
157+
- LifespanOn.main: Main application coroutine
158+
- Server.serve: Server coroutine
159+
- monitor_task_queue: Task queue monitoring coroutine
160+
- RequestReponseCycle.run_asgi: ASGI single cycle coroutine
161+
Any upcoming requests will be added to the task queue in the form of
162+
another RequestReponseCycle.run_asgi coroutine.
163+
"""
164+
all_tasks = asyncio.all_tasks()
165+
166+
# Get count of all running tasks
167+
_all_tasks = [task for task in all_tasks if task._state == "PENDING"]
168+
status_dict["running_tasks_count"] = len(_all_tasks)
169+
# Get basic metadata of all running tasks
170+
status_dict["running_tasks"] = {
171+
task.get_name(): str(task.get_coro())
172+
.encode("ascii", errors="ignore")
173+
.strip()
174+
.decode("ascii")
175+
for task in all_tasks
176+
}
177+
178+
asyncio.create_task(
179+
monitor_task_queue(status_dict)
180+
) # pass status_dict to the next task

llama_cpp/llama.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,13 @@
3838
import llama_cpp.llama_cpp as llama_cpp
3939
import llama_cpp.llama_chat_format as llama_chat_format
4040

41-
from llama_cpp.llama_metrics import Metrics, MetricsExporter
41+
from llama_cpp.llama_metrics import RequestMetrics, MetricsExporter
4242

4343
from llama_cpp._utils import (
4444
get_cpu_usage,
4545
get_ram_usage,
4646
get_gpu_info_by_pid,
47-
get_gpu_general_info,
47+
get_gpu_general_info
4848
)
4949

5050
from llama_cpp.llama_speculative import LlamaDraftModel
@@ -938,7 +938,7 @@ def decode_batch(seq_sizes: List[int]):
938938
return output, total_tokens
939939
else:
940940
return output
941-
941+
942942
def _create_completion(
943943
self,
944944
prompt: Union[str, List[int]],
@@ -972,7 +972,6 @@ def _create_completion(
972972
]:
973973
assert self._ctx is not None
974974
assert suffix is None or suffix.__class__ is str
975-
976975
# Variables required for metric collection
977976
_metrics_dict = {}
978977
_ttft_start = time.time()
@@ -1464,8 +1463,8 @@ def logit_bias_processor(
14641463
}
14651464

14661465
# Log metrics to Prometheus
1467-
_all_metrics = Metrics(**_metrics_dict)
1468-
self.metrics.log_metrics(_all_metrics, labels=_labels)
1466+
_all_metrics = RequestMetrics(**_metrics_dict)
1467+
self.metrics.log_request_metrics(_all_metrics, labels=_labels)
14691468

14701469
return
14711470

@@ -1585,8 +1584,8 @@ def logit_bias_processor(
15851584
}
15861585

15871586
# Log metrics to Prometheus
1588-
_all_metrics = Metrics(**_metrics_dict)
1589-
self.metrics.log_metrics(_all_metrics, labels=_labels)
1587+
_all_metrics = RequestMetrics(**_metrics_dict)
1588+
self.metrics.log_request_metrics(_all_metrics, labels=_labels)
15901589

15911590
yield {
15921591
"id": completion_id,

0 commit comments

Comments
 (0)