...

Document 1239106

by user

on
Category: Documents
61

views

Report

Comments

Transcript

Document 1239106
developerWorks®
ibm.com/developerWorks/
Comparing Apache Sqoop and IBM InfoSphere Data
Replication (IIDR) : Moving incremental data from
relational database management system (RDBMS) into
the hadoop distributed file system (HDFS)
Apache® Hadoop is becoming the most popular enterprise choice for Big Data
Analytics but getting your data into hadoop is a challenge. Apache® Sqoop is a utility
designed to transfer data between relational databases and hadoop distributed file system.
Sqoop can be used for importing data from relational databases such as (DB2/ Oracle
etc.) into the hadoop distributed file system (HDFS) or exporting data from hadoop
distributed file system (HDFS) back to relational databases.
IBM® InfoSphere® Data Replication is a replication solution that captures database
changes as they happen in relational database transaction logs and delivers them to target
relational databases, message queues, Hadoop distributed file system (HDFS) or an ETL
solution such as InfoSphere DataStage® based on table mappings configured in the
InfoSphere Data Replication Management Console GUI application.
Overview
Apache Sqoop is a command-line tool for transferring data between relational
databases and Hadoop. Connectors are also available for some NoSQL databases. Sqoop,
similar to other ETL tools, uses schema metadata to infer data types and ensure type-safe
data handling when the data moves from the source to Hadoop. In this article we will
compare Sqoop with IIDR and provide guidance as to how to use the two together.
Prerequisites
To follow this article, basic knowledge about the following is required.
 IIDR replication
 Basic computer technology and terminology
 Familiarity with command-line interfaces such as bash and UNIX commands like
ls/cat.
 Relational database management systems such as DB2/Oracle etc.
 Familiarity with Apache Hadoop framework and operation of Hadoop
Before you can use Sqoop, a release of Hadoop must be installed and configured. If
you don’t already have access to a Hadoop environment you may want to download
IBM
BigInsights
Quick
Start
Edition
from
https://www.ibm.com/services/forms/preLogin.do?source=swg-ibmibqsevmw
© Copyright IBM Corporation 2015
Infosphere Data Replication comparison with Apache Sqoop Incremental Import
Trademarks
developerWorks®
ibm.com/developerWorks/
 Use Cases
Apache Sqoop is designed for big data bulk transfers, it partitions data sets and
creates Hadoop jobs to process each partition. Sqoop is the JDBC-based utility for
integrating with traditional databases. A Sqoop Import allows for the movement of data
into HDFS (a delimited format can be defined as part of the Import definition). With
Sqoop, you can import data from a relational database system into HDFS. The input to
the import process is a database table. Sqoop will read the table row-by-row into HDFS.
The output of this import process is a set of files containing a copy of the imported table.
The import process is performed in parallel. For this reason, the output will be in multiple
files. These files may be delimited text files (for example, with commas or tabs
separating each field), or binary Avro or SequenceFiles containing serialized record data.
After manipulating the imported records (for example, with Map Reduce or Hive) you
may have a result data set which you can then export back to the relational database.
Sqoop export process will read a set of delimited text files from HDFS in parallel, parse
them into records, and insert them as new rows in a target database table, for
consumption by external applications or users.
High-level architecture
Apache Sqoop Import loads data from RDBMS into Hadoop distributed File
System.
Apache Sqoop Export picks data from hadoop distributed file system and loads
into RDBMS tables.
© Copyright IBM Corporation 2015
Infosphere Data Replication comparison with Apache Sqoop Incremental Import
Trademarks
developerWorks®
ibm.com/developerWorks/
Incremental Import
Apache Sqoop provides an incremental import mode which can be used to retrieve
only newer rows than some previously imported set of rows. Sqoop provides two
types of incremental import namely a) append and b) lastmodified.
Usage and example of append mode
You should specify append mode when importing a table where newer rows are
continually being added with increasing row id values. You specify the column
containing the row’s id with --check-column. Sqoop imports rows where the check
column has a value greater than the one specified with --last-value. At the end of an
incremental import, the value which should be specified as --last-value for a subsequent
import is printed to the screen. When running a subsequent import, you should specify -last-value in this way to ensure you import only the new or updated data. This is handled
automatically by creating an incremental import as a saved job, which is the preferred
mechanism for performing a recurring incremental import.
sqoop import \
--connect jdbc:db2://192.168.255.129:50000/cdcdb \
--username BIADMIN \
--password passw0rd \
--table TEST7 \
--m 1 \
--incremental append
--check-column ID
--last-value 4
--direct \
--target-dir sqoopincr
[biadmin@bigdata
bin]$
sqoop
import
--connect
jdbc:db2://192.168.255.129:50000/cdcdb --username BIADMIN --password passw0rd -table TEST7 --incremental append --check-column ID --last-value 9 --target-dir
sqoopincr
15/03/09 07:49:38 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure.
Consider using -P instead.
15/03/09 07:49:39 INFO manager.SqlManager: Using default fetchSize of 1000
15/03/09 07:49:39 INFO tool.CodeGenTool: Beginning code generation
15/03/09 07:49:39 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "TEST7"
AS t WHERE 1=0
15/03/09 07:49:39 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "TEST7"
AS t WHERE 1=0
15/03/09
07:49:39
INFO
orm.CompilationManager:
HADOOP_MAPRED_HOME
is
/mnt/BigInsights/opt/ibm/biginsights/IHC
© Copyright IBM Corporation 2015
Infosphere Data Replication comparison with Apache Sqoop Incremental Import
Trademarks
developerWorks®
ibm.com/developerWorks/
15/03/09
07:49:39
INFO
orm.CompilationManager:
Found
hadoop
core
jar
at:
/mnt/BigInsights/opt/ibm/biginsights/IHC/hadoop-core.jar
Note: /tmp/sqoop-biadmin/compile/3368a44b029fb4068a56ff5cd8d04a7e/TEST7.java uses or overrides a
deprecated API.
Note: Recompile with -Xlint:deprecation for details.
15/03/09
07:49:41
INFO
orm.CompilationManager:
Writing
jar
file:
/tmp/sqoopbiadmin/compile/3368a44b029fb4068a56ff5cd8d04a7e/TEST7.jar
15/03/09 07:49:41 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT
MAX(ID) FROM TEST7
15/03/09 07:49:41 INFO tool.ImportTool: Incremental import based on column "ID"
15/03/09 07:49:41 INFO tool.ImportTool: Lower bound value: 9
15/03/09 07:49:41 INFO tool.ImportTool: Upper bound value: 10
15/03/09 07:49:41 INFO db2.DB2ConnManager: importTable entered
15/03/09
07:49:41
INFO
db2.DB2ConnManager:
getPrimaryKey()
tabSchema,tabName=BIADMIN,TEST7
15/03/09
07:49:41
INFO
db2.DB2ConnManager:
getPrimaryKey()
tabSchema,tabName=BIADMIN,TEST7
15/03/09 07:49:41 INFO mapreduce.ImportJobBase: Beginning import of TEST7
15/03/09 07:49:42 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "TEST7"
AS t WHERE 1=0
15/03/09 07:49:43 INFO db2.DB2InputFormat: getSplits for table,mapTasks="TEST7",4
15/03/09 07:49:43 INFO db2.DB2Util: partitioning key not found for BIADMIN.TEST7
15/03/09 07:49:43 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN("ID"),
MAX("ID") FROM "TEST7" WHERE ( "ID" > 9 AND "ID" <= 10 )
15/03/09 07:49:44 INFO mapred.JobClient: Running job: job_201502271841_0022
15/03/09 07:49:45 INFO mapred.JobClient: map 0% reduce 0%
15/03/09 07:49:59 INFO mapred.JobClient: map 100% reduce 0%
15/03/09 07:50:00 INFO mapred.JobClient: Job complete: job_201502271841_0022
15/03/09 07:50:00 INFO mapred.JobClient: Counters: 18
15/03/09 07:50:00 INFO mapred.JobClient: File System Counters
15/03/09 07:50:00 INFO mapred.JobClient: FILE: BYTES_WRITTEN=213660
15/03/09 07:50:00 INFO mapred.JobClient: HDFS: BYTES_READ=101
15/03/09 07:50:00 INFO mapred.JobClient: HDFS: BYTES_WRITTEN=35
15/03/09 07:50:00 INFO mapred.JobClient: org.apache.hadoop.mapreduce.JobCounter
15/03/09 07:50:00 INFO mapred.JobClient: TOTAL_LAUNCHED_MAPS=1
15/03/09 07:50:00 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=9097
15/03/09 07:50:00 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
15/03/09 07:50:00 INFO mapred.JobClient: FALLOW_SLOTS_MILLIS_MAPS=0
15/03/09 07:50:00 INFO mapred.JobClient: FALLOW_SLOTS_MILLIS_REDUCES=0
15/03/09 07:50:00 INFO mapred.JobClient: org.apache.hadoop.mapreduce.TaskCounter
15/03/09 07:50:00 INFO mapred.JobClient: MAP_INPUT_RECORDS=1
15/03/09 07:50:00 INFO mapred.JobClient: MAP_OUTPUT_RECORDS=1
15/03/09 07:50:00 INFO mapred.JobClient: SPLIT_RAW_BYTES=101
15/03/09 07:50:00 INFO mapred.JobClient: SPILLED_RECORDS=0
15/03/09 07:50:00 INFO mapred.JobClient: CPU_MILLISECONDS=1040
15/03/09 07:50:00 INFO mapred.JobClient: PHYSICAL_MEMORY_BYTES=268967936
15/03/09 07:50:00 INFO mapred.JobClient: VIRTUAL_MEMORY_BYTES=1465724928
15/03/09 07:50:00 INFO mapred.JobClient: COMMITTED_HEAP_BYTES=1048576000
15/03/09 07:50:00 INFO mapred.JobClient: File Input Format Counters
15/03/09 07:50:00 INFO mapred.JobClient: Bytes Read=0
15/03/09
07:50:00
INFO
mapred.JobClient:
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat$Counter
15/03/09 07:50:00 INFO mapred.JobClient: BYTES_WRITTEN=35
15/03/09 07:50:00 INFO mapreduce.ImportJobBase: Transferred 35 bytes in 17.8041 seconds (1.9658
bytes/sec)
15/03/09 07:50:00 INFO mapreduce.ImportJobBase: Retrieved 1 records.
© Copyright IBM Corporation 2015
Infosphere Data Replication comparison with Apache Sqoop Incremental Import
Trademarks
developerWorks®
ibm.com/developerWorks/
15/03/09 07:50:00 INFO util.AppendUtils: Appending to directory sqoopincr
15/03/09 07:50:00 INFO util.AppendUtils: Using found partition 2
15/03/09 07:50:00 INFO tool.ImportTool: Incremental import complete! To run another incremental
import of all data following this import, supply the following arguments:
15/03/09 07:50:00 INFO tool.ImportTool: --incremental append
15/03/09 07:50:00 INFO tool.ImportTool: --check-column ID
15/03/09 07:50:00 INFO tool.ImportTool: --last-value 10
15/03/09 07:50:00 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
The last successful incremental import was done when the ID column of the table has
maximum value of 9. Before running the above incremental import, there was just one
more record added into the table with last value 10 for the column ID. Hence, during the
subsequent incremental import, it just fetches/imported only one row (newer rows) and
also printed --last-value 10 to the output log.
You can look at the incremental changes into the hadoop file system via following
command
[biadmin@bigdata bin]$ hadoop fs -ls -R sqoopincr
-rw-r--r-- 1 biadmin biadmin
36 2015-03-04 10:28 sqoopincr/part-m-00000
-rw-r--r-- 1 biadmin biadmin
34 2015-03-04 10:31 sqoopincr/part-m-00001
-rw-r--r-- 1 biadmin biadmin
35 2015-03-09 07:49 sqoopincr/part-m-00002
[biadmin@bigdata bin]$ hadoop fs -cat sqoopincr/part-m-00002
10,Soma,2015-03-09 07:49:20.962439
During the incremental import, the file generated was part-m-00002 and browsing the content of that file
shows that only one rows added has been imported. (last value =10). The files part-m-00000/
sqoopincr/part-m-00001 were the output of previously incremental import. For every incremental import
into the HDFS, a new file gets created with only the incremental rows.
Usage and example of last modified
You should use this when rows of the source table may be added/updated and having a
timestamp column, and each such update will set the value of a last-modified column to
the current timestamp. Rows where the check column holds a timestamp more recent than
the timestamp specified with --last-value are imported into the hadoop distributed file
system.
[biadmin@bigdata bin]$ sqoop import --connect
jdbc:db2://192.168.255.129:50000/cdcdb --username BIADMIN --password
passw0rd --table TEST7 --incremental "lastmodified" --check-column JOINING -last-value "2015-03-09-07.49.20.962439" --target-dir sqoopincrlastmod
15/03/09 07:55:57 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure.
Consider using -P instead.
© Copyright IBM Corporation 2015
Infosphere Data Replication comparison with Apache Sqoop Incremental Import
Trademarks
developerWorks®
ibm.com/developerWorks/
15/03/09 07:55:57 INFO manager.SqlManager: Using default fetchSize of 1000
15/03/09 07:55:57 INFO tool.CodeGenTool: Beginning code generation
15/03/09 07:55:57 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "TEST7"
AS t WHERE 1=0
15/03/09 07:55:57 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "TEST7"
AS t WHERE 1=0
15/03/09
07:55:57
INFO
orm.CompilationManager:
HADOOP_MAPRED_HOME
is
/mnt/BigInsights/opt/ibm/biginsights/IHC
15/03/09
07:55:58
INFO
orm.CompilationManager:
Found
hadoop
core
jar
at:
/mnt/BigInsights/opt/ibm/biginsights/IHC/hadoop-core.jar
Note: /tmp/sqoop-biadmin/compile/55cec73f5eeb4a054f5c20b9935cc589/TEST7.java uses or overrides a
deprecated API.
Note: Recompile with -Xlint:deprecation for details.
15/03/09
07:55:59
INFO
orm.CompilationManager:
Writing
jar
file:
/tmp/sqoopbiadmin/compile/55cec73f5eeb4a054f5c20b9935cc589/TEST7.jar
15/03/09 07:55:59 INFO tool.ImportTool: Incremental import based on column "JOINING"
15/03/09 07:55:59 INFO tool.ImportTool: Lower bound value: '2015-03-09-07.49.20.962439'
15/03/09 07:55:59 INFO tool.ImportTool: Upper bound value: '2015-03-09 07:55:59.163688'
15/03/09 07:55:59 INFO db2.DB2ConnManager: importTable entered
15/03/09
07:55:59
INFO
db2.DB2ConnManager:
getPrimaryKey()
tabSchema,tabName=BIADMIN,TEST7
15/03/09
07:55:59
INFO
db2.DB2ConnManager:
getPrimaryKey()
tabSchema,tabName=BIADMIN,TEST7
15/03/09 07:55:59 INFO mapreduce.ImportJobBase: Beginning import of TEST7
15/03/09 07:55:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "TEST7"
AS t WHERE 1=0
15/03/09 07:56:00 INFO db2.DB2InputFormat: getSplits for table,mapTasks="TEST7",4
15/03/09 07:56:00 INFO db2.DB2Util: partitioning key not found for BIADMIN.TEST7
15/03/09 07:56:00 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN("ID"),
MAX("ID") FROM "TEST7" WHERE ( "JOINING" >= '2015-03-09-07.49.20.962439' AND "JOINING" <
'2015-03-09 07:55:59.163688' )
15/03/09 07:56:01 INFO mapred.JobClient: Running job: job_201502271841_0024
15/03/09 07:56:02 INFO mapred.JobClient: map 0% reduce 0%
15/03/09 07:56:16 INFO mapred.JobClient: map 100% reduce 0%
15/03/09 07:56:17 INFO mapred.JobClient: Job complete: job_201502271841_0024
15/03/09 07:56:17 INFO mapred.JobClient: Counters: 18
15/03/09 07:56:17 INFO mapred.JobClient: File System Counters
15/03/09 07:56:17 INFO mapred.JobClient: FILE: BYTES_WRITTEN=213714
15/03/09 07:56:17 INFO mapred.JobClient: HDFS: BYTES_READ=101
15/03/09 07:56:17 INFO mapred.JobClient: HDFS: BYTES_WRITTEN=35
15/03/09 07:56:17 INFO mapred.JobClient: org.apache.hadoop.mapreduce.JobCounter
15/03/09 07:56:17 INFO mapred.JobClient: TOTAL_LAUNCHED_MAPS=1
15/03/09 07:56:17 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=10337
15/03/09 07:56:17 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
15/03/09 07:56:17 INFO mapred.JobClient: FALLOW_SLOTS_MILLIS_MAPS=0
15/03/09 07:56:17 INFO mapred.JobClient: FALLOW_SLOTS_MILLIS_REDUCES=0
15/03/09 07:56:17 INFO mapred.JobClient: org.apache.hadoop.mapreduce.TaskCounter
15/03/09 07:56:17 INFO mapred.JobClient: MAP_INPUT_RECORDS=1
15/03/09 07:56:17 INFO mapred.JobClient: MAP_OUTPUT_RECORDS=1
15/03/09 07:56:17 INFO mapred.JobClient: SPLIT_RAW_BYTES=101
15/03/09 07:56:17 INFO mapred.JobClient: SPILLED_RECORDS=0
15/03/09 07:56:17 INFO mapred.JobClient: CPU_MILLISECONDS=1020
15/03/09 07:56:17 INFO mapred.JobClient: PHYSICAL_MEMORY_BYTES=272326656
15/03/09 07:56:17 INFO mapred.JobClient: VIRTUAL_MEMORY_BYTES=1465704448
15/03/09 07:56:17 INFO mapred.JobClient: COMMITTED_HEAP_BYTES=1048576000
15/03/09 07:56:17 INFO mapred.JobClient: File Input Format Counters
© Copyright IBM Corporation 2015
Infosphere Data Replication comparison with Apache Sqoop Incremental Import
Trademarks
developerWorks®
ibm.com/developerWorks/
15/03/09 07:56:17 INFO mapred.JobClient: Bytes Read=0
15/03/09
07:56:17
INFO
mapred.JobClient:
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat$Counter
15/03/09 07:56:17 INFO mapred.JobClient: BYTES_WRITTEN=35
15/03/09 07:56:17 INFO mapreduce.ImportJobBase: Transferred 35 bytes in 17.722 seconds (1.9749
bytes/sec)
15/03/09 07:56:17 INFO mapreduce.ImportJobBase: Retrieved 1 records.
15/03/09 07:56:17 INFO tool.ImportTool: Incremental import complete! To run another incremental
import of all data following this import, supply the following arguments:
15/03/09 07:56:17 INFO tool.ImportTool: --incremental lastmodified
15/03/09 07:56:17 INFO tool.ImportTool: --check-column JOINING
15/03/09 07:56:17 INFO tool.ImportTool: --last-value 2015-03-09 07:55:59.163688
15/03/09 07:56:17 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
In this example of lastmodified, the value of JOINING column parameter "--check-column JOINING -last-value "2015-03-09-07.49.20.962439". It will import rows all the rows having timestamp greater than
"2015-03-09-07.49.20.962439". After the completion of the import, it also prompts us the next value of
JOINING columns should be provided for subsequent import. Another option is to save this sqoop
incremental import as a job. This is handled automatically by creating an incremental import as a saved
job, which is the preferred mechanism for performing a recurring incremental import.
Difference between InfoSphere Data Replication incremental change
and Sqoop incremental import
It is worthwhile to mention the major differences between IIDR capturing the
incremental changes from source and delivering it to target and Sqoop incremental
import from relational databases into hadoop distributed file system.




IIDR is able to learn about rows deleted from the source database. Sqoop can
never capture information about deleted rows.
IIDR uses the concept of Journal Control Fields which are specific types of
database log information that are made available for each row that is
replicated e,g, user name who made the change, the Commit timestamp of the
change, the actual action that occurred on the source (Insert, Update or
Delete), The Commit group ID and a number of others. These columns can be
included in the data written to Hadoop. Sqoop cannot provide this
information.
IIDR will generally have a lower impact on the source system by doing log
based change capture versus the direct database queries done by Sqoop
IIDR can capture changes regardless of the design of the source application.
Sqoop can only capture incremental changes in specific situations.
© Copyright IBM Corporation 2015
Infosphere Data Replication comparison with Apache Sqoop Incremental Import
Trademarks
developerWorks®
ibm.com/developerWorks/
Using Sqoop and IIDR together
IIDR’s CDC does have the ability to do an initial synchronization via a process called a
Refresh. After the refresh has completed mirroring would be initiated. It is also possible
to use an external tool such as Sqoop to do the initial synchronization and CDC to
perform the mirroring after the external refresh has completed. External tools such as
Sqoop can be advantageous for large initial synchronizations as there are performance
benefits.
For IIDR, when targeting non-database targets such as HDFS during the refresh, if there
are transactions on the source database during the duration of the IIDR refresh (in-doubt
period) then there could be duplicate records on the target. The duplicates on the target
will also occur if the client chooses to do the initial synchronization (refresh) externally
(outside of IIDR). For many clients, having duplicate records in HDFS after a refresh
will not be an issue. If duplicate records as a result of a refresh while active is not
acceptable, there are options such as ensuring that the database is quiesed during the
refresh. Alternatively, a client can run a process/utility e.g HDFS ( hive/hbase/etc) or
other ways to remove duplicate records on the target after the refresh in-doubt period.
Conclusion
This article has described the commands used by apache sqoop import to
incrementally import the changed data from RDBMS to hadoop file system and major
differences between sqoop & InfoSphere Change Data Capture replication. It has also
described how the two technologies can be used together.
Resources
Learn

Learn more about how IBM InfoSphere Change Data Capture integrates
information across heterogeneous data stores in real time.
Discuss


Participate in the discussion forum.
Get involved in the CDC (Change Data Capture) community. Connect with other
CDC users while exploring the developer-driven blogs, forums, groups, and
wikis.
© Copyright IBM Corporation 2015
Infosphere Data Replication comparison with Apache Sqoop Incremental Import
Trademarks
developerWorks®

ibm.com/developerWorks/
Get involved in the My developerWorks community. Connect with other
developerWorks users while exploring the developer-driven blogs, forums,
groups, and wikis.
© Copyright IBM Corporation 2015
Infosphere Data Replication comparison with Apache Sqoop Incremental Import
Trademarks
Fly UP