kafka

This input will read events from a Kafka topic. It uses the high level consumer API provided by Kafka to read messages from the broker. It also maintains the state of what has been consumed using Zookeeper. The default input codec is json

The only required configuration is the topic name. By default it will connect to a Zookeeper running on localhost. All the broker information is read from Zookeeper state

Ideally you should have as many threads as the number of partitions for a perfect balance — more threads than partitions means that some threads will be idle

For more information see http://kafka.apache.org/documentation.html#theconsumer

Kafka consumer configuration: http://kafka.apache.org/documentation.html#consumerconfigs

 

Synopsis

This plugin supports the following configuration options:

Required configuration options:

kafka {
    topic_id => ...
}

Available configuration options:

Details

 

add_field

  • Value type is hash
  • Default value is {}

Add a field to an event

charset (DEPRECATED)

  • DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
  • Value can be any of: ASCII-8BIT, Big5, Big5-HKSCS, Big5-UAO, CP949, Emacs-Mule, EUC-JP, EUC-KR, EUC-TW, GB18030, GBK, ISO-8859-1, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, ISO-8859-10, ISO-8859-11, ISO-8859-13, ISO-8859-14, ISO-8859-15, ISO-8859-16, KOI8-R, KOI8-U, Shift_JIS, US-ASCII, UTF-8, UTF-16BE, UTF-16LE, UTF-32BE, UTF-32LE, Windows-1251, GB2312, IBM437, IBM737, IBM775, CP850, IBM852, CP852, IBM855, CP855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM869, Windows-1258, GB1988, macCentEuro, macCroatian, macCyrillic, macGreek, macIceland, macRoman, macRomania, macThai, macTurkish, macUkraine, CP950, CP951, stateless-ISO-2022-JP, eucJP-ms, CP51932, GB12345, ISO-2022-JP, ISO-2022-JP-2, CP50220, CP50221, Windows-1252, Windows-1250, Windows-1256, Windows-1253, Windows-1255, Windows-1254, TIS-620, Windows-874, Windows-1257, Windows-31J, MacJapanese, UTF-7, UTF8-MAC, UTF-16, UTF-32, UTF8-DoCoMo, SJIS-DoCoMo, UTF8-KDDI, SJIS-KDDI, ISO-2022-JP-KDDI, stateless-ISO-2022-JP-KDDI, UTF8-SoftBank, SJIS-SoftBank, BINARY, CP437, CP737, CP775, IBM850, CP857, CP860, CP861, CP862, CP863, CP864, CP865, CP866, CP869, CP1258, Big5-HKSCS:2008, eucJP, euc-jp-ms, eucKR, eucTW, EUC-CN, eucCN, CP936, ISO2022-JP, ISO2022-JP2, ISO8859-1, CP1252, ISO8859-2, CP1250, ISO8859-3, ISO8859-4, ISO8859-5, ISO8859-6, CP1256, ISO8859-7, CP1253, ISO8859-8, CP1255, ISO8859-9, CP1254, ISO8859-10, ISO8859-11, CP874, ISO8859-13, CP1257, ISO8859-14, ISO8859-15, ISO8859-16, CP878, CP932, csWindows31J, SJIS, PCK, MacJapan, ASCII, ANSI_X3.4-1968, 646, CP65000, CP65001, UTF-8-MAC, UTF-8-HFS, UCS-2BE, UCS-4BE, UCS-4LE, CP1251, external, locale
  • There is no default value for this setting.

The character encoding used in this input. Examples include UTF-8 and cp1252

This setting is useful if your log files are in Latin-1 (aka cp1252) or in another character set other than UTF-8.

This only affects plain format logs since json is UTF-8 already.

codec

  • Value type is codec
  • Default value is "json"

The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.

consumer_id

  • Value type is string
  • Default value is nil

A unique id for the consumer; generated automatically if not set.

consumer_restart_on_error

  • Value type is boolean
  • Default value is true

Option to restart the consumer loop on error

consumer_restart_sleep_ms

  • Value type is number
  • Default value is 0

Time in millis to wait for consumer to restart after an error

consumer_threads

  • Value type is number
  • Default value is 1

Number of threads to read from the partitions. Ideally you should have as many threads as the number of partitions for a perfect balance. More threads than partitions means that some threads will be idle. Less threads means a single thread could be consuming from more than one partition

consumer_timeout_ms

  • Value type is number
  • Default value is -1

Throw a timeout exception to the consumer if no message is available for consumption after the specified interval

debug (DEPRECATED)

  • DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
  • Value type is boolean
  • Default value is false

decorate_events

  • Value type is boolean
  • Default value is false

Option to add Kafka metadata like topic, message size to the event

fetch_message_max_bytes

  • Value type is number
  • Default value is 1048576

The number of byes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch.

format (DEPRECATED)

  • DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
  • Value can be any of: plain, json, json_event, msgpack_event
  • There is no default value for this setting.

The format of input data (plain, json, json_event)

group_id

  • Value type is string
  • Default value is "logstash"

A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.

message_format (DEPRECATED)

  • DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
  • Value type is string
  • There is no default value for this setting.

If format is json, an event sprintf string to build what the display @message should be given (defaults to the raw JSON). sprintf format strings look like %{fieldname}

If format is json_event, ALL fields except for @type are expected to be present. Not receiving all fields will cause unexpected results.

queue_size

  • Value type is number
  • Default value is 20

Internal Logstash queue size used to hold events in memory after it has been read from Kafka

rebalance_backoff_ms

  • Value type is number
  • Default value is 2000

Backoff time between retries during rebalance.

rebalance_max_retries

  • Value type is number
  • Default value is 4

When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up.

reset_beginning

  • Value type is boolean
  • Default value is false

Specify whether to jump to beginning of the queue when there is no initial offset in ZooKeeper, or if an offset is out of range. If this is false, messages are consumed from the latest offset

If reset_beginning is true, the consumer will check ZooKeeper to see if any other group members are present and active. If not, the consumer deletes any offset information in the ZooKeeper and starts at the smallest offset. If other group members are present reset_beginning will not work and the consumer threads will rejoin the consumer group.

tags

  • Value type is array
  • There is no default value for this setting.

Add any number of arbitrary tags to your event.

This can help with processing later.

topic_id

  • This is a required setting.
  • Value type is string
  • There is no default value for this setting.

The topic to consume messages from

type

  • Value type is string
  • There is no default value for this setting.

Add a type field to all events handled by this input.

Types are used mainly for filter activation.

The type is stored as part of the event itself, so you can also use the type to search for it in the web interface.

If you try to set a type on an event that already has one (for example when you send an event from a shipper to an indexer) then a new input will not override the existing type. A type set at the shipper stays with that event for its life even when sent to another Logstash server.

zk_connect

  • Value type is string
  • Default value is "localhost:2181"

Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server. You can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.

The server may also have a ZooKeeper chroot path as part of it’s ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in its connection string. For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.