云上的對象存儲,具有低成本、高可靠、易運維等諸多優(yōu)點。將其對接到 PostgreSQL 后,可用于自動歸檔歷史數(shù)據(jù),還可用于數(shù)據(jù)分析。
介紹
pg_parquet
是一個 PostgreSQL 擴展,允許您通過COPY TO/FROM
命令,從 PostgreSQL 讀取和寫入位于S3
或文件系統(tǒng)中的 Parquet 文件。它依賴于 Apache Arrow 項目來讀寫 Parquet 文件,依賴于 pgrx 項目來擴展 PostgreSQL 的COPY
命令。
-- Copy a query result into Parquet in S3
COPY (SELECT * FROM table) TO 's3://mybucket/data.parquet' WITH (format 'parquet');
-- Load data from Parquet in S3
COPY table FROM 's3://mybucket/data.parquet' WITH (format 'parquet');
從源代碼安裝
在安裝 PostgreSQL(14 或更高版本)后,您需要設(shè)置rustup
,cargo-pgrx
以構(gòu)建擴展。
# install rustup
> curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# install cargo-pgrx
> cargo install cargo-pgrx
# configure pgrx
> cargo pgrx init --pg17 $(which pg_config)
# append the extension to shared_preload_libraries in ~/.pgrx/data-17/postgresql.conf
> echo "shared_preload_libraries = 'pg_parquet'" >> ~/.pgrx/data-17/postgresql.conf
# run cargo-pgrx to build and install the extension
> cargo pgrx run
# create the extension in the database
psql> "CREATE EXTENSION pg_parquet;"
用法
您可以使用pg_parquet
主要做 3 件事:
1. 您可以將 PostgreSQL 表/查詢結(jié)果導出到 Parquet 文件,
2. 您可以將數(shù)據(jù)從 Parquet 文件導入到 PostgreSQL 表。
3. 您可以檢查 Parquet 文件的模式和元數(shù)據(jù)。
在表和 Parquet 文件之間進行導入/導出
您可以使用 PostgreSQL 的COPY
命令,來讀取和寫入 Parquet 文件。以下示例說明了,如何將具有復雜類型的 PostgreSQL 表寫入 Parquet 文件,然后將 Parquet 文件內(nèi)容讀回同一表中。
-- create composite types
CREATE TYPE product_item AS (id INT, name TEXT, price float4);
CREATE TYPE product AS (id INT, name TEXT, items product_item[]);
-- create a table with complex types
CREATE TABLE product_example (
id int,
product product,
products product[],
created_at TIMESTAMP,
updated_at TIMESTAMPTZ
);
-- insert some rows into the table
insert into product_example values (
1,
ROW(1, 'product 1', ARRAY[ROW(1, 'item 1', 1.0), ROW(2, 'item 2', 2.0), NULL]::product_item[])::product,
ARRAY[ROW(1, NULL, NULL)::product, NULL],
now(),
'2022-05-01 12:00:00-04'
);
-- copy the table to a parquet file
COPY product_example TO '/tmp/product_example.parquet' (format 'parquet', compression 'gzip');
-- show table
SELECT * FROM product_example;
-- copy the parquet file to the table
COPY product_example FROM '/tmp/product_example.parquet';
-- show table
SELECT * FROM product_example;
檢查 Parquet 模式
您可以調(diào)用SELECT * FROM parquet.schema(<uri>)
,以發(fā)現(xiàn)給定 uri 處的 Parquet 文件的模式。
SELECT * FROM parquet.schema('/tmp/product_example.parquet') LIMIT 10;
uri | name | type_name | type_length | repetition_type | num_children | converted_type | scale | precision | field_id | logical_type
------------------------------+--------------+------------+-------------+-----------------+--------------+----------------+-------+-----------+----------+--------------
/tmp/product_example.parquet | arrow_schema | | | | 5 | | | | |
/tmp/product_example.parquet | id | INT32 | | OPTIONAL | | | | | 0 |
/tmp/product_example.parquet | product | | | OPTIONAL | 3 | | | | 1 |
/tmp/product_example.parquet | id | INT32 | | OPTIONAL | | | | | 2 |
/tmp/product_example.parquet | name | BYTE_ARRAY | | OPTIONAL | | UTF8 | | | 3 | STRING
/tmp/product_example.parquet | items | | | OPTIONAL | 1 | LIST | | | 4 | LIST
/tmp/product_example.parquet | list | | | REPEATED | 1 | | | | |
/tmp/product_example.parquet | items | | | OPTIONAL | 3 | | | | 5 |
/tmp/product_example.parquet | id | INT32 | | OPTIONAL | | | | | 6 |
/tmp/product_example.parquet | name | BYTE_ARRAY | | OPTIONAL | | UTF8 | | | 7 | STRING
(10 rows)
檢查 Parquet 元數(shù)據(jù)
您可以調(diào)用SELECT * FROM parquet.metadata(<uri>)
,以在給定的 uri 中發(fā)現(xiàn) Parquet 文件的詳細元數(shù)據(jù),例如列統(tǒng)計信息。
SELECT uri, row_group_id, row_group_num_rows, row_group_num_columns, row_group_bytes, column_id, file_offset, num_values, path_in_schema, type_name FROM parquet.metadata('/tmp/product_example.parquet') LIMIT 1;
uri | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type_name
------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-----------
/tmp/product_example.parquet | 0 | 1 | 13 | 842 | 0 | 0 | 1 | id | INT32
(1 row)
SELECT stats_null_count, stats_distinct_count, stats_min, stats_max, compression, encodings, index_page_offset, dictionary_page_offset, data_page_offset, total_compressed_size, total_uncompressed_size FROM parquet.metadata('/tmp/product_example.parquet') LIMIT 1;
stats_null_count | stats_distinct_count | stats_min | stats_max | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size
------------------+----------------------+-----------+-----------+--------------------+--------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------
0 | | 1 | 1 | GZIP(GzipLevel(6)) | PLAIN,RLE,RLE_DICTIONARY | | 4 | 42 | 101 | 61
(1 row)
您可以調(diào)用SELECT * FROM parquet.file_metadata(<uri>)
,以在給定 uri 處發(fā)現(xiàn) Parquet 文件的文件級元數(shù)據(jù),例如格式版本。
SELECT * FROM parquet.file_metadata('/tmp/product_example.parquet')
uri | created_by | num_rows | num_row_groups | format_version
------------------------------+------------+----------+----------------+----------------
/tmp/product_example.parquet | pg_parquet | 1 | 1 | 1
(1 row)
您可以調(diào)用SELECT * FROM parquet.kv_metadata(<uri>)
,以查詢給定 uri 處的 Parquet 文件的自定義鍵值元數(shù)據(jù)。
SELECT uri, encode(key, 'escape') as key, encode(value, 'escape') as value FROM parquet.kv_metadata('/tmp/product_example.parquet');
uri | key | value
------------------------------+--------------+---------------------
/tmp/product_example.parquet | ARROW:schema | /////5gIAAAQAAAA ...
(1 row)
對象存儲支持
pg_parquet
支持從S3
對象存儲讀取和寫入 Parquet 文件。僅支持帶s3://
的 URI 格式。
配置對象存儲的最簡單方法是,創(chuàng)建標準的~/.aws/credentials
和~/.aws/config
文件:
$ cat ~/.aws/credentials
[default]
aws_access_key_id = AKIAIOSFODNN7EXAMPLE
aws_secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
$ cat ~/.aws/config
[default]
region = eu-central-1
或者,您可以在啟動 PostgreSQL 時,使用以下環(huán)境變量來配置 S3 客戶端:
? AWS_ACCESS_KEY_ID
:AWS 賬戶的訪問密鑰 ID
? AWS_SECRET_ACCESS_KEY
:AWS 賬戶的保密訪問密鑰
? AWS_REGION
:AWS 賬戶的默認區(qū)域
? AWS_SHARED_CREDENTIALS_FILE
:憑證文件的替代位置
? AWS_CONFIG_FILE
:配置文件的替代位置
? AWS_PROFILE
:憑證和配置文件中的概要文件名稱(默認概要文件名稱為default
)
注意:為了能夠?qū)懭雽ο蟠鎯ξ恢茫枰虍斍?postgres 用戶授予parquet_object_store_write
角色。同樣,要從對象存儲位置讀取數(shù)據(jù),您需要向當前 postgres 用戶授予parquet_object_store_read
角色。
COPY 選項
pg_parquet
在COPY TO
命令中支持以下選項:
? format parquet
:您需要指定此選項,來讀取或?qū)懭氩灰?code style="-webkit-tap-highlight-color: transparent; margin: 0px; padding: 3px 5px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; line-height: 1.75; font-size: 14.4px; color: rgb(221, 17, 68); background: rgba(27, 31, 35, 0.05); border-radius: 4px;">.parquet[.<compression>]擴展名結(jié)尾的 Parquet 文件。(這是COPY FROM
命令支持的唯一選項。)
? row_group_size <int>
:寫入 Parquet 文件時每個行組中的行數(shù)。默認行組的行數(shù)為122880
,
? row_group_size_bytes <int>
:寫入 Parquet 文件時每個行組中行的總字節(jié)數(shù)。默認行組的字節(jié)數(shù)為row_group_size * 1024
,
? compression <string>
:寫入 Parquet 文件時要使用的壓縮格式。支持的壓縮格式包括uncompressed
、snappy
、gzip
、brotli
、lz4
、lz4raw
和zstd
。默認壓縮格式為snappy
。如果未指定,則壓縮格式由文件擴展名決定。
? compression_level <int>
:寫入 Parquet 文件時要使用的壓縮級別。支持的壓縮級別,僅支持用于gzip
、zstd
和brotli
壓縮格式。默認壓縮級別,對于gzip (0-10)
為6
、對于zstd (1-22)
為1
,對于brotli (0-11)
為1
。
配置
目前只有一個 GUC 參數(shù),可用于啟用/禁用pg_parquet
:
支持的類型
pg_parquet
具有豐富的類型支持,包括 PostgreSQL 的基礎(chǔ)類型、數(shù)組和復合類型。下表列出了 PostgreSQL 中支持的類型,及其相應的 Parquet 類型。
PostgreSQL 類型 | Parquet 物理類型 | 邏輯類型 |
bool | BOOLEAN |
|
smallint | INT16 |
|
integer | INT32 |
|
bigint | INT64 |
|
real | FLOAT |
|
oid | INT32 |
|
double | DOUBLE |
|
numeric [1] | FIXED_LEN_BYTE_ARRAY(16) | DECIMAL(128) |
text | BYTE_ARRAY | STRING |
json | BYTE_ARRAY | STRING |
bytea | BYTE_ARRAY |
|
date [2] | INT32 | DATE |
timestamp | INT64 | TIMESTAMP_MICROS |
timestamptz [3] | INT64 | TIMESTAMP_MICROS |
time | INT64 | TIME_MICROS |
timetz [3] | INT64 | TIME_MICROS |
geometry [4] | BYTE_ARRAY |
|
composite | GROUP | STRUCT |
array | 元素的物理類型 | LIST |
警告:
(1) 精度 <= 38
的numeric
類型表示為FIXED_LEN_BYTE_ARRAY(16)
,邏輯類型為DECIMAL(128)
。精度 > 38
的numeric
類型表示為BYTE_ARRAY
,邏輯類型為STRING
。
(2) date
類型在寫入 Parquet 文件時,根據(jù)Unix epoch
來表示。從 Parquet 文件讀取時,根據(jù)PostgreSQL epoch
轉(zhuǎn)換回來。
(3) 在寫入 Parquet 文件時,timestamptz
和timetz
類型會調(diào)整成UTC
形式。從 Parquet 文件讀取時,它們會按UTC
時區(qū)轉(zhuǎn)換回來。
(4) 在設(shè)置了postgis
擴展的情況下,geometry
類型表示為BYTE_ARRAY
,編碼為WKB
。否則,它將表示為BYTE_ARRAY
,邏輯類型為STRING
。
(5) 作為一種回退機制,任何沒有相應 Parquet 類型的類型,都將表示為BYTE_ARRAY
,邏輯類型為STRING
。例如enum
。
該文章在 2024/11/12 16:49:17 編輯過