Skip to content

Added support to process empty records #402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions splunklib/searchcommands/generating_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# under the License.

from __future__ import absolute_import, division, print_function, unicode_literals
import sys

from .decorators import ConfigurationSetting
from .search_command import SearchCommand
Expand Down Expand Up @@ -220,6 +221,35 @@ def _execute_chunk_v2(self, process, chunk):
return
self._finished = True

def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout, allow_empty_input=True):
""" Process data.

:param argv: Command line arguments.
:type argv: list or tuple

:param ifile: Input data file.
:type ifile: file

:param ofile: Output data file.
:type ofile: file

:param allow_empty_input: For generating commands, it must be true. Doing otherwise will cause an error.
:type allow_empty_input: bool

:return: :const:`None`
:rtype: NoneType

"""

# Generating commands are expected to run on an empty set of inputs as the first command being run in a search,
# also this class implements its own separate _execute_chunk_v2 method which does not respect allow_empty_input
# so ensure that allow_empty_input is always True

if not allow_empty_input:
raise ValueError("allow_empty_input cannot be False for Generating Commands")
else:
return super(GeneratingCommand, self).process(argv=argv, ifile=ifile, ofile=ofile, allow_empty_input=True)

# endregion

# region Types
Expand Down
23 changes: 16 additions & 7 deletions splunklib/searchcommands/search_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def __init__(self):
self._default_logging_level = self._logger.level
self._record_writer = None
self._records = None
self._allow_empty_input = True

def __str__(self):
text = ' '.join(chain((type(self).name, str(self.options)), [] if self.fieldnames is None else self.fieldnames))
Expand Down Expand Up @@ -413,7 +414,7 @@ def prepare(self):
"""
pass

def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout):
def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout, allow_empty_input=True):
""" Process data.

:param argv: Command line arguments.
Expand All @@ -425,10 +426,16 @@ def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout):
:param ofile: Output data file.
:type ofile: file

:param allow_empty_input: Allow empty input records for the command, if False an Error will be returned if empty chunk body is encountered when read
:type allow_empty_input: bool

:return: :const:`None`
:rtype: NoneType

"""

self._allow_empty_input = allow_empty_input

if len(argv) > 1:
self._process_protocol_v1(argv, ifile, ofile)
else:
Expand Down Expand Up @@ -965,13 +972,14 @@ def _execute_v2(self, ifile, process):
def _execute_chunk_v2(self, process, chunk):
metadata, body = chunk

if len(body) <= 0:
return
if len(body) <= 0 and not self._allow_empty_input:
raise ValueError(
"No records found to process. Set allow_empty_input=True in dispatch function to move forward "
"with empty records.")

records = self._read_csv_records(StringIO(body))
self._record_writer.write_records(process(records))


def _report_unexpected_error(self):

error_type, error, tb = sys.exc_info()
Expand Down Expand Up @@ -1063,8 +1071,7 @@ def iteritems(self):
SearchMetric = namedtuple('SearchMetric', ('elapsed_seconds', 'invocation_count', 'input_count', 'output_count'))



def dispatch(command_class, argv=sys.argv, input_file=sys.stdin, output_file=sys.stdout, module_name=None):
def dispatch(command_class, argv=sys.argv, input_file=sys.stdin, output_file=sys.stdout, module_name=None, allow_empty_input=True):
""" Instantiates and executes a search command class

This function implements a `conditional script stanza <https://docs.python.org/2/library/__main__.html>`_ based on the value of
Expand All @@ -1087,6 +1094,8 @@ def dispatch(command_class, argv=sys.argv, input_file=sys.stdin, output_file=sys
:type output_file: :code:`file`
:param module_name: Name of the module calling :code:`dispatch` or :const:`None`.
:type module_name: :code:`basestring`
:param allow_empty_input: Allow empty input records for the command, if False an Error will be returned if empty chunk body is encountered when read
:type allow_empty_input: bool
:returns: :const:`None`

**Example**
Expand Down Expand Up @@ -1124,4 +1133,4 @@ def stream(records):
assert issubclass(command_class, SearchCommand)

if module_name is None or module_name == '__main__':
command_class().process(argv, input_file, output_file)
command_class().process(argv, input_file, output_file, allow_empty_input)
17 changes: 17 additions & 0 deletions tests/searchcommands/test_generator_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,21 @@ def generate(self):
assert finished_seen


def test_allow_empty_input_for_generating_command():
"""
Passing allow_empty_input for generating command will cause an error
"""
@Configuration()
class GeneratorTest(GeneratingCommand):
def generate(self):
for num in range(1, 3):
yield {"_index": num}
generator = GeneratorTest()
in_stream = io.BytesIO()
out_stream = io.BytesIO()

try:
generator.process([], in_stream, out_stream, allow_empty_input=False)
except ValueError as error:
assert str(error) == "allow_empty_input cannot be False for Generating Commands"

56 changes: 56 additions & 0 deletions tests/searchcommands/test_search_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,62 @@ def test_process_scpv2(self):
r'\{(' + inspector + r',' + finished + r'|' + finished + r',' + inspector + r')\}')

self.assertEqual(command.protocol_version, 2)

# 5. Different scenarios with allow_empty_input flag, default is True
# Test preparation
dispatch_dir = os.path.join(basedir, 'recordings', 'scpv2', 'Splunk-6.3', 'countmatches.dispatch_dir')
logging_configuration = os.path.join(basedir, 'apps', 'app_with_logging_configuration', 'logging.conf')
logging_level = 'ERROR'
record = False
show_configuration = True

getinfo_metadata = metadata.format(
dispatch_dir=encode_string(dispatch_dir),
logging_configuration=encode_string(logging_configuration)[1:-1],
logging_level=logging_level,
record=('true' if record is True else 'false'),
show_configuration=('true' if show_configuration is True else 'false'))

execute_metadata = '{"action":"execute","finished":true}'
command = TestCommand()
result = BytesIO()
argv = ['some-external-search-command.py']

# Scenario a) Empty body & allow_empty_input=False ==> Assert Error

execute_body = '' # Empty body
input_file = build_command_input(getinfo_metadata, execute_metadata, execute_body)
try:
command.process(argv, input_file, ofile=result, allow_empty_input=False) # allow_empty_input=False
except SystemExit as error:
self.assertNotEqual(0, error.code)
self.assertTrue(result.getvalue().decode("UTF-8").__contains__("No records found to process. Set "
"allow_empty_input=True in dispatch "
"function to move forward with empty "
"records."))
else:
self.fail('Expected SystemExit, not a return from TestCommand.process: {}\n'.format(
result.getvalue().decode('utf-8')))

# Scenario b) Empty body & allow_empty_input=True ==> Assert Success

execute_body = '' # Empty body
input_file = build_command_input(getinfo_metadata, execute_metadata, execute_body)
result = BytesIO()

try:
command.process(argv, input_file, ofile=result) # By default allow_empty_input=True
except SystemExit as error:
self.fail('Unexpected exception: {}: {}'.format(type(error).__name__, error))

expected = (
'chunked 1.0,68,0\n'
'{"inspector":{"messages":[["INFO","test command configuration: "]]}}\n'
'chunked 1.0,17,0\n'
'{"finished":true}'
)

self.assertEquals(result.getvalue().decode("UTF-8"), expected)
return

_package_directory = os.path.dirname(os.path.abspath(__file__))
Expand Down