Pushing Relational Data to Elasticsearch using Logstash JDBC input plugin

I recently wanted to push existing relational data to Elasticsearch and analyse them using Kibana. Therefore, I tried different ways of doing it. Then I could find few ways of doing that.

  1. Using Logstash JDBC input plugin
  2. Using Kafka connect JDBC
  3. Using Elasticsearch JDBC input plugin

Here I will be discussing the use of Logstash JDBC input plugin to push data from an Oracle database to Elasticsearch. This plugin has been created as a way to ingest data in any database with a JDBC interface into Logstash. I have assumed that you have an Elasticsearch instance up and running. I will be doing this on Windows with Logstash 5.5.1 version.

Logstash is an open source, server-side data processing pipeline. We can use it to ingest data from multiple sources, transform it and send to the Elasticsearch. You can download Logstash here.

After downloading Logstash you will see a folder structure as below.

Now you have to create a config file to instruct the Logstash, the way data injection should happen. Let’s make a configuration file named logstash-config.conf.

Configuration file should contain an input section and an output section.

Following are the essential configuration options for input plugin section to access the database.

We have to specify the location of the jdbc driver library by jdbc_river_library option since the plugin does not contain jdbc drivers. If you use MySQL you have to use the MySQL jdbc drivers. I will be using the jdbc driver for the Oracle. Driver class is the class that is in charge of running other classes. When using Oracle, you have to specify “Java::” before the driver class.

In mysql, driver class is com.mysql.jdbc.Driver

In oracle, it is oracle.jdbc.driver.OracleDriver

Connection string is the URL and the port (with SID or Service Name) that runs the oracle database server.

Completed input section will look like following. We can use # to make comments inside the configuration file.

Above is the all required configurations in the input section to access the Oracle database.

Now the second part is the data injection to the Elasticsearch. Those configurations are specified in the output section of the config file.

Let’s say you have Elasticsearch running locally on the port 9200. Then the output configuration will be as follows.

Let’s say you want to push all the rows in the “People” table to Elasticsearch (You can use any other table you want in your database). This People table contains id (auto increment primary key), first name, last name, city, address and row_update_time columns. Following will be the full configuration.

What does the stdout section do inside the output section?

stdout is a output plugin in Logstash. It prints to the stdout of the shell running Logstash. We can use different codecs inside stdout like rubydebug. Rubydebug outputs event data using awesome_print library. It is used to visualise the structure of the data. We can also use the json codec in order to print event data using JSON format.

Now you can run the Logstash using command line in the current directory using following command.

bin\logstash -f logstash-config.conf

Then, something like following output can be seen on the command line along with the executed query (select * from People, in my example).

But here, only the rows that are present at the time this gets run, will be sent to the Elasticsearch. Let’s say People table is changed after pushing data to Elasticsearch. Those will not present at the Elasticsearch. But usually we want to have the updated version of data at the Elasticsearch.

Therefore, what are the solutions for that?

  1. We can run the same configuration file again and again using a scheduler. (This will send all the data again and again — all the changed and not changed data)
  2. We can send only the changed data in the table.

First Approach

In the first scenario, the configuration file will look as follows.

In order to run again and again I have used ‘schedule’ option. Here I have specified it to run Logstash in every second. You can find more examples like running in 5 minutes, 10 minutes here.

Why have I used a document_id option?

In Elasticsearch, for every document it creates a unique id in order to uniquely identify that. Since we run the configuration file again and again, it will create the same documents again and again. This should be prevented. Therefore, we can use the document_id option to prevent this from happening. Here I have told plugin to use the primary key (id field) in the Person table as the unique document id. Then Elasticsearch will not create multiple documents for the same record since it does not create documents with same document id. Then the existing one will get replaced with the newest ones. Then the newly added and changed records’ data will be present at the Elasticsearch.

Second Approach

In the second scenario, in order to incrementally update the data in the Elasticsearch, we have to have a column in the database such that it can be used as a reference. Since the People table has a row_update_time column which gets updated whenever a change is happened to a row, I have used that.

Following is an example of incrementally updating the data in the Elasticsearch using a row_update_time column.

Here I have used the new option of :sql_last_value. It contains the value that is used as the reference to select data to be retrieved. Initially this is set to Thursday, 1 January 1970. And then it jumps to current time. After that it increments the time using the time period specified in the schedule option.

When we run the Logstash with the above config file, it will STDOUT following queries with the data in between.

select * from People where row_update_time>’2017–07–04 13:08:08'

select * from People where row_update_time>’2017–07–04 13:08:09'

select * from People where row_update_time>’2017–07–04 13:08:10'

That means the sql_last_value parameter auto gets updated according to the scheduler (In this case it has been scheduled to run in every second — therefore, value of sql_last_value gets incremented by one second every time it runs).

Where is that sql_last_value is saved?

It gets saved in to a metadata file called .logstash_jdbc_last_run automatically. In windows its location is C:/Users/%username% where as in Linux, it it is inside the home folder.

We can also configure the config file such that it only retrieves the data in the table in which row_update_time is greater than the value of the row_update_time of the record which was pushed at last, instead of increasing the value of :sql_last_value according to the scheduler. This is very suitable if the row_update_time is set at the program level instead of the database level (Because Logstash could have run the query at the time the records are saved to the database. Therefore, we may miss some records).

Following is the configuration for that. We use the option use_column_value to tell Logstash to use the value of the column.

row_update_time is a time-stamp column. Therefore, we have told that to Logstash using tracking_column_type option in advance. And also we have specified the column that should be tracked using use_column_value option. Then the :sql_last_value contains the row_update_time of the record which was pushed at last. Therefore, even though I run with the same schedule, I will get queries like following with data in between.

select * from People where row_update_time>’2017–07–04 13:08:08'
select * from People where row_update_time>’2017–07–04 13:08:08'
select * from People where row_update_time>’2017–07–04 13:08:08'
select * from People where row_update_time>’2017–07–04 13:08:09'

Let’s say you only want to push the newly added rows not the changed rows. Then the instead of using the row_update_time column, you can use the id field as follows.

In this case, :sql_last_value is automatically set to 0 at first (Since tracking column is in integer type).

There is another option that may be useful. Let’s say, when you run the Logstash, you want to get data from the beginning again. If so, you can use the clean_run option in the jdbc plugin section. Default value of that is false. Since we do not want to send the same data again. It is better to leave in it’s default value (If there is a system failure and Logstash stops running, when we start the server it will continue from the value in the .logstash_jdbc_run file without starting from the beginning).

You can also specify a custom .logstash_jdbc_last_run file that the :sql_last_value gets saved.

This is very important if you want to run two SQL queries at the same time with two parameters. For an example,

Now you know all the necessary information to work with Logstash JDBC input plugin. Just try 😉.

COO (AkvaSoft)