- Hadoop as a Data Lake
- The Hadoop Distributed File System (HDFS)
- Direct File Transfer to Hadoop HDFS
- Importing Data from Files into Hive Tables
- Importing Data into Hive Tables Using Spark
- Using Apache Sqoop to Acquire Relational Data
- Using Apache Flume to Acquire Data Streams
- Manage Hadoop Work and Data Flows with Apache Oozie
- Apache Falcon
- What's Next in Data Ingestion?
- Summary
Importing Data into Hive Tables Using Spark
Apache Spark is a modern processing engine that is focused on in-memory processing. Spark’s primary data abstraction is an immutable distributed collection of items called a resilient distributed dataset (RDD). RDDs can be created from Hadoop input formats (such as HDFS files) or by transforming other RDDs. Each dataset in an RDD is divided into logical partitions, which may be transparently computed on different nodes of the cluster.
The other important data abstraction is Spark’s DataFrame. A DataFrame is built on top of an RDD, but data are organized into named columns similar to a relational database table and similar to a data frame in R or in Python’s Pandas package.
Spark DataFrames can be created from different data sources such as the following:
Existing RDDs
Structured data files
JSON datasets
Hive tables
External databases
Due to its flexibility and friendly developer API, Spark is often used as part of the process of ingesting data into Hadoop. With Spark, you can read data from a CSV file, external SQL or NO-SQL data store, or another data source, apply certain transformations to the data, and store it onto Hadoop in HDFS or Hive. Similar to the Hive examples, a full treatment of all Spark import scenarios is beyond the scope of this book. Consult the Apache Spark project page, http://spark.apache.org, for more information.
The following sections provide some basic usage examples of data import using PySpark (Spark via the Python API), although these steps can also be performed using the Scala or Java interfaces to Spark. Each step is explained. However, a full description of the Spark commands and API are beyond the scope of this book.
All the examples assume the PySpark shell (version 1.6) has been started using the following command:
$ pyspark Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.6.2 /_/ Using Python version 2.7.9 (default, Apr 14 2015 12:54:25) SparkContext available as sc, HiveContext available as sqlContext. >>>
Import CSV Files into HIVE Using Spark
Comma-separated value (CSV) files and, by extension, other text files with separators can be imported into a Spark DataFrame and then stored as a HIVE table using the steps described. Note that in this example we show how to use an RDD, translate it into a DataFrame, and store it in HIVE. It is also possible to load CSV files directly into DataFrames using the spark-csv package.
The first step imports functions necessary for Spark DataFrame operations:
>>> from pyspark.sql import HiveContext >>> from pyspark.sql.types import * >>> from pyspark.sql import Row
Next, the raw data are imported into a Spark RDD. The input file, names.csv, is located in the users local file system and does not have to be moved into HDFS prior to use. (Assuming the local path to the data is /home/username.)
>>> csv_data = sc.textFile("file:///home/username/names.csv")
The RDD can be confirmed by using the type() command:
>>> type(csv_data) <class 'pyspark.rdd.RDD'>
The comma-separated data are then split using Spark’s map( ) function that creates a new RDD:
>>> csv_data = csv_data.map(lambda p: p.split(","))
Most CSV files have a header with the column names. The following steps remove this from the RDD,
>>> header = csv_data.first() >>> csv_data = csv_data.filter(lambda p:p != header)
The data in the csv_data RDD are put into a Spark SQL DataFrame using the toDF() function. First, however, the data are mapped using the map() function so that every RDD item becomes a Row object which represents a row in the new DataFrame. Note the use of the int() to cast for the employee ID as an integer. All other columns default to a string type.
>>> df_csv = csv_data.map(lambda p: Row(EmployeeID = int(p[0]), FirstName = p[1], Title=p[2], State=p[3], Laptop=p[4])).toDF()
The Row() class captures the mapping of the single values into named columns in a row and subsequently transforms the complete data into a DataFrame.
The structure and data of the first five rows of the df_csv DataFrame are viewed using the following command:
>>> df_csv.show(5) +----------+---------+------+-----+--------+ |EmployeeID|FirstName|Laptop|State| Title| +----------+---------+------+-----+--------+ | 10| Andrew| PC| DE| Manager| | 11| Arun| PC| NJ| Manager| | 12| Harish| MAC| NJ| Sales| | 13| Robert| MAC| PA| Manager| | 14| Laura| MAC| PA|Engineer| +----------+---------+------+-----+--------+ only showing top 5 rows
Similarly, if you’d like to inspect the DataFrame schema, use the printSchema() command:
>>> df_csv.printSchema() root |-- EmployeeID: long (nullable = true) |-- FirstName: string (nullable = true) |-- Laptop: string (nullable = true) |-- State: string (nullable = true) |-- Title: string (nullable = true)
Finally, to store the DataFrame into a Hive table, use saveAsTable():
>>> from pyspark.sql import HiveContext >>> hc = HiveContext(sc) >>> df_csv.write.format("orc").saveAsTable("employees")
Here we create a HiveContext that is used to store the DataFrame into a Hive table (in ORC format), by using the saveAsTable() command.
Import a JSON File into HIVE Using Spark
Spark can import JSON files directly into a DataFrame. The following is a JSON formatted version of the names.csv file used in the previous examples. Note that by entering the EmployeeID as an un-quoted integer, it will be input as an integer.
{"EmployeeID":10,"FirstName":"Andrew","Title":"Manager","State":"DE","Laptop":"PC"} {"EmployeeID":11,"FirstName":"Arun","Title":"Manager","State":"NJ","Laptop":"PC"} {"EmployeeID":12,"FirstName":"Harish","Title":"Sales","State":"NJ","Laptop":"MAC"}
Also note that Spark expects each line to be a separate JSON object, so it will fail if you try to load a fully formatted JSON file.
The first step imports the needed functions and creates a HiveContext.
>>> from pyspark.sql import HiveContext >>> hc = HiveContext(sc)
Similar to the CSV example, the data file is located in the users local file system.
>>> df_json = hc.read.json("file:///home/username/names.json")
The first five rows of the DataFrame can be viewed using the df_json.show(5) command:
>>> df_json.show(5) +----------+---------+------+-----+--------+ |EmployeeID|FirstName|Laptop|State| Title| +----------+---------+------+-----+--------+ | 10| Andrew| PC| DE| Manager| | 11| Arun| PC| NJ| Manager| | 12| Harish| MAC| NJ| Sales| | 13| Robert| MAC| PA| Manager| | 14| Laura| MAC| PA|Engineer| +----------+---------+------+-----+--------+ only showing top 5 rows
To confirm that the EmployeeID was indeed cast as an integer, the df_json.printSchema() command can be used to inspect the DataFrame schema:
>>> df_json.printSchema() root |-- EmployeeID: long (nullable = true) |-- FirstName: string (nullable = true) |-- Laptop: string (nullable = true) |-- State: string (nullable = true) |-- Title: string (nullable = true)
Similar to the CSV example, storing this DataFrame back to Hive is simple:
>>> df_json.write.format("orc").saveAsTable("employees")