Skip to content

Commit fe49edb

Browse files
committed
Use timerfd to process bus timeouts on time
sdbus has its own internal timeout system where normally the `sd_bus_get_timeout` function should be used for the maximum time passed to `poll()`. However, Python's asyncio loop does not work with that very well as it has its own internal implementation of polling. Instead create a new timer file descriptor and use it to make asyncio call `SdBus.process()` on given time. This provides a much better performance compared to the `call_later()` implementation as timerfd timeout can be adjusted without allocating any new Python objects.
1 parent cd658bd commit fe49edb

File tree

3 files changed

+66
-3
lines changed

3 files changed

+66
-3
lines changed

src/sdbus/sd_bus_internals.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,9 @@ typedef struct {
334334
sd_bus* sd_bus_ref;
335335
PyObject* bus_fd;
336336
PyObject* loop;
337+
PyObject* timer_fd;
337338
int asyncio_watchers_last_state;
339+
int timer_fd_int;
338340
} SdBusObject;
339341

340342
extern PyType_Spec SdBusType;

src/sdbus/sd_bus_internals_bus.c

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,20 @@
2020
*/
2121
#include <errno.h>
2222
#include <poll.h>
23+
#include <sys/timerfd.h>
24+
#include <time.h>
2325
#include "sd_bus_internals.h"
2426

2527
static void SdBus_dealloc(SdBusObject* self) {
2628
if (NULL != self->loop && NULL != self->bus_fd) {
2729
Py_XDECREF(PyObject_CallMethodObjArgs(self->loop, remove_reader_str, self->bus_fd, NULL));
2830
Py_XDECREF(PyObject_CallMethodObjArgs(self->loop, remove_writer_str, self->bus_fd, NULL));
2931
}
32+
if (NULL != self->timer_fd) {
33+
Py_XDECREF(PyObject_CallMethodObjArgs(self->loop, remove_reader_str, self->bus_fd, NULL));
34+
Py_DECREF(self->timer_fd);
35+
close(self->timer_fd_int);
36+
}
3037
sd_bus_unref(self->sd_bus_ref);
3138
Py_XDECREF(self->bus_fd);
3239
Py_XDECREF(self->loop);
@@ -623,6 +630,10 @@ static PyObject* SdBus_close(SdBusObject* self, PyObject* Py_UNUSED(args)) {
623630
Py_XDECREF(CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(self->loop, remove_reader_str, self->bus_fd, NULL)));
624631
Py_XDECREF(CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(self->loop, remove_writer_str, self->bus_fd, NULL)));
625632
}
633+
if (NULL != self->timer_fd) {
634+
Py_XDECREF(PyObject_CallMethodObjArgs(self->loop, remove_reader_str, self->bus_fd, NULL));
635+
// TODO: Close timerfd
636+
}
626637
Py_RETURN_NONE;
627638
}
628639

@@ -639,7 +650,45 @@ static inline int sd_bus_get_events_zero_on_closed(SdBusObject* self) {
639650
return events;
640651
};
641652

653+
static inline int sd_bus_get_timeout_uint_max_on_closed(SdBusObject* self, uint64_t* timeout_usec) {
654+
int r = sd_bus_get_timeout(self->sd_bus_ref, timeout_usec);
655+
if (-ENOTCONN == r) {
656+
*timeout_usec = UINT64_MAX;
657+
return 0;
658+
}
659+
return r;
660+
}
661+
642662
static PyObject* SdBus_asyncio_update_fd_watchers(SdBusObject* self) {
663+
PyObject* running_loop = CALL_PYTHON_AND_CHECK(_get_or_bind_loop(self));
664+
PyObject* drive_method CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(PyObject_GetAttrString((PyObject*)self, "process"));
665+
666+
if (NULL == self->timer_fd) {
667+
self->timer_fd_int = CALL_SD_BUS_AND_CHECK(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC));
668+
if (self->timer_fd_int < 0) {
669+
PyErr_SetFromErrno(PyExc_OSError);
670+
}
671+
PyObject* timer_fd CLEANUP_PY_OBJECT = PyLong_FromLong((int)self->timer_fd_int);
672+
Py_XDECREF(CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(running_loop, add_reader_str, timer_fd, drive_method, NULL)));
673+
Py_INCREF(timer_fd);
674+
self->timer_fd = timer_fd;
675+
}
676+
677+
uint64_t timeout_usec = UINT64_MAX;
678+
CALL_SD_BUS_AND_CHECK(sd_bus_get_timeout_uint_max_on_closed(self, &timeout_usec));
679+
680+
struct itimerspec bus_timer = {0};
681+
if (timeout_usec == UINT64_MAX) {
682+
// Setting bus_timer to zero disarms timer.
683+
} else if (timeout_usec != 0) {
684+
bus_timer.it_value.tv_sec = timeout_usec / 1000000;
685+
bus_timer.it_value.tv_nsec = (timeout_usec % 1000000) * 1000;
686+
} else if (timeout_usec == 0) {
687+
Py_XDECREF(CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(running_loop, call_soon_str, drive_method, NULL)));
688+
}
689+
690+
CALL_SD_BUS_AND_CHECK(timerfd_settime(self->timer_fd_int, TFD_TIMER_ABSTIME, &bus_timer, NULL));
691+
643692
int events_to_watch = CALL_SD_BUS_AND_CHECK(sd_bus_get_events_zero_on_closed(self));
644693
if (events_to_watch == self->asyncio_watchers_last_state) {
645694
// Do not update the watchers because state is the same
@@ -648,9 +697,6 @@ static PyObject* SdBus_asyncio_update_fd_watchers(SdBusObject* self) {
648697
self->asyncio_watchers_last_state = events_to_watch;
649698
}
650699

651-
PyObject* running_loop = CALL_PYTHON_AND_CHECK(_get_or_bind_loop(self));
652-
PyObject* drive_method CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(PyObject_GetAttrString((PyObject*)self, "process"));
653-
654700
if (NULL == self->bus_fd) {
655701
self->bus_fd = CALL_PYTHON_AND_CHECK(SdBus_get_fd(self, NULL));
656702
}

test/test_sdbus_async.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from sdbus.exceptions import (
3030
DbusFailedError,
3131
DbusFileExistsError,
32+
DbusNoReplyError,
3233
DbusPropertyReadOnlyError,
3334
DbusUnknownObjectError,
3435
SdBusLibraryError,
@@ -721,6 +722,20 @@ async def too_long_wait() -> None:
721722
with self.assertRaises(SdBusLibraryError):
722723
await wait_for(too_long_wait(), timeout=1)
723724

725+
async def test_bus_timerfd(self) -> None:
726+
test_object, test_object_connection = initialize_object()
727+
728+
self.bus.method_call_timeout_usec = 10_000 # 0.01 seconds
729+
730+
loop = get_running_loop()
731+
732+
start = loop.time()
733+
734+
with self.assertRaises(DbusNoReplyError):
735+
await wait_for(test_object_connection.looong_method(), timeout=1)
736+
737+
self.assertAlmostEqual(loop.time() - start, 0.01, delta=0.01)
738+
724739
async def test_signal_queue_wildcard_match(self) -> None:
725740
test_object, test_object_connection = initialize_object()
726741

0 commit comments

Comments
 (0)