- Hadoop as a Data Lake
- The Hadoop Distributed File System (HDFS)
- Direct File Transfer to Hadoop HDFS
- Importing Data from Files into Hive Tables
- Importing Data into Hive Tables Using Spark
- Using Apache Sqoop to Acquire Relational Data
- Using Apache Flume to Acquire Data Streams
- Manage Hadoop Work and Data Flows with Apache Oozie
- Apache Falcon
- What's Next in Data Ingestion?
- Summary
Using Apache Sqoop to Acquire Relational Data
In many enterprise environments, a lot of data that is required for data science applications resides inside of database management systems such as Oracle, MySQL, PosgreSQL, or DB2. Before we can use this data in the context of a data science application, we need to ingest such data into Hadoop.
Sqoop is a tool designed to transfer data between Hadoop and relational databases. You can use Sqoop to import data from a relational database management system (RDBMS) into the Hadoop Distributed File System (HDFS) or export data from Hadoop back into an RDBMS.
Sqoop can be used with any JDBC-compliant database and has been tested on Microsoft SQL Server, PostgreSQL, MySQL, and Oracle. In the remainder of this section, a brief overview of how Sqoop works with Hadoop is provided. In addition, a basic Sqoop example walk-through is demonstrated. To fully explore Sqoop, more information can found by consulting the Sqoop project website at http://sqoop.apache.org.
Data Import and Export with Sqoop
Figure 4.2 describes the process of importing data into HDFS using Sqoop, which includes two steps. In the first step, Sqoop examines the database to gather the necessary metadata for the data that are to be imported. The second step is a map-only2 (no reduce step) Hadoop job that Sqoop submits to the cluster. This is the job that does the actual data transfer using the metadata captured in the previous step. Note that each node doing the import must have access to the database.

Figure 4.2 Two-step Apache Sqoop data import method.
The imported data is saved in an HDFS directory. Sqoop will use the database name for the directory or the user can specify any alternative directory where the files should be populated. By default, these files contain comma-delimited fields, with new lines separating different records. You can easily override the format in which data is copied over by explicitly specifying the field separator and record terminator characters. Once placed in HDFS, the data are ready for further processing.
Data export from the cluster works in a similar fashion. The export is done in two steps as shown in Figure 4.3. Like the import process, the first step is to examine the database for metadata, followed by the export step that is again a map-only Hadoop job to write the data to the target database. Sqoop divides the input dataset into splits and then uses individual map tasks to push the splits to the database. Again, this process assumes the map tasks have access to the database.

Figure 4.3 Two-step Sqoop data export method.
Apache Sqoop Version Changes
Two versions of Sqoop are in general use within the Hadoop ecosystem. Many users have found the features removed in version 2 to be useful and continue to use the first version. Sqoop version 2 will be used for the examples.
Sqoop version 1 uses specialized connectors to access external database systems. These are often optimized for various RDBMS systems or those that do not support JDBC (Java Database Connectivity). Connectors are plug-in components based on Sqoop’s extension framework and can be added to any existing Sqoop installation. Once a connector is installed, Sqoop can use it to efficiently transfer data between Hadoop and the external store supported by the connector. By default, Sqoop version 1 includes connectors for various popular databases such as MySQL, PostgreSQL, Oracle, SQL Server, and DB2. Sqoop version 1 also supports direct transfer to and from the RDBMS for HBase or Hive.
In order to streamline the Sqoop input methods (the issues cited were increasingly complex command lines, security, and the need to understand too many low-level issues), Sqoop version 2 no longer supports specialized connectors or direct import into HBase or Hive or direct data transfer from Hive or HBase to your RDBMS. There are more generalized ways to accomplish these tasks in version 2. All import and export is done through the JDBC interface. Table 4.1 summarizes the changes. Due to these changes, any new development should be done with attention to Sqoop version 2 capabilities.
Table 4.1 Apache Sqoop version comparison.
Feature |
Sqoop Version 1 |
Sqoop Version 2 |
Connectors for all major |
Supported |
Not supported. Use the generic |
Kerberos Security |
Supported |
Not supported |
Data transfer from |
Supported |
Not supported. First import data |
Data transfer from Hive |
Not supported. First export data |
Not supported. First export data |
Using Sqoop V2: A Basic Example
To better understand how to use Sqoop in practice, we’re going to demonstrate how to configure and use Sqoop version 2 via a simple example. The example can then be extended as needed to explore the other capabilities offered by Apache Sqoop. More detailed information can be found at the Sqoop website at http://sqoop.apache.org.
The following steps will be performed:
Download and load sample MySQL data
Add Sqoop user permissions for local machine and cluster
Import data from MySQL to HDFS
Export data from HDFS to MySQL
Step 1: Download a Sample MySQL Database
For this example, we assume MySQL is installed on the Sqoop node and will use the world example database from the MySQL site (http://dev.mysql.com/doc/world-setup/en/index.html). The database has three tables:
Country—Information about countries of the world.
City—Information about some of the cities in those countries.
CountryLanguage—Languages spoken in each country.
To get the database, use wget3 to download and then extract the file:
$ wget http://downloads.mysql.com/docs/world.sql.gz $ gunzip world.sql.gz
Next, log into MySQL (assumes you have privileges to create a database) and import that database by entering the following commands:
$ mysql -u root -p mysql> CREATE DATABASE world; mysql> USE world; mysql> SOURCE world.sql; mysql> SHOW TABLES; +-----------------+ | Tables_in_world | +-----------------+ | City | | Country | | CountryLanguage | +-----------------+ 3 rows in set (0.01 sec)
The following MySQL commands will let you see the details for each table (output omitted because of space considerations):
mysql> SHOW CREATE TABLE Country; mysql> SHOW CREATE TABLE City; mysql> SHOW CREATE TABLE CountryLanguage;
Step 2: Add Sqoop User Permissions for Local Machine and Cluster
Sqoop often needs to talk to MySQL from the Hadoop cluster. Thus, there needs to be permissions added to MySQL so that these conversations can take place. Depending on your installation, you may need to add several privileges for Sqoop requests based on the location (hosts or IP addresses) from where the request originates. For example, the following permissions were assigned for the example.
mysql> GRANT ALL PRIVILEGES ON world.* To 'sqoop'@'localhost' IDENTIFIED BY 'sqoop'; mysql> GRANT ALL PRIVILEGES ON world.* To 'sqoop'@'_HOSTAME_' IDENTIFIED BY 'sqoop'; mysql> GRANT ALL PRIVILEGES ON world.* To 'sqoop'@'_SUBNET_' IDENTIFIED BY 'sqoop'; FLUSH PRIVILEGES; mysql> quit
The _HOSTNAME_ is the name of the host on which a user has logged in. The _SUBNET_ is the subnet of the cluster (for example 10.0.0.%, defines 10.0.0.0/24 network). These permissions allow any node in the cluster to execute MySQL commands as user sqoop. Also, for the purposes of this example, the Sqoop password is “sqoop.”
Next, log in as user sqoop to test the MySQL permissions.
$ mysql -u sqoop -p mysql> USE world; mysql> SHOW TABLES; +-----------------+ | Tables_in_world | +-----------------+ | City | | Country | | CountryLanguage | +-----------------+ 3 rows in set (0.01 sec) mysql> quit
Step 3: Import Data Using Sqoop
As a check of Sqoop’s capability to read the MySQL database, we can use Sqoop to list the databases in MySQL.
Enter the following commands. The results are after the warnings at the end of the output. Note the use of local _HOSTNAME_ in the JDBC statement. Extra notifications have been removed from the output (represented by ...).
$ sqoop list-databases --connect jdbc:mysql://_HOSTNAME_/world --username sqoop --password sqoop ... information_schema test world
In a similar fashion, Sqoop can connect to MySQL and list the tables in the world database.
$ sqoop list-tables --connect jdbc:mysql://_HOSTNAME_/world --username sqoop --password sqoop ... City Country CountryLanguage
In order to import data, we need to make a directory in HDFS:
$ hdfs dfs -mkdir sqoop-mysql-import
The following command will import the Country table into HDFS:
$ sqoop import --connect jdbc:mysql://_HOSTNAME_/world --username sqoop --password sqoop --table Country -m 1 --target-dir/user/username/sqoop-mysql-import/country
The option –-table signifies the table to import, --target-dir is the directory created above, and –m 1 tells sqoop to use a single map task (which is enough in our example since it is only a small table) to import the data.
The import can be confirmed by examining HDFS:
$ hdfs dfs -ls sqoop-mysql-import/country Found 2 items -rw-r--r-- 2 username hdfs 0 2014-08-18 16:47 sqoop-mysql-import/world/_SUCCESS -rw-r--r-- 2 username hdfs 31490 2014-08-18 16:47 sqoop-mysql-import/world/part-m-00000
The file can be viewed using the hdfs –cat command:
$ hdfs dfs -cat sqoop-mysql-import/country/part-m-00000 ABW,Aruba,North America,Caribbean,193.0,null,103000,78.4,828.0,793.0, Aruba,Nonmetropolitan Territory of The Netherlands,Beatrix,129,AW ... ZWE,Zimbabwe,Africa,Eastern Africa,390757.0,1980,11669000,37.8, 5951.0,8670.0,Zimbabwe,Republic,Robert G. Mugabe,4068,ZW
To make Sqoop commands more convenient, an options file may be created and used in the command line. This file will help you avoid having to rewrite the same options. For example, a file called world-options.txt with the following contents will include the import command, --connect, --username, and --password options:
import --connect jdbc:mysql://_HOSTNAME_/world --username sqoop --password sqoop
The same import command from the preceding can be performed with the following shorter line:
$ sqoop --options-file world-options.txt --table City -m 1 --target-dir/user/username/sqoop-mysql-import/city
It is also possible to include an SQL Query in the import step. For example, if we want just cities in Canada:
SELECT ID,Name from City WHERE CountryCode='CAN'
Then we can include the --query option in the Sqoop import request. In the following query example, a single mapper task is designated with the –m 1 option:
sqoop --options-file world-options.txt -m 1 --target-dir/user/username/sqoop-mysql-import/canada-city --query "SELECT ID,Name from City WHERE CountryCode='CAN' AND \$CONDITIONS"
Inspecting the results shows only cities from Canada are imported.
$ hdfs dfs -cat sqoop-mysql-import/canada-city/part-m-00000 1810,Montréal 1811,Calgary 1812,Toronto ... 1856,Sudbury 1857,Kelowna 1858,Barrie
Since there was only one mapper process, only one copy of the query needed to be run on the database. The results are also reported in single file (part-m-0000). Multiple mappers can be used to process the query if the --split-by option is used. The split-by option is a way to parallelize the SQL query. Each parallel task runs a subset of the main query with results partitioned by bounding conditions inferred by Sqoop. Your query must include the token $CONDITIONS; this is a placeholder for Sqoop to put in unique condition expression based on the --split-by option, and Sqoop automatically populates this with the right conditions for each mapper task. Sqoop will try to create balanced sub-queries based on a range of your primary key. However, it may be necessary to split on another column if your primary key is not uniformly distributed.
The following example will help illustrate the –split-by option. First, remove the results of the previous query.
$ hdfs dfs -rm -r -skipTrash sqoop-mysql-import/canada-city
Next, run the query using four mappers (-m 4) where we split by the ID number (--split-by ID).
sqoop --options-file world-options.txt -m 4 --target-dir/user/username/sqoop-mysql-import/canada-city --query "SELECT ID, Name from City WHERE CountryCode='CAN' AND \$CONDITIONS" --split-by ID
If we look at the number of results files, we find four files corresponding to the four mappers we requested in the command. There is no need to combine these files into one entity because all Hadoop tools can manage multiple files as input.
$ hdfs dfs -ls sqoop-mysql-import/canada-city Found 5 items -rw-r--r-- 2 username hdfs 0 2014-08-18 21:31 sqoop-mysql-import/canada- city/_SUCCESS -rw-r--r-- 2 username hdfs 175 2014-08-18 21:31 sqoop-mysql-import/canada-city/part-m-00000 -rw-r--r-- 2 username hdfs 153 2014-08-18 21:31 sqoop-mysql-import/canada-city/part-m-00001 -rw-r--r-- 2 username hdfs 186 2014-08-18 21:31 sqoop-mysql-import/canada-city/part-m-00002 -rw-r--r-- 2 username hdfs 182 2014-08-18 21:31 sqoop-mysql-import/canada-city/part-m-00003
Step 4: Export Data Using Sqoop
The first step when exporting data with Sqoop is to create tables in the target database system for the exported data. There are actually two tables needed for each exported table. The first is a table to hold the exported data (e.g., CityExport) and the second is a table to be used for staging the exported data (e.g., CityExportStaging).
Using the following MySQL commands, you can create the tables:
mysql> USE world; mysql> CREATE TABLE `CityExport` ( `ID` int(11) NOT NULL AUTO_INCREMENT `Name` char(35) NOT NULL DEFAULT '', `CountryCode` char(3) NOT NULL DEFAULT '', `District` char(20) NOT NULL DEFAULT '', `Population` int(11) NOT NULL DEFAULT '0', PRIMARY KEY (`ID`)); mysql> CREATE TABLE `CityExportStaging` ( `ID` int(11) NOT NULL AUTO_INCREMENT, `Name` char(35) NOT NULL DEFAULT '', `CountryCode` char(3) NOT NULL DEFAULT '', `District` char(20) NOT NULL DEFAULT '', `Population` int(11) NOT NULL DEFAULT '0', PRIMARY KEY (`ID`));
Next, create a cities-export-options.txt file similar to the world-options.txt file created above, using the export instead of import command. The following will export the cities data we imported above back into MySQL:
sqoop --options-file cities-export-options.txt --table CityExport --staging-table CityExportStaging --clear-staging-table -m 4 --export-dir /user/username/sqoop-mysql-import/city
Finally, to make sure everything worked, check the table in MySQL to see if the cities are in the table.
$ mysql> select * from CityExport limit 10; +----+----------------+-------------+---------------+------------+ | ID | Name | CountryCode | District | Population | +----+----------------+-------------+---------------+------------+ | 1 | Kabul | AFG | Kabol | 1780000 | | 2 | Qandahar | AFG | Qandahar | 237500 | | 3 | Herat | AFG | Herat | 186800 | | 4 | Mazar-e-Sharif | AFG | Balkh | 127800 | | 5 | Amsterdam | NLD | Noord-Holland | 731200 | | 6 | Rotterdam | NLD | Zuid-Holland | 593321 | | 7 | Haag | NLD | Zuid-Holland | 440900 | | 8 | Utrecht | NLD | Utrecht | 234323 | | 9 | Eindhoven | NLD | Noord-Brabant | 201843 | | 10 | Tilburg | NLD | Noord-Brabant | 193238 | +----+----------------+-------------+---------------+------------+ 10 rows in set (0.00 sec)
Some Handy Clean-up Commands
If you are not real familiar with MySQL, the following commands may be helpful to clean up the examples.
To remove a table in MySQL:
mysql> Drop table `CityExportStaging`;
To remove the data in a table:
mysql> delete from CityExportStaging;
To clean up imported files:
$ hdfs dfs -rm -r -skipTrash sqoop-mysql-import/{country,city, canada-city}