Overview of Hive#
Basic Architecture of Hive#
-
User Interface
: CLI, JDBC/ODBC, Web UI layer -
Thrift Server
: Supports multiple language programs to manipulate Hive -
Driver
: Driver, Compiler, Optimizer, ExecutorThe core of Hive is the driver engine, which consists of four parts:
- Interpreter: Converts Hive SQL statements into an abstract syntax tree (AST)
- Compiler: Converts the abstract syntax tree into a logical execution plan
- Optimizer: Optimizes the logical execution plan
- Executor: Calls the underlying execution framework to execute the logical plan
-
Metadata Storage System
: The metadata in Hive typically includes the names of tables, the columns and partitions of tables and their attributes, the attributes of the tables (internal and external tables), and the directory where the table data is stored.The Metastore is by default in the built-in
Derby
database, which is not suitable for multi-user operations, and the data storage directory is not fixed. The database follows Hive, making it extremely inconvenient to manage.
Solution: Typically, we store it in a MySQL database that we create ourselves (locally or remotely), and Hive interacts with MySQL through the MetaStore service.
Tables in Hive#
Tables in Hive correspond to specified directories on HDFS. When querying data, the entire table is scanned by default, which consumes a lot of time and performance.
Partitioned Tables and Bucketed Tables#
Partitioning#
Divides the data table into multiple partitions based on one or more columns. Partitions are subdirectories of the table directory on HDFS, and data is stored in subdirectories according to partitions. If the WHERE clause of the query contains partition conditions, it directly searches from that partition instead of scanning the entire table directory. A reasonable partition design can greatly improve query speed and performance.
Creating a Partitioned Table
In Hive, you can create a partitioned table using the PARTITIONED BY
clause.
CREATE EXTERNAL TABLE emp_partition(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- Partitioned by department number
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_partition';
Loading Data into a Partitioned Table
When loading data into a partitioned table, you must specify the partition where the data is located.
# Load data for department number 20 into the table
LOAD DATA LOCAL INPATH "/usr/file/emp20.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=20)
# Load data for department number 30 into the table
LOAD DATA LOCAL INPATH "/usr/file/emp30.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=30)
Bucketing#
Not all datasets can form reasonable partitions, and having too many partitions is not necessarily better; excessive partition conditions may lead to many partitions without data. Bucketing is a finer-grained division relative to partitioning, distinguishing the entire data content based on the hash value of a certain column attribute.
Creating a Bucketed Table
In Hive, we can specify the bucketing column using CLUSTERED BY
and specify the sorting reference column within the buckets using SORTED BY
.
CREATE EXTERNAL TABLE emp_bucket(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS -- Hashing employee numbers into four buckets
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_bucket';
Loading Data into a Bucketed Table
--1. Set enforce bucketing, not needed in Hive 2.x
set hive.enforce.bucketing = true;
--2. Import data
INSERT INTO TABLE emp_bucket SELECT * FROM emp; -- Here, the emp table is a regular employee table
Internal Tables and External Tables#
Differences
- When creating a table: When creating an internal table, the data is moved to the path pointed to by the data warehouse; when creating an external table, only the path of the data is recorded, and no changes are made to the data's location.
- When deleting a table: When deleting a table, the metadata and data of an internal table are deleted together, while an external table only deletes metadata and does not delete data. Thus, external tables are relatively safer, and data organization is more flexible, making it easier to share source data.
- When creating a table, an additional
external
keyword is added for external tables.
Usage Choices
- If all data processing is done in Hive, it is preferable to choose internal tables.
- The scenario for using external tables is mainly when sharing data sources; external tables can access the initial data stored in HDFS, and then the data can be transformed through Hive and stored in internal tables.
Custom UDF#
Reference link: Hive UDF
UDF (User Defined Function) mainly includes:
UDF (user-defined function)
: One input, one output, given one parameter, outputs a processed data.UDAF (user-defined aggregate function)
: Multiple inputs, one output, belongs to aggregate functions, similar to count, sum, etc.UDTF (user-defined table function)
: One input, multiple outputs, belongs to one parameter, returns a list as a result.
Optimization#
(1) Data Skew#
Causes
- Uneven key distribution
- Characteristics of the business data itself
- SQL statements causing data skew
Solutions
-
Set
hive.map.aggr=true
andhive.groupby.skewindata=true
. -
When there is data skew, perform load balancing. When
hive.groupby.skewindata=true
is set, the generated query plan will have two MR Jobs. In the first MR Job, the output results of the Map will be randomly distributed to the Reduce, with each Reduce performing partial aggregation and outputting results. This way, the results of the same Group By Key may be distributed to different Reduce, achieving load balancing; the second MR Job distributes the preprocessed data results according to Group By Key to the Reduce (this process ensures that the same Group By Key is distributed to the same Reduce), and finally completes the final aggregation operation. -
SQL statement adjustments:
Choose the table with the most uniform distribution of join keys as the driving table
. Perform column pruning and filter operations to reduce the data volume when joining two tables.Join small tables
: Use map join to load small dimension tables (with fewer than 1000 records) into memory first, completing the Reduce on the Map side.Join large tables
: Change the null key into a string plus a random number, distributing the skewed data to different reduces, as null values cannot be associated, which does not affect the final result after processing.Count distinct large numbers of identical special values
: When counting distinct values, handle cases where values are null separately. If calculating count distinct, you can directly filter and add 1 to the final result. If other calculations are needed, you can first handle records with null values separately and then union them with other calculation results.
(2) General Settings#
hive.optimize.cp=true
: Column pruninghive.optimize.prunner
: Partition pruninghive.limit.optimize.enable=true
: Optimize LIMIT n statementshive.limit.row.max.size=1000000
hive.limit.optimize.limit.file=10
: Maximum number of files
(3) Local Mode (Small Tasks)#
Enable local mode hive> set hive.exec.mode.local.auto=true
-
The input data size of the job must be less than the parameter:
hive.exec.mode.local.auto.inputbytes.max (default 128MB)
-
The number of maps in the job must be less than the parameter:
hive.exec.mode.local.auto.tasks.max (default 4)
-
The number of reduces in the job must be 0 or 1.
(4) Concurrent Execution#
Enable parallel computation hive> set hive.exec.parallel=true
Related parameter hive.exec.parallel.thread.number
: The number of jobs allowed to run concurrently in a single SQL computation.
(5) Strict Mode#
Mainly to prevent a group of SQL queries from greatly increasing the pressure on the cluster.
Enable strict mode: hive> set hive.mapred.mode = strict
Some Restrictions:
- For partitioned tables, a WHERE clause must be added for filtering partition fields.
- The ORDER BY statement must include a limit output restriction.
- Limit execution of Cartesian product queries.
(6) Speculative Execution#
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
hive.mapred.reduce.tasks.speculative.execution=true;
(7) Grouping#
-
Two aggregate functions cannot have different DISTINCT columns; the following expression is incorrect:
INSERT OVERWRITE TABLE pv_gender_agg SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip) FROM pv_users GROUP BY pv_users.gender;
-
The SELECT statement can only have columns from GROUP BY or aggregate functions.
-
hive.multigroupby.singlemar=true
: When multiple GROUP BY statements have the same grouping columns, they will be optimized into one MR task.
(8) Aggregation#
Enable map aggregation hive> set hive.map.aggr=true
.
Related Parameters
-
hive.groupby.mapaggr.checkinterval
: The number of rows processed during aggregation on the map side (default: 100000). -
hive.map.aggr.hash.min.reduction
: The minimum ratio for aggregation (if the amount of data after aggregation / 100000 is greater than this configuration of 0.5, aggregation will not occur). -
hive.map.aggr.hash.percentmemory
: The maximum amount of memory used for aggregation on the map side. -
hive.map.aggr.hash.force.flush.memory.threshold
: The maximum usable content of the hash table during aggregation on the map side; exceeding this value will trigger a flush. -
hive.groupby.skewindata
: Whether to optimize data skew produced by Group By, default is false.
(9) Merging Small Files#
hive.merge.mapfiles=true
: Merge map output.hive.merge.mapredfiles=false
: Merge reduce output.hive.merge.size.per.task=256*1000*1000
: Size of files to merge.hive.mergejob.maponly=true
: If CombineHiveInputFormat is supported, generate a task that only executes the merge on the Map side.hive.merge.smallfiles.avgsize=16000000
: When the average size of files is less than this value, an MR task will be triggered to execute the merge.
(10) Custom Map/Reduce Numbers#
Parameters related to the number of Maps
-
mapred.max.split.size
: The maximum value of a split, i.e., the maximum value of each map processing file. -
mapred.min.split.size.per.node
: The minimum value of a split on a node. -
mapred.min.split.size.per.rack
: The minimum value of a split on a rack.
Parameters related to the number of Reduces
-
mapred.reduce.tasks
: Force the specification of the number of reduce tasks. -
hive.exec.reducers.bytes.per.reducer
: The amount of data processed by each reduce task. -
hive.exec.reducers.max
: The maximum number of reduces per task [Map count >= Reduce count].
(11) Using Indexes:#
hive.optimize.index.filter
: Automatically use indexes.hive.optimize.index.groupby
: Use aggregate indexes to optimize GROUP BY operations.
Supported Storage Formats#
ORC and Parquet have outstanding overall performance and are widely used; they are recommended.
-
TextFile
: Stored as plain text files. This is the default file storage format for Hive. This storage method does not compress data, resulting in large disk overhead and high data parsing overhead. -
SequenceFile
: SequenceFile is a binary file provided by the Hadoop API that serializes data in the form of <key,value> into files. This binary file internally uses Hadoop's standard Writable interface for serialization and deserialization. It is compatible with MapFile in the Hadoop API. In Hive, SequenceFile inherits from Hadoop API's SequenceFile, but its key is empty, using value to store the actual value to avoid additional sorting operations during the map phase of MR. -
RCFile
: The RCFile file format is an open-source Hive file storage format from Facebook, which first divides the table into several row groups, storing the data in each row group by column, with each column's data stored separately. -
ORC Files
: ORC is an extension of RCFile to some extent and is an optimization of RCFile. -
Avro Files
: Avro is a data serialization system designed to support applications that exchange large amounts of data. Its main features include: support for binary serialization, allowing for convenient and fast processing of large amounts of data; friendly to dynamic languages, with mechanisms provided by Avro that allow dynamic languages to easily handle Avro data. -
Parquet
: Parquet is a columnar storage format aimed at analytical business, implemented based on the Dremel data model and algorithms. It achieves efficient compression by column and special encoding techniques, thereby reducing storage space while improving IO efficiency.
Common Operation Commands#
Common DDL Operations#
Reference: LanguageManual DDL
View Data List: show databases;
Use Database: USE database_name;
Create Database:
CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name -- DATABASE|SCHEMA are equivalent
[COMMENT database_comment] -- Database comment
[LOCATION hdfs_path] -- Location stored on HDFS
[WITH DBPROPERTIES (property_name=property_value, ...)]; -- Specify additional properties
View Database Information:
DESC DATABASE [EXTENDED] db_name; -- EXTENDED indicates whether to display additional properties
Delete Database:
-- The default behavior is RESTRICT; if there are tables in the database, deletion will fail.
-- To delete the database and its tables, use CASCADE for cascading deletion.
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT | CASCADE];
Create Table#
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name -- Table name
[(col_name data_type [COMMENT col_comment],
... [constraint_specification])] -- Column name, column data type
[COMMENT table_comment] -- Table description
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] -- Partition rules for partitioned tables
[
CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS
] -- Bucketing rules for bucketed tables
[SKEWED BY (col_name, col_name, ...) ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)
[STORED AS DIRECTORIES]
] -- Specify skewed columns and values
[
[ROW FORMAT row_format]
[STORED AS file_format]
| STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)]
] -- Specify row delimiters, storage file formats, or use custom storage formats
[LOCATION hdfs_path] -- Specify the storage location of the table
[TBLPROPERTIES (property_name=property_value, ...)] -- Specify table properties
[AS select_statement]; -- Create table from query results
Support for Creating Tables from Query Results:
CREATE TABLE emp_copy AS SELECT * FROM emp WHERE deptno='20';
Modify Table#
Rename Table:
ALTER TABLE table_name RENAME TO new_table_name;
Modify Column:
ALTER TABLE table_name [PARTITION partition_spec]
CHANGE [COLUMN] col_old_name col_new_name column_type
[COMMENT col_comment] [FIRST | AFTER column_name] [CASCADE | RESTRICT];
-- Example
-- Modify field name and type
ALTER TABLE emp_temp CHANGE empno empno_new INT;
-- Modify the name of the field sal and place it after the empno field
ALTER TABLE emp_temp CHANGE sal sal_new decimal(7,2) AFTER ename;
-- Add a comment to the field
ALTER TABLE emp_temp CHANGE mgr mgr_new INT COMMENT 'this is column mgr';
Add Column:
ALTER TABLE emp_temp ADD COLUMNS (address STRING COMMENT 'home address');
Clear Table/Delete Table#
Clear Table:
-- Clear all data from the table or specified partition of the table
TRUNCATE TABLE table_name [PARTITION (partition_column = partition_col_value, ...)];
Currently, only internal tables can perform the TRUNCATE
operation; external tables will throw an exception Cannot truncate non-managed table XXXX
.
Delete Table:
DROP TABLE [IF EXISTS] table_name [PURGE];
- Internal tables: Will delete both the table's metadata and the data on HDFS.
- External tables: Will only delete the table's metadata and not the data on HDFS.
- When deleting tables referenced by views, no warning will be given (but the view is already invalid and must be deleted or recreated by the user).
Others#
View List of Views:
SHOW VIEWS [IN/FROM database_name] [LIKE 'pattern_with_wildcards']; -- Supported only in Hive 2.2.0 +
View List of Table Partitions:
SHOW PARTITIONS table_name;
View Create Statement of Table/View:
SHOW CREATE TABLE ([db_name.]table_name|view_name);
Common DML Operations#
Similar to relational databases, refer to: LanguageManual DML
Sorting Keywords#
-
sort by
: Not a global sort; it completes sorting before data enters the reducer. -
order by
: Will perform a global sort on the input, thus only one reducer (multiple reducers cannot guarantee global order). Only one reducer will lead to longer computation time when the input scale is large. -
distribute by
: Divides the data according to specified fields and outputs to different reduces. -
cluster by
: When the fields of distribute by and sort by are the same, it is equivalent to cluster by. It can be seen as a special distribute + sort.
Append Data Import Methods in Hive#
-
Import from local:
load data local inpath ‘/home/1.txt’ (overwrite)into table student;
-
Import from HDFS:
load data inpath ‘/user/hive/warehouse/1.txt’ (overwrite)into table student;
-
Query import:
create table student1 as select * from student; (can also query specific data)
-
Query result import:
insert (overwrite)into table staff select * from track_log;
Data Export Methods in Hive#
-
Use insert overwrite export method
- Export to local:
insert overwrite local directory ‘/home/robot/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
- Export to HDFS
insert overwrite directory ‘/user/hive/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
-
Bash shell overwrite append export
$ bin/hive -e “select * from staff;” > /home/z/backup.log
- Sqoop to export Hive data to external sources.