Skip to content

Modified Streaming and Generating Custom Search Command #407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,53 @@ The test suite uses Python's standard library, the built-in `unittest` library,
|/tests | Source for unit tests |
|/utils | Source for utilities shared by the examples and unit tests |

### Customization
* When working with custom search commands such as Custom Streaming Commands or Custom Generating Commands, We may need to add new fields to the records based on certain conditions.
* Structural changes like this may not be preserved.
* Make sure to use ``add_field(record, fieldname, value)`` method from SearchCommand to add a new field and value to the record.
* ___Note:__ Usage of ``add_field`` method is completely optional, if you are not facing any issues with field retention._

Do
```python
class CustomStreamingCommand(StreamingCommand):
def stream(self, records):
for index, record in enumerate(records):
if index % 1 == 0:
self.add_field(record, "odd_record", "true")
yield record
```

Don't
```python
class CustomStreamingCommand(StreamingCommand):
def stream(self, records):
for index, record in enumerate(records):
if index % 1 == 0:
record["odd_record"] = "true"
yield record
```
### Customization for Generating Custom Search Command
* Generating Custom Search Command is used to generate events using SDK code.
* Make sure to use ``gen_record()`` method from SearchCommand to add a new record and pass event data as a key=value pair separated by , (mentioned in below example).

Do
```python
@Configuration()
class GeneratorTest(GeneratingCommand):
def generate(self):
yield self.gen_record(_time=time.time(), one=1)
yield self.gen_record(_time=time.time(), two=2)
```

Don't
```python
@Configuration()
class GeneratorTest(GeneratingCommand):
def generate(self):
yield {'_time': time.time(), 'one': 1}
yield {'_time': time.time(), 'two': 2}
```

### Changelog

The [CHANGELOG](CHANGELOG.md) contains a description of changes for each version of the SDK. For the latest version, see the [CHANGELOG.md](https://github.com/splunk/splunk-sdk-python/blob/master/CHANGELOG.md) on GitHub.
Expand Down
15 changes: 11 additions & 4 deletions splunklib/searchcommands/generating_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,20 @@ def _execute(self, ifile, process):

def _execute_chunk_v2(self, process, chunk):
count = 0
records = []
for row in process:
self._record_writer.write_record(row)
records.append(row)
count += 1
if count == self._record_writer._maxresultrows:
self._finished = False
return
self._finished = True
break

for row in records:
self._record_writer.write_record(row)

if count == self._record_writer._maxresultrows:
self._finished = False
else:
self._finished = True

def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout, allow_empty_input=True):
""" Process data.
Expand Down
3 changes: 3 additions & 0 deletions splunklib/searchcommands/internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ def __init__(self, ofile, maxresultrows=None):
self._chunk_count = 0
self._pending_record_count = 0
self._committed_record_count = 0
self.custom_fields = set()

@property
def is_flushed(self):
Expand Down Expand Up @@ -572,6 +573,7 @@ def write_record(self, record):

def write_records(self, records):
self._ensure_validity()
records = list(records)
write_record = self._write_record
for record in records:
write_record(record)
Expand All @@ -593,6 +595,7 @@ def _write_record(self, record):

if fieldnames is None:
self._fieldnames = fieldnames = list(record.keys())
self._fieldnames.extend([i for i in self.custom_fields if i not in self._fieldnames])
value_list = imap(lambda fn: (str(fn), str('__mv_') + str(fn)), fieldnames)
self._writerow(list(chain.from_iterable(value_list)))

Expand Down
8 changes: 8 additions & 0 deletions splunklib/searchcommands/search_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ def logging_level(self, value):
raise ValueError('Unrecognized logging level: {}'.format(value))
self._logger.setLevel(level)

def add_field(self, current_record, field_name, field_value):
self._record_writer.custom_fields.add(field_name)
current_record[field_name] = field_value
Comment on lines +176 to +178
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should return current_record or else the documentation is incorrect.

Suggested change
def add_field(self, current_record, field_name, field_value):
self._record_writer.custom_fields.add(field_name)
current_record[field_name] = field_value
def add_field(self, current_record, field_name, field_value):
self._record_writer.custom_fields.add(field_name)
current_record[field_name] = field_value
return current_record

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @Bre77 - I think in this case the documentation needs to be updated, the current_record should be updated with the new field without the need to return since it's essentially pass-by-reference for this usage (I always go back to this article when I need to refresh my memory: https://robertheaton.com/2014/02/09/pythons-pass-by-object-reference-as-explained-by-philip-k-dick/).

So if you all agree can we fix the README.md above to the following?

class CustomStreamingCommand(StreamingCommand):
    def stream(self, records):
        for index, record in enumerate(records):
            if index % 1 == 0:
                self.add_field(record, "odd_record", "true")
            yield record

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And to make sure that example we provide is working ^ that's probably a good implementation to add when adding tests for the StreamingCommand / add_field case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed my implementation to using pass by reference (as per the suggested doco update) and everything works as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @Bre77
I've updated the README accordingly.


def gen_record(self, **record):
self._record_writer.custom_fields |= set(record.keys())
return record

record = Option(doc='''
**Syntax: record=<bool>

Expand Down
27 changes: 26 additions & 1 deletion tests/searchcommands/test_generator_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def generate(self):
assert expected.issubset(seen)
assert finished_seen


def test_allow_empty_input_for_generating_command():
"""
Passing allow_empty_input for generating command will cause an error
Expand All @@ -59,3 +58,29 @@ def generate(self):
except ValueError as error:
assert str(error) == "allow_empty_input cannot be False for Generating Commands"

def test_all_fieldnames_present_for_generated_records():
@Configuration()
class GeneratorTest(GeneratingCommand):
def generate(self):
yield self.gen_record(_time=time.time(), one=1)
yield self.gen_record(_time=time.time(), two=2)
yield self.gen_record(_time=time.time(), three=3)
yield self.gen_record(_time=time.time(), four=4)
yield self.gen_record(_time=time.time(), five=5)

generator = GeneratorTest()
in_stream = io.BytesIO()
in_stream.write(chunky.build_getinfo_chunk())
in_stream.write(chunky.build_chunk({'action': 'execute'}))
in_stream.seek(0)
out_stream = io.BytesIO()
generator._process_protocol_v2([], in_stream, out_stream)
out_stream.seek(0)

ds = chunky.ChunkedDataStream(out_stream)
fieldnames_expected = {'_time', 'one', 'two', 'three', 'four', 'five'}
fieldnames_actual = set()
for chunk in ds:
for row in chunk.data:
fieldnames_actual |= set(row.keys())
assert fieldnames_expected.issubset(fieldnames_actual)
2 changes: 2 additions & 0 deletions tests/searchcommands/test_internals_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ def test_record_writer_with_random_data(self, save_recording=False):
self.assertGreater(writer._buffer.tell(), 0)
self.assertEqual(writer._total_record_count, 0)
self.assertEqual(writer.committed_record_count, 0)
fieldnames.sort()
writer._fieldnames.sort()
self.assertListEqual(writer._fieldnames, fieldnames)
self.assertListEqual(writer._inspector['messages'], messages)

Expand Down
68 changes: 68 additions & 0 deletions tests/searchcommands/test_streaming_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,71 @@ def stream(self, records):
output = chunky.ChunkedDataStream(ofile)
getinfo_response = output.read_chunk()
assert getinfo_response.meta["type"] == "streaming"


def test_field_preservation_negative():
@Configuration()
class TestStreamingCommand(StreamingCommand):

def stream(self, records):
for index, record in enumerate(records):
if index % 2 != 0:
record["odd_field"] = True
else:
record["even_field"] = True
yield record

cmd = TestStreamingCommand()
ifile = io.BytesIO()
ifile.write(chunky.build_getinfo_chunk())
data = list()
for i in range(0, 10):
data.append({"in_index": str(i)})
ifile.write(chunky.build_data_chunk(data, finished=True))
ifile.seek(0)
ofile = io.BytesIO()
cmd._process_protocol_v2([], ifile, ofile)
ofile.seek(0)
output_iter = chunky.ChunkedDataStream(ofile).__iter__()
output_iter.next()
output_records = [i for i in output_iter.next().data]

# Assert that count of records having "odd_field" is 0
assert len(list(filter(lambda r: "odd_field" in r, output_records))) == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice demonstration of the issue!


# Assert that count of records having "even_field" is 10
assert len(list(filter(lambda r: "even_field" in r, output_records))) == 10


def test_field_preservation_positive():
@Configuration()
class TestStreamingCommand(StreamingCommand):

def stream(self, records):
for index, record in enumerate(records):
if index % 2 != 0:
self.add_field(record, "odd_field", True)
else:
self.add_field(record, "even_field", True)
yield record

cmd = TestStreamingCommand()
ifile = io.BytesIO()
ifile.write(chunky.build_getinfo_chunk())
data = list()
for i in range(0, 10):
data.append({"in_index": str(i)})
ifile.write(chunky.build_data_chunk(data, finished=True))
ifile.seek(0)
ofile = io.BytesIO()
cmd._process_protocol_v2([], ifile, ofile)
ofile.seek(0)
output_iter = chunky.ChunkedDataStream(ofile).__iter__()
output_iter.next()
output_records = [i for i in output_iter.next().data]

# Assert that count of records having "odd_field" is 10
assert len(list(filter(lambda r: "odd_field" in r, output_records))) == 10

# Assert that count of records having "even_field" is 10
assert len(list(filter(lambda r: "even_field" in r, output_records))) == 10