|
| 1 | +# Pipeline |
| 2 | + |
| 3 | +Pipeline is an entity which handles data. It consists of input plugin, list of action plugins and output plugin. The input plugin sends the data to `pipeline.In` controller. There the data is validated, if the data is empty, it is discarded, the data size is also checked, the behaviour for the long logs is defined by `cut_off_event_by_limit` setting. Then the data is checked in `antispam` if it is enabled. After all checks are passed the data is converted to the `Event` structure, the events are limited by the `EventPool`, and decoded depending on the [pipeline settings](#settings). The event is sent to stream which are handled with `processors`. In the processors the event is passed through the list of action plugins and sent to the output plugin. Output plugin commits the `Event` by calling `pipeline.Commit` function and after the commit is finished the data is considered as processed. More details and architecture is presented in [architecture page](/docs/architecture.md). |
| 4 | + |
| 5 | +## Settings |
| 6 | + |
| 7 | +**`capacity`** *`int`* *`default=1024`* |
| 8 | + |
| 9 | +Capacity of the `EventPool`. There can only be processed no more than `capacity` events at the same time. It can be considered as one of the rate limiting tools, but its primary role is to control the amount of RAM used by File.d. |
| 10 | + |
| 11 | +<br> |
| 12 | + |
| 13 | +**`avg_log_size`** *`int`* *`default=4096`* |
| 14 | + |
| 15 | +Expected average size of the input logs in bytes. Used in standard event pool to release buffer memory when its size exceeds this value. |
| 16 | + |
| 17 | +<br> |
| 18 | + |
| 19 | +**`max_event_size`** *`int`* *`default=0`* |
| 20 | + |
| 21 | +Maximum allowed size of the input logs in bytes. If set to 0, logs of any size are allowed. If set to the value greater than 0, logs with size greater than `max_event_size` are discarded unless `cut_off_event_by_limit` is set to `true`. |
| 22 | + |
| 23 | +<br> |
| 24 | + |
| 25 | +**`cut_off_event_by_limit`** *`bool`* *`default=false`* |
| 26 | + |
| 27 | +Flag indicating whether to cut logs which have exceeded the `max_event_size`. If set to `true` huge logs are cut and only the first `max_event_size` bytes of the logs are passed further. If set to `false` huge logs are discarded. Only works if `max_event_size` is greater than 0, otherwise does nothing. Useful when there are huge logs which affect the logging system but it is prefferable to deliver them at least partially. |
| 28 | + |
| 29 | +<br> |
| 30 | + |
| 31 | +**`cut_off_event_by_limit_field`** *`string`* |
| 32 | + |
| 33 | +Field to add to log if it was cut by `max_event_size`. E.g. with `cut_off_event_by_limit_field: _cropped`, if the log was cut, the output event will have field `"_cropped":true`. Only works if `cut_off_event_by_limit` is set to `true` and `max_event_size` is greater than 0. Useful for marking cut logs. |
| 34 | + |
| 35 | +<br> |
| 36 | + |
| 37 | +**`decoder`** *`string`* *`default=auto`* |
| 38 | + |
| 39 | +Which decoder to use on every log from input plugin. Defaults to `auto` meaning the usage of the decoder suggested by the input plugin. Currently most of the time `json` decoder is suggested, the only exception is [k8s input plugin](/plugin/input/k8s/README.md) with CRI type not docker, in that case `cri` decoder is suggested. The full list of the decoders is available on the [decoders page](/decoder/readme.md). |
| 40 | + |
| 41 | +<br> |
| 42 | + |
| 43 | +**`decoder_params`** *`map[string]any`* |
| 44 | + |
| 45 | +Additional parameters for the chosen decoder. The params list varies. It can be found on the [decoders page](/decoder/readme.md) for each of them. |
| 46 | + |
| 47 | +<br> |
| 48 | + |
| 49 | +**`stream_field`** *`string`* *`default=stream`* |
| 50 | + |
| 51 | +Which field in the log indicates `stream`. Mostly used for distinguishing `stdout` from `stderr` in k8s logs. |
| 52 | + |
| 53 | +<br> |
| 54 | + |
| 55 | +**`maintenance_interval`** *`string`* *`default=5s`* |
| 56 | + |
| 57 | +How often to perform maintenance. Maintenance includes antispammer maintenance and metric cleanup, metric holder maintenance, increasing basic pipeline metrics with accumulated deltas, logging pipeline stats. The value must be passed in format of duration (`<number>(ms|s|m|h)`). |
| 58 | + |
| 59 | +<br> |
| 60 | + |
| 61 | +**`event_timeout`** *`bool`* *`default=30s`* |
| 62 | + |
| 63 | +How long the event can process in action plugins and block stream in streamer until it is marked as a timeout event and unlocks stream so that the whole pipeline does not get stuck. The value must be passed in format of duration (`<number>(ms|s|m|h)`). |
| 64 | + |
| 65 | +<br> |
| 66 | + |
| 67 | +**`antispam_threshold`** *`int`* *`default=0`* |
| 68 | + |
| 69 | +Threshold value for the [antispammer](/pipeline/antispam/README.md#antispammer) to ban sources. If set to 0 antispammer is disabled. If set to the value greater than 0 antispammer is enabled and bans sources which write `antispam_threshold` or more logs in `maintenance_interval` time. |
| 70 | + |
| 71 | +<br> |
| 72 | + |
| 73 | +**`antispam_exceptions`** *`[]`[antispam.Exception](/pipeline/antispam/README.md#exception-parameters)* |
| 74 | + |
| 75 | +The list of antispammer exceptions. If the log matches at least one of the exceptions it is not accounted in antispammer. |
| 76 | + |
| 77 | +<br> |
| 78 | + |
| 79 | +**`meta_cache_size`** *`int`* *`default=1024`* |
| 80 | + |
| 81 | +Amount of entries in metadata cache. |
| 82 | + |
| 83 | +<br> |
| 84 | + |
| 85 | +**`source_name_meta_field`** *`string`* |
| 86 | + |
| 87 | +The metadata field used to retrieve the name or origin of a data source. You can use it for antispam. Metadata is configured via `meta` parameter in input plugin. For example: |
| 88 | + |
| 89 | +```yaml |
| 90 | +input: |
| 91 | + type: k8s |
| 92 | + meta: |
| 93 | + pod_namespace: '{{ .pod_name }}.{{ .namespace_name }}' |
| 94 | +pipeline: |
| 95 | + antispam_threshold: 2000 |
| 96 | + source_name_meta_field: pod_namespace |
| 97 | +``` |
| 98 | +
|
| 99 | +<br> |
| 100 | +
|
| 101 | +**`is_strict`** *`bool`* *`default=false`* |
| 102 | + |
| 103 | +Whether to fatal on decoding error. |
| 104 | + |
| 105 | +<br> |
| 106 | + |
| 107 | +**`metric_hold_duration`** *`string`* *`default=30m`* |
| 108 | + |
| 109 | +The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`<number>(ms|s|m|h)`). |
| 110 | + |
| 111 | +<br> |
| 112 | + |
| 113 | +**`pool`** *`string`* *`options=std|low_memory`* |
| 114 | + |
| 115 | +Type of `EventPool`. `std` pool is an original pool with the slice of `Event` pointers and slices of free events indicators. `low_memory` pool is a leveled pool based on multiple `sync.Pool` for the events of different size. The latter one is experimental. |
| 116 | + |
| 117 | +<br> |
| 118 | + |
| 119 | +## Datetime parse formats |
| 120 | + |
| 121 | +Most of the plugins which work with parsing datetime call `pipeline.ParseTime` function. It accepts datetime layouts the same way as Go `time.Parse` (in format of datetime like `2006-01-02T15:04:05.999999999Z07:00`) except unix timestamp formats, they can only be specified via aliases. |
| 122 | + |
| 123 | +For the comfort of use there are aliases to some datetime formats: |
| 124 | + |
| 125 | ++ `ansic` - Mon Jan _2 15:04:05 2006 |
| 126 | ++ `unixdate` - Mon Jan _2 15:04:05 MST 2006 |
| 127 | ++ `rubydate` - Mon Jan 02 15:04:05 -0700 2006 |
| 128 | ++ `rfc822` - 02 Jan 06 15:04 MST |
| 129 | ++ `rfc822z` - 02 Jan 06 15:04 -0700 |
| 130 | ++ `rfc850` - Monday, 02-Jan-06 15:04:05 MST |
| 131 | ++ `rfc1123` - Mon, 02 Jan 2006 15:04:05 MST |
| 132 | ++ `rfc1123z` - Mon, 02 Jan 2006 15:04:05 -0700 |
| 133 | ++ `rfc3339` - 2006-01-02T15:04:05Z07:00 |
| 134 | ++ `rfc3339nano` - 2006-01-02T15:04:05.999999999Z07:00 |
| 135 | ++ `kitchen` - 3:04PM |
| 136 | ++ `stamp` - Jan _2 15:04:05 |
| 137 | ++ `stampmilli` - Jan _2 15:04:05.000 |
| 138 | ++ `stampmicro` - Jan _2 15:04:05.000000 |
| 139 | ++ `stampnano` - Jan _2 15:04:05.000000000 |
| 140 | ++ `nginx_errorlog` - 2006/01/02 15:04:05 |
| 141 | ++ `unixtime` - unix timestamp in seconds: 1739959880 |
| 142 | ++ `unixtimemilli` - unix timestamp in milliseconds: 1739959880999 |
| 143 | ++ `unixtimemicro` - unix timestamp in microseconds: 1739959880999999 (e.g. `journalctl` writes timestamp in that format in `__REALTIME_TIMESTAMP` field when using json output format) |
| 144 | ++ `unixtimenano` - unix timestamp in nanoseconds: 1739959880999999999 |
| 145 | + |
| 146 | +**Note**: when using `unixtime(|milli|micro|nano)` if there is a float value its whole part is always considered as seconds and the fractional part is fractions of a second. |
| 147 | + |
1 | 148 | ## Match modes
|
| 149 | + |
| 150 | +> Note: consider using [DoIf match rules](/pipeline/doif/README.md) instead, since it is an advanced version of match modes. |
| 151 | + |
2 | 152 | #### And
|
3 | 153 | `match_mode: and` — matches fields with AND operator
|
4 | 154 |
|
|
0 commit comments