|
4 | 4 | import java.util.List;
|
5 | 5 |
|
6 | 6 | /**
|
7 |
| - * Common base class for all Splunk Input types. |
8 |
| - * Currently just has shared logic for queuing up events. |
| 7 | + * Common base class for all Splunk Input types. Currently just has shared logic |
| 8 | + * for queuing up events. |
9 | 9 | *
|
10 | 10 | * @author Damien Dallimore damien@dtdsoftware.com
|
11 |
| - * |
| 11 | + * |
12 | 12 | */
|
13 | 13 | public abstract class SplunkInput {
|
14 | 14 |
|
15 |
| - //data size multipliers |
| 15 | + // data size multipliers |
16 | 16 | private static final int KB = 1024;
|
17 |
| - private static final int MB = KB*1024; |
18 |
| - private static final int GB = MB*1024; |
19 |
| - |
20 |
| - //default to 500K |
21 |
| - private long maxQueueSize = 500*KB; |
22 |
| - //default. If true,queue will get emptied when it fills up to accommodate new data. |
| 17 | + private static final int MB = KB * 1024; |
| 18 | + private static final int GB = MB * 1024; |
| 19 | + |
| 20 | + // default to 500K |
| 21 | + private long maxQueueSize = 500 * KB; |
| 22 | + // default. If true,queue will get emptied when it fills up to accommodate |
| 23 | + // new data. |
23 | 24 | private boolean dropEventsOnQueueFull = false;
|
24 |
| - |
25 |
| - //Using this collection structure to implement the FIFO queue |
26 |
| - private List <String>queue = new ArrayList<String>(); |
27 |
| - |
| 25 | + |
| 26 | + // Using this collection structure to implement the FIFO queue |
| 27 | + private List<String> queue = new ArrayList<String>(); |
| 28 | + |
28 | 29 | private long currentQueueSizeInBytes = 0;
|
29 |
| - |
| 30 | + |
30 | 31 | /**
|
31 |
| - * Add an event to the tail of the FIFO queue subject to there being capacity |
| 32 | + * Add an event to the tail of the FIFO queue subject to there being |
| 33 | + * capacity |
| 34 | + * |
32 | 35 | * @param event
|
33 | 36 | */
|
34 |
| - protected void enqueue(String event){ |
35 |
| - |
| 37 | + protected void enqueue(String event) { |
| 38 | + |
36 | 39 | long eventSize = event.getBytes().length;
|
37 |
| - |
38 |
| - if(queueHasCapacity(eventSize)){ |
39 |
| - queue.add(event); |
40 |
| - currentQueueSizeInBytes += eventSize; |
41 |
| - } |
42 |
| - else if(dropEventsOnQueueFull){ |
| 40 | + |
| 41 | + if (queueHasCapacity(eventSize)) { |
| 42 | + queue.add(event); |
| 43 | + currentQueueSizeInBytes += eventSize; |
| 44 | + } else if (dropEventsOnQueueFull) { |
43 | 45 | queue.clear();
|
44 | 46 | queue.add(event);
|
45 | 47 | currentQueueSizeInBytes = eventSize;
|
46 |
| - |
47 |
| - } |
48 |
| - else{ |
49 |
| - //bummer , queue is full up |
50 |
| - |
| 48 | + |
| 49 | + } else { |
| 50 | + // bummer , queue is full up |
| 51 | + |
51 | 52 | }
|
52 | 53 | }
|
53 |
| - |
| 54 | + |
54 | 55 | /**
|
55 | 56 | * True if the queue has capacity for adding an event of the given size
|
| 57 | + * |
56 | 58 | * @param eventSize
|
57 | 59 | * @return
|
58 | 60 | */
|
59 | 61 | private boolean queueHasCapacity(long eventSize) {
|
60 |
| - |
61 |
| - return (currentQueueSizeInBytes+eventSize) <= maxQueueSize; |
| 62 | + |
| 63 | + return (currentQueueSizeInBytes + eventSize) <= maxQueueSize; |
62 | 64 | }
|
63 | 65 |
|
64 | 66 | /**
|
65 | 67 | * True if there are pending events in the queue
|
| 68 | + * |
66 | 69 | * @return
|
67 | 70 | */
|
68 |
| - protected boolean queueContainsEvents(){ |
| 71 | + protected boolean queueContainsEvents() { |
69 | 72 | return !queue.isEmpty();
|
70 | 73 | }
|
71 |
| - |
| 74 | + |
72 | 75 | /**
|
73 |
| - * Remove an event from the head of the FIFO queue or null if there are no items in the queue |
| 76 | + * Remove an event from the head of the FIFO queue or null if there are no |
| 77 | + * items in the queue |
| 78 | + * |
74 | 79 | * @return
|
75 | 80 | */
|
76 |
| - protected String dequeue(){ |
77 |
| - |
78 |
| - if(queueContainsEvents()){ |
79 |
| - String event = queue.remove(0); |
80 |
| - currentQueueSizeInBytes -= event.getBytes().length; |
81 |
| - if(currentQueueSizeInBytes < 0){ |
82 |
| - currentQueueSizeInBytes = 0; |
83 |
| - } |
84 |
| - return event; |
| 81 | + protected String dequeue() { |
| 82 | + |
| 83 | + if (queueContainsEvents()) { |
| 84 | + String event = queue.remove(0); |
| 85 | + currentQueueSizeInBytes -= event.getBytes().length; |
| 86 | + if (currentQueueSizeInBytes < 0) { |
| 87 | + currentQueueSizeInBytes = 0; |
| 88 | + } |
| 89 | + return event; |
85 | 90 | }
|
86 | 91 | return null;
|
87 | 92 | }
|
88 |
| - |
| 93 | + |
89 | 94 | /**
|
90 |
| - * Set the queue size from the configured property String value. |
91 |
| - * If parsing fails , the default of 500KB will be used. |
| 95 | + * Set the queue size from the configured property String value. If parsing |
| 96 | + * fails , the default of 500KB will be used. |
92 | 97 | *
|
93 |
| - * @param rawProperty in format [<integer>|<integer>[KB|MB|GB]] |
| 98 | + * @param rawProperty |
| 99 | + * in format [<integer>|<integer>[KB|MB|GB]] |
94 | 100 | */
|
95 |
| - public void setMaxQueueSize(String rawProperty) { |
96 |
| - |
| 101 | + public void setMaxQueueSize(String rawProperty) { |
| 102 | + |
97 | 103 | int multiplier;
|
98 | 104 | int factor;
|
99 |
| - |
100 |
| - if(rawProperty.endsWith("KB")){ |
| 105 | + |
| 106 | + if (rawProperty.endsWith("KB")) { |
101 | 107 | multiplier = KB;
|
102 |
| - } |
103 |
| - else if(rawProperty.endsWith("MB")){ |
| 108 | + } else if (rawProperty.endsWith("MB")) { |
104 | 109 | multiplier = MB;
|
105 |
| - } |
106 |
| - else if(rawProperty.endsWith("GB")){ |
| 110 | + } else if (rawProperty.endsWith("GB")) { |
107 | 111 | multiplier = GB;
|
108 |
| - } |
109 |
| - else{ |
| 112 | + } else { |
110 | 113 | return;
|
111 | 114 | }
|
112 | 115 | try {
|
113 |
| - factor = Integer.parseInt(rawProperty.substring(0,rawProperty.length()-2)); |
| 116 | + factor = Integer.parseInt(rawProperty.substring(0, |
| 117 | + rawProperty.length() - 2)); |
114 | 118 | } catch (NumberFormatException e) {
|
115 | 119 | return;
|
116 | 120 | }
|
117 |
| - setMaxQueueSize(factor*multiplier); |
118 |
| - |
| 121 | + setMaxQueueSize(factor * multiplier); |
| 122 | + |
119 | 123 | }
|
120 |
| - |
| 124 | + |
121 | 125 | public long getMaxQueueSize() {
|
122 | 126 | return maxQueueSize;
|
123 | 127 | }
|
| 128 | + |
124 | 129 | /**
|
125 | 130 | * Max queue size in bytes
|
| 131 | + * |
126 | 132 | * @param maxQueueSize
|
127 | 133 | */
|
128 | 134 | public void setMaxQueueSize(long maxQueueSize) {
|
129 | 135 | this.maxQueueSize = maxQueueSize;
|
130 | 136 | }
|
| 137 | + |
131 | 138 | public boolean isDropEventsOnQueueFull() {
|
132 | 139 | return dropEventsOnQueueFull;
|
133 | 140 | }
|
| 141 | + |
134 | 142 | /**
|
135 | 143 | * If true,queue will get emptied when it fills up to accommodate new data.
|
| 144 | + * |
136 | 145 | * @param dropEventsOnQueueFull
|
137 | 146 | */
|
138 | 147 | public void setDropEventsOnQueueFull(boolean dropEventsOnQueueFull) {
|
139 | 148 | this.dropEventsOnQueueFull = dropEventsOnQueueFull;
|
140 | 149 | }
|
141 |
| - |
142 |
| - |
| 150 | + |
143 | 151 | }
|
0 commit comments