Bulk Load CSV Files into Elasticsearch Indices

Bulk load CSV Files into Elasticsearch Indices

Loading Data into an Elasticsearch Index from a CSV File

Many developers have CSV files or other types of fixed-length or delimited text files to bulk load into Elasticsearch. Import to Elasticsearch solutions include:

  • using Logstash to format your CSV files and import them into Elasticsearch
  • reading and parsing fixed-length or delimited files with scripting languages such as Python or PHP and then loading them into Elasticsearch using the Elasticsearch API
  • using a data feed API like Flex.io to regularly bulk load CSV files into Elasticsearch, with a few lines of code

Logstash is a great solution for loading log files into Elasticsearch, but you can quickly run into problems processing files that aren’t in a log format—particularly if they don’t include timestamps or other date information. To work around these problems, you can use the Elasticsearch API to upload data into Elasticsearch.

Working with the Elasticsearch API does require some developer know-how plus, of course, the effort of setting up the servers, bash scripts, cron jobs, type mappings and other integration and processing “glue” to regularly run scripts that load the data into Elasticsearch.

Thankfully, third-party APIs have come to the rescue. This tutorial will provide you with a simple example of how to utilize the Flex.io API to bulk upload CSV files to your Elasticsearch instance and refresh the indices on periodic basis.

Demonstration: Example and Code

Before jumping into the tutorial, here is a GitHub repo of what we’re about to build:

Fork the Source Code | Source Data: File 1, File 2, File 3

Here’s an example of an Elasticsearch query running from the browser using the Contacts file from the Source Data:

Elasticsearch in browser example

Summary: This data feed pulls CSV files from a Dropbox folder, formats the contents and uploads the data files into Elasticsearch indices, one for each file. In this setup, subsequent runs will overwrite the existing index with new data.

Let’s Start Building the Elasticsearch Data Feed

In this tutorial, we’ll do the following:

  1. Create a connection to your data store and to Elasticsearch.
  2. Access the CSV files from your data storage location.
  3. Build the loop to read files from the store and write to Elasticsearch.
  4. Pull it all together and deploy the pipe.

To get started, you’ll need the following:

Step 1: Create your Data and Elasticsearch Connections

For this tutorial we’ll keep it simple and access CSV files from a cloud storage account (in this case, a Dropbox folder) and upload them to an Elasticsearch instance (in this case, hosted on AWS).

The Flex.io application has a Connections keychain for storing credential information and referencing them in your code via an alias. Here is a guide on setting up a connection in Flex.io.

Here are the default aliases we’ll reference in our code snippets below:

  • Dropbox Connection Alias: tutorial-dropbox
  • Elasticsearch Connection Alias: tutorial-elasticsearch

Once your connections are set up, you’re ready to create your data feed. Simpliy swap out the default aliases with your own connection aliases.

Step 2: Access your Files from Data Storage

As a first step, let’s confirm we have access to our data store. For this tutorial we have a dropbox folder called es that includes our three CSV files we’re going to transfer.

Dropbox CSV folder

Here’s the pipe to get the list of files from the Dropbox folder es using the connection alias from Step 1:

Flexio.pipe()
.list('/tutorial-dropbox/es/*.csv')

If you run this pipe as is, you’ll get a result like this listing all of the CSV files in that folder:

[
{
"name": "ipaddress.csv",
"path": "/tutorial-dropbox/es/ipaddress.csv",
"size": 6167,
"modified": "2018-03-05T17:47:44Z",
"type": "FILE"
},
{
"name": "people.csv",
"path": "/tutorial-dropbox/es/people.csv",
"size": 134644,
"modified": "2018-03-05T17:48:11Z",
"type": "FILE"
},
{
"name": "contacts.csv",
"path": "/tutorial-dropbox/es/contacts.csv",
"size": 381686,
"modified": "2018-03-05T17:49:18Z",
"type": "FILE"
}
]

Step 3: Create the Loop to Read Files and Write to Elasticsearch

Now that we have confirmed our data access to Dropbox, we’ll create the loop that reads each of the files and writes them to Elasticsearch:

Flexio.pipe()
.list('/tutorial-dropbox/es/*.csv')
.foreach('file : input',
Flexio.pipe()
.read('${file.path}')
.convert('csv','table')
.write('/tutorial-elasticsearch/${file.name}')
)

This code uses a foreach task to loop through the files in our list and process them. In this case, for each file in the list, the read task accesses the file and then uses a convert task to convert the file from a csv into a table format, which will enable us to query by field within Elasticsearch. Finally, we use a write task to upload the files, each to a new index. And, that’s about it! Run your pipe and your files will be uploaded.

Once you’ve run your pipe, you can view your files by utilizing the list task again, but this time with your Elasticsearch connection:

Flexio.pipe()
.list('/tutorial-elasticsearch')

Running the pipe will give you a result like this:

[
{
"name": "contacts.csv",
"path": "/tutorial-elasticsearch/contacts.csv",
"size": null,
"modified": null,
"type": "FILE"
},
{
"name": "ipaddress.csv",
"path": "/tutorial-elasticsearch/ipaddress.csv",
"size": null,
"modified": null,
"type": "FILE"
},
{
"name": "people.csv",
"path": "/tutorial-elasticsearch/people.csv",
"size": null,
"modified": null,
"type": "FILE"
}
]

Step 4: Deploy the CSV to Elasticsearch Data Feed

Now that you have your CSV data in Elasticsearch, you can deploy this pipe to run when you have new data. For the setup in this tutorial, new data would simply overwrite the previous index with new data.

The pipe can be saved in your code or in the Flex.io app. It could be called via API endpoint or scheduled to run as needed. Click here for a guide on Flex.io deployment options.

Additional Permutations

To extend the tutorial above, here are some additional permutations you can try:

Load data from an API like Twilio or GitHub into Elasticsearch

Bulk loading to Elasticsearch from a CSV file is quite useful. However, you might also want to copy data from a JSON format like you’d get from an API. Here’s an example pipe that connects to Twilio’s call logs (alias: tutorial-twilio), converts the JSON into a table format and then writes it out to Elasticsearch to an index called call-log.

Flexio.pipe()
.read("/tutorial-twilio/calls")
.convert("json", "table")
.write("tutorial-elasticsearch/call-log")

Load data from a database like MySQL or Postgres into Elasticsearch

In addition to bulk loading from CSV or an API, you might want to load from a database like Postgres or MySQL into Elasticsearch. Here’s an example pipe that connects to Postgres (alias: tutorial-postgres) and copies a table called contacts to Elasticsearch:

Flexio.pipe()
.read("/tutorial-postgres/contacts")
.write("tutorial-elasticsearch/contacts")

Pre-process with Python and pandas before bulk loading into Elasticsearch

In addition to straight table copies, you might want to add some processing or cleaning steps. Python is an excellent language for data munging and the pandas library is particularly useful. In the example below, we use our Twilio to Elasticsearch pipe above and do some simple preprocessing of the file to make everything lowercase before uploading to Elasticsearch:

Flexio.pipe()
.read("/tutorial-twilio/calls")
.convert("json", "table")
.python(`
import pandas
def flexio_handler(context):
df = pandas.read_csv(context.input)
df = df.apply(lambda x: x.astype(str).str.lower())
context.output.write(df.to_string() + " ")
`)
.write("tutorial-elasticsearch/call-log")

Need Any Help?

We hope you found this tutorial useful. If you have any questions, shoot us a note using the chat button below. We’re happy to help and look forward to seeing what you can build!