Yige

Yige

Build

Hiveの概要

Hive の概要#

Hive の基本アーキテクチャ#

image.png

  • ユーザーインターフェース: CLI、JDBC/ODBC、Web UI 層

  • Thriftサーバー: Hive を操作するための多言語プログラムをサポート

  • ドライバー: ドライバー、コンパイラー、オプティマイザー、エグゼキューター

    Hive のコアはドライバーエンジンであり、ドライバーエンジンは 4 つの部分で構成されています:

    1. インタープリター: HiveSQL 文を抽象構文木(AST)に変換
    2. コンパイラー:抽象構文木を論理実行計画に変換
    3. オプティマイザー:論理実行計画を最適化
    4. エグゼキューター:低レベルの実行フレームワークを呼び出して論理計画を実行
  • メタデータストレージシステム: Hive のメタデータには通常、テーブル名、テーブルの列とパーティションおよびその属性、テーブルの属性(内部テーブルと外部テーブル)、テーブルのデータが格納されているディレクトリが含まれます。

    Metastore はデフォルトで自動的にDerbyデータベースに存在しますが、欠点は多ユーザー操作には適しておらず、データストレージディレクトリが固定されていないことです。データベースは Hive に従い、管理が非常に不便です。
    解決策: 通常は自分で作成した MySQL データベース(ローカルまたはリモート)を使用し、Hive と MySQL は MetaStore サービスを介して相互作用します。

Hive のテーブル#

Hive のテーブルは HDFS 上の指定されたディレクトリに対応し、データをクエリする際にはデフォルトで全テーブルをスキャンします。これにより、時間とパフォーマンスの消耗が非常に大きくなります。

パーティションテーブルとバケットテーブル#

パーティション#

データテーブルの特定の列または列に基づいて複数の区分に分け、パーティションは HDFS 上のテーブルディレクトリのサブディレクトリです。データはパーティションに基づいてサブディレクトリに格納されます。クエリの where 句にパーティション条件が含まれている場合、全テーブルディレクトリをスキャンするのではなく、そのパーティションから直接検索します。合理的なパーティション設計は、クエリ速度とパフォーマンスを大幅に向上させることができます。

パーティションテーブルの作成
Hive ではPARTITIONED BY句を使用してパーティションテーブルを作成できます。

CREATE EXTERNAL TABLE emp_partition(
    empno INT,
    ename STRING,
    job STRING,
    mgr INT,
    hiredate TIMESTAMP,
    sal DECIMAL(7,2),
    comm DECIMAL(7,2)
    )
    PARTITIONED BY (deptno INT)   -- 部門番号に基づいてパーティションを作成
    ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
    LOCATION '/hive/emp_partition';

パーティションテーブルへのデータのロード
パーティションテーブルにデータをロードする際は、データが存在するパーティションを指定する必要があります。

# 部門番号20のデータをテーブルにロード
LOAD DATA LOCAL INPATH "/usr/file/emp20.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=20)
# 部門番号30のデータをテーブルにロード
LOAD DATA LOCAL INPATH "/usr/file/emp30.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=30)

バケット#

すべてのデータセットが合理的なパーティションを形成できるわけではなく、パーティションの数が多いほど良いわけでもありません。過剰なパーティション条件は、多くのパーティションにデータが存在しない可能性があります。バケットはパーティションに対してより細かい粒度の分割を行い、全データ内容を特定の列属性値のハッシュ値に基づいて区別します。

バケットテーブルの作成
Hive では、CLUSTERED BYを使用してバケット列を指定し、SORTED BYを使用してバケット内のデータのソート基準列を指定できます。

CREATE EXTERNAL TABLE emp_bucket(
    empno INT,
    ename STRING,
    job STRING,
    mgr INT,
    hiredate TIMESTAMP,
    sal DECIMAL(7,2),
    comm DECIMAL(7,2),
    deptno INT)
    CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS  -- 従業員番号に基づいて4つのバケットにハッシュ化
    ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
    LOCATION '/hive/emp_bucket';

バケットテーブルへのデータのロード

--1. バケットを強制する設定、Hive 2.xではこのステップは不要
set hive.enforce.bucketing = true;

--2. データをインポート
INSERT INTO TABLE emp_bucket SELECT *  FROM emp;  -- ここでのempテーブルは通常の従業員テーブルです

内部テーブルと外部テーブル#

違い

  • テーブル作成時:内部テーブルを作成すると、データはデータウェアハウスが指すパスに移動します。外部テーブルを作成する場合は、データの存在するパスを記録するだけで、データの位置は変更されません。
  • テーブル削除時:テーブルを削除すると、内部テーブルのメタデータとデータは一緒に削除されますが、外部テーブルはメタデータのみを削除し、データは削除しません。このため、外部テーブルは相対的に安全であり、データの組織も柔軟で、ソースデータの共有が容易です。
  • テーブル作成時に外部テーブルにはexternalキーワードを追加します。

使用の選択

  • データのすべての処理が Hive 内で行われる場合、内部テーブルを選択する傾向があります。
  • 外部テーブルの使用シーンは主にデータソースを共有する場合であり、外部テーブルを使用して HDFS 上に保存された初期データにアクセスし、その後 Hive でデータを変換して内部テーブルに保存します。

カスタム UDF#

参考リンク: Hive UDF

UDF(ユーザー定義関数)は、主に以下を含みます:

  • UDF(ユーザー定義関数): 一つの入力、一つの出力、与えられたパラメータに基づいて処理されたデータを出力します。
  • UDAF(ユーザー定義集約関数): 複数の入力、一つの出力、集約関数に属し、count、sum などの関数に似ています。
  • UDTF(ユーザー定義テーブル関数): 一つの入力、複数の出力、一つのパラメータに属し、結果としてリストを返します。

最適化#

(1) データの偏り#

原因

  • キーの分布が不均一
  • ビジネスデータ自体の特性
  • SQL 文によるデータの偏り

解決方法

  • hive 設定hive.map.aggr=truehive.groupby.skewindata=true

  • データの偏りがある場合は負荷分散を行い、hive.groupby.skewindata=trueを設定すると、生成されるクエリ計画には 2 つの MR ジョブが含まれます。最初の MR ジョブでは、Map の出力結果集合がランダムに Reduce に分配され、各 Reduce が部分的な集約操作を行い、結果を出力します。このように処理された結果は、同じ Group By Key が異なる Reduce に分配される可能性があるため、負荷分散の目的を達成します。2 つ目の MR ジョブは、事前処理されたデータ結果に基づいて Group By Key に従って Reduce に分配され(このプロセスでは、同じ Group By Key が同じ Reduce に分配されることが保証されます)、最終的な集約操作を完了します。

  • SQL 文の調整:

    1. 最も均等に分布したテーブルをドライバーテーブルとして選択。列の裁断とフィルター操作を行い、2 つのテーブルを結合する際にデータ量が相対的に小さくなる効果を得ます。
    2. 小さなテーブルと大きなテーブルの結合: 小さな次元テーブル(1000 行未満のレコード数)をメモリに先に読み込み、Map 側で Reduce を完了させるために map join を使用します。
    3. 大きなテーブルと大きなテーブルの結合: 空のキーを文字列とランダムな数値に変換し、偏ったデータを異なる Reduce に分配します。null 値は関連付けられないため、処理後も最終結果には影響しません。
    4. count distinctの大量の同じ特殊値: count distinct 時に、値が空のケースを個別に処理します。count distinct を計算する場合は処理する必要はなく、直接フィルタリングし、後の結果に 1 を加えます。他の計算が必要な場合は、group by を行う前に値が空のレコードを個別に処理し、他の計算結果と union します。

(2) 一般設定#

  • hive.optimize.cp=true: 列裁断
  • hive.optimize.prunner: パーティション裁断
  • hive.limit.optimize.enable=true: LIMIT n 文の最適化
  • hive.limit.row.max.size=1000000:
  • hive.limit.optimize.limit.file=10:最大ファイル数

(3) ローカルモード(小タスク)#

ローカルモードを有効にする hive> set hive.exec.mode.local.auto=true

  • ジョブの入力データサイズはパラメータhive.exec.mode.local.auto.inputbytes.max(デフォルト128MB)未満でなければなりません。

  • ジョブの map 数はパラメータhive.exec.mode.local.auto.tasks.max(デフォルト4)未満でなければなりません。

  • ジョブの reduce 数は 0 または 1 でなければなりません。

(4) 同時実行#

並列計算を有効にする hive> set hive.exec.parallel=true
関連パラメータ hive.exec.parallel.thread.number: 一度の SQL 計算で許可されるジョブの数

(5) 厳格モード#

主に一群の SQL クエリがクラスタの負荷を大幅に増加させるのを防ぐためです。

厳格モードを有効にする: hive> set hive.mapred.mode = strict

いくつかの制限:

  • パーティションテーブルの場合、パーティションフィールドに対する where 条件フィルタを追加する必要があります。
  • orderby 文には limit 出力制限を含める必要があります。
  • デカルト積クエリの実行を制限します。

(6) 推測実行#

mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
hive.mapred.reduce.tasks.speculative.execution=true;

(7) グループ化#

  • 2 つの集約関数は異なる DISTINCT 列を持つことができず、以下の式は誤りです:

    INSERT OVERWRITE TABLE pv_gender_agg SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip) FROM pv_users GROUP BY pv_users.gender;
    
  • SELECT 文には GROUP BY の列または集約関数のみを含めることができます。

  • hive.multigroupby.singlemar=true: 複数の GROUP BY 文が同じグループ列を持つ場合、1 つの MR タスクに最適化されます。

(8) 集約#

map 集約を有効にする hive> set hive.map.aggr=true

関連パラメータ

  • hive.groupby.mapaggr.checkinterval: map 側で group by を実行する際に処理する行数(デフォルト:100000)

  • hive.map.aggr.hash.min.reduction: 集約の最小比率(100000 行のデータを事前に集約し、集約後のデータ量 /100000 の値がこの設定 0.5 を超える場合、集約は行われません)

  • hive.map.aggr.hash.percentmemory: map 側集約に使用されるメモリの最大値

  • hive.map.aggr.hash.force.flush.memory.threshold: map 側で集約操作を行う際のハッシュテーブルの最大可用内容。この値を超えると flush がトリガーされます。

  • hive.groupby.skewindata: GroupBy によって生成されたデータの偏りを最適化するかどうか、デフォルトは false です。

(9) 小ファイルの統合#

  • hive.merg.mapfiles=true: map 出力を統合
  • hive.merge.mapredfiles=false: reduce 出力を統合
  • hive.merge.size.per.task=256*1000*1000: 統合ファイルのサイズ
  • hive.mergejob.maponly=true: CombineHiveInputFormat をサポートする場合、Map のみのタスクを生成して merge を実行します。
  • hive.merge.smallfiles.avgsize=16000000: ファイルの平均サイズがこの値未満の場合、MR タスクが merge を実行します。

(10) カスタム map/reduce 数#

Map 数に関連するパラメータ

  • mapred.max.split.size: 一つの split の最大値、すなわち各 map が処理するファイルの最大値

  • mapred.min.split.size.per.node: 一つのノード上の split の最小値

  • mapred.min.split.size.per.rack: 一つのラック上の split の最小値

Reduce 数に関連するパラメータ

  • mapred.reduce.tasks: reduce タスクの数を強制的に指定

  • hive.exec.reducers.bytes.per.reducer: 各 reduce タスクが処理するデータ量

  • hive.exec.reducers.max: 各タスクの最大 reduce 数 [Map 数>= Reduce 数]

(11) インデックスの使用:#

  • hive.optimize.index.filter: インデックスを自動的に使用
  • hive.optimize.index.groupby: 集約インデックスを使用して GROUP BY 操作を最適化

サポートされているストレージ形式#

ORC と Parquet は総合的なパフォーマンスが優れており、広く使用されているため、推奨されます。

  • TextFile: 純粋なテキストファイルとして保存されます。これは Hive のデフォルトのファイルストレージ形式です。このストレージ方式ではデータは圧縮されず、ディスクのオーバーヘッドが大きく、データ解析のオーバーヘッドも大きくなります。

  • SequenceFile: SequenceFile は Hadoop API が提供するバイナリファイルで、データを <key,value> の形式でファイルにシリアライズします。このバイナリファイルは内部で Hadoop の標準 Writable インターフェースを使用してシリアライズとデシリアライズを実現しています。Hadoop API の MapFile と互換性があります。Hive の SequenceFile は Hadoop API の SequenceFile を継承していますが、キーは空で、実際の値を格納するために value を使用します。これは MR が map 段階で追加のソート操作を行わないようにするためです。

  • RCFile: RCFile ファイル形式は Facebook がオープンソースにした Hive のファイルストレージ形式で、まずテーブルをいくつかの行グループに分け、各行グループ内のデータを列ごとに保存します。各列のデータは別々に保存されます。

  • ORC Files: ORC はある程度 RCFile を拡張したもので、RCFile の最適化です。

  • Avro Files: Avro はデータシリアライズシステムで、大量データ交換アプリケーションをサポートするために設計されています。主な特徴は、バイナリシリアライズ方式をサポートし、大量データを迅速に処理できることです。動的言語に優しく、Avro が提供するメカニズムにより、動的言語は Avro データを簡単に処理できます。

  • Parquet: Parquet は Dremel に基づくデータモデルとアルゴリズムを実装した、分析ビジネス向けの列指向ストレージ形式です。列ごとに効率的に圧縮し、特殊なエンコーディング技術を使用することで、ストレージスペースを削減しつつ IO 効率を向上させます。

よく使う操作コマンド#

よく使う DDL 操作#

参考: LanguageManual DDL

データベースのリストを表示: show databases;

データベースを使用: USE database_name;

新しいデータベースを作成:

CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name   --DATABASE|SCHEMAは同等です
  [COMMENT database_comment] --データベースのコメント
  [LOCATION hdfs_path] --HDFS上の保存場所
  [WITH DBPROPERTIES (property_name=property_value, ...)]; --追加の属性を指定

データベース情報を表示:
DESC DATABASE [EXTENDED] db_name; --EXTENDEDは追加属性を表示するかどうかを示します

データベースを削除:

-- デフォルトの動作はRESTRICTであり、データベースにテーブルが存在する場合は削除に失敗します。
-- データベースとその中のテーブルを削除するには、CASCADEを使用して削除できます。
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT | CASCADE];

テーブルの作成#

CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name     --テーブル名
  [(col_name data_type [COMMENT col_comment],
    ... [constraint_specification])]  --列名 列データ型
  [COMMENT table_comment]   --テーブルの説明
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]  --パーティションテーブルのパーティション規則
  [
    CLUSTERED BY (col_name, col_name, ...)
   [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS
  ]  --バケットテーブルのバケット規則
  [SKEWED BY (col_name, col_name, ...) ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)
   [STORED AS DIRECTORIES]
  ]  --傾斜列と値を指定
  [
   [ROW FORMAT row_format]
   [STORED AS file_format]
     | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)]
  ]  -- 行区切り文字、ストレージファイル形式、またはカスタムストレージ形式を指定
  [LOCATION hdfs_path]  -- テーブルの保存場所を指定
  [TBLPROPERTIES (property_name=property_value, ...)]  --テーブルの属性を指定
  [AS select_statement];   --クエリ結果からテーブルを作成

クエリ文の結果からテーブルを作成することをサポート:

CREATE TABLE emp_copy AS SELECT * FROM emp WHERE deptno='20';

テーブルの変更#

テーブル名の変更:

ALTER TABLE table_name RENAME TO new_table_name;

列の変更:

ALTER TABLE table_name [PARTITION partition_spec]
CHANGE [COLUMN] col_old_name col_new_name column_type
[COMMENT col_comment] [FIRST | AFTER column_name] [CASCADE | RESTRICT];

--例
-- フィールド名と型を変更
ALTER TABLE emp_temp CHANGE empno empno_new INT;

-- フィールドsalの名前を変更し、empnoフィールドの後に配置
ALTER TABLE emp_temp CHANGE sal sal_new decimal(7,2)  AFTER ename;

-- フィールドにコメントを追加
ALTER TABLE emp_temp CHANGE mgr mgr_new INT COMMENT 'これはmgr列です';

列の追加:

ALTER TABLE emp_temp ADD COLUMNS (address STRING COMMENT '自宅住所');

テーブルのクリア / 削除#

テーブルのクリア:

-- テーブル全体または指定されたパーティションのデータをクリア
TRUNCATE TABLE table_name [PARTITION (partition_column = partition_col_value,  ...)];

現在、TRUNCATE操作は内部テーブルのみで実行でき、外部テーブルで実行すると例外Cannot truncate non-managed table XXXXがスローされます。

テーブルの削除:

DROP TABLE [IF EXISTS] table_name [PURGE];
  • 内部テーブル:テーブルのメタデータだけでなく、HDFS 上のデータも削除されます。
  • 外部テーブル:テーブルのメタデータのみが削除され、HDFS 上のデータは削除されません。
  • ビューが参照するテーブルを削除する際には警告が表示されません(ただし、ビューは無効になり、ユーザーが削除または再作成する必要があります)。

その他#

ビューのリストを表示:

SHOW VIEWS [IN/FROM database_name] [LIKE 'pattern_with_wildcards'];   --Hive 2.2.0以降のみサポート

テーブルのパーティションリストを表示:

SHOW PARTITIONS table_name;

テーブル / ビューの作成文を表示:

SHOW CREATE TABLE ([db_name.]table_name|view_name);

よく使う DML 操作#

リレーショナルデータベースに似ており、具体的には参考: LanguageManual DML

ソートキーワード#

  • sort by: グローバルソートではなく、データが reducer に入る前にソートされます。

  • order by: 入力をグローバルにソートするため、1 つの reducer のみ(複数の reducer ではグローバルに順序を保証できません)。1 つの reducer のみであるため、入力規模が大きい場合、計算時間が長くなる可能性があります。

  • distribute by: 指定されたフィールドに基づいてデータを異なる reduce に分配します。

  • cluster by: distribute by と sort by のフィールドが同じ場合、cluster by と等しいです。特別な distribute + sort として見ることができます。

Hive でのデータの追加インポート方法#

  • ローカルからインポート: load data local inpath ‘/home/1.txt’ (overwrite)into table student;

  • HDFS からインポート: load data inpath ‘/user/hive/warehouse/1.txt’ (overwrite)into table student;

  • クエリインポート: create table student1 as select * from student;(特定のデータをクエリすることも可能)

  • クエリ結果のインポート: insert (overwrite)into table staff select * from track_log;

Hive でのデータのエクスポート方法#

  • insert overwrite エクスポート方式

    1. ローカルにエクスポート:
    insert overwrite local directory ‘/home/robot/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
    
    1. HDFS にエクスポート
    insert overwrite directory ‘/user/hive/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
    
  • Bash シェルでの上書き追加エクスポート

$ bin/hive -e “select * from staff; > /home/z/backup.log
  • sqoop を使用して hive データを外部にエクスポート

拡張#

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。