This input will read events from a Kafka topic. It uses the high level consumer API provided by Kafka to read messages from the broker. It also maintains the state of what has been consumed using Zookeeper. The default input codec is json
The only required configuration is the topic name. By default it will connect to a Zookeeper running on localhost. All the broker information is read from Zookeeper state
Ideally you should have as many threads as the number of partitions for a perfect balance — more threads than partitions means that some threads will be idle
For more information see http://kafka.apache.org/documentation.html#theconsumer
Kafka consumer configuration: http://kafka.apache.org/documentation.html#consumerconfigs
This plugin supports the following configuration options:
Required configuration options:
kafka { topic_id => ... }
Available configuration options:
Setting | Input type | Required | Default value |
---|---|---|---|
No |
| ||
No |
| ||
No |
| ||
No |
| ||
No |
| ||
No |
| ||
No |
| ||
No |
| ||
No |
| ||
No |
| ||
No |
| ||
No |
| ||
No |
| ||
No |
| ||
No | |||
Yes | |||
No | |||
No |
|
- DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
-
Value can be any of:
ASCII-8BIT
,Big5
,Big5-HKSCS
,Big5-UAO
,CP949
,Emacs-Mule
,EUC-JP
,EUC-KR
,EUC-TW
,GB18030
,GBK
,ISO-8859-1
,ISO-8859-2
,ISO-8859-3
,ISO-8859-4
,ISO-8859-5
,ISO-8859-6
,ISO-8859-7
,ISO-8859-8
,ISO-8859-9
,ISO-8859-10
,ISO-8859-11
,ISO-8859-13
,ISO-8859-14
,ISO-8859-15
,ISO-8859-16
,KOI8-R
,KOI8-U
,Shift_JIS
,US-ASCII
,UTF-8
,UTF-16BE
,UTF-16LE
,UTF-32BE
,UTF-32LE
,Windows-1251
,GB2312
,IBM437
,IBM737
,IBM775
,CP850
,IBM852
,CP852
,IBM855
,CP855
,IBM857
,IBM860
,IBM861
,IBM862
,IBM863
,IBM864
,IBM865
,IBM866
,IBM869
,Windows-1258
,GB1988
,macCentEuro
,macCroatian
,macCyrillic
,macGreek
,macIceland
,macRoman
,macRomania
,macThai
,macTurkish
,macUkraine
,CP950
,CP951
,stateless-ISO-2022-JP
,eucJP-ms
,CP51932
,GB12345
,ISO-2022-JP
,ISO-2022-JP-2
,CP50220
,CP50221
,Windows-1252
,Windows-1250
,Windows-1256
,Windows-1253
,Windows-1255
,Windows-1254
,TIS-620
,Windows-874
,Windows-1257
,Windows-31J
,MacJapanese
,UTF-7
,UTF8-MAC
,UTF-16
,UTF-32
,UTF8-DoCoMo
,SJIS-DoCoMo
,UTF8-KDDI
,SJIS-KDDI
,ISO-2022-JP-KDDI
,stateless-ISO-2022-JP-KDDI
,UTF8-SoftBank
,SJIS-SoftBank
,BINARY
,CP437
,CP737
,CP775
,IBM850
,CP857
,CP860
,CP861
,CP862
,CP863
,CP864
,CP865
,CP866
,CP869
,CP1258
,Big5-HKSCS:2008
,eucJP
,euc-jp-ms
,eucKR
,eucTW
,EUC-CN
,eucCN
,CP936
,ISO2022-JP
,ISO2022-JP2
,ISO8859-1
,CP1252
,ISO8859-2
,CP1250
,ISO8859-3
,ISO8859-4
,ISO8859-5
,ISO8859-6
,CP1256
,ISO8859-7
,CP1253
,ISO8859-8
,CP1255
,ISO8859-9
,CP1254
,ISO8859-10
,ISO8859-11
,CP874
,ISO8859-13
,CP1257
,ISO8859-14
,ISO8859-15
,ISO8859-16
,CP878
,CP932
,csWindows31J
,SJIS
,PCK
,MacJapan
,ASCII
,ANSI_X3.4-1968
,646
,CP65000
,CP65001
,UTF-8-MAC
,UTF-8-HFS
,UCS-2BE
,UCS-4BE
,UCS-4LE
,CP1251
,external
,locale
- There is no default value for this setting.
The character encoding used in this input. Examples include UTF-8
and cp1252
This setting is useful if your log files are in Latin-1
(aka cp1252
)
or in another character set other than UTF-8
.
This only affects plain
format logs since json is UTF-8
already.
- Value type is codec
-
Default value is
"json"
The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.
- Value type is string
-
Default value is
nil
A unique id for the consumer; generated automatically if not set.
- Value type is boolean
-
Default value is
true
Option to restart the consumer loop on error
- Value type is number
-
Default value is
0
Time in millis to wait for consumer to restart after an error
- Value type is number
-
Default value is
1
Number of threads to read from the partitions. Ideally you should have as many threads as the number of partitions for a perfect balance. More threads than partitions means that some threads will be idle. Less threads means a single thread could be consuming from more than one partition
- Value type is number
-
Default value is
-1
Throw a timeout exception to the consumer if no message is available for consumption after the specified interval
- DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
- Value type is boolean
-
Default value is
false
- Value type is boolean
-
Default value is
false
Option to add Kafka metadata like topic, message size to the event
- Value type is number
-
Default value is
1048576
The number of byes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch.
- DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
-
Value can be any of:
plain
,json
,json_event
,msgpack_event
- There is no default value for this setting.
The format of input data (plain, json, json_event)
- Value type is string
-
Default value is
"logstash"
A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.
- DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
- Value type is string
- There is no default value for this setting.
If format is json
, an event sprintf
string to build what
the display @message
should be given (defaults to the raw JSON).
sprintf
format strings look like %{fieldname}
If format is json_event
, ALL fields except for @type
are expected to be present. Not receiving all fields
will cause unexpected results.
- Value type is number
-
Default value is
20
Internal Logstash queue size used to hold events in memory after it has been read from Kafka
- Value type is number
-
Default value is
2000
Backoff time between retries during rebalance.
- Value type is number
-
Default value is
4
When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up.
- Value type is boolean
-
Default value is
false
Specify whether to jump to beginning of the queue when there is no initial offset in
ZooKeeper, or if an offset is out of range. If this is false
, messages are consumed
from the latest offset
If reset_beginning
is true, the consumer will check ZooKeeper to see if any other group members
are present and active. If not, the consumer deletes any offset information in the ZooKeeper
and starts at the smallest offset. If other group members are present reset_beginning
will not
work and the consumer threads will rejoin the consumer group.
- Value type is array
- There is no default value for this setting.
Add any number of arbitrary tags to your event.
This can help with processing later.
- This is a required setting.
- Value type is string
- There is no default value for this setting.
The topic to consume messages from
- Value type is string
- There is no default value for this setting.
Add a type
field to all events handled by this input.
Types are used mainly for filter activation.
The type is stored as part of the event itself, so you can also use the type to search for it in the web interface.
If you try to set a type on an event that already has one (for example when you send an event from a shipper to an indexer) then a new input will not override the existing type. A type set at the shipper stays with that event for its life even when sent to another Logstash server.
- Value type is string
-
Default value is
"localhost:2181"
Specifies the ZooKeeper connection string in the form hostname:port where host and port are
the host and port of a ZooKeeper server. You can also specify multiple hosts in the form
hostname1:port1,hostname2:port2,hostname3:port3
.
The server may also have a ZooKeeper chroot path as part of it’s ZooKeeper connection string
which puts its data under some path in the global ZooKeeper namespace. If so the consumer
should use the same chroot path in its connection string. For example to give a chroot path of
/chroot/path
you would give the connection string as
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
.