Hive の概要#
Hive の基本アーキテクチャ#
-
ユーザーインターフェース
: CLI、JDBC/ODBC、Web UI 層 -
Thriftサーバー
: Hive を操作するための多言語プログラムをサポート -
ドライバー
: ドライバー、コンパイラー、オプティマイザー、エグゼキューターHive のコアはドライバーエンジンであり、ドライバーエンジンは 4 つの部分で構成されています:
- インタープリター: HiveSQL 文を抽象構文木(AST)に変換
- コンパイラー:抽象構文木を論理実行計画に変換
- オプティマイザー:論理実行計画を最適化
- エグゼキューター:低レベルの実行フレームワークを呼び出して論理計画を実行
-
メタデータストレージシステム
: Hive のメタデータには通常、テーブル名、テーブルの列とパーティションおよびその属性、テーブルの属性(内部テーブルと外部テーブル)、テーブルのデータが格納されているディレクトリが含まれます。Metastore はデフォルトで自動的に
Derby
データベースに存在しますが、欠点は多ユーザー操作には適しておらず、データストレージディレクトリが固定されていないことです。データベースは Hive に従い、管理が非常に不便です。
解決策: 通常は自分で作成した MySQL データベース(ローカルまたはリモート)を使用し、Hive と MySQL は MetaStore サービスを介して相互作用します。
Hive のテーブル#
Hive のテーブルは HDFS 上の指定されたディレクトリに対応し、データをクエリする際にはデフォルトで全テーブルをスキャンします。これにより、時間とパフォーマンスの消耗が非常に大きくなります。
パーティションテーブルとバケットテーブル#
パーティション#
データテーブルの特定の列または列に基づいて複数の区分に分け、パーティションは HDFS 上のテーブルディレクトリのサブディレクトリです。データはパーティションに基づいてサブディレクトリに格納されます。クエリの where 句にパーティション条件が含まれている場合、全テーブルディレクトリをスキャンするのではなく、そのパーティションから直接検索します。合理的なパーティション設計は、クエリ速度とパフォーマンスを大幅に向上させることができます。
パーティションテーブルの作成
Hive ではPARTITIONED BY
句を使用してパーティションテーブルを作成できます。
CREATE EXTERNAL TABLE emp_partition(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 部門番号に基づいてパーティションを作成
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_partition';
パーティションテーブルへのデータのロード
パーティションテーブルにデータをロードする際は、データが存在するパーティションを指定する必要があります。
# 部門番号20のデータをテーブルにロード
LOAD DATA LOCAL INPATH "/usr/file/emp20.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=20)
# 部門番号30のデータをテーブルにロード
LOAD DATA LOCAL INPATH "/usr/file/emp30.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=30)
バケット#
すべてのデータセットが合理的なパーティションを形成できるわけではなく、パーティションの数が多いほど良いわけでもありません。過剰なパーティション条件は、多くのパーティションにデータが存在しない可能性があります。バケットはパーティションに対してより細かい粒度の分割を行い、全データ内容を特定の列属性値のハッシュ値に基づいて区別します。
バケットテーブルの作成
Hive では、CLUSTERED BY
を使用してバケット列を指定し、SORTED BY
を使用してバケット内のデータのソート基準列を指定できます。
CREATE EXTERNAL TABLE emp_bucket(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS -- 従業員番号に基づいて4つのバケットにハッシュ化
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_bucket';
バケットテーブルへのデータのロード
--1. バケットを強制する設定、Hive 2.xではこのステップは不要
set hive.enforce.bucketing = true;
--2. データをインポート
INSERT INTO TABLE emp_bucket SELECT * FROM emp; -- ここでのempテーブルは通常の従業員テーブルです
内部テーブルと外部テーブル#
違い
- テーブル作成時:内部テーブルを作成すると、データはデータウェアハウスが指すパスに移動します。外部テーブルを作成する場合は、データの存在するパスを記録するだけで、データの位置は変更されません。
- テーブル削除時:テーブルを削除すると、内部テーブルのメタデータとデータは一緒に削除されますが、外部テーブルはメタデータのみを削除し、データは削除しません。このため、外部テーブルは相対的に安全であり、データの組織も柔軟で、ソースデータの共有が容易です。
- テーブル作成時に外部テーブルには
external
キーワードを追加します。
使用の選択
- データのすべての処理が Hive 内で行われる場合、内部テーブルを選択する傾向があります。
- 外部テーブルの使用シーンは主にデータソースを共有する場合であり、外部テーブルを使用して HDFS 上に保存された初期データにアクセスし、その後 Hive でデータを変換して内部テーブルに保存します。
カスタム UDF#
参考リンク: Hive UDF
UDF(ユーザー定義関数)は、主に以下を含みます:
UDF(ユーザー定義関数)
: 一つの入力、一つの出力、与えられたパラメータに基づいて処理されたデータを出力します。UDAF(ユーザー定義集約関数)
: 複数の入力、一つの出力、集約関数に属し、count、sum などの関数に似ています。UDTF(ユーザー定義テーブル関数)
: 一つの入力、複数の出力、一つのパラメータに属し、結果としてリストを返します。
最適化#
(1) データの偏り#
原因
- キーの分布が不均一
- ビジネスデータ自体の特性
- SQL 文によるデータの偏り
解決方法
-
hive 設定
hive.map.aggr=true
とhive.groupby.skewindata=true
-
データの偏りがある場合は負荷分散を行い、
hive.groupby.skewindata=true
を設定すると、生成されるクエリ計画には 2 つの MR ジョブが含まれます。最初の MR ジョブでは、Map の出力結果集合がランダムに Reduce に分配され、各 Reduce が部分的な集約操作を行い、結果を出力します。このように処理された結果は、同じ Group By Key が異なる Reduce に分配される可能性があるため、負荷分散の目的を達成します。2 つ目の MR ジョブは、事前処理されたデータ結果に基づいて Group By Key に従って Reduce に分配され(このプロセスでは、同じ Group By Key が同じ Reduce に分配されることが保証されます)、最終的な集約操作を完了します。 -
SQL 文の調整:
最も均等に分布したテーブルをドライバーテーブルとして選択
。列の裁断とフィルター操作を行い、2 つのテーブルを結合する際にデータ量が相対的に小さくなる効果を得ます。小さなテーブルと大きなテーブルの結合
: 小さな次元テーブル(1000 行未満のレコード数)をメモリに先に読み込み、Map 側で Reduce を完了させるために map join を使用します。大きなテーブルと大きなテーブルの結合
: 空のキーを文字列とランダムな数値に変換し、偏ったデータを異なる Reduce に分配します。null 値は関連付けられないため、処理後も最終結果には影響しません。count distinctの大量の同じ特殊値
: count distinct 時に、値が空のケースを個別に処理します。count distinct を計算する場合は処理する必要はなく、直接フィルタリングし、後の結果に 1 を加えます。他の計算が必要な場合は、group by を行う前に値が空のレコードを個別に処理し、他の計算結果と union します。
(2) 一般設定#
hive.optimize.cp=true
: 列裁断hive.optimize.prunner
: パーティション裁断hive.limit.optimize.enable=true
: LIMIT n 文の最適化hive.limit.row.max.size=1000000:
hive.limit.optimize.limit.file=10:最大ファイル数
(3) ローカルモード(小タスク)#
ローカルモードを有効にする hive> set hive.exec.mode.local.auto=true
-
ジョブの入力データサイズはパラメータ
hive.exec.mode.local.auto.inputbytes.max(デフォルト128MB)
未満でなければなりません。 -
ジョブの map 数はパラメータ
hive.exec.mode.local.auto.tasks.max(デフォルト4)
未満でなければなりません。 -
ジョブの reduce 数は 0 または 1 でなければなりません。
(4) 同時実行#
並列計算を有効にする hive> set hive.exec.parallel=true
関連パラメータ hive.exec.parallel.thread.number
: 一度の SQL 計算で許可されるジョブの数
(5) 厳格モード#
主に一群の SQL クエリがクラスタの負荷を大幅に増加させるのを防ぐためです。
厳格モードを有効にする: hive> set hive.mapred.mode = strict
いくつかの制限:
- パーティションテーブルの場合、パーティションフィールドに対する where 条件フィルタを追加する必要があります。
- orderby 文には limit 出力制限を含める必要があります。
- デカルト積クエリの実行を制限します。
(6) 推測実行#
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
hive.mapred.reduce.tasks.speculative.execution=true;
(7) グループ化#
-
2 つの集約関数は異なる DISTINCT 列を持つことができず、以下の式は誤りです:
INSERT OVERWRITE TABLE pv_gender_agg SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip) FROM pv_users GROUP BY pv_users.gender;
-
SELECT 文には GROUP BY の列または集約関数のみを含めることができます。
-
hive.multigroupby.singlemar=true
: 複数の GROUP BY 文が同じグループ列を持つ場合、1 つの MR タスクに最適化されます。
(8) 集約#
map 集約を有効にする hive> set hive.map.aggr=true
関連パラメータ
-
hive.groupby.mapaggr.checkinterval
: map 側で group by を実行する際に処理する行数(デフォルト:100000) -
hive.map.aggr.hash.min.reduction
: 集約の最小比率(100000 行のデータを事前に集約し、集約後のデータ量 /100000 の値がこの設定 0.5 を超える場合、集約は行われません) -
hive.map.aggr.hash.percentmemory
: map 側集約に使用されるメモリの最大値 -
hive.map.aggr.hash.force.flush.memory.threshold
: map 側で集約操作を行う際のハッシュテーブルの最大可用内容。この値を超えると flush がトリガーされます。 -
hive.groupby.skewindata
: GroupBy によって生成されたデータの偏りを最適化するかどうか、デフォルトは false です。
(9) 小ファイルの統合#
hive.merg.mapfiles=true
: map 出力を統合hive.merge.mapredfiles=false
: reduce 出力を統合hive.merge.size.per.task=256*1000*1000
: 統合ファイルのサイズhive.mergejob.maponly=true
: CombineHiveInputFormat をサポートする場合、Map のみのタスクを生成して merge を実行します。hive.merge.smallfiles.avgsize=16000000
: ファイルの平均サイズがこの値未満の場合、MR タスクが merge を実行します。
(10) カスタム map/reduce 数#
Map 数に関連するパラメータ
-
mapred.max.split.size
: 一つの split の最大値、すなわち各 map が処理するファイルの最大値 -
mapred.min.split.size.per.node
: 一つのノード上の split の最小値 -
mapred.min.split.size.per.rack
: 一つのラック上の split の最小値
Reduce 数に関連するパラメータ
-
mapred.reduce.tasks
: reduce タスクの数を強制的に指定 -
hive.exec.reducers.bytes.per.reducer
: 各 reduce タスクが処理するデータ量 -
hive.exec.reducers.max
: 各タスクの最大 reduce 数 [Map 数>= Reduce 数]
(11) インデックスの使用:#
hive.optimize.index.filter
: インデックスを自動的に使用hive.optimize.index.groupby
: 集約インデックスを使用して GROUP BY 操作を最適化
サポートされているストレージ形式#
ORC と Parquet は総合的なパフォーマンスが優れており、広く使用されているため、推奨されます。
-
TextFile
: 純粋なテキストファイルとして保存されます。これは Hive のデフォルトのファイルストレージ形式です。このストレージ方式ではデータは圧縮されず、ディスクのオーバーヘッドが大きく、データ解析のオーバーヘッドも大きくなります。 -
SequenceFile
: SequenceFile は Hadoop API が提供するバイナリファイルで、データを <key,value> の形式でファイルにシリアライズします。このバイナリファイルは内部で Hadoop の標準 Writable インターフェースを使用してシリアライズとデシリアライズを実現しています。Hadoop API の MapFile と互換性があります。Hive の SequenceFile は Hadoop API の SequenceFile を継承していますが、キーは空で、実際の値を格納するために value を使用します。これは MR が map 段階で追加のソート操作を行わないようにするためです。 -
RCFile
: RCFile ファイル形式は Facebook がオープンソースにした Hive のファイルストレージ形式で、まずテーブルをいくつかの行グループに分け、各行グループ内のデータを列ごとに保存します。各列のデータは別々に保存されます。 -
ORC Files
: ORC はある程度 RCFile を拡張したもので、RCFile の最適化です。 -
Avro Files
: Avro はデータシリアライズシステムで、大量データ交換アプリケーションをサポートするために設計されています。主な特徴は、バイナリシリアライズ方式をサポートし、大量データを迅速に処理できることです。動的言語に優しく、Avro が提供するメカニズムにより、動的言語は Avro データを簡単に処理できます。 -
Parquet
: Parquet は Dremel に基づくデータモデルとアルゴリズムを実装した、分析ビジネス向けの列指向ストレージ形式です。列ごとに効率的に圧縮し、特殊なエンコーディング技術を使用することで、ストレージスペースを削減しつつ IO 効率を向上させます。
よく使う操作コマンド#
よく使う DDL 操作#
データベースのリストを表示: show databases;
データベースを使用: USE database_name;
新しいデータベースを作成:
CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name --DATABASE|SCHEMAは同等です
[COMMENT database_comment] --データベースのコメント
[LOCATION hdfs_path] --HDFS上の保存場所
[WITH DBPROPERTIES (property_name=property_value, ...)]; --追加の属性を指定
データベース情報を表示:
DESC DATABASE [EXTENDED] db_name; --EXTENDEDは追加属性を表示するかどうかを示します
データベースを削除:
-- デフォルトの動作はRESTRICTであり、データベースにテーブルが存在する場合は削除に失敗します。
-- データベースとその中のテーブルを削除するには、CASCADEを使用して削除できます。
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT | CASCADE];
テーブルの作成#
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name --テーブル名
[(col_name data_type [COMMENT col_comment],
... [constraint_specification])] --列名 列データ型
[COMMENT table_comment] --テーブルの説明
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] --パーティションテーブルのパーティション規則
[
CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS
] --バケットテーブルのバケット規則
[SKEWED BY (col_name, col_name, ...) ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)
[STORED AS DIRECTORIES]
] --傾斜列と値を指定
[
[ROW FORMAT row_format]
[STORED AS file_format]
| STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)]
] -- 行区切り文字、ストレージファイル形式、またはカスタムストレージ形式を指定
[LOCATION hdfs_path] -- テーブルの保存場所を指定
[TBLPROPERTIES (property_name=property_value, ...)] --テーブルの属性を指定
[AS select_statement]; --クエリ結果からテーブルを作成
クエリ文の結果からテーブルを作成することをサポート:
CREATE TABLE emp_copy AS SELECT * FROM emp WHERE deptno='20';
テーブルの変更#
テーブル名の変更:
ALTER TABLE table_name RENAME TO new_table_name;
列の変更:
ALTER TABLE table_name [PARTITION partition_spec]
CHANGE [COLUMN] col_old_name col_new_name column_type
[COMMENT col_comment] [FIRST | AFTER column_name] [CASCADE | RESTRICT];
--例
-- フィールド名と型を変更
ALTER TABLE emp_temp CHANGE empno empno_new INT;
-- フィールドsalの名前を変更し、empnoフィールドの後に配置
ALTER TABLE emp_temp CHANGE sal sal_new decimal(7,2) AFTER ename;
-- フィールドにコメントを追加
ALTER TABLE emp_temp CHANGE mgr mgr_new INT COMMENT 'これはmgr列です';
列の追加:
ALTER TABLE emp_temp ADD COLUMNS (address STRING COMMENT '自宅住所');
テーブルのクリア / 削除#
テーブルのクリア:
-- テーブル全体または指定されたパーティションのデータをクリア
TRUNCATE TABLE table_name [PARTITION (partition_column = partition_col_value, ...)];
現在、TRUNCATE
操作は内部テーブルのみで実行でき、外部テーブルで実行すると例外Cannot truncate non-managed table XXXX
がスローされます。
テーブルの削除:
DROP TABLE [IF EXISTS] table_name [PURGE];
- 内部テーブル:テーブルのメタデータだけでなく、HDFS 上のデータも削除されます。
- 外部テーブル:テーブルのメタデータのみが削除され、HDFS 上のデータは削除されません。
- ビューが参照するテーブルを削除する際には警告が表示されません(ただし、ビューは無効になり、ユーザーが削除または再作成する必要があります)。
その他#
ビューのリストを表示:
SHOW VIEWS [IN/FROM database_name] [LIKE 'pattern_with_wildcards']; --Hive 2.2.0以降のみサポート
テーブルのパーティションリストを表示:
SHOW PARTITIONS table_name;
テーブル / ビューの作成文を表示:
SHOW CREATE TABLE ([db_name.]table_name|view_name);
よく使う DML 操作#
リレーショナルデータベースに似ており、具体的には参考: LanguageManual DML
ソートキーワード#
-
sort by
: グローバルソートではなく、データが reducer に入る前にソートされます。 -
order by
: 入力をグローバルにソートするため、1 つの reducer のみ(複数の reducer ではグローバルに順序を保証できません)。1 つの reducer のみであるため、入力規模が大きい場合、計算時間が長くなる可能性があります。 -
distribute by
: 指定されたフィールドに基づいてデータを異なる reduce に分配します。 -
cluster by
: distribute by と sort by のフィールドが同じ場合、cluster by と等しいです。特別な distribute + sort として見ることができます。
Hive でのデータの追加インポート方法#
-
ローカルからインポート:
load data local inpath ‘/home/1.txt’ (overwrite)into table student;
-
HDFS からインポート:
load data inpath ‘/user/hive/warehouse/1.txt’ (overwrite)into table student;
-
クエリインポート:
create table student1 as select * from student;(特定のデータをクエリすることも可能)
-
クエリ結果のインポート:
insert (overwrite)into table staff select * from track_log;
Hive でのデータのエクスポート方法#
-
insert overwrite エクスポート方式
- ローカルにエクスポート:
insert overwrite local directory ‘/home/robot/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
- HDFS にエクスポート
insert overwrite directory ‘/user/hive/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
-
Bash シェルでの上書き追加エクスポート
$ bin/hive -e “select * from staff;” > /home/z/backup.log
- sqoop を使用して hive データを外部にエクスポート