Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Make InfluxDBClusterClient thread-safe #265

Merged
merged 1 commit into from
Nov 18, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 50 additions & 15 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from functools import wraps
import json
import socket
import threading
import random
import requests
import requests.exceptions
Expand Down Expand Up @@ -73,7 +74,7 @@ def __init__(self,
proxies=None,
):
"""Construct a new InfluxDBClient object."""
self._host = host
self.__host = host
self._port = port
self._username = username
self._password = password
Expand All @@ -98,7 +99,7 @@ def __init__(self,
else:
self._proxies = proxies

self._baseurl = "{0}://{1}:{2}".format(
self.__baseurl = "{0}://{1}:{2}".format(
self._scheme,
self._host,
self._port)
Expand All @@ -108,6 +109,22 @@ def __init__(self,
'Accept': 'text/plain'
}

# _baseurl and _host are properties to allow InfluxDBClusterClient
# to override them with thread-local variables
@property
def _baseurl(self):
return self._get_baseurl()

def _get_baseurl(self):
return self.__baseurl

@property
def _host(self):
return self._get_host()

def _get_host(self):
return self.__host

@staticmethod
def from_DSN(dsn, **kwargs):
"""Return an instance of :class:`~.InfluxDBClient` from the provided
Expand Down Expand Up @@ -740,6 +757,8 @@ def __init__(self,
self.bad_hosts = [] # Corresponding server has failures in history
self.shuffle = shuffle
host, port = self.hosts[0]
self._hosts_lock = threading.Lock()
self._thread_local = threading.local()
self._client = client_base_class(host=host,
port=port,
username=username,
Expand All @@ -757,6 +776,10 @@ def __init__(self,

setattr(self, method, self._make_func(orig_attr))

self._client._get_host = self._get_host
self._client._get_baseurl = self._get_baseurl
self._update_client_host(self.hosts[0])

@staticmethod
def from_DSN(dsn, client_base_class=InfluxDBClient,
shuffle=True, **kwargs):
Expand Down Expand Up @@ -791,19 +814,29 @@ def from_DSN(dsn, client_base_class=InfluxDBClient,
return cluster_client

def _update_client_host(self, host):
self._client._host, self._client._port = host
self._client._baseurl = "{0}://{1}:{2}".format(self._client._scheme,
self._client._host,
self._client._port)
self._thread_local.host, self._thread_local.port = host
self._thread_local.baseurl = "{0}://{1}:{2}".format(
self._client._scheme,
self._client._host,
self._client._port
)

def _get_baseurl(self):
return self._thread_local.baseurl

def _get_host(self):
return self._thread_local.host

def _make_func(self, orig_func):

@wraps(orig_func)
def func(*args, **kwargs):
if self.shuffle:
random.shuffle(self.hosts)
with self._hosts_lock:
if self.shuffle:
random.shuffle(self.hosts)

hosts = self.hosts + self.bad_hosts

hosts = self.hosts + self.bad_hosts
for h in hosts:
bad_host = False
try:
Expand All @@ -815,13 +848,15 @@ def func(*args, **kwargs):
except Exception as e:
# Errors that might caused by server failure, try another
bad_host = True
if h in self.hosts:
self.hosts.remove(h)
self.bad_hosts.append(h)
with self._hosts_lock:
if h in self.hosts:
self.hosts.remove(h)
self.bad_hosts.append(h)
finally:
if not bad_host and h in self.bad_hosts:
self.bad_hosts.remove(h)
self.hosts.append(h)
with self._hosts_lock:
if not bad_host and h in self.bad_hosts:
self.bad_hosts.remove(h)
self.hosts.append(h)

raise InfluxDBServerError("InfluxDB: no viable server!")

Expand Down