Skip to content

Commit 8d7098e

Browse files
committed
Merge pull request influxdata#133 from savoirfairelinux/pythonic
Pythonic changes (Thanks @gst!)
2 parents 3ac7549 + beda89d commit 8d7098e

File tree

4 files changed

+168
-151
lines changed

4 files changed

+168
-151
lines changed

influxdb/_dataframe_client.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
DataFrame client for InfluxDB
4+
"""
5+
import math
6+
import warnings
7+
8+
from .client import InfluxDBClient
9+
10+
import pandas as pd
11+
12+
13+
class DataFrameClient(InfluxDBClient):
14+
"""
15+
The ``DataFrameClient`` object holds information necessary to connect
16+
to InfluxDB. Requests can be made to InfluxDB directly through the client.
17+
The client reads and writes from pandas DataFrames.
18+
"""
19+
20+
EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
21+
22+
def write_points(self, data, *args, **kwargs):
23+
"""
24+
Write to multiple time series names.
25+
26+
:param data: A dictionary mapping series names to pandas DataFrames
27+
:param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
28+
or 'u'.
29+
:param batch_size: [Optional] Value to write the points in batches
30+
instead of all at one time. Useful for when doing data dumps from
31+
one database to another or when doing a massive write operation
32+
:type batch_size: int
33+
"""
34+
35+
batch_size = kwargs.get('batch_size')
36+
time_precision = kwargs.get('time_precision', 's')
37+
if batch_size:
38+
kwargs.pop('batch_size') # don't hand over to InfluxDBClient
39+
for key, data_frame in data.items():
40+
number_batches = int(math.ceil(
41+
len(data_frame) / float(batch_size)))
42+
for batch in range(number_batches):
43+
start_index = batch * batch_size
44+
end_index = (batch + 1) * batch_size
45+
data = [self._convert_dataframe_to_json(
46+
name=key,
47+
dataframe=data_frame.ix[start_index:end_index].copy(),
48+
time_precision=time_precision)]
49+
super(DataFrameClient, self).write_points(data,
50+
*args, **kwargs)
51+
return True
52+
else:
53+
data = [self._convert_dataframe_to_json(
54+
name=key, dataframe=dataframe, time_precision=time_precision)
55+
for key, dataframe in data.items()]
56+
return super(DataFrameClient, self).write_points(data,
57+
*args, **kwargs)
58+
59+
def write_points_with_precision(self, data, time_precision='s'):
60+
"""
61+
DEPRECATED. Write to multiple time series names
62+
63+
"""
64+
warnings.warn(
65+
"write_points_with_precision is deprecated, and will be removed "
66+
"in future versions. Please use "
67+
"``DataFrameClient.write_points(time_precision='..')`` instead.",
68+
FutureWarning)
69+
return self.write_points(data, time_precision='s')
70+
71+
def query(self, query, time_precision='s', chunked=False):
72+
"""
73+
Quering data into a DataFrame.
74+
75+
:param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
76+
or 'u'.
77+
:param chunked: [Optional, default=False] True if the data shall be
78+
retrieved in chunks, False otherwise.
79+
80+
"""
81+
results = super(DataFrameClient, self).query(query, database=database)
82+
if len(results) > 0:
83+
return self._to_dataframe(results, time_precision)
84+
else:
85+
return results
86+
87+
def _to_dataframe(self, json_result, time_precision):
88+
dataframe = pd.DataFrame(data=json_result['points'],
89+
columns=json_result['columns'])
90+
if 'sequence_number' in dataframe.keys():
91+
dataframe.sort(['time', 'sequence_number'], inplace=True)
92+
else:
93+
dataframe.sort(['time'], inplace=True)
94+
pandas_time_unit = time_precision
95+
if time_precision == 'm':
96+
pandas_time_unit = 'ms'
97+
elif time_precision == 'u':
98+
pandas_time_unit = 'us'
99+
dataframe.index = pd.to_datetime(list(dataframe['time']),
100+
unit=pandas_time_unit,
101+
utc=True)
102+
del dataframe['time']
103+
return dataframe
104+
105+
def _convert_dataframe_to_json(self, dataframe, name, time_precision='s'):
106+
if not isinstance(dataframe, pd.DataFrame):
107+
raise TypeError('Must be DataFrame, but type was: {}.'
108+
.format(type(dataframe)))
109+
if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
110+
isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
111+
raise TypeError('Must be DataFrame with DatetimeIndex or \
112+
PeriodIndex.')
113+
dataframe.index = dataframe.index.to_datetime()
114+
if dataframe.index.tzinfo is None:
115+
dataframe.index = dataframe.index.tz_localize('UTC')
116+
dataframe['time'] = [self._datetime_to_epoch(dt, time_precision)
117+
for dt in dataframe.index]
118+
data = {'name': name,
119+
'columns': [str(column) for column in dataframe.columns],
120+
'points': list([list(x) for x in dataframe.values])}
121+
return data
122+
123+
def _datetime_to_epoch(self, datetime, time_precision='s'):
124+
seconds = (datetime - self.EPOCH).total_seconds()
125+
if time_precision == 's':
126+
return seconds
127+
elif time_precision == 'm' or time_precision == 'ms':
128+
return seconds * 1000
129+
elif time_precision == 'u':
130+
return seconds * 1000000

influxdb/client.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,9 @@ def format_query_response(response):
113113
if 'columns' in row.keys() and 'values' in row.keys():
114114
for value in row['values']:
115115
item = {}
116-
current_col = 0
117-
for field in value:
118-
item[row['columns'][current_col]] = field
119-
current_col += 1
116+
for cur_col, field in enumerate(value):
117+
item[row['columns'][cur_col]] = field
118+
cur_col += 1
120119
items.append(item)
121120
return series
122121

@@ -237,8 +236,7 @@ def write_points(self,
237236
time_precision=None,
238237
database=None,
239238
retention_policy=None,
240-
*args,
241-
**kwargs):
239+
):
242240
"""
243241
Write to multiple time series names.
244242
@@ -261,12 +259,12 @@ def _write_points(self,
261259
database,
262260
retention_policy):
263261
if time_precision not in ['n', 'u', 'ms', 's', 'm', 'h', None]:
264-
raise Exception(
262+
raise ValueError(
265263
"Invalid time precision is given. "
266264
"(use 'n', 'u', 'ms', 's', 'm' or 'h')")
267265

268266
if self.use_udp and time_precision and time_precision != 's':
269-
raise Exception(
267+
raise ValueError(
270268
"InfluxDB only supports seconds precision for udp writes"
271269
)
272270

@@ -325,7 +323,7 @@ def create_retention_policy(
325323
query_string = \
326324
"CREATE RETENTION POLICY %s ON %s " \
327325
"DURATION %s REPLICATION %s" % \
328-
(name, (database or self._database), duration, replication)
326+
(name, database or self._database, duration, replication)
329327

330328
if default is True:
331329
query_string += " DEFAULT"
@@ -344,7 +342,7 @@ def get_list_series(self, database=None):
344342
"""
345343
Get the list of series
346344
"""
347-
return self.query("SHOW SERIES", database=(database or self._database))
345+
return self.query("SHOW SERIES", database=database)
348346

349347
def get_list_users(self):
350348
"""

influxdb/dataframe_client.py

Lines changed: 12 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -2,140 +2,18 @@
22
"""
33
DataFrame client for InfluxDB
44
"""
5-
import math
6-
import warnings
75

8-
from .client import InfluxDBClient
6+
__all__ = ['DataFrameClient']
97

108
try:
11-
import pandas as pd
12-
except ImportError:
13-
pd = None
14-
15-
16-
class DataFrameClient(InfluxDBClient):
17-
"""
18-
The ``DataFrameClient`` object holds information necessary to connect
19-
to InfluxDB. Requests can be made to InfluxDB directly through the client.
20-
The client reads and writes from pandas DataFrames.
21-
"""
22-
23-
def __init__(self, *args, **kwargs):
24-
super(DataFrameClient, self).__init__(*args, **kwargs)
25-
if not pd:
26-
raise ImportError(
27-
'DataFrameClient requires Pandas'
28-
)
29-
30-
self.EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
31-
32-
def write_points(self, data, *args, **kwargs):
33-
"""
34-
Write to multiple time series names.
35-
36-
:param data: A dictionary mapping series names to pandas DataFrames
37-
:param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
38-
or 'u'.
39-
:param batch_size: [Optional] Value to write the points in batches
40-
instead of all at one time. Useful for when doing data dumps from
41-
one database to another or when doing a massive write operation
42-
:type batch_size: int
43-
"""
44-
45-
batch_size = kwargs.get('batch_size')
46-
time_precision = kwargs.get('time_precision', 's')
47-
if batch_size:
48-
kwargs.pop('batch_size') # don't hand over to InfluxDBClient
49-
for key, data_frame in data.items():
50-
number_batches = int(math.ceil(
51-
len(data_frame) / float(batch_size)))
52-
for batch in range(number_batches):
53-
start_index = batch * batch_size
54-
end_index = (batch + 1) * batch_size
55-
data = [self._convert_dataframe_to_json(
56-
name=key,
57-
dataframe=data_frame.ix[start_index:end_index].copy(),
58-
time_precision=time_precision)]
59-
InfluxDBClient.write_points(self, data, *args, **kwargs)
60-
return True
61-
else:
62-
data = [self._convert_dataframe_to_json(
63-
name=key, dataframe=dataframe, time_precision=time_precision)
64-
for key, dataframe in data.items()]
65-
return InfluxDBClient.write_points(self, data, *args, **kwargs)
66-
67-
def write_points_with_precision(self, data, time_precision='s'):
68-
"""
69-
DEPRECATED. Write to multiple time series names
70-
71-
"""
72-
warnings.warn(
73-
"write_points_with_precision is deprecated, and will be removed "
74-
"in future versions. Please use "
75-
"``DataFrameClient.write_points(time_precision='..')`` instead.",
76-
FutureWarning)
77-
return self.write_points(data, time_precision='s')
78-
79-
def query(self, query, time_precision='s', chunked=False):
80-
"""
81-
Quering data into a DataFrame.
82-
83-
:param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
84-
or 'u'.
85-
:param chunked: [Optional, default=False] True if the data shall be
86-
retrieved in chunks, False otherwise.
87-
88-
"""
89-
result = InfluxDBClient.query(self,
90-
query=query,
91-
time_precision=time_precision,
92-
chunked=chunked)
93-
if len(result['results'][0]) > 0:
94-
return self._to_dataframe(result['results'][0], time_precision)
95-
else:
96-
return result
97-
98-
def _to_dataframe(self, json_result, time_precision):
99-
dataframe = pd.DataFrame(data=json_result['points'],
100-
columns=json_result['columns'])
101-
if 'sequence_number' in dataframe.keys():
102-
dataframe.sort(['time', 'sequence_number'], inplace=True)
103-
else:
104-
dataframe.sort(['time'], inplace=True)
105-
pandas_time_unit = time_precision
106-
if time_precision == 'm':
107-
pandas_time_unit = 'ms'
108-
elif time_precision == 'u':
109-
pandas_time_unit = 'us'
110-
dataframe.index = pd.to_datetime(list(dataframe['time']),
111-
unit=pandas_time_unit,
112-
utc=True)
113-
del dataframe['time']
114-
return dataframe
115-
116-
def _convert_dataframe_to_json(self, dataframe, name, time_precision='s'):
117-
if not isinstance(dataframe, pd.DataFrame):
118-
raise TypeError('Must be DataFrame, but type was: {}.'
119-
.format(type(dataframe)))
120-
if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
121-
isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
122-
raise TypeError('Must be DataFrame with DatetimeIndex or \
123-
PeriodIndex.')
124-
dataframe.index = dataframe.index.to_datetime()
125-
if dataframe.index.tzinfo is None:
126-
dataframe.index = dataframe.index.tz_localize('UTC')
127-
dataframe['time'] = [self._datetime_to_epoch(dt, time_precision)
128-
for dt in dataframe.index]
129-
data = {'name': name,
130-
'columns': [str(column) for column in dataframe.columns],
131-
'points': list([list(x) for x in dataframe.values])}
132-
return data
133-
134-
def _datetime_to_epoch(self, datetime, time_precision='s'):
135-
seconds = (datetime - self.EPOCH).total_seconds()
136-
if time_precision == 's':
137-
return seconds
138-
elif time_precision == 'm' or time_precision == 'ms':
139-
return seconds * 1000
140-
elif time_precision == 'u':
141-
return seconds * 1000000
9+
import pandas
10+
del pandas
11+
except ImportError as err:
12+
from .client import InfluxDBClient
13+
14+
class DataFrameClient(InfluxDBClient):
15+
def __init__(self, *a, **kw):
16+
raise ImportError("DataFrameClient requires Pandas "
17+
"which couldn't be imported: %s" % err)
18+
else:
19+
from ._dataframe_client import DataFrameClient

tests/influxdb/dataframe_client_test.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -211,13 +211,24 @@ def test_query_with_empty_result(self):
211211
assert result == []
212212

213213
def test_list_series(self):
214-
response = [
215-
{
216-
'columns': ['time', 'name'],
217-
'name': 'list_series_result',
218-
'points': [[0, 'seriesA'], [0, 'seriesB']]
219-
}
220-
]
214+
response = {
215+
'results': [
216+
{
217+
'series': [{
218+
'columns': ['id'],
219+
'name': 'seriesA',
220+
'values': [[0]],
221+
}]
222+
},
223+
{
224+
'series': [{
225+
'columns': ['id'],
226+
'name': 'seriesB',
227+
'values': [[1]],
228+
}]
229+
},
230+
]
231+
}
221232
with _mocked_session('get', 200, response):
222233
cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
223234
series_list = cli.get_list_series()

0 commit comments

Comments
 (0)