Skip to main content

35 posts tagged with "databend"

View All Tags

Alt text

PRQL, pronounced as "Prequel", is a query language that stands alongside SQL. Its unique feature is the pipeline syntax, which makes querying relational databases more intuitive and efficient.

Databend Embraces PRQL

In the v1.2.380-nightly release, thanks to a significant PR by community contributor @ncuwaln, Databend has successfully integrated support for the PRQL language. This new feature further enhances the flexibility and user-friendliness of Databend queries.

Introduction

Hugging Face is currently the most popular AI community globally, fostering innovation and collaboration among data scientists and businesses in various aspects such as models, datasets, and applications. Hugging Face hosts a diverse range of datasets that serve not only as samples for learning and practice but also as foundational sources for enterprise model data.

In order to maximize the utility of these datasets, it is often necessary to cleanse the raw data within them and archive it in a data lake to provide a unified access point. Databend is designed with ETL/ELT-centric workflows in mind, allowing for immediate cleansing, transformation, and merging of data upon loading. It supports a wealth of structured and semi-structured data types, enabling direct querying and analysis of raw data in multiple formats. The enterprise edition further supports advanced features such as virtual columns and computed columns, ensuring that the data is always ready for use.

In this article, we will demonstrate how to effortlessly access datasets hosted on Hugging Face using Databend and perform simple yet efficient analysis and processing with SQL. Additionally, the examples in this post include a white-box model implemented in SQL, showcasing how to conduct category predictions within the data warehouse and validate the model's accuracy.

Gunnar Morling's challenge, initially posed to Java users, involved processing a dataset of 1 billion rows, focusing on temperature measurement values. This challenge, highlighted in his blog post, The One Billion Row Challenge, has intrigued many in the database community. While the original challenge was Java-centric, it sparked interest across various databases, examining their speed and efficiency. Efforts like the 1BRC in SQL with DuckDB and 1 billion rows challenge in PostgreSQL and ClickHouse demonstrate this widespread fascination.

In this post, let's shift the focus to cloud data warehouses, particularly Snowflake and Databend Cloud, exploring how they handle such extensive datasets. The tests are aimed at understanding how these platforms efficiently sort and group large-scale data, especially when stored in cloud-based object storage. The test involves two main tasks:

  • Directly sort and group 1 billion rows of data stored in object storage.
  • Load these rows into a cloud data warehouse, then perform sorting and grouping.

The dataset used for this test is in the Parquet format, known for its columnar storage efficiency. Hosted on AWS S3 and publicly accessible here (5.1 GB), it allows anyone to replicate these tests on Snowflake and Databend Cloud.

info

The tests use a MEDIUM(32vCPU) warehouse size in Snowflake and Databend Cloud to process a dataset stored in the AWS US East (Ohio) region. The cost is $8 per hour for Snowflake and $4 per hour for Databend Cloud.

Snowflake vs. Databend: A Brief Overview

Snowflake is a well-established cloud-based data warehouse that provides a SQL interface for querying data. It employs a columnar database that stores data in cloud-based object storage like AWS S3. Snowflake has gained popularity for its user-friendliness and scalability in data warehousing.

Databend Cloud represents the new wave of cloud data warehouses, designed to handle massive-scale analytics with reduced cost and complexity. It's built on Databend, the open-source alternative to Snowflake, built from scratch in Rust, offering similar capabilities.

Preparations: Generate Parquet File

The tests follow the steps similar to Robin Moffatt for generating the raw data and exporting it to a Parquet file.

Snowflake vs. Databend: Sort & Group Data in Object Storage

Sort & Group Data in Object Storage with Snowflake

Run the following SQL commands in Snowflake:

CREATE OR REPLACE STAGE wizardbend URL='s3://wizardbend/';

CREATE OR REPLACE FILE FORMAT myformat TYPE = 'parquet';

SELECT
$1:station_name,
MIN($1:measurement) AS min_measurement,
AVG($1:measurement) AS mean_measurement,
MAX($1:measurement) AS max_measurement
FROM
@wizardbend/1brc/measurements.parquet (file_format => 'myformat')
GROUP BY
$1:station_name
ORDER BY
$1:station_name;

Result:

Time21m 35s
Cost$2.87
One Billion Row Challenge

Sort & Group Data in Object Storage with Databend Cloud

In Databend Cloud, run the following commands:

CREATE STAGE IF NOT EXISTS wizardbend
URL = 's3://wizardbend/'
CONNECTION = (ALLOW_ANONYMOUS = 'true');

SELECT station_name,
MIN(measurement) AS min_measurement,
AVG(measurement) AS mean_measurement,
MAX(measurement) AS max_measurement
FROM @wizardbend/1brc/measurements.parquet
GROUP BY station_name
ORDER BY station_name;

Result:

Time9.8s
Cost$0.01
One Billion Row Challenge

Snowflake vs. Databend: Load & Process Data in Cloud Data Warehouses

Next, we'll examine the efficiency of Snowflake and Databend Cloud by loading, sorting, and grouping 1 billion rows of data.

Load 1 Billion Rows into Snowflake

CREATE TABLE onebrc (
station_name VARCHAR NULL,
measurement DOUBLE NULL
);

COPY INTO onebrc
FROM (
SELECT $1:station_name::VARCHAR,
$1:measurement::DOUBLE
FROM @wizardbend/1brc/measurements.parquet
(FILE_FORMAT => 'myformat')
);

Result:

Time26m 7s
Cost$3.48
One Billion Row Challenge

Load 1 Billion Rows into Databend Cloud

CREATE TABLE onebrc (
station_name VARCHAR NULL,
measurement DOUBLE NULL
);

COPY INTO onebrc
FROM (
SELECT $1:station_name::VARCHAR,
$1:measurement::DOUBLE
FROM @wizardbend/1brc/measurements.parquet
(FILE_FORMAT => 'myformat')
);

Result:

Time19.1s
Cost$0.02
One Billion Row Challenge

Sort & Group 1 Billion Rows in Snowflake

Excluding the utilization of cache (result cache and local disk cache), the SQL query is as follows:

SELECT station_name,
MIN(measurement) AS min_measurement,
AVG(measurement) AS mean_measurement,
MAX(measurement) AS max_measurement
FROM onebrc
GROUP BY station_name
ORDER BY station_name;

Result:

Time3.1s
Cost$0.007
One Billion Row Challenge

Sort & Group 1 Billion Rows in Databend Cloud

Again, without result caching:

SELECT station_name,
MIN(measurement) AS min_measurement,
AVG(measurement) AS mean_measurement,
MAX(measurement) AS max_measurement
FROM onebrc
GROUP BY station_name
ORDER BY station_name;

Result:

Time3.6s
Cost$0.004
One Billion Row Challenge

Conclusion

All the comparisons presented can be independently run and verified on both Snowflake and Databend Cloud platforms:

TaskSnowflake (Cost & Time)Databend Cloud (Cost & Time)
Read Parquet File$2.87 (21m 35s)$0.01 (9.8s)
Load 1 Billion Rows$3.48 (26m 7s)$0.02 (19.1s)
Read Native Format$0.007 (3.1s)$0.004 (3.6s)
Total$6.357$0.034

In this benchmark comparison between Snowflake and Databend, it's evident that cloud data warehouses, particularly Databend Cloud, excel in efficiently processing large datasets stored in object storage like AWS S3. They offer not only superior speed but also remarkable cost-effectiveness, making them the go-to choice for handling complex data challenges.

LF Edge eKuiper is a lightweight IoT data analytics and stream processing engine running on resource-constraint edge devices. The major goal for eKuiper is to provide a streaming software framework (similar to Apache Flink) in edge side. eKuiper’s rule engine allows users to provide either SQL based or graph based (similar to Node-RED) rules to create IoT edge analytics applications within few minutes.

eKuiper supports extensions in three aspects: Source, SQL functions, and Sink, through both Golang and Python. By supporting different Sinks, it allows users to send analysis results to various external systems. Databend has also been integrated into the eKuiper plugin as a Sink. Below is an example demonstrating how to use eKuiper to write IoT stream processing data into Databend.

Building eKuiper and Databend SQL Plugin

eKuiper

git clone https://github.com/lf-edge/ekuiper & cd ekuiper
make

Databend SQL Plugin

Build the sink plugin:

go build -trimpath --buildmode=plugin -tags databend -o plugins/sinks/Sql.so extensions/sinks/sql/sql.go

Copy the built sink plugin to the build directory:

cp plugins/sinks/Sql.so _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64/plugins/sinks

Creating Table in Databend

Create the target table "ekuiper_test" in Databend:

create table ekuiper_test (name string,size bigint,id bigint);

Starting eKuiper

cd _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64 
./bin/kuiperd

img

Creating Streams and Rules

eKuiper offers two methods for managing streams, rules, and target destinations. One approach is to launch a visual management interface through the ekuiper-manager Docker image (https://hub.docker.com/r/lfedge/ekuiper), while the other is to manage them using the CLI tool.

Creating Stream

A stream is the operational form of data source connectors in eKuiper. It must specify a source type to define how it connects to external resources. Here, we create a stream to retrieve data from a JSON file data source and send it to eKuiper. First, configure the file data source; the connector's configuration file is located at /etc/sources/file.yaml.

default:
# File type, supports json, csv, and lines
fileType: json
# Absolute path to the directory or file under eKuiper's root directory.
# Do not include the filename here. The filename should be defined in the stream data source.
path: data
# Time interval for reading files, in milliseconds. Set to 0 if reading only once.
interval: 0
# Interval between sending two pieces of data after reading.
sendInterval: 0
# Whether to read files in parallel from the directory.
parallel: false
# Action after reading the file
# 0: Keep the file unchanged
# 1: Delete the file
# 2: Move the file to the location defined in moveTo
actionAfterRead: 0
# Location to move the file, only used for actionAfterRead = 2
moveTo: /tmp/kuiper/moved
# Whether to include a file header, mainly used for CSV. If true, the first line is parsed as the file header.
hasHeader: false
# Define the columns of the file. If a file header is defined, this option will be overridden.
# columns: [id, name]
# Ignore the content of how many lines at the beginning.
ignoreStartLines: 0
# Ignore the content of how many lines at the end. The last empty line is not counted.
ignoreEndLines: 0
# Use the specified compression method to decompress the file. Supported methods now include 'gzip' and 'zstd'.
decompression: ""

Create a stream named "stream1" in the terminal:

./bin/kuiper create stream stream1 '(id BIGINT, name STRING,size BIGINT) WITH (DATASOURCE="test.json", FORMAT="json", TYPE="file");'

img

Content in the JSON file:

[
{"id": 1,"size":100, "name": "John Doe"},
{"id": 2,"size":200, "name": "Jane Smith"},
{"id": 3,"size":300, "name": "Kobe Brant"},
{"id": 4,"size":400, "name": "Alen Iverson"}
]

Creating Databend Sink Rule

A rule represents a stream processing workflow, outlining the journey of data from its input source through various processing logic, and ultimately to actions that dispatch the data to external systems. eKuiper offers two methods to define the business logic of rules: either through SQL/action combinations or utilizing the newly introduced graph API. In this context, we define the business logic of a rule in a declarative manner by specifying the sql and actions attributes. Within this approach, the sql defines SQL queries to be executed against predefined streams, thereby transforming the data. Subsequently, the resulting data can be directed to multiple destinations through the action."

Rules are defined using JSON. Below is the rule that we are preparing to create, named "myRule.json":

{
"id": "myRule",
"sql": "SELECT id, name from stream1",
"actions": [
{
"log": {
},
"sql": {
"url": "databend://databend:databend@localhost:8000/default?sslmode=disable",
"table": "ekuiper_test",
"fields": ["id","name"]
}
}
]
}

Create a rule in the terminal:

./bin/kuiper create rule myRule -f myRule.json

img

View the status of the created rule:

./bin/kuiper getstatus rule myRule

img

Once the rule is created, data that satisfies the rule's criteria is immediately sent to the destination. At this juncture, when we inspect the 'ekuiper_test' table in Databend, we can observe that the data from the file data source has been successfully ingested into Databend:

img

It can be observed that, because our SQL rule only specified the id and name fields, only these two fields have been included in the output.

Summary

eKuiper, a stream processing software under EMQ, is known for its compact size and robust functionality. It is widely utilized in various applications such as industrial IoT, vehicular networks, and public data analysis. This article has provided insights into how to use eKuiper for writing IoT stream processing data into Databend.

A prominent pharmaceutical retail chain laid its foundation in 1999, gradually evolving into a significant player. Accumulating substantial data volumes through expansive supply chain operations, the business managed extensive datasets, including tables with billions of records. As the enterprise embraced digital evolution, the imperative for efficient data utilization and intelligent technology solutions grew. However, a notable challenge emerged as their existing CDH (Cloudera Distribution for Hadoop) necessitated an upgrade due to architectural reasons.

Challenge Faced:

With a growing data platform exceeding 30 terabytes, the CDH faced hardware limitations and escalating costs. The business needed a solution that could retain historical data for audit and tracing purposes, while also facilitating efficient data analysis.

Technical Implementation:

Historical data from CDH was exported as Parquet files using Tencent Cloud's COS migration tool. These backups were then transferred to Tencent Cloud COS. The data seamlessly transitioned into Databend through the establishment of a storage stage connected to a cloud storage bucket. Following this, data files were extracted based on specific patterns, enabling the creation of new tables that use the files' structure. This data was then loaded into the tables, making use of parallel processing for enhanced efficiency.

Alt text

Achieved Outcomes:

Databend proved transformative for the Pharmaceutical Group. The migration resulted in a 2x increase in query and loading speed for large table data. Storage costs on Tencent Cloud COS plummeted around 15x compared to CDH's local storage and replication costs.

In today's fast-paced world, businesses require real-time data synchronization and event-driven architectures to stay competitive. In this blog, we will explore Debezium and its lightweight counterpart, debezium-server-databend, developed by Databend. We will discuss how to install and use debezium-server-databend to effortlessly monitor and capture database changes and integrate them into Databend.

Introducing Debezium & debezium-server-databend

Debezium is a robust set of distributed services designed to capture changes within databases, enabling applications to respond promptly to those changes. This is achieved through recording all row-level changes within each database table and streaming these change events to applications in chronological order.

Debezium's potential reaches new heights with debezium-server-databend, a lightweight CDC tool developed by Databend. It harnesses the capabilities of Debezium Engine to capture real-time changes in relational databases and stream them as events into Databend. What sets this tool apart is its simplicity, as it requires no large data infrastructures like Flink, Kafka, or Spark.

Installing debezium-server-databend

debezium-server-databend can be installed independently without the need for installing Debezium beforehand. Once you have decided to install debezium-server-databend, you have two options available. The first one is to install it from source by downloading the source code and building it yourself. Alternatively, you can opt for a more straightforward installation process using Docker.

For step-by-step instructions on how to install the tool, see the Databend documentation at https://docs.databend.com/doc/load-data/load-db/debezium#installing-debezium-server-databend

Loading Data with debezium-server-databend

The key of importing data using debezium-server-databend lies in the configuration file named "application.properties," which serves as the core of the entire data import process. This file needs to be tailored according to specific requirements. Below is an example of an "application.properties" file for importing data from MYSQL to Databend:

debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://<your-databend-host>:<port>
debezium.sink.databend.database.username=<your-username>
debezium.sink.databend.database.password=<your-password>
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=true

# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000

debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

Databend provides a sample "application.properties" file, which can be found at the following link: https://github.com/databendcloud/debezium-server-databend/blob/main/debezium-server-databend-dist/src/main/resources/distro/conf/application.properties.example. This file serves as a useful starting point for configuring your data import process using debezium-server-databend. For explanations of the parameters in the configuration file, please refer to https://github.com/databendcloud/debezium-server-databend/blob/main/docs/docs.md.

The Databend documentation also offers an example of importing data from MYSQL, available at https://docs.databend.com/doc/load-data/load-db/debezium#usage-example.

A few weeks ago, during the yearly conferences of Databricks and Snowflake, AI was getting a lot of attention, but the progress in data lakes and data warehouses was also significant because data is fundamental. Apache Iceberg emerged as a prominent solution for data lakes, and Databricks unveiled UniForm to better handle Apache Iceberg and Hudi table formats from Delta data. Meanwhile, Snowflake made timely updates to Iceberg Tables, aiming to eliminate data silos.

One of the significant new features that Databend has been working on in recent months is supporting the reading of data in Apache Iceberg table format. Though it's still a work in progress, they have made good progress so far.

This article is intended to give you a preview of this new capability by demonstrating how to use Databend to mount and query an Iceberg Catalog. We will cover the core concepts of Iceberg and table formats while also introducing Databend's solutions, including its ability to handle multiple data catalogs and the implementation of IceLake in Rust from scratch. As part of the demonstration, a comprehensive workshop will be provided, so you can try it out yourself.

What is Apache Iceberg?

An increasing amount of data is now moving to the cloud and being stored in object storage. However, this setup may not fully meet the demands of modern analytics. There are two key issues to address: First, how to organize data in a more structured manner, achieving a more organized data storage approach. Second, how to provide users with broader consistency guarantees, necessary schema information, and advanced features that cater to the requirements of modern analytics workloads.

Data lakes often focus on addressing and resolving the first issue, while table formats are dedicated to providing solutions for the second one.

Apache Iceberg is a high-performance open table format designed for large-scale analytics workloads. It is known for its simplicity and reliability. It supports various query engines like Spark, Trino, Flink, Presto, Hive, and Impala. One of its killer features includes full schema evolution, time travel, and rollback capabilities. Additionally, Apache Iceberg's data partitioning and well-defined data structures make concurrent access to data sources more secure, reliable, and convenient.

If you're interested in Iceberg we recommend reading Docker Spark And Iceberg: The Fastest Way To Try Iceberg!.

Table Format

Table Format is a specification for storing data using a collection of files. It consists of definitions for the following three parts:

  • How to store data in files
  • How to store metadata for related files
  • How to store metadata for the table itself

Table format files are usually stored in underlying storage services such as HDFS, S3 or GCS, and are accessed by upper-level data warehouses such as Databend and Snowflake. Compared to CSV or Parquet, table format offers a standardized and structured data definition in tabular form, enabling its usage without the need to load it into a data warehouse.

Although there are strong competitors like Delta Lake and Apache Hudi in the field of table formats, this article focuses on Apache Iceberg. Let's take a look at its underlying file organization structure together.

stack of Apache Iceberg

The figures above illustrate "s0" and "s1," representing snapshots of the table. A snapshot captures the table's state at specific points in time. Each commit results in a snapshot, and each snapshot is associated with an inventory list (manifest list). The inventory list can hold multiple addresses of manifest files, along with statistical information, including path and partition range. The manifest file serves to record the addresses and statistical details of data files generated from current operations, such as maximum/minimum values and number of rows per column.

Multiple Catalog

To integrate Databend with Iceberg, the first step is to add the Multiple Catalog capability to Databend. The Multiple Catalog enables the data that was previously managed by other data analysis systems to be mounted onto Databend.

From the very beginning, Databend's objective has been to function as a cloud-native OLAP data warehouse, with a focus on addressing the complexities of handling multiple data sources. In Databend, data is structured into three layers: catalog -> database -> table. The catalog represents the highest level and encompasses all databases and tables.

Based on this foundation, our team designed and implemented support for Hive and Iceberg data catalogs, providing various mounting forms such as configuration files and CREATE CATALOG statements.

To mount Iceberg Catalog located in S3, simply execute the following SQL statement:

CREATE CATALOG iceberg_ctl
TYPE=ICEBERG
CONNECTION=(
URL='s3://warehouse/path/to/db'
AWS_KEY_ID='admin'
AWS_SECRET_KEY='password'
ENDPOINT_URL='your-endpoint-url'
);

IceLake - A Pure Rust Implementation of Apache Iceberg

Although the Rust ecosystem has seen the rise of many new projects related to databases and big data analysis in recent years, there is still a notable absence of mature Apache Iceberg bindings in Rust. This has presented significant challenges for Databend when it comes to integrating with Iceberg.

IceLake, supported and initiated by Databend Labs, aims to overcome the challenges and establish an open ecosystem where:

  • Users can read/write iceberg tables from ANY storage services like s3, gcs, azblob, hdfs and so on.
  • ANY databases can integrate with icelake to facilitate reading and writing of iceberg tables.
  • Provides NATIVE support transmute between arrows.
  • Provides bindings so that other languages can work with iceberg tables powered by Rust core.

Currently, IceLake only supports reading data (in Parquet format) from Apache Iceberg storage services. The design and implementation of Databend's Iceberg catalog is supported by Icelake and has been validated through integration with Databend.

In addition, we have also collaborated with the Iceberg community to initiate and participate in the iceberg-rust project. The project aims to contribute Iceberg-related implementations from IceLake upstream, and the first version is currently under intense development.

Workshop: Experience Iceberg with Databend

In this workshop, we will demonstrate how to prepare data in Iceberg table format and mount it onto Databend as a Catalog, and perform some basic queries. Relevant files and configurations can be found at PsiACE/databend-workshop.

If you already have data that conforms to the Iceberg table format stored in a storage service supported by OpenDAL, we recommend using Databend Cloud so that you can skip the tedious process of service deployment and data preparation, and easily get started with the Iceberg Catalog.

Starting Services

To simplify the service deployment and data preparation issues of Iceberg, we will be using Docker and Docker Compose. You need to install these components first, and then write the docker-compose.yml file.

version: "3"

services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
networks:
iceberg_net:
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
- 10000:10000
- 10001:10001
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
networks:
iceberg_net:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
networks:
iceberg_net:

In the above configuration file, we use MinIO as the underlying storage, Iceberg provides table formatting capabilities, and spark-iceberg can help us prepare some pre-set data and perform conversion operations.

Next, we start all services in the directory corresponding to the docker-compose.yml file.

docker-compose up -d

Data Preparation

In this workshop, we plan to use the NYC Taxis dataset (data on taxi rides in New York City), which is already built into spark-iceberg in Parquet format. We just need to convert it to Iceberg format.

First, enable pyspark-notebook:

docker exec -it spark-iceberg pyspark-notebook

Next, we can start Jupyter Notebook at http://localhost:8888:

Here we need to run the following code to implement the data conversion operation:

df = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2021-04.parquet")
df.write.saveAsTable("nyc.taxis", format="iceberg")

The first line will read the Parquet data and the second line will convert it into Iceberg format.

To verify that the data has been successfully converted, we can access the MinIO instance located at http://localhost:9001 and notice that the data is managed according to the Iceberg underlying file organization described earlier.

Deploying Databend

Here we will manually deploy a single-node Databend service. The overall deployment process can refer to Docs | Deploying a Standalone Databend, and some details that need attention are as follows:

  • First, prepare the relevant directories for logs and meta data.

    sudo mkdir /var/log/databend
    sudo mkdir /var/lib/databend
    sudo chown -R $USER /var/log/databend
    sudo chown -R $USER /var/lib/databend
  • Secondly, because the default admin_api_address has been occupied by spark-iceberg, it is necessary to edit databend-query.toml to make some modifications to avoid conflicts:

    admin_api_address = "0.0.0.0:8088"
  • In addition, according to Docs | Configuring Admin Users, we also need to configure administrator users. Since this is just a workshop, we choose the simplest way by simply uncommenting [[query.users]] field and root user:

    [[query.users]
    name = "root"
    auth_type = "no_password"
  • Because we deployed MinIO locally without setting certificate encryption, we need to use insecure HTTP protocol to load data. Therefore, it is necessary to change the configuration file of databend-query.toml in order to allow this behavior. Please try your best not enable it in production services:

    ...

    [storage]

    ...

    allow_insecure = true

    ...

The next step is to start up Databend:

```bash
./scripts/start.sh
```

We strongly recommend using BendSQL as a client tool for accessing Databand. Of course, we also support various access methods such as MySQL Client and HTTP API.

Mounting Iceberg Catalog

According to the previous configuration file, you only need to execute the following SQL statement to mount the Iceberg Catalog.

CREATE CATALOG iceberg_ctl
TYPE=ICEBERG
CONNECTION=(
URL='s3://warehouse/'
AWS_KEY_ID='admin'
AWS_SECRET_KEY='password'
ENDPOINT_URL='http://localhost:9000'
);

To verify the mounting, we can execute SHOW CATALOGS:

Of course, Databend also supports SHOW DATABASES and SHOW TABLES statements. The nyc.taxis in the data conversion corresponds to a second-level directory in MinIO and will be mapped to databases and tables in Databend.

Running Queries

Now that the data has been mounted, let's try some simple queries:

Firstly, let's count the number of rows in the data. We can see that a total of 2 million rows have been mounted to Databend:

SELECT count(*) FROM iceberg_ctl.nyc.taxis;

Let's try to retrieve some data from a few columns:

SELECT tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count FROM iceberg_ctl.nyc.taxis LIMIT 5;

The following query can help us explore the correlation between passenger count and travel distance. Here we only take 10 results:

SELECT
passenger_count,
to_year(tpep_pickup_datetime) AS year,
round(trip_distance) AS distance,
count(*)
FROM
iceberg_ctl.nyc.taxis
GROUP BY
passenger_count,
year,
distance
ORDER BY
year,
count(*) DESC
LIMIT
10;

Summary

In this article, we introduced Apache Iceberg table format and Databend solution for it, and provided a workshop for everyone to gain some hands-on experience.

Currently, Databend only provide catalog mounting capability for Iceberg Integration, but it can handle some basic query processing tasks. We also welcome everyone to try it out on their own interested data and provide us with feedback.

Databend v1.2.0 was officially released on June 29, 2023! In comparison to v1.1.0, we have made 600 optimizations and fixes, 3,083 changed files with 115,847 additions and 55,836 deletions in v1.2.0. We would like to thank all of our community partners who participated and everyone who helped make Databend better!

In version v1.2.0, Databend has added features such as BITMAP data type , direct query of CSV/TSV/NDJSON files using column position , and AI Functions . It has also Designed and Implemented a New Hash Table to significantly improve Join performance. This release brings Databend closer to realizing the vision of LakeHouse, allowing it to directly read and analyze CSV/TSV/NDJSON/Parquet format files stored on object storage. You can also perform ETL operations on these files within Databend for higher performance OLAP analysis.

In addition, we have designed and implemented enterprise features such as Computed Columns, VACUUM TABLE, and Serverless Background Service. If you are interested in these features, you can contact Databend Support for upgrade information or visit Databend Cloud for an instant experience.

New Features and Enhancements

Discover Databend's new features and find your perfect fit with a quick overview.

New Data Type: BITMAP

Databend has added support for the BITMAP datatype and implemented a series of related functions.

BITMAP is a type of compressed data structure that can be used to efficiently store and manipulate sets of boolean values. It has wide applications in data analysis and querying, providing fast set operations and aggregation capabilities. Common use cases include Distinct Count, Filtering & Selection, and Compressed Storage.

Our implementation of the BITMAP data type utilizes RoaringTreemap, which brings us improved performance and decreased memory usage in comparison to other bitmap implementations.

SELECT user_id, bitmap_count(page_visits) AS total_visits
FROM user_visits

+--------+------------+
|user_id |total_visits|
+--------+------------+
| 1| 4|
| 2| 3|
| 3| 4|
+--------+------------+

If you are interested in learning more, please check out the resources listed below:

Direct Query of CSV/TSV/NDJSON Files Using Column Position

To query files such as CSV/TSV/NDJSON that do not have a schema, they have to be loaded into a table before querying. However, sometimes you do not know the specific details of the file in advance (such as how many columns a CSV file has), or they just want to make temporary queries.

To make it easier, Databend introduces Column Position and uses the $N syntax to represent the Nth column. All columns in CSV/TSV files are treated as String type. If the number of columns in a row is less than the column number used, it will be padded with an empty string. NDJSON files have only one column $1, which is of type Variant .

This capability allows you to selectively load specific columns using the COPY INTO command. Additionally, you can perform data conversion using functions during the loading process.

SELECT $1 FROM @my_stage (FILE_FORMAT=>'ndjson')

COPY INTO my_table FROM (SELECT TRIM($2) SELECT @my_stage t) FILE_FORMAT = (type = CSV)

If you are interested in learning more, please check out the resources listed below:

New Hash Table: Improved Hash Join Performance

Initially, Databend's Hash table was specifically designed for aggregation operators. To further improve the Hash Join performance, we have designed and implemented a new Hash table optimized for Hash Join. Through parallelized design, Databend can now fully utilize computing resources while also becoming more precise in memory control, avoiding unnecessary memory overhead and significantly improving Hash Join performance.

Business intelligence analyst Mimoune Djouallah commented that Databend performs well and runs TPCH-SF10 in just 25 seconds under 8 cores and 32GB of memory. He also wrote a blog post titled Databend and the rise of Data warehouse as a code .

If you are interested in learning more, please check out the resources listed below:

AI Functions

Databend introduced powerful AI capabilities in version v1.2.0, achieving seamless integration of Data and AI. We can use SQL to achieve:

  1. Generating SQL with Natural Language
  2. Embedding Vectorization
  3. Similarity Calculation
  4. Text Generation

Generating SQL from Natural Language

For example, if you ask a question in a nginx log database such as "What are the top 5 IP addresses making the most requests", using Databend's AI_TO_SQL function, you will get the corresponding SQL statement directly, which is very convenient.

Embedding Vectorization

With Databend's AI_EMBEDDING_VECTOR function, we can vectorize data and store it in Databend's ARRAY type. In this way, Databend actually becomes a vector database.

Similarity Calculation

Under vectorized representation, the similarity between two words, sentences or documents can be calculated. For example, suppose we have two words "dog" and "puppy" (or sentences), we first convert them into vectors v1 and v2 respectively, and then use cosine similarity to calculate their similarity.

cos_sim = dot(v1, v2) / (norm(v1) * norm(v2))

The COSINE_DISTANCE function in Databend is an implementation of this formula.

Text Generation

Text Generation is very useful in many scenarios. Now you can use the AI_TEXT_COMPLETION function in SQL to complete it.

Currently, we have used the above Data + AI capabilities to perform Embedding processing on all documents of https://docs.databend.com/ and stored them in Databend, building an intelligent Q & A website: https://ask.databend.rs . On this website, you can ask any questions about Databend.

Enterprise Features

New enterprise features are now available! Learn how Databend is driving more valuable data analysis services.

Computed Columns

Computed columns are generated from other columns by a scalar expression. Using Computed Columns can store the data of the expression to speed up query speed and simplify some complex query expressions. There are two types of computed columns: stored and virtual.

  • Stored Computed Columns generate data and store it on disk every time an insert or update occurs. The data does not need to be recalculated during queries, which can read data faster.
  • Virtual Computed Columns do not store data and do not occupy additional space. They perform calculations in real-time during each query.

Computed Columns are particularly useful for reading JSON internal fields. By defining commonly used internal fields as computed columns, the time-consuming operation of extracting JSON data during each query can be greatly reduced. For example:

CREATE TABLE student (
profile variant,
id int64 null as (profile['id']::int64) stored,
name string null as (profile['name']::string) stored
);

INSERT INTO student VALUES ('{"id":1, "name":"Jim", "age":20}'),('{"id":2, "name":"David", "age": 21}');

SELECT id, name FROM student;
+------+-------+
| id | name |
+------+-------+
| 1 | Jim |
| 2 | David |
+------+-------+

If you are interested in learning more, please check out the resources listed below:

VACUUM TABLE

The VACUUM TABLE command helps to optimize the system performance by freeing up storage space through the permanent removal of historical data files from a table. This includes:

  • Snapshots associated with the table, as well as their relevant segments and blocks.
  • Orphan files. Orphan files in Databend refer to snapshots, segments, and blocks that are no longer associated with the table. Orphan files might be generated from various operations and errors, such as during data backups and restores, and can take up valuable disk space and degrade the system performance over time.

If you are interested in learning more, please check out the resources listed below:

Serverless Background Service

Databend's internal storage FuseTable is similar to Apache Iceberg, a log-structured table that requires regular table compaction, re-clustering, and vacuuming to merge small data chunks. The process involves sorting the data by the cluster key or vacuuming unneeded branches.

To automate this process, different drivers are required, which increases the complexity of the infrastructure. And other services must be deployed and maintained to trigger driver events. To simplify this process, Databend has designed and implemented the Serverless Background Service, which can automatically discover tables that need to be compressed, reordered, and cleaned up after data is written without other services or manual user operations. It automatically triggers the corresponding table maintenance work, reduces the burden of user maintenance, improves the performance of table queries, and reduces the cost of data in object storage.

Growing Ecosystem

Databend's ecosystem has been further improved. It's time to introduce Databend into your data insight workflow!

Bind databend into Python

Databend now offers a Python binding that allows users to execute SQL queries against Databend using Python even without deploying a Databend instance.

To use this functionality, simply import SessionContext from databend module and create an instance of it:

from databend import SessionContext

ctx = SessionContext()
df = ctx.sql("select number, number + 1, number::String as number_p_1 from numbers(8)")

The resulting DataFrame can be converted to PyArrow or Pandas format using the to_py_arrow() or to_pandas() methods respectively:

df.to_pandas() # Or, df.to_py_arrow()

Feel free to integrate it with your data science workflow.

BendSQL - Databend Native Command Line Tool

BendSQL is a command-line tool specifically designed for Databend. It has been rewritten using Rust, which not only adds support for the REST API but also introduces Flight SQL functionality.

With BendSQL, you can easily and efficiently manage your databases, tables, and data, and perform a wide range of queries and operations with ease.

bendsql> select avg(number) from numbers(10);

SELECT
avg(number)
FROM
numbers(10);

┌───────────────────┐
avg(number)
│ Nullable(Float64)
├───────────────────┤
4.5
└───────────────────┘

1 row in 0.259 sec. Processed 10 rows, 10B (38.59 rows/s, 308B/s)

We are excited about the progress of BendSQL and look forward to sharing more updates with you! Feel free to try it out and give us feedback.

Data Integration & Business Intelligence

Apache DolphinScheduler

Apache DolphinScheduler is a distributed and extensible open-source workflow orchestration platform with powerful DAG visual interfaces. It supports 30+ task types, including Flink SQL, DataX, HiveCli, etc. It can execute millions of tasks with high concurrency, high throughput, low latency and stability. It can execute tasks in batches according to the planned time (special date range or special date list), and the workflow instance supports modification, rollback and re-execution without affecting the workflow template.

Apache DolphinScheduler now has added support for Databend as a data source. This enhancement enables you to leverage DolphinScheduler for managing DataX tasks and effortlessly load data from MySQL to Databend.

Databend now supports Apache Flink CDC (Change Data Capture), which allows you to capture and process real-time data changes from various sources using SQL-based queries. With Flink CDC, you can monitor and capture data modifications (inserts, updates, and deletes) happening in a database or streaming system and react to those changes in real-time.

Databend's Flink SQL connector offers a connector that integrates Flink's stream processing capabilities with Databend. By configuring this connector, you can capture data changes from various databases as streams and load them into Databend for processing and analysis in real-time.

If you are interested in learning more, please check out the resources listed below:

Tableau

Tableau is a popular data visualization and business intelligence tool. It provides an intuitive, interactive way to explore, analyze, and present data, helping users better understand the meaning and insights of their data.

Refer to Other Databases (JDBC) and add databend-jdbc to the Tableau driver path in order to analyze data from Databend using Tableau.

If you are interested in learning more, please check out the resources listed below:

Download Databend v1.2.0

If you are interested in Databend v1.2.0, go to https://github.com/datafuselabs/databend/releases/tag/v1.2.0-nightly to view all changelogs or download the release.

If you are using an old version of Databend, we recommend you upgrade to the latest version. For the upgrade process, please refer to: https://docs.databend.com/doc/operations/upgrade .

Feedback

If you need help when working with the new release, submit an issue on GitHub Issues.

GitHub: https://github.com/datafuselabs/databend/

Tableau is a popular data visualization and business intelligence tool. It provides an intuitive, interactive way to explore, analyze, and present data, helping users better understand the meaning and insights of their data.

This tutorial helps users connect Databend for BI data analysis in Tableau.

Databend JDBC

Tableau supports all data sources that implement the JDBC protocol, so you need to prepare the databend jdbc jar file first. You can compile it yourself or download it from the Maven repository as follows.

git clone https://github.com/databendcloud/databend-jdbc
cd databend-jdbc
mvn clean install -DskipTest

Then copy the compiled databend-jdbc.jar to Tableau's Driver directory:

  • Windows: C:\Program Files\Tableau\Drivers
  • Mac: ~/Library/Tableau/Drivers
  • Linux: /opt/tableau/tableau_driver/jdbc

Connect to Tableau

Click "Other Databases (JDBC)" on the Tableau homepage.

JDBC URL: jdbc:databend://{user}:{password}@{host}:{port}/database?ssl={status}

Select MySQL as the dialect.

Data Analysis

Select the database and table you want to operate on, and drag the table into the area.

This way, you can analyze and chart data in databend.

Please use version 2023.1.0 or above of Tableau, as other versions may have compatibility issues.

Conclusion

The above is the basic process of using Tableau to analyze Databend data. For more information about connecting with JDBC in Tableau, please refer to Other Databases (JDBC).

Background

Compiling a medium to large Rust program is not a breeze due to the accumulation of complex project dependencies and boilerplate code. As noted in an article by Brian Anderson, "But Rust compile times are so, so bad." To maintain the stability of the build pipeline, it is necessary to introduce some techniques, but there is no "one-size-fits-all" solution. As the complexity of the workflow increases, it can become a loop.

The Databend team encountered similar challenges in compiling the product from hundreds of thousands of lines of code and in developing Docker-based build tools to enhance the developers/CI workflow. This article outlines the measures taken by the team to address the compilation challenges. If you're interested, check out these earlier posts to get a general idea of how we compile Databend:

Observability

While observability may not directly optimize compilation, it can aid in identifying where the bottleneck in the compilation process lies. This knowledge can help us determine the appropriate remedy to address the issue.

cargo build --timings

This command visualizes the compilation process of Databend.

In Rust version 1.59 or earlier, you can use cargo +nightly build -Ztimings.

When opened in a web browser, the resulting HTML file shows a Gantt chart displaying the dependency relationships between crates in the program, the degree of parallelism in compilation, and the order of magnitude of code generation.

Based on the chart, we can decide whether to increase the number of code generation units for a particular module, or whether to further decompose to optimize the overall build process.

cargo-depgraph

Although not commonly utilized, cargo-depgraph can be employed to analyze dependency relationships. It helps to find potential optimization points, especially when you need to replace some similar dependencies or optimize the organization level of crates.

Painless Optimization with Configuration Adjustments

The first step to improving the compilation experience does not involve directly altering the code. In many cases, only a few configuration adjustments are necessary to achieve significant improvement.

Always Bump & Upstream First

As mentioned earlier, members of the Rust team were also early on aware that compile times are currently suboptimal. Therefore, the Databend team has plans to continually optimize for this issue. Improvements to compilation can often be found listed in the version update notes.

[toolchain]  
channel = "nightly-2023-03-10"
components = ["rustfmt", "clippy", "rust-src", "miri"]

In addition, upstream projects may also improve unreasonable designs over time, and many of these improvements will ultimately be reflected in the impact on compilation.

One of the simplest ways to improve compile time is to always keep up with upstream changes and participate in ecosystem building with the philosophy of "upstream first". Databend has been a loyal follower of Rust nightly from the very beginning and provided concise guidance for updating the toolchain and dependency relationships.

Caching

Caching is a common compilation optimization technique. The idea is simple: store pre-built artifacts and reuse them the next time you build.

Initially, Databend employed the rust-cache action in CI to improve caching and achieved promising results. However, we had to manually update the key frequently to clear the cache and prevent misjudgment during the build.

Moreover, Rust's early support for incremental builds was terrible. For a while, we had to consider how to configure the pipeline to make some trade-offs.

Things have now changed.

Sccache was revitalized and OpenDAL was successfully integrated into it, becoming a crucial component that supports the Rust compilation cache ecosystem. Although it may not fully showcase its potential when building locally, it can still deliver great results in CI.

Another important change is that the Rust community realized that incremental compilation did not work well for CI.

CI builds often are closer to from-scratch builds, as changes are typically much bigger than from a local edit-compile cycle. For from-scratch builds, incremental adds an extra dependency-tracking overhead. It also significantly increases the amount of IO and the size of ./target, which make caching less effective. (Fast Rust Builds)

Remove Unused Dependencies

There is an interesting project in the Rust ecosystem known as mTvare6/hello-world.rs, which demonstrates how to create a Rust project that is as poorly written as possible.

In particular:

in a few lines of code with few(1092) dependencies

Rust itself is not very good at automatically handling dependencies. It always downloads and compiles all dependencies in one go. Therefore, avoiding unnecessary introduction of dependencies becomes essential.

At first, Databend introduced cargo-udeps to check for unused dependencies. Most of the time it worked well. However, the major drawback was that every time dependencies were checked, it was equivalent to recompiling, which was undoubtedly inefficient in a CI environment.

sundy-li found another fast and easy to use tool called cargo-machete.

One significant benefit is that machete is fast as it only requires simple regular expressions to handle everything. Additionally, it supports automatic fixes, eliminating the need to search through files one by one and make manual edits.

However, machete is not a flawless tool. Due to its reliance on simple regular expression processing, it may not accurately identify some situations, but it is acceptable to ignore these instances.

Sparse Index

In order to determine which crates exist on crates.io, Cargo needs to download and read the crates.io-index, which is located in a git repository hosted on GitHub and lists all versions of all crates.

However, as the index has grown significantly over time, the initial acquisition and updates have become painfully slow.

RFC 2789 introduced a sparse index to improve Cargo's access to the index and is hosted at https://index.crates.io/.

[registries.crates-io]
protocol = "sparse"

Linker

If a project is relatively large and has many dependencies, it may waste a lot of time on linking. Few code changes may lead to a long compile time.

The simplest solution is to choose a faster linker than the default one.

Both lld and mold can improve link time. Databend eventually chose to use mold. In fact, the difference between the two linkers is not obvious for Databend. However, using mold has a potential benefit of saving some memory consumption during compilation.

[target.x86_64-unknown-linux-gnu] 
linker = "clang"
rustflags = ["-C", "link-arg=-fuse-ld=/path/to/mold"]

First look at a common setting: split-debuginfo .

On macOS, rustc runs a tool called dsymutil which analyzes the binary and then builds a debug information directory. Configuring split-debuginfo skips dsymutil and speeds up the build.

split-debuginfo = "unpacked"

Another example is codegen-units.

Databend uses codegen-units = 1 during compilation to enhance optimization and restrain the size of binaries. However, considering that some dependencies have particularly long code generation time during compilation (due to heavy macro dependencies), it is necessary to loosen some restrictions specifically.

[profile.release.package]
arrow2 = { codegen-units = 4 }
common-functions = { codegen-units = 16 }
databend-query = { codegen-units = 4 }
databend-binaries = { codegen-units = 4 }

More Reasonable Code Structures

The above are some configuration adjustments. Next, we will explore the impact of refactoring on compile time.

Split into More Reasonable Crate Sizes

Refactoring a large all-in-one crate into smaller ones can be a highly beneficial strategy. It can not only improve parallelism, but also help Rust process code compilation faster by decoupling cross dependencies and circular dependencies.

Splitting crates also makes the boundaries of the code more apparent, which can result in easier maintenance.

The Boundary between Unit Testing and Integration Testing

Common forms of unit test organization include maintaining tests mod in src and maintaining corresponding test code in the tests directory.

Following the recommendation of Delete Cargo Integration Tests, Databend has stripped all unit tests from the code very early and organized them in a similar form:

tests/  
it/
main.rs
foo.rs
bar.rs

This form avoids compiling each file under tests/ into some separate binary files, thereby reducing the impact on compile time.

In addition, Rust spends a lot of time processing tests mod and docs tests during compilation, especially docs tests which require building additional targets. After adopting the above organization form, they can be turned off in the configuration.

However, this form is not elegant enough for us. All contents that need to be tested have to be set as public, which easily breaks the modular organization of the code. In-depth evaluation is recommended before use.

More Elegant Testing Methods

We all know that the more code that needs to be compiled for unit tests, the slower the compilation time will be.

In addition, for Databend, a considerable part of the tests are end-to-end tests of input and output. If these tests are hardcoded in unit tests, much more format-related work needs to be added, which also requires substantially more effort to maintain.

The use of golden file testing and SQL logic testing in Databend replaces a large number of SQL query tests and output result checks embedded in unit tests, which further improves compile time.

Cargo Snubs

cargo-nextest

cargo nextest makes testing as fast as lightning and provides finer statistics and elegant views. Many projects in the Rust community have greatly improved test pipeline time by introducing cargo nextest.

However, Databend is currently unable to switch to this tool for two reasons. Firstly, configuration-related tests are not currently supported, so if you need to run cargo test separately, you have to recompile. Secondly, some tests related to timeouts are set to a specific execution time and must wait for completion.

cargo-hakari

One typical example of improving the compilation of dependencies is workspace-hack, which places important public dependencies in a directory, avoiding the need to repeatedly recompile these dependencies. cargo-hakari can be used to automatically manage workspace-hack.

Databend has a large number of common components, and the main binary programs are built on common components, implicitly in line with this optimization idea. In addition, with the support of dependencies inheritance in the workspace, the maintenance pressure has also been reduced.

Databend v1.1.0 was officially released on April 14, 2023! This release marks the first major update of Databend since version 1.0. In comparison to v1.0.0, we have made over 1,616 commits, 505 optimizations and fixes, 2,069 changed files with 118,455 additions and 42,100 deletions in v1.1.0. We would like to thank all of our community partners who participated and everyone who helped make Databend better!

In version 1.1.0, we introduced new data transformation capabilities to the COPY INTO command, allowing users to easily manipulate data during the loading process. Additionally, we have made significant enhancements to Databend, enabling it to run all TPC-DS queries successfully, along with other major performance optimizations and feature enhancements.

New Features and Enhancements

Discover Databend's new features and find your perfect fit with a quick overview.

COPY INTO now has ETL capabilities

COPY INTO is an important path for Databend to import data across multiple clouds. Now it also has basic data transformation capabilities, eliminating the need for temporary tables to store pre-transformed data. This includes support for column reordering, column omission, and transformation queries based on SELECT, enhancing its data manipulation capabilities.

CREATE TABLE my_table(id int, name string, time date);

COPY INTO my_table
FROM (SELECT t.id, t.name, to_date(t.timestamp) FROM @mystage t)
FILE_FORMAT = (type = parquet) PATTERN='.*parquet';

Full Support for TPC-DS Queries

Databend now supports all 99 TPC-DS queries!

TPC-DS is a decision support benchmark that includes multidimensional general application models for decision support systems. It models several universally applicable aspects of decision support systems, including queries and data maintenance. TPC-DS is widely used to measure the performance of decision support and analysis systems.

REPLACE INTO

Databend now supports using REPLACE INTO statements to insert or update data.

This statement allows you to specify a conflict key to determine whether to insert a new row of data or update an existing row of data. If a row with the same conflict key already exists in the table, Databend will update this row with new data. Otherwise, new data will be added as a new record to the table.

You can use this statement to easily synchronize data from different sources or process duplicate records.

#> CREATE TABLE employees(id INT, name VARCHAR, salary INT);
#> REPLACE INTO employees (id, name, salary) ON (id) VALUES (1, 'John Doe', 50000);
#> SELECT * FROM Employees;
+------+----------+--------+
| id | name | salary |
+------+----------+--------+
| 1 | John Doe | 50000 |
+------+----------+--------+

Window Functions

For each input row, a window function returns one output row that depends on the specific row passed to the function and the values of the other rows in the window. In analytical queries such as reports, window functions can elegantly express certain requirements and play an irreplaceable role.

-- use aggrerate window function
SELECT date, AVG(amount) over (partition by date)
FROM BookSold

June 21|544.0
June 21|544.0
June 22|454.5
June 22|454.5
June 23|643.0
June 23|643.0

Aggregate window functions can apply aggregate operations to each row of data in the window. All aggregate functions supported by Databend can be used as aggregate window functions.

Growing Ecosystem

Databend's ecosystem has been further improved. It's time to introduce Databend into your data insight workflow!

Integration with Visualization Tools

Metabase, Redash and Grafana are all open source visualization tools that can query data from multiple sources and visualize it.

Databend now provides support for the above three tools. You can use Databend as a data source to easily build visualization dashboards and better understand and analyze your data.

Grafana Dashboard

Support for Rust

In addition to supporting Python, Go, and Java, Databend now also has its own Rust driver. This means you can easily connect to Databend and execute SQL queries using Rust.

use databend_driver::new_connection;

let dsn = "databend://root:@localhost:8000/default?sslmode=disable";
let conn = new_connection(dsn).unwrap();

let sql_create = "CREATE TABLE books (
title VARCHAR,
author VARCHAR,
date Date
);";
conn.exec(sql_create).await.unwrap();

Databend x AI

When a cloud data warehouse intersects with cutting-edge AI models, what can you anticipate? Databend and OpenAI now work together to make you more productive.

AI Functions

Databend now has a built-in practical AI function ai_to_sql, which supports converting natural language into SQL statements, making it easy to write high-quality SQL for complex analysis tasks.

SELECT * FROM ai_to_sql(
'List the total amount spent by users from the USA who are older than 30 years, grouped by their names, along with the number of orders they made in 2022');

In addition, Databend also supports text embedding generation, similarity retrieval, text completion and other capabilities, one-stop easy to build SQL-based queries, AI-enabled productivity tools.

SELECT doc_id, text_content, cosine_distance(embedding, ai_embedding_vector('What is a subfield of artificial intelligence?')) AS distance
FROM embeddings
ORDER BY distance ASC
LIMIT 5;

SELECT ai_text_completion('Artificial intelligence is a fascinating field. What is a subfield of artificial intelligence?') AS completion;

AskBend

Got a question? AskBend! Databend now has an AskBend knowledge base Q&A system that can answer questions about Databend.

AskBend

P.S. AskBend is open source and powered by Databend Cloud and AI Functions. You can also create and deploy an intelligent assistant like AskBend with your own markdown files.

Download Databend v1.1.0

If you are interested in Databend v1.1.0, go to https://github.com/datafuselabs/databend/releases/tag/v1.1.0-nightly to view all changelogs or download the release.

If you are using an old version of Databend, we recommend you upgrade to the latest version. For the upgrade process, please refer to: https://docs.databend.com/doc/operations/upgrade .

Feedback

If you need any help when working with the new release, submit an issue on GitHub Issue.

GitHub: https://github.com/datafuselabs/databend/

diving into the future

Databend is a modern, open-source, cloud-native data warehouse. It leverages cost-effective cloud storage as its primary storage solution and delivers fast and efficient analytical performance. Databend has received widespread acclaim for its ability to help numerous customers achieve cost savings and increased efficiency in scenarios such as data warehousing and user behavior logging.

Databend is also available on the cloud! With Databend Cloud, you can host your Databend instances and benefit from a serverless deployment where you are billed based on compute duration rather than hardware resources. A serverless deployment not only helps lower costs but also enhances system elasticity and reliability. By utilizing Databend Cloud, you can easily build cost-effective and high-performance data warehouses, allowing you to focus on data analysis rather than maintenance work.

Databend Cloud and Databend were almost simultaneously launched for development. The development of Databend Cloud involves requirements for infrastructure scalability, high availability, etc., which align with Databend's cloud-centric positioning. The continuous refinement of both Databend Cloud and Databend has made Databend Cloud more user-friendly while also making Databend easier to deploy, manage, and access.

Efficiently, securely, and cost-effectively utilizing cloud resources has become a common focus in the industry. This article will introduce the design and trade-offs of the Databend Cloud architecture, as well as the work mechanism of its various components. We hope that this article will help you better understand the advantages of the cloud, making your cloud-based data warehouse more cost-effective, efficient, secure, and user-friendly.

Design Principles

The following principles are integrated throughout the development process of Databend Cloud:

  • Pay-as-you-go: Minimizing user expenses in computing, storage, and data transfer resources.
  • Serverless: Leveraging Kubernetes and Infrastructure as Code (IaC) for automated operations, always using the latest version.
  • Security: Following SOC2 standards to design a zero-trust architecture with strong isolation between tenants and default encryption.
  • Data ecosystem: Incorporating cloud-native and Rust elements into the big data ecosystem.

The design principles above are interconnected. To minimize resource expenses, we employed a serverless architecture that promptly releases unused resources to avoid incurring unnecessary costs. With Kubernetes and Infrastructure as Code (IaC), we established a standardized infrastructure, enabling built-in security mechanisms across different cloud providers and regions. Additionally, external data systems can seamlessly integrate with Databend Cloud without needing to care about infrastructure details.

Next, we will provide a detailed overview of the Databend Cloud architecture and how the components work together.

Architecture

The following is an architectural overview of Databend Cloud within a single region:

Architecture

In general, the architecture of Databend Cloud follows a design that separates the control plane from the data plane, with a multi-tenant storage layer and a serverless computing layer:

  • Object storage is used as the primary storage, with tenant-level isolation.
  • Kubernetes is used to manage computing resources, with Operators serving as the central component of the control plane. Each region exposes an internal Managed gRPC service for cross-cloud management.
  • Meta cluster is used for storing table schemas, identity authentication, and other metadata.
  • A custom Query Gateway is used as the entry point for the data plane.
  • A unified cross-region Cloud Console serves as the management interface for managing accounts, organization information, usage metrics, and more.

Object Storage

Databend utilizes object storage as the primary storage, enabling a completely decoupled architecture of storage and computation from day one. This greatly simplifies the development of cloud products, as it only requires managing and scheduling stateless compute nodes without the need for complex mechanisms such as node state migration or master-slave switching.

However, leveraging object storage also presents new challenges for our query engine:

  • Unstable access latency of object storage.
  • Potential access limitations imposed by cloud providers.

To address these challenges, we have implemented these optimizations:

  • Parallel scanning of object files at a large scale to maximize I/O bandwidth.
  • Dynamic scheduling framework that adapts to varying request latencies.
  • Dynamic backoff and retry for handling errors related to cloud storage limitations, ensuring query success.
  • Addition of local SSD cache to accelerate access to hot data.

Managing Compute Resources with Kubernetes

In Databend Cloud, we utilize Kubernetes to manage the compute layer. Kubernetes allows us to abstract the differences between different cloud providers and use unified APIs to coordinate the scaling of compute resources. The widely adopted Operator development pattern in the Kubernetes community enables us to flexibly extend control logic for operations such as usage statistics and auto-scaling.

Operators are at the core of the control plane. They define two Custom Resource Definitions (CRDs) for managing tenants and warehouses. When a warehouse is created in Databend Cloud, it corresponds to a Warehouse CRD, and the Operator creates a StatefulSet to start the Databend cluster for that warehouse.

Although Databend Query itself is stateless, we choose to use StatefulSet instead of Deployment for managing pods. Firstly, pods in StatefulSet are assigned a fixed ordinal number, and we designate Pod 0 as the coordinator to handle query planning and coordination. Secondly, we bind an SSD cache to the Databend cluster for accelerating data access.

In addition, to improve overall resource utilization, we use Karpenter as a scaler. It dynamically requests more physical resources from cloud providers and automatically adds them to the cluster when physical resources are insufficient, and releases them when usage is low. This enables the Kubernetes cluster to form an elastic resource pool that can adapt to changes in our physical resource requirements.

Multi-Tenant Metadata Center

Databend caches metadata from Metasrv as much as possible to reduce the access pressure on Metasrv. As a result, we choose to deploy Metasrv as a multi-tenant shared cluster, using key prefixes to differentiate metadata for different tenants.

Metadata Center

In Databend Cloud, Metasrv serves not only as a repository for storing metadata such as table structures and user authentication information, but also provides transactional support for table writes. Object storage typically does not provide strong consistency semantics for writes, so we address this limitation by using a Raft-based Metasrv cluster to ensure consistency. In Databend, each write operation on a table generates a new Snapshot file, and the write is considered successful only when the corresponding Snapshot Key is written to Metasrv, making Metasrv the foundation for implementing ACID in Databend.

As the sole stateful component in Databend Cloud's infrastructure, the reliability of Metasrv is crucial. We have implemented automated backup mechanisms and continuously conduct reliability drills on Metasrv. The openraft framework, incubated from Metasrv, is a solid foundation for ensuring correctness. It is currently the most highly acclaimed Raft framework in the Rust ecosystem and is being used in production by companies such as Microsoft and Huobi.

Data Plane

The data plane represents the path in Databend Cloud from data import, computation, to presentation. It uses the public internet's Query Gateway as the access entry point, forwarding incoming HTTPS requests from external sources to Databend instances running in Kubernetes for computation, and ultimately returning the results to the users.

Our Query protocol is based on HTTP for communication and transport layer. In a cloud-native environment, components including the Query Gateway may restart at any time due to version upgrades or machine maintenance. In such cases, compared to the 4-layer protocols like MySQL or Postgres, the stateless HTTP protocol has significant advantages. It does not require connection keep-alive, and as long as the application gracefully handles Kubernetes' exit signals, it can ensure uninterrupted user requests during version upgrades and system maintenance periods.

Data Plane

However, integrating our proprietary query protocol with the big data ecosystem has posed some challenges. We need to implement SDKs in different languages and establish one-to-one integrations with various systems, which can be a significant amount of work for a new product. Currently, we have provided SDKs in Go, Java, Python, and Rust, as well as integrations with data systems such as Metabase, Grafana, Quick BI, Deepnote, Airbyte, Kafka, DataX, and Flink CDC. We are also exploring the implementation of a protocol based on Flight SQL, leveraging the SQL protocol standards from the Arrow community, to make Databend Cloud more easily integrated with other systems and seamlessly blend into a larger ecosystem.

Auto Suspend and Auto Resume

To minimize compute resource costs, we periodically collect metrics from active warehouses. If a warehouse exceeds the suspend time, it will be put into a suspend state to release computational resources and stop incurring costs. By default, we set the auto-suspend duration for warehouses to 5 minutes, but you can adjust it to a minimum of 1 minute based on your own needs.

Auto suspend

The SuspendController in the Operator continuously monitors the Pods of active warehouses by querying the /v1/status API to retrieve the running status of Databend instances. If all Pods of a warehouse do not have active queries and the time since the last activity exceeds the suspend duration, the StatefulSet of that warehouse is deleted to release compute resources.

When a request arrives at the Query Gateway for a warehouse that is not yet started, it notifies the Operator to attempt to wake up the warehouse. Once the StatefulSet is ready, the query is then forwarded for execution. Thanks to the fast startup speed of Databend instances (around 1 second), the waiting time is usually just a few seconds. With this automatic wake-up mechanism, users no longer need to worry about the status of warehouses, as each warehouse operates as a serverless service.

We did not choose serverless frameworks like Knative because we wanted to keep the control flow simple with minimal dependencies and CRD definitions to reduce maintenance costs and allow for more flexible adjustments to serverless strategies.

Multi-Cloud and Multi-Region

In the context of Databend Cloud architecture, as mentioned earlier, there are two main components: the data plane and the control plane. In the case of multi-cloud and multi-region deployment, the Cloud Console treats each region as a black box that exposes the entry point for the control plane.

Cloud Console

In Databend Cloud, all supported regions are managed through Pulumi for Infrastructure as Code (IaC) management. Managing all cloud-based infrastructure resources through IaC ensures that our infrastructure environments in different regions are standardized and consistent. This allows us to create test environments that are consistent with production environments and enables us to quickly onboard new regions.

When a new region is added, the Cloud Console connects to it through Manage Grpc over the internal network. When a new user registers, the SetupTenant is called to initialize the tenant, which includes initializing an IAM Role and binding object storage prefix access permissions for the current tenant. The behavior of SetupTenant varies depending on the cloud provider, and we have adapted different initialization logic for different cloud providers.

In addition to control plane traffic, there is also additional communication for usage information synchronization. The Operators in each region asynchronously push usage and storage information of the Warehouses to the Cloud Console after collecting them. The Cloud Console centrally stores the usage information in its own database, aggregates it to generate usage statistics data, and generates monthly bills.

Data Security

Data security is the top priority of the Cloud Warehouse platform. During the development of Databend Cloud, we hope to apply the security practices of the State of Art to maximize the protection of user data security.

We hope to have strict data access isolation among tenants. At the same time, we also want to use as little as possible long-lifecycle AccessKey/SecretKey to access resources on the cloud, because once a long-lifecycle Key is leaked, it will cause huge damage. Security risks. Fortunately, most cloud vendors provide an identity authentication mechanism based on Kubernetes' Bound Service Account Token, which helps us solve this problem. It can provide a service with an automatically rotated Token (for example, automatically expires in half an hour), and establish an identity association with the IAM Role through the STS service of the cloud vendor, and then restrict the access rights of the service through IAM rules to achieve fine-grained access control.

Data Security

In Databend Cloud, each tenant is assigned a dedicated IAM Role during the SetupTenant process, and IAM policies are configured to restrict each tenant's access to specific prefixes within the files. This eliminates the need for long-lived access keys, making access to cloud resources such as S3 more secure and reliable.

All data stored on Databend Cloud is encrypted by default. Additionally, users can also leverage the Assume Role mechanism in Databend Cloud to mount their own S3 buckets from their AWS accounts for analysis purposes, ensuring data security while enabling data analysis capabilities.

RBAC

In Databend, users can be assigned multiple roles, and roles can have dependencies among them. However, at any given time, only one role can be active. Databend Cloud currently includes two special built-in roles: AccountAdmin and Public, which are used as administrator and regular user roles, respectively. The difference between these roles is that AccountAdmin serves as the parent role for all other roles, while Public is considered the child role of all other roles. The hierarchical relationship between these roles is approximately as follows:

RBAC

By assigning different roles and permissions to users with different responsibilities, it ensures that only authorized personnel can access sensitive data. RBAC is a significant safeguard for data security.

More Features

In addition to making Databend more user-friendly and secure as an infrastructure, we also aim to provide more possibilities through cloud services. Cloud Data Warehouse often consists of advanced features implemented as microservices. Microservices allow us to continuously iterate and enhance the capabilities of Data Cloud services, which is a key difference between cloud data warehousing and traditional data warehousing. We will continue to develop more microservices in Databend Cloud to contribute to the Data Cloud architecture. For example:

  • Automatic data import pipeline: Allows automatic synchronization of data from object storage to tables in Databend Cloud, automatically detects new files and initiates imports, and users can also push new files by calling the API endpoint in real-time. Currently, only AWS is supported, with integration with other cloud providers under development.
  • Automatic tiering: Allows automatically downgrading data from object storage to cheaper storage tiers, further reducing storage costs.
  • Automatic compaction and optimization: Automatically initiates optimization based on metadata usage, improving query performance for free.
  • Data Masking: Allows setting rules to mask specific rows and columns for specific roles, ensuring data privacy.
  • Data Market: Automatically subscribes to data shared by other tenants through sharing mechanisms, creating a data market.

Summary

Now that you have gained an understanding of the design and architecture of Databend Cloud, feel free to visit https://app.databend.com to sign up and try it out! Databend Cloud is committed to providing an affordable, efficient, user-friendly, and secure Cloud Data Warehouse solution. If you have large amounts of data and are looking for a cost-effective solution to meet your analytical needs, we're here to help.

The TPC-DS benchmark is widely used for measuring the performance of decision support and analytical systems. Databend is a data warehouse that supports TPC-DS SQLs. In this blog, we will walk you through the process of benchmarking TPC-DS with Databend, covering key aspects such as generating TPC-DS data, preparing create tables for Databend, and executing benchmark queries.

What's TPC-DS?

TPC-DS is a decision support benchmark that models several generally applicable aspects of a decision support system, including queries and data maintenance. The benchmark provides a representative evaluation of performance as a general purpose decision support system.

It includes 7 fact tables, 17 dimension tables, with an average of 18 columns per table and 99 test queries.

You can find more information about TPC-DS at https://www.tpc.org/tpcds/.

Running TPC-DS Benchmark on Databend

This section describes the steps to run the TPC-DS benchmark on Databend and provides the related scripts. You can find more detail information at: https://github.com/datafuselabs/databend/tree/main/benchmark/tpcds.

Step 1: Generate TPC-DS test data

Leverage duckdb to generate TPC-DS data:

INSTALL tpcds;
LOAD tpcds;
SELECT * FROM dsdgen(sf=1);
EXPORT DATABASE 'TARGET_DIR' (FORMAT CSV, DELIMITER '|');

Step 2: Load TPC-DS data into Databend

./load_data.sh

Step3: Run TPC-DS queries

databend-sqllogictests --handlers mysql --database tpcds --run_dir tpcds --bench 

Databend is an open-source elastic and workload-aware modern cloud data warehouse that allows you to do blazing-fast data analytics on a variety of storage services.

This post shows you how to configure WebHDFS as a storage backend for Databend.

WebHDFS is a REST API that provides HTTP access to HDFS, a popular distributed file system in the big data ecosystem. By using WebHDFS, you can avoid the dependency on Java environment and specific jar packages that are required by native HDFS client.

Step 1: Prepare HDFS Environment

Skip this step if you already have a deployed HDFS environment. Ensure that WebHDFS is enabled and accessible. Please note that in some public cloud platforms, managed HDFS services may not support WebHDFS.

If you don't have an HDFS environment, set up a local one for testing:

git clone https://github.com/PsiACE/databend-workshop.git
cd databend-workshop/webhdfs
docker-compose up

You can now access http://127.0.0.1:9870:

Step 2: Deploy Databend

Before starting Databend, configure the settings endpoint_url and root in the file databend-query.toml as shown in the example below. Please note that you can also configure a delegation token for authentication.

[storage]
type = "webhdfs"
[storage.webhdfs]
endpoint_url = "http://127.0.0.1:9870"
# set your root
root = "/analyses/databend/storage"
# if your webhdfs needs authentication, uncomment and set with your value
# delegation = "<delegation-token>"

For more information about how to deploy Databend in standalone mode with WebHDFS, see Deploying a Standalone Databend.

Step 3: Test Functionality

Upload books.csv file from your directory to the specified path in HDFS.

curl -L -X PUT -T ../data/books.csv 'http://127.0.0.1:9870/webhdfs/v1/data-files/books.csv?op=CREATE&overwrite=true'

Upload the file books.csv to your HDFS:

$> mysql -uroot -h0.0.0.0 -P3307

mysql> DROP DATABASE IF EXISTS book_db;
Query OK, 0 rows affected (0.02 sec)

mysql> CREATE DATABASE book_db;
Query OK, 0 rows affected (0.02 sec)

mysql> use book_db;
Database changed

mysql> CREATE TABLE IF NOT EXISTS books ( title VARCHAR, author VARCHAR, date VARCHAR );
Query OK, 0 rows affected (0.02 sec)

Create a stage with your WebHDFS first, and then load data from the data file using the COPY INTO command:

mysql> CREATE STAGE IF NOT EXISTS whdfs URL='webhdfs://127.0.0.1:9870/data-files/' CONNECTION=(HTTPS='false');
Query OK, 0 rows affected (0.01 sec)

mysql> DESC STAGE whdfs;
+-------+------------+----------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------------+---------+
| name | stage_type | stage_params | copy_options | file_format_options | number_of_files | creator | comment |
+-------+------------+----------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------------+---------+
| whdfs | External | StageParams { storage: Webhdfs(StorageWebhdfsConfig { endpoint_url: "http://127.0.0.1:9870", root: "/data-files/", delegation: "" }) } | CopyOptions { on_error: AbortNum(1), size_limit: 0, split_size: 0, purge: false, single: false, max_file_size: 0 } | FileFormatOptions { format: Parquet, skip_header: 0, field_delimiter: ",", record_delimiter: "\n", nan_display: "NaN", escape: "", compression: None, row_tag: "row", quote: "", name: None } | NULL | 'root'@'127.0.0.1' | |
+-------+------------+----------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------------+---------+
1 row in set (0.01 sec)
Read 1 rows, 590.00 B in 0.002 sec., 414.67 rows/sec., 238.92 KiB/sec.

mysql> COPY INTO books FROM @whdfs FILES=('books.csv') file_format=(type=CSV field_delimiter=',' record_delimiter='\n' skip_header=0);
Query OK, 2 rows affected (1.83 sec)

After copying the data into the stage, you can run some SQL queries to check it. For example:

mysql> SELECT * FROM books;
+------------------------------+---------------------+------+
| title | author | date |
+------------------------------+---------------------+------+
| Transaction Processing | Jim Gray | 1992 |
| Readings in Database Systems | Michael Stonebraker | 2004 |
+------------------------------+---------------------+------+
2 rows in set (0.02 sec)
Read 2 rows, 157.00 B in 0.015 sec., 137.21 rows/sec., 10.52 KiB/sec.

If you go to 127.0.0.1:9870 now, you can see the corresponding storage under /analyses/databend/storage/:

Congrats! You're all set.

Dear Databenders,

We're excited to share with you that on the occasion of the second anniversary of the establishment of Databend Labs, we're officially releasing Databend 1.0! This is a significant milestone for our team, and we want to express our gratitude for your support and contributions along the way.

Driven by top-tier use cases and requirements, the Databend community has been committed to addressing the challenges of cost and complexity in big data analytics. Stats shows that around 700 TB of data is being written to cloud object storage and analyzed using Databend everyday by users from Europe, North America, Southeast Asia, Africa, China, and other regions. This has resulted in saving them millions of dollars in costs every month.

Databend 1.0 is a significant release, and we believe it will drive further advancements in cloud-based big data analytics. Now let's dive in to see what's new in Databend 1.0 compared to version 0.9, and explore our team's vision and future prospects in depth.

What's New in Databend 1.0

Databend 1.0 has demonstrated a noteworthy enhancement in its performance, as evidenced by a twofold increase in query efficiency compared to version 0.9. Please check on the ClickBench website for further details. Moreover, Databend 1.0 has also introduced a variety of new features:

UPDATE

Databend now allows users to modify data using the UPDATE statement with the syntax below. This UPDATE support marks the completion of Databend's full support for CRUD operations.

-- Update a book (Id: 103)
UPDATE bookstore SET book_name = 'The long answer (2nd)' WHERE book_id = 103;

ALTER TABLE

The ALTER TABLE statement is now available to modify the structure of tables:

-- Add a column
ALTER TABLE t ADD COLUMN c Int DEFAULT 10;

DECIMAL Data Type

Databend has successfully integrated support for the DECIMAL data type after an extensive refactoring of the type system, laying a robust foundation for this feature.

-- Create a table with decimal data type.
create table tb_decimal(c1 decimal(36, 18));

-- Insert two values.
insert into tb_decimal values(0.152587668674722117), (0.017820781941443176);

select * from tb_decimal;
+----------------------+
| c1 |
+----------------------+
| 0.152587668674722117 |
| 0.017820781941443176 |
+----------------------+

Native Format

Databend 1.0 introduces several significant enhancements to the Native Format strawboat. In addition to support for semi-structured data, the Databend community has implemented a range of performance optimizations, leading to substantial improvements in performance on the HITS dataset.

Cost-Based Optimization (CBO)

The histogram framework has been introduced to enable more accurate cost estimation using statistics. The join reorder algorithm has been further improved and strengthened, resulting in a significant improvement in the performance of multi-table joins and helping Databend achieve remarkable performance improvements on the TPCH dataset.

SELECT FROM STAGE

STAGE serves as the central hub for data flow in Databend. While we've already supported loading data from STAGE and exporting data to it, we've now taken things a step further by enabling data querying directly within STAGE.

Users can now create a STAGE that contains data files and easily perform data querying without having to write complex CREATE TABLE statements or go through tedious data import processes.

select min(number), max(number) 
from @lake (pattern => '.*parquet');

+-------------+-------------+
| min(number) | max(number) |
+-------------+-------------+
| 0 | 9 |
+-------------+-------------+

For one-time queries, users can use a short URI in the statement for even greater convenience:

select count(*), author 
from 'https://datafuse-1253727613.cos.ap-hongkong.myqcloud.com/data/books.parquet'
(file_format => 'parquet')
group by author;

+----------+---------------------+
| count(*) | author |
+----------+---------------------+
| 1 | Jim Gray |
| 1 | Michael Stonebraker |
+----------+---------------------+

Query Result Cache

The Databend community introduced Query Result Cache functionality in version 1.0, allowing for the caching of queries that have already been executed. This eliminates the need for repeatedly running identical queries, providing significant performance benefits when the underlying data remains unchanged.

MySQL [(none)]> SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
+---------------------+-------------+------+----------------+----------------------+
| watchid | clientip | c | sum(isrefresh) | avg(resolutionwidth) |
+---------------------+-------------+------+----------------+----------------------+
| 6655575552203051303 | 1611957945 | 2 | 0 | 1638.0 |
| 8566928176839891583 | -1402644643 | 2 | 0 | 1368.0 |
| 7904046282518428963 | 1509330109 | 2 | 0 | 1368.0 |
| 7224410078130478461 | -776509581 | 2 | 0 | 1368.0 |
| 5957995970499767542 | 1311505962 | 1 | 0 | 1368.0 |
| 5295730445754781367 | 1398621605 | 1 | 0 | 1917.0 |
| 8635802783983293129 | 900266514 | 1 | 1 | 1638.0 |
| 5650467702003458413 | 1358200733 | 1 | 0 | 1368.0 |
| 6470882100682188891 | -1911689457 | 1 | 0 | 1996.0 |
| 6475474889432602205 | 1501294204 | 1 | 0 | 1368.0 |
+---------------------+-------------+------+----------------+----------------------+
10 rows in set (3.255 sec)

MySQL [(none)]> SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
+---------------------+-------------+------+----------------+----------------------+
| watchid | clientip | c | sum(isrefresh) | avg(resolutionwidth) |
+---------------------+-------------+------+----------------+----------------------+
| 6655575552203051303 | 1611957945 | 2 | 0 | 1638.0 |
| 8566928176839891583 | -1402644643 | 2 | 0 | 1368.0 |
| 7904046282518428963 | 1509330109 | 2 | 0 | 1368.0 |
| 7224410078130478461 | -776509581 | 2 | 0 | 1368.0 |
| 5957995970499767542 | 1311505962 | 1 | 0 | 1368.0 |
| 5295730445754781367 | 1398621605 | 1 | 0 | 1917.0 |
| 8635802783983293129 | 900266514 | 1 | 1 | 1638.0 |
| 5650467702003458413 | 1358200733 | 1 | 0 | 1368.0 |
| 6470882100682188891 | -1911689457 | 1 | 0 | 1996.0 |
| 6475474889432602205 | 1501294204 | 1 | 0 | 1368.0 |
+---------------------+-------------+------+----------------+----------------------+
10 rows in set (0.066 sec)

Table Data Cache

Cache is an important component in the storage-compute separation architecture. Databend 1.0 introduced Table Data Cache, which enhances the query performance by caching data blocks based on the frequency of data access. Databend decides whether to cache the data block based on the hotness of the accessed data, thereby improving query performance during subsequent accesses.

Alt text

Aggregate Spill

Databend 1.0 introduced aggregate spill, which allows temporary storage and persistence of aggregation data to object storage based on the current memory usage. This helps prevent excessive memory usage during query execution when performing aggregation queries in Databend.

What's Next

After multiple iterations, Databend has finally taken shape. Now, let's take a fresh look at Databend:

  • Databend is a cloud-native data warehouse developed using Rust, featuring storage-compute separation, object storage-oriented design, and extreme elasticity.
  • Databend provides complete CRUD functionality and supports protocols such as MySQL, ClickHouse, and HTTP RESTful.
  • Databend natively supports complex data types such as ARRAY, MAP, and JSON, as well as high-precision types like DECIMAL.
  • Databend's columnar storage engine is similar to Git's MVCC, supporting Data Time Travel and Data Share capabilities.
  • Databend is not restricted to specific storage vendors and can run on any storage service, allowing for direct querying of data on any storage service.
  • Databend fully supports HDFS and cloud-based object storage protocols, including Alibaba Cloud OSS, Tencent Cloud COS, as well as Amazon S3, Azure Blob, and Google Cloud Storage.

The journey of Databend extends far beyond this, and in the future, we hope to introduce more powerful features and foster a more open community.

More Powerful Features

We have planned several features for Databend 1.1 to better meet the needs of our users and to make breakthroughs in the CDC scenario:

  • JSON index: Improves the retrieval capability of semi-structured data.
  • Distributed ingest capability: Improves data writing speed.
  • MERGE INTO function: Enable real-time Change Data Capture (CDC) capabilities for data source updates, including additions, deletions, and modifications.
  • Windows Function.

More Open Community

Databend Labs is composed of a group of open source enthusiasts, and the Databend project has been an open source project licensed under the Apache 2.0 license since its inception. While drawing inspiration from and incorporating excellent ideas from other open source projects such as ClickHouse and CockroachDB, we also give back to the communities in our own way:

  • We have open-sourced Openraft, the consensus engine for the Databend metadata service cluster.
  • We have donated our underlying data access engine, OpenDAL, to the Apache Software Foundation and it has successfully entered incubation.
  • We have contributed to various dependent projects such as the arrow2 vector computation library.
  • We have kept up with and adopted Rust Nightly to help the Rust community reproduce and verify issues.

We're deeply grateful to the 144 contributors who have made Databend what it is today. Without the open source community, we wouldn't be where we are now. Moving forward, we aim to collaborate more openly with other open source communities to support reading and writing in formats such as Iceberg and Delta Lake. We strive to break down barriers between data and enable it to flow more freely and flexibly.

Recently someone in the community suggested that we try profile-guided optimization (#9387). Let's see how we can use Rust to build a PGO-optimized Databend!

Background

Profile-guided Optimization is a compiler optimization technique, which collects typical execution data (possible branches) during program execution and then optimizes for inlining, conditional branches, machine code layout, register allocation, etc.

The reason to introduce this technique is that static analysis techniques only consider code performance improvements without actually executing the program. However, these optimizations may not be fully effective. In the absence of runtime information, the compiler cannot take into account the actual execution of the program.

PGO allows data to be collected based on application scenarios in a production environment, so the optimizer can optimize the speed for hot code paths and size for cold code paths and produce faster and smaller code for applications.

rustc supports PGO by building data collection into the binaries, then collecting perf data during runtime to prepare for the final compilation optimization. The implementation relies entirely on LLVM.

Workflow

Follow the workflow below to generate a PGO-optimized program:

  • Compile the program with instrumentation enabled.
  • Run the instrumented program to generate a profraw file.
  • Convert the .profraw file into a .profdata file using LLVM's llvm-profdata tool.
  • Compile the program again with the profiling data.

Preparations

The data collected during the run will be eventually converted with llvm-profdata. To do so, install the llvm-tools-preview component via rustup, or consider using the program provided by a recent LLVM or Clang version.

rustup component add llvm-tools-preview

After the installation, llvm-profdata may need to be added to the following PATH:

~/.rustup/toolchains/<toolchain>/lib/rustlib/<target-triple>/bin/

Step-By-Step

The following procedure uses Databend's SQL logic tests for demonstration purposes only to help us understand how it works, so you may not get positive results for performance. Use a typical workload for your production environment.

The caveat, however, is that the sample of data fed to the program during the profiling stage must be statistically representative of the typical usage scenarios; otherwise, profile-guided feedback has the potential to harm the overall performance of the final build instead of improving it.

  1. Make sure there is no left-over profiling data from previous runs.

    rm -rf /tmp/pgo-data
  2. Build the instrumented binaries (with release profile), using the RUSTFLAGS environment variable in order to pass the PGO compiler flags to the compilation of all crates in the program.

    RUSTFLAGS="-Cprofile-generate=/tmp/pgo-data" \
    cargo build --release --target=x86_64-unknown-linux-gnu
  3. Instrumented binaries were run with some typical workload and we strongly recommend using workload that is statistically representative of the real scenario. This example runs SQL logic tests for reference only.

  • Start a stand-alone Databend via a script, or a Databend cluster. Note that a production environment is more likely to run in cluster mode.

  • Import the dataset and run a typical query workload.

    BUILD_PROFILE=release ./scripts/ci/deploy/databend-query-standalone.sh
    ulimit -n 10000;ulimit -s 16384; cargo run -p sqllogictests --release -- --enable_sandbox --parallel 16 --no-fail-fast
  1. Merge the .profraw files into a .profdata file with llvm-profdata.

    llvm-profdata merge -o /tmp/pgo-data/merged.profdata /tmp/pgo-data
  2. Use the .profdata file for guiding optimizations. In fact, you can notice that both builds use the --release flag, because in an actual runtime case we always use the release build binary.

    RUSTFLAGS="-Cprofile-use=/tmp/pgo-data/merged.profdata -Cllvm-args=-pgo-warn-missing-function" \
    cargo build --release --target=x86_64-unknown-linux-gnu
  3. Run the compiled program again with the previous workload and check the performance:

    BUILD_PROFILE=release ./scripts/ci/deploy/databend-query-standalone.sh
    ulimit -n 10000;ulimit -s 16384; cargo run -p sqllogictests --release -- --enable_sandbox --parallel 16 --no-fail-fast

References

Sccache is a ccache-like project started by the Mozilla team, supporting C/CPP, Rust and other languages, and storing caches locally or in a cloud storage backend. In v0.3.3, Sccache added native support for the GitHub Action Cache Service; in the subsequent v0.4.0-pre.6, the community has continued to improve this functionality, and it is now ready for use in the production CI.

I recently added sccache to the PyO3/maturin CI for testing, and found it to have the following advantages

  • Easy deployment configuration: No need to specify shared key, no need to worry about GHA's internal cache-from/cache-to logic, just configure SCCACHE_GHA_ENABLED: "true".
  • Multi-language support: sccache supports caching different compilers for C/CPP, Rust and nvcc at the same time.
  • Faster in most scenarios: sccache caches the compilation product, no need to load the entire cache in advance, and no need to upload the cached content after the build is complete.
  • Concurrent job friendly: sccache can share caches between multiple concurrent jobs/workflows, no need to wait until the end of the build.
  • No cache conflicts: sccache performs hash calculations on each build product input (parameters, environment variables, files, etc.) to build a global conflict-free cache without cache conflicts and without the need to specify additional different cache keys.
  • No vendor lock-in: sccache is built on [opendal] (https://github.com/datafuselabs/opendal) and naturally supports a variety of different storage services, allowing seamless migration to s3/gcs/azlob etc. in future CI evolutions without relying on GHA cache services.
  • Actively maintained: sccache is currently actively maintained by me, you can submit feedback directly if you encounter problems using it.

The following are the results of the maturin project tests.

CasesSccache vs rust-cache (2nd)Sccache vs rust-cache (3rd)
Test (ubuntu-latest, 3.7)59.72%-1.68%
Test (ubuntu-latest, 3.8)-4.70%-8.22%
Test (ubuntu-latest, 3.9)30.72%10.81%
Test (ubuntu-latest, 3.10)1.03%12.15%
Test (ubuntu-latest, 3.11)-10.16%-29.35%
Test (ubuntu-latest, pypy3.8)18.34%-3.84%
Test (ubuntu-latest, pypy3.9)5.13%22.90%
Test (macos-latest, 3.7)11.87%5.65%
Test (macos-latest, 3.8)-7.82%-13.65%
Test (macos-latest, 3.9)-17.98%-45.20%
Test (macos-latest, 3.10)-13.20%-15.38%
Test (macos-latest, 3.11)-17.44%-29.55%
Test (macos-latest, pypy3.8)14.83%-23.32%
Test (macos-latest, pypy3.9)-28.03%-38.56%
Test (windows-latest, 3.7)30.08%24.22%
Test (windows-latest, 3.8)35.11%41.14%
Test (windows-latest, 3.9)9.24%-5.28%
Test (windows-latest, 3.10)-8.56%-15.81%
Test (windows-latest, 3.11)-1.39%-36.49%
Test (windows-latest, 3.8)-19.99%-35.54%
Test (windows-latest, 3.9)18.95%-8.55%

The table compares the difference between a second/third run with sccache and a run with rust-cache, with bolded plural entries indicating that sccache is faster than rust-cache. As you can see, as the cache hit rate increases, sccache achieves a maximum improvement of almost 50% over rust-cache.

Give sccache a try, and if you don't like it, go to issues for feedback, and feel free to contribute!


In the next section I will first describe the internal API of the GitHub Action Cache Service and how it works, and then compare the differences between the rust-cache / sccache implementations to show why sccache is better / faster.

GitHub Action Cache Service Principle

The GitHub Action Cache Service is essentially an immutable storage service that supports prefix queries and provides the following non-public API.

Query Cache

GET /cache?keys=abc,ab,a&version=v1

  • keys: Specify a comma-separated set of query keys, the result will be the latest key with the same version prefix match
  • version:Specify the namespace used by the cache key

Reserve Cache

POST /caches

  • inputs: {key: <cache_key>, version: <cache_version>}
  • outputs: {cache_id: <cache_id>}

After each set of (key, version) has been reserved, all subsequent requests with the same key & version will return an already exists error, meaning that the cache cannot be overwritten. A successful request will return a numeric cache_id, which will be used for subsequent uploads and cache creation.

Upload Cache

PATCH /caches/[cache_id]

Upload specific cache content, using Content-Range to mark the cache location for this upload.

Create Cache

POST /caches/[cache_id]

This API can be used to create a cache after all the cached content has been created. The cache will only be queried after a successful response from this API.

rust-cache implementation

On top of this internal API (which is reasonably suspected to be provided by Azure DevOps services), GitHub provides actions/cache for users to use, and rust-cache is based on the @actions/cache implementation.

rust-cache calculates a cache key based on the github job_id, rustc version, environment variables, Cargo.lock, etc., and then packages ~/.cargo and . /target into one package and upload it. If the cache hits, it will be loaded locally and unpacked, if the cache misses, the cache will be uploaded in the post action.

The advantage of rust-cache is that the GHA Cache API is only called once for the whole process and the rate limit is rarely triggered.

sccache implementation

The GHA implementation of sccache is completely file based.

Alt text

sccache calculates a hash as a cache key based on the environment variables, binary, compilation parameters, input files, etc. passed during each rustc call, and loads the file directly from the storage service if it exists, skipping this compilation operation, otherwise it compiles and writes the result to the storage service. This means that

  • sccache does not have to deal with cache conflicts caused by different inputs, and can use an always unique hash as the cache key.
  • sccache can download the required cache at rustc compile time, without having to load the entire contents beforehand, or upload them all after the job has finished.
  • Sccache's cache loading logic is not heavily dependent on Cargo.lock itself, so the cache can be reused even if a large number of dependencies change.
  • sccache can reuse the cache between concurrent jobs, as they all share the same non-conflicting memory space.

In addition, sccache can be used for compilation caching of languages such as c/cpp, and if compilers such as gcc/clang are also used in the project, the cache can be shared with a simple configuration.

To help users make better use of Sccache in GitHub Action, we have developed sccache-action. However, my PR is not currently merged and can be used first with my fork:

- name: Sccache Setup
# Just for test, come back to upstream after released
uses: Xuanwo/sccache-action@c94e27bef21ab3fb4a5152c8a878c53262b4abb0
with:
version: "v0.4.0-pre.6"

Next, only two environment variables need to be configured.

env:
SCCACHE_GHA_ENABLED: "true"
RUSTC_WRAPPER: "sccache"

At the end of each Job, sccache-action outputs the usage of this Cache.

/opt/hostedtoolcache/sccache/0.4.0-pre.6/x64/sccache --show-stats
Compile requests 1887
Compile requests executed 1035
Cache hits 836
Cache hits (C/C++) 22
Cache hits (Rust) 814
Cache misses 189
Cache misses (Rust) 189
Cache timeouts 0
Cache read errors 0
Forced recaches 0
Cache write errors 0
Compilation failures 10
Cache errors 0
Non-cacheable compilations 0
Non-cacheable calls 852
Non-compilation calls 0
Unsupported compiler calls 0
Average cache write 0.051 s
Average compiler 1.132 s
Average cache read hit 0.000 s
Failed distributed compilations 0

Non-cacheable reasons:
crate-type 521
- 320
unknown source language 11

Cache location ghac, name: sccache-v0.4.0-pre.6, prefix: /sccache/

Users can use this information to adjust their strategy for using sccache.


Conclusion

Sccache uses GHA Cache in a new way to accelerate the compilation of Rust projects, with the following advantages over existing solutions

  • Easy deployment and configuration
  • Multiple language support
  • Faster in most scenarios
  • Concurrent task friendly
  • No cache conflicts
  • No vendor lock-in
  • Active maintenance

Feel free to try and use Sccache in your own projects!

Databend v0.9.0 was officially released on January 13, 2023! This release is the last major release of Databend before version 1.0, and it is also the version in which we have refactored the core code the most so far. Compared to v0.8.0, we have made more than 5,000 commits, 700+ optimizations and fixes, 4,347 file changes, and 340,000 lines of code changes in v0.9.0. Thanks to all the community partners who participated and to everyone who made Databend better!

In v0.9.0, we introduced a new type system, a new expression calculation framework, JSONB support, complete JOIN support and optimization, CBO support, a native storage format and other major feature optimizations. A lot of optimization and enhancements have also been made in terms of performance, stability, and usability.

Performance comparison

In the new version, we have made a lot of optimizations in the execution engine, optimizer, and storage layer. The efficiency gets doubled in most scenarios. The following is a performance comparison between v0.8 and v0.9 benchmarking the Hits dataset with the default FUSE engine on S3:

Alt text

Brand new type system

To give databend an easy-to-understand yet powerful type derivation system, we learned from the compiler internals of a number of good programming languages and then refined a subset of them for SQL. Based on the current purely static type system, we have a sophisticated type derivation mechanism that infers the execution of expressions as much as possible at the compile time of SQL, a minimalist expression function registration logic, and the implementation of generic derivation at the database type level.

On top of the new type system, modules such as constant folding, type derivation, function registration, query data trimming, etc. can benefit from the new type system.

Support for JSONB

In the new version, we implemented the Rust version of JSONB, the default JSON data type will be stored in JSONB, and it is also compatible with the old JsonText format. Based on the binary JSON format, both storage space and query performance have been significantly optimized.

Alt text

More info: https://docs.databend.com/doc/contributing/rfcs/json-optimization

Full support for JOINs

Full Join types are supported: inner/natural/cross/outer/semi/anti joins. In the past few months, according to the feedback from the community and online users, hash join has been deeply optimized to meet the performance requirements in most scenarios.

Support for CBO

We have added the statistical calculation logic of NDV in the statistical information, and users can now generate statistical information tables through the "Analyze" command similar to presto. JOINs can use the existing statistical information to optimize the logical plan based on cost. After the subsequent CBO support is improved, we will update the query performance data comparison of TPCH 100G data.

Native Storage Format

Databend supports Git-Like's Fuse engine. Based on this engine, we can quickly go back to a certain historical point in time to query, and realize "time travel" inside the database. Inside the Fuse engine, we also support a new Storage Format besides Parquet --- strawboat: https://github.com/sundy-li/strawboat.

Strawboat is based on Arrow's native storage format. Based on it, we can read data more efficiently than Parquet. In the hits dataset, the full table scan native format can be 2-3 times faster. In the hits data set, a very considerable improvement has been achieved in the local deployment scenario, and we will improve the performance comparison in clickbench later.

Efficient bloom filter filtering

In the new version, we introduced the xor filter to calculate and store the bloom filter for each column. Compared with the previous version, the new bloom filter has improved a lot in import query performance and occupied space. See https://www.databend.com/blog/xor-filter

Designing and open-sourcing serverless DataSharing protocol

In the new version we created a zero-trust data sharing solution among multi-tenants based on object storage presign short-term access token. In the case of consistent basic performance, aws lambda will be used to implement data sharing in a serverless manner.

Stage

We implemented UserStage in the new version, similar to the home directory of linux: COPY INTO my_table FROM @~;

  • Stage's data import supports meta storage status, which means we can always save and import new files from stage.
  • Support exporting multiple files in different formats from Stage.
  • Importing tables from Stage supports parallelization.

Other improvements

The new release also includes these improvements:

  • Read_parquet of duckdb supports reading local parquet files directly without importing
  • Commonly used function performance optimization, commonly used GEO function support
  • Distinct performance optimization
  • Adaptive String HashTable
  • SQLancer integration
  • Parquet reading acceleration
  • The previous python version sqllogictest was rewritten to use Rust
  • NDJSON and JSON output format support
  • ALTER TABLE supports recluster
  • Support hyperloglog update and delete: https://db.in.tum.de/~freitag/papers/p23-freitag-cidr19.pdf

Download v0.9.0

If you are interested in Databend v0.9.0, go to https://github.com/datafuselabs/databend/releases/tag/v0.9.0-nightly to view all changelogs or download the release.

If you are using an old version of Databend, you can directly upgrade to the new version. For the upgrade process, please refer to: https://docs.databend.com/doc/operations/upgrade

Feedback

If you need any help when working with the new release, submit an issue on GitHub Issue. GitHub: https://github.com/datafuselabs/databend/

Rewriting sqllogictest Framework with Rust

This post is about a big move we've made for Databend. We successfully switched the sqllogictest framework from Python to Rust using sqllogictest-rs, a robust implementation of the sqllogictest framework for the Rust ecosystem. Sqllogictest was designed with SQLite in mind. Benefiting from its neutrality towards database engines, we can use Sqllogictest to verify the accuracy of a SQL database engine as well. This is done by comparing query results from multiple SQL engines running the same query.

Why sqllogictest-rs?

The original sqllogictest framework (RFC for sqllogictest) was written in Python. We planned a switch to sqllogictest-rs for the following reasons:

  • The entire Databend team is proficient in Rust. Working with a unified codebase written in Rust would boost our productivity over the long term.
  • The previous framework lacked a strict parser at the front end and resulted in errors going undetected.
  • As the sqllogictest-rs crate is maturing, building a new sqllogictest framework based on it would save us a lot of effort in the long run.
  • We expected a 10x performance boost from the switch to Rust. The Python sqllogictest had been experiencing suboptimal runtime that resulted in a slower CI.

How We Nailed It

Our first version of sqllogictest doesn't strictly follow the sqllogictest wiki. This is a little bit frustrating because we have to manually adjust the format of the test files, for example, in some cases like these:

  • Extra blank lines between the query and ----.
  • Non-identical comment formats.
  • Confusing empty strings: It displays results from queries like select ' ' with , rather than (empty).

Databend supports three types of client handlers: MySQL, HTTP, and ClickHouse. Each type of them returns content in a different format. The HTTP handler returns content in JSON format and the ClickHouse handler returns it in TSV, both of which require the following substitutions:

  • inf -> Infinity
  • nan -> NaN
  • \N -> NULL

We introduced sandbox tenant to increase parallelism. Each test file now runs in parallel in its own sandbox environment that is separated from each other. The benefits of doing so include preventing a database or table from being dropped by mistake and significantly reducing test time.

Unsolved Issues​

We're still figuring out the most effective way to test a query that returns dynamic results. For example, the Create_time in the result returned from SHOW TABLE STATUS.

After the Switch

We're glad to see an efficiency improvement after going with sqllogictest-rs and this will benefit the entire Databend community. Our special thanks go to sqllogictest-rs for the great support, and everyone who has been involved. If you're also a fan of sqllogictest, stay tuned for more exciting news by visiting the following links:

Profiling CPU and memory for Go applications is easy and can be of great help in performance troubleshooting, for example, with flamegraphs. For Rust applications, however, the profiling requires extra work. This post explains how to use flamegraphs to visualize performance data of your CPU and memory for Databend.

To support CPU and memory profiling, some APIs need to be included in the application. For example, Databend includes the following in the code:

CPU Profiling

To do a CPU profiling, simply run the following command on the Databend server:

go tool pprof -http="0.0.0.0:8081" http://localhost:8080/debug/pprof/profile?seconds=30
  • localhost:8080: Databend management address.
  • 0.0.0.0:8081: pprof server address.
  • seconds=30: Profiling lasts for 30 seconds.

Then open the URL <your-ip>:8081/ui/flamegraph in your browser to view the flamegraph:

Alt text

Memory Profiling

Compared to CPU profiling, memory profiling is a bit more involved, and can be done in the following steps:

1. Enable Memory Profiling

cargo build --bin databend-query --release --features memory-profiling

2. Start with MALLOC_CONF

MALLOC_CONF=prof:true,lg_prof_interval:30 ./target/release/databend-query
  • lg_prof_interval:30: Profiles are dumped into a file for each allocation of 1 GiB (2^30 bytes).

3. Replace add2line with a Faster One

This will rocket your jeprof from 30 minutes to 3 seconds.

git clone https://github.com/gimli-rs/addr2line
cd addr2line
cargo b --examples -r
cp ./target/release/examples/addr2line <your-addr2line-find-with-whereis-addr2line>

4. Upgrade jeprof to the Latest Version

jeprof needs an upgrade because the old version doesn't support some parameters for creating flamegraphs. jeprof is a perl script, so the way to upgrade it is a little bit rough-and-ready.

First, find out the path of your local jeprof file:

whereis jeprof

Open and copy the latest version of jeprof, then overwrite your local copy with the copied script EXCEPT for the following two parameters:

my $JEPROF_VERSION = "5.2.1-0-gea6b3e973b477b8061e0076bb257dbd7f3faa756";
my $PPROF_VERSION = "2.0";

5. Create a Flamegraph

jeprof ./databend-query-main ./jeprof.206330.563.i563.heap --collapse | flamegraph.pl --reverse --invert --minwidth 3 > heap.svg
  • flamegraph.pl: Download from GitHub.
  • databend-query-main: Path to your executable.
  • jeprof.206330.563.i563.heap: Selects a heap file.

Alt text

References

When working with Databend, you don't bother maintaining indexes. Databend takes advantage of these indexing techniques to automatically build and manage indexes on the fly:

  • Min/Max index
  • Bloom index
  • Cluster key

Min/Max Index

Min/Max Index is the key indexing technique for OLAP databases. Databend Fuse Engine uses it as the main indexing method to build indexes and store them in snapshots, segments, and blocks. The following shows how the Min/Max Index works for a table in Databend.

First, use SHOW CREATE TABLE to find the initial snapshot file created for the table:

show create table ontime(
`Year` INT, -- First column
...
) ENGINE=FUSE SNAPSHOT_LOCATION='1/458/_ss/71b460c61fa943d1a391d3118ebd984c_v1.json'

Download and open the snapshot file with VSCODE:

{
"format_version": 1,
"snapshot_id": "71b460c6-1fa9-43d1-a391-d3118ebd984c",
"timestamp": "2022-11-29T03:44:03.419194Z",
"prev_snapshot_id": null,
"schema": {
"fields": [
... -- Field definitions
],
"metadata": {}
},
"summary": {
"row_count": 90673588,
"block_count": 200,
"perfect_block_count": 0,
"uncompressed_byte_size": 65821591614,
"compressed_byte_size": 2761791374,
"index_size": 1194623,
"col_stats": {
...
"0": { -- Min/Max indexes for the first column 'Year' in the table
"min": {
"Int64": 1987
},
"max": {
"Int64": 2004
},
"null_count": 0,
"in_memory_size": 362694352,
"distinct_of_values": 0
},
...
}
},
"segments": [
...
[
"1/458/_sg/ddccbb022ba74387be0b41eefd16bbbe_v1.json",
1
],
...
],
"cluster_key_meta": null
}

The file above indicates that the min value of the first column is 1987 and the max is 2004. The indexes in a snapshot file can tell you whether the data you want to retrieve exists in the table. For example, no data would be returned for the following query if Databend cannot find a matching Min/Max interval in all snapshots:

select avg(DepDelay) from ontime where Year='2003';

Databend Fuse Engine stores the most important indexes in segment files. At the end of a snapshot file, you can find information about which segments are related to the snapshot. Here's a sample segment file:

{
"format_version":1,
"blocks":[
{ -- block ...
...
"row_count": 556984,
"block_size": 405612604,
"file_size": 25302413,
"col_stats": {
...
"0": {
"min": {
"Int64": 2003
},
"max": {
"Int64": 2003
},
"null_count": 0,
"in_memory_size": 2227936,
"distinct_of_values": 1
},
...
},
"col_metas": {
-- Used to record the start position and length of each column
},
"cluster_stats": null,
"location": [
"1/458/_b/e4f3795c79004f22b80ed5ee821edf23_v0.parquet",
0
],
"bloom_filter_index_location": [
"1/458/_i_b_v2/e4f3795c79004f22b80ed5ee821edf23_v2.parquet",
2
],
"bloom_filter_index_size": 60207,
"compression": "Lz4Raw"
...
}
],
"summary": {
"row_count": 11243809,
"block_count": 25,
"perfect_block_count": 25,
"uncompressed_byte_size": 8163837349,
"compressed_byte_size": 339392734,
"index_size": 1200133,
"col_stats": {
...
"0": {
"min": {
"Int64": 1988
},
"max": {
"Int64": 2003
},
"null_count": 0,
"in_memory_size": 44975236,
"distinct_of_values": 0
},
...
}
}
}

From the sample above, we can see that a segment file contains its own Min/Max index information. So does a block file. The Min/Max indexes are layered and distributed among snapshots, segments, and blocks like this:

Alt text

When retrieving data for a query, Databend starts from the snapshot indexes and locates the corresponding segment by matching the Min/Max interval. Then, it looks up the indexes in the segment file to find the block where the required data is stored and reads data from the block file with information about the start position from col_metas. So Databend literally processes a query by finding the right segments and blocks with the Min/Max Index.

Bloom Index

For queries requiring an exact string match, Databend uses the Min/Max Index to find the right block first, and then locates the offsets with the bloom index information in bloom_filter_index_location to retrieve data from the block.

For more information about the Bloom Index, see https://www.databend.com/blog/xor-filter.

Cluster Key

The Min/Max Index seems to work perfectly for Databend, but in fact, data is usually written into a table out of order. As a result, segments and blocks might be created with overlapped Min/Max intervals.

For example, you need to access up to three parquet files for a query condition like Age = 20 & Age = 35. If Age is set as the cluster key, Databend will sort the data by the Age column and combine as many small parquet files as possible.

Alt text

For more information about the cluster key, see https://docs.databend.com/sql/sql-commands/ddl/clusterkey/.

Backgroup

now databend support hive catalog to run hive queries, this docs shows how to set up databend-hive environment and run hive sqls.

How to set up databend-hive cluster

hiveserver&metastore&hdfs is supposed to be pre-installed.

  1. download a databend-release with hive support, or build from source
## make sure JAVA_HOME is set
export JAVA_HOME=/path/to/java
export LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH}
cargo build --features hive,storage-hdfs
  1. setup a databend cluster, refer to deploying-databend
  2. add hive catalog and hdfs storage to databend-query.toml
[storage]
type = "hdfs"

[storage.hdfs]
# hdfs namenode address,such as 127.0.0.1:8020
name_node = "xx"
root = ""

[catalogs.hive]
type = "hive"
# hive metastore address, such as 127.0.0.1:9083
address = "xx"

  1. run databend-query with java&hadoop environment
export HADOOP_HOME=xxx
export JAVA_HOME=xxx, such as /Library/Java/JavaVirtualMachines/openjdk-11.jdk/Contents/Home
export LD_LIBRARY_PATH=$JAVA_HOME/lib/server:$LD_LIBRARY_PATH

./bin/databend-query -c ./databend-query.toml > query.log 2>&1 &
  1. setup hive related settings with mysql client
set global sql_dialect = 'hive';

suggest settings:

-- for chinese users
set global timezone = 'Asia/Shanghai';
set global max_execute_time_in_seconds = 180;

-- support hive nvl function
create FUNCTION nvl as (a,b) -> ifnull(a,b);
  1. query hive data using mysql client or mysql jdbc client. Note: hive tables must be referred as hive.db.table
select * from hive.$db.$table limit 10;

Limitations

  1. only support parquet table, not support orc,txt
  2. not support struct&map&decimal hive data types
  3. only support hive select queries, not support DDL, insert, DML sqls
  4. not support hive udfs, hive functions are limited supported

hive features is now in beta stage, please feel free to report bugs&suggestions in databend issues.

Many optimizations are usually required for big data analytics to "reduce the distance to data." For example, using the Bloom Filter Index, queries can be filtered to determine whether data should be fetched from backend storage:

Alt text

Why We Replaced the Bloom Filter

Most popular databases use Bloom Filters to handle equivalent queries and avoid useless data readings. Databend also used the classic Bloom Filter algorithm in the first version (databend#6639). However, we found the Bloom Filter Index required plenty of storage space which even exceeded the data storage size (Databend automatically created Bloom indexes for some data types to make it easier for users to work with). The Bloom Filter Index didn't show a significant performance improvement because it is not much different from reading data directly from storage.

The reason is that Bloom Filter does not know the cardinality of the data when it is generated. Take the Boolean type as an example, the algorithm allocates space for it without considering the cardinality (2, True or False).

As a result, the Databend community began exploring new solutions and determined a feasible solution that uses HyperLoglog to sort out distinct values before allocating space.

At the TiDB User Conference on a Saturday in September, I met XP (@drmingdrmer) and talked about the solution again with him. He came up with using Trie to solve the problem. An excellent idea, but lots of work. 

XP is a master for Trie, so I'm sure implementation would be no big deal for him. But I think some existing technologies might be able to help.

Alt text

Why Xor Filter?

A few explorations later, the Xor Filter algorithm, proposed by Daniel Lemire and his team in 2019: [Xor Filters: Faster and Smaller Than Bloom Filters, caught my attention(https://lemire.me/blog/2019/12/19/xor-filters-faster-and-smaller-than-bloom-filters/).

Alt text

I did a test (Xor Filter Bench) with the Rust version (xorfilter) and got a very positive result, so we replaced Bloom Filters with Xor Filters by databend#7860. Let's do a test and see how it performs with Xor Filters.

u64: 
xor bitmap encode:1230069 bytes, raw:8000000 bytes, ratio:0.15375863

bool:
xor bitmap encode:61 bytes, raw:1000000 bytes, ratio:0.000061

string:
xor bitmap encode:123067 bytes, raw:3000000 bytes, ratio:0.041022334

100000 records of the same key:
xor bitmap encode: 61 bytes, raw:3000000 bytes, ratio:0.000020333333

Test Environment

Databend: v0.8.122-nightly, single node
VM: 32 vCPU, 32 GiB (Cloud VM)
Object Storage: S3
Dataset: 10 billion records, 350G Raw Data, Xor Filter Index 700MB, all indexes and data are stored in object storage.
Table:

mysql> desc t10b;
+-------+-----------------+------+---------+-------+
| Field | Type | Null | Default | Extra |
+-------+-----------------+------+---------+-------+
| c1 | BIGINT UNSIGNED | NO | 0 | |
| c2 | VARCHAR | NO | | |
+-------+-----------------+------+---------+-------+

Deploying Databend

Step 1: Download installation package

wget https://github.com/datafuselabs/databend/releases/download/v0.8.122-nightly/databend-v0.8.122-nightly-x86_64-unknown-linux-musl.tar.gz 
tar zxvf databend-v0.8.122-nightly-x86_64-unknown-linux-musl.tar.gz

You can find the following content after extracting the package:

tree
.
├── bin
│ ├── databend-meta
│ ├── databend-metabench
│ ├── databend-metactl
│ └── databend-query
├── configs
│ ├── databend-meta.toml
│ └── databend-query.toml
├── readme.txt
└── scripts
├── start.sh
└── stop.sh

Step 2: Start Databend Meta

./bin/databend-meta -c configs/databend-meta.toml

Step 3: Configure Databend Query

For more information, refer to https://docs.databend.com/doc/deploy/deploying-databend

vim configs/databend-query.toml
... ...

[meta]
endpoints = ["127.0.0.1:9191"]
username = "root"
password = "root"
client_timeout_in_second = 60
auto_sync_interval = 60

# Storage config.
[storage]
# fs | s3 | azblob | obs
type = "s3"

# To use S3-compatible object storage, uncomment this block and set your values.
[storage.s3]
bucket = "<your-bucket-name>"
endpoint_url = "<your-s3-endpoint>"
access_key_id = "<your-key>"
secret_access_key = "<your-access-key>"

Step 4: Start Databend Query

./bin/databend-query -c configs/databend-query.toml

Step 5: Construct the test data set

mysql -uroot -h127.0.0.1 -P3307

Construct 10 billion rows of test data (time: 16 min 0.41 sec):

create table t10b as select number as c1, cast(rand() as string) as c2 from numbers(10000000000)

Run a query (no cache, all data and indexes are in object storage):

mysql> select * from t10b where  c2='0.6622377673133426';
+-----------+--------------------+
| c1 | c2 |
+-----------+--------------------+
| 937500090 | 0.6622377673133426 |
+-----------+--------------------+
1 row in set (20.57 sec)
Read 40000000 rows, 1009.75 MiB in 20.567 sec., 1.94 million rows/sec., 49.10 MiB/sec.

With the Xor Filter Index, it takes about 20 seconds for a single-node Databend to complete a point query on a scale of 10 billion rows. You can also speed up point queries by expanding a single Databend node into a cluster. Refer to https://docs.databend.com/doc/deploy/expanding-to-a-databend-cluster#deploying-a-new-query-node for details.

References

[1] Arxiv: Xor Filters: Faster and Smaller Than Bloom and Cuckoo Filters

[2] Daniel Lemire’s blog: Xor Filters: Faster and Smaller Than Bloom Filters

[3] Databend, Cloud Lakehouse: https://github.com/datafuselabs/databend

Life is a journey, sometimes you must go back to move forward. When working with databases, you need to access the data history now and then. Time Travel is one of the most valuable features that Databend has rolled out. The time travel feature acts as a data recovery utility that enables you to restore a dropped table or get back a previous version of your data in a table. For example, when you accidentally delete a table or update some rows by mistake, you will need the help from Time Travel. This post sheds some light on what you can do with Time Travel in Databend.

First things first, you must know that not all the historical data can be restored based on the Databend retention policy. The default retention period is 24 hours, which means you can restore your historical data within 24 hours after it is deleted or outdated.

If you run SHOW TABLES HISTORY against a database, you will find the dropped tables (if any) and their drop time. The command does not list the dropped tables that have passed their retention period.

Restore a Dropped Table

When you delete a file from your computer, the file goes to the trash bin and you can restore the file by putting it back to its original folder.

In Databend, restoring a table is as easy as you restore a file from the trash bin. The UNDROP TABLE command makes a dropped table become available again with the data of the latest version. The "latest version" means that Databend recovers a table as well as the data that the table was holding when you deleted it.

Query Old Data

This is the most glorious part of the Time Travel story in Databend. When we say that you can get back a previous version of your data in a table, it does not mean that you roll back your table to an earlier point in time, it shows you the table's data at that point instead.

Databend automatically takes and saves a snapshot of your tables after each transaction that updates your table data. A version of a table's data practically refers to a snapshot that saves the data of the table when the snapshot was taken.

Databend provides a system function named FUSE_SNAPSHOT that enables you to find the saved snapshots. Each snapshot comes with a snapshot ID and a timestamp.

The saved snapshots are the behind-the-scenes heroes that make the time travel become true. So when you try to get back your history data, you need to tell Databend which version you want by the snapshot ID or the timestamp with an AT clause in the SELECT statement.

Create a New Table from Old Data

The Time Travel feature makes it possible to create an OLD table, which means you can create a table to hold and move on from a previous version of your data.

The CREATE TABLE statement can include a SNAPSHOT_LOCATION clause that allows you to specify a snapshot file that holds your old data. This command enables you to insert the data stored in the snapshot file when you create a table. Please note that the table you create must have same column definitions as the data from the snapshot.

Go without Time Travel

Tables in Databend support Time Travel out-of-the-box. However, you might not need it for some cases, for example, when you're running low of your storage space or the data is big but unimportant. Databend currently does not provide a setting to switch it off, but you can CREATE TRANSIENT TABLE.

Transient tables are used to hold transitory data that does not require a data protection or recovery mechanism. Databend does not hold historical data for a transient table so you will not be able to query from a previous version of the transient table with the Time Travel feature, for example, the AT clause in the SELECT statement will not work for transient tables. Please note that you can still drop and undrop a transient table.

JSON (JavaScript Object Notation) is a commonly used semi-structured data type. With the self-describing schema structure, JSON can hold all data types, including multi-level nested data types, such as Array, Object, etc. JSON takes advantage of high flexibility and easy dynamic expansion compared with the structured data types that must strictly follow the fields in a tabular data structure.

As data volume increases rapidly in recent years, many platforms have started to use and get the most out of semi-structured data types (such as JSON). For example, the JSON data shared by various platforms through open interfaces, and the public datasets and application logs stored in JSON format.

Databend supports structured data types, as well as JSON. This post dives deeply into the JSON data type in Databend.

Working with JSON in Databend

Databend stores semi-structured data as the VARIANT (also called JSON) data type:

CREATE TABLE test
(
id INT32,
v1 VARIANT,
v2 JSON
);

The JSON data needs to be generated by calling the parse_json or try_parse_json function. The input string must be in the standard JSON format, including Null, Boolean, Number, String, Array, and Object. In case of parsing failure due to invalid string, the parse_json function will return an error while the try_parse_json function will return a NULL value.

INSERT INTO test VALUES
(1, parse_json('{"a":{"b":1,"c":[1,2]}}'), parse_json('[["a","b"],{"k":"a"}]')),
(2, parse_json('{"a":{"b":2,"c":[3,4]}}'), parse_json('[["c","d"],{"k":"b"}]'));

SELECT * FROM test;
+----+-------------------------+-----------------------+
| id | v1 | v2 |
+----+-------------------------+-----------------------+
| 1 | {"a":{"b":1,"c":[1,2]}} | [["a","b"],{"k":"a"}] |
| 2 | {"a":{"b":2,"c":[3,4]}} | [["c","d"],{"k":"b"}] |
+----+-------------------------+-----------------------+

JSON usually holds data of Array or Object type. Due to the nested hierarchical structure, the internal elements can be accessed through JSON PATH. The syntax supports the following delimiters:

  • :: Colon can be used to obtain the elements in an object by the key.

  • .: Dot can be used to obtain the elements in an object by the key. Do NOT use a dot as the first delimiter in a statement, or Databend would consider the dot as the delimiter to separate the table name from the column name.

  • []: Brackets can be used to obtain the elements in an object by the key or the elements in an array by the index.

You can mix the three types of delimiters above.

SELECT v1:a.c, v1:a['b'], v1['a']:c, v2[0][1], v2[1].k FROM test;

+--------+-----------+-----------+----------+---------+
| v1:a.c | v1:a['b'] | v1['a']:c | v2[0][1] | v2[1].k |
+--------+-----------+-----------+----------+---------+
| [1,2] | 1 | [1,2] | "b" | "a" |
| [3,4] | 2 | [3,4] | "d" | "b" |
+--------+-----------+-----------+----------+---------+

The internal elements extracted through JSON PATH are also of JSON type, and they can be converted to basic types through the cast function or using the conversion operator ::.

SELECT cast(v1:a.c[0], int64), v1:a.b::int32, v2[0][1]::string FROM test;

+--------------------------+---------------+------------------+
| cast(v1:a.c[0] as int64) | v1:a.b::int32 | v2[0][1]::string |
+--------------------------+---------------+------------------+
| 1 | 1 | b |
| 3 | 2 | d |
+--------------------------+---------------+------------------+

Parsing JSON from GitHub

Many public datasets are stored in JSON format. We can import these data into Databend for parsing. The following introduction uses the GitHub events dataset as an example.

The GitHub events dataset (downloaded from GH Archive) uses the following JSON format:

{
"id":"23929425917",
"type":"PushEvent",
"actor":{
"id":109853386,
"login":"teeckyar-bot",
"display_login":"teeckyar-bot",
"gravatar_id":"",
"url":"https://api.github.com/users/teeckyar-bot",
"avatar_url":"https://avatars.githubusercontent.com/u/109853386?"
},
"repo":{
"id":531248561,
"name":"teeckyar/Times",
"url":"https://api.github.com/repos/teeckyar/Times"
},
"payload":{
"push_id":10982315959,
"size":1,
"distinct_size":1,
"ref":"refs/heads/main",
"head":"670e7ca4085e5faa75c8856ece0f362e56f55f09",
"before":"0a2871cb7e61ce47a6790adaf09facb6e1ef56ba",
"commits":[
{
"sha":"670e7ca4085e5faa75c8856ece0f362e56f55f09",
"author":{
"email":"[email protected]",
"name":"teeckyar-bot"
},
"message":"1662804002 Timehash!",
"distinct":true,
"url":"https://api.github.com/repos/teeckyar/Times/commits/670e7ca4085e5faa75c8856ece0f362e56f55f09"
}
]
},
"public":true,
"created_at":"2022-09-10T10:00:00Z",
"org":{
"id":106163581,
"login":"teeckyar",
"gravatar_id":"",
"url":"https://api.github.com/orgs/teeckyar",
"avatar_url":"https://avatars.githubusercontent.com/u/106163581?"
}
}

From the data above, we can see that the actor, repo, payload, and org fields have a nested structure and can be stored as JSON. Others can be stored as basic data types. So we can create a table like this:

CREATE TABLE `github_data`
(
`id` VARCHAR,
`type` VARCHAR,
`actor` JSON,
`repo` JSON,
`payload` JSON,
`public` BOOLEAN,
`created_at` timestamp,
`org` json
);

Use the COPY INTO command to load the data:

COPY INTO github_data
FROM 'https://data.gharchive.org/2022-09-10-10.json.gz'
FILE_FORMAT = (
compression = auto
type = NDJSON
);

The following code returns the top 10 projects with the most commits:

SELECT   repo:name,
count(id)
FROM github_data
WHERE type = 'PushEvent'
GROUP BY repo:name
ORDER BY count(id) DESC
LIMIT 10;

+----------------------------------------------------------+-----------+
| repo:name | count(id) |
+----------------------------------------------------------+-----------+
| "Lombiq/Orchard" | 1384 |
| "maique/microdotblog" | 970 |
| "Vladikasik/statistic" | 738 |
| "brokjad/got_config" | 592 |
| "yanonono/booth-update" | 537 |
| "networkoperator/demo-cluster-manifests" | 433 |
| "kn469/web-clipper-bed" | 312 |
| "ufapg/jojo" | 306 |
| "bj5nj7oh/bj5nj7oh" | 291 |
| "appseed-projects2/500f32d3-8019-43ee-8f2a-a273163233fb" | 247 |
+----------------------------------------------------------+-----------+

The following code returns the top 10 users with the most forks:

SELECT   actor:login,
count(id)
FROM github_data
WHERE type='ForkEvent'
GROUP BY actor:login
ORDER BY count(id) DESC
LIMIT 10;

+-----------------------------------+-----------+
| actor:login | count(id) |
+-----------------------------------+-----------+
| "actions-marketplace-validations" | 191 |
| "alveraboquet" | 59 |
| "ajunlonglive" | 50 |
| "Shutch420" | 13 |
| "JusticeNX" | 13 |
| "RyK-eR" | 12 |
| "DroneMad" | 10 |
| "UnqualifiedEngineer" | 9 |
| "PeterZs" | 8 |
| "lgq2015" | 8 |
+-----------------------------------+-----------+

Performance Optimization

The JSON data generally is saved in plaintext format and needs to be parsed to generate the enumeration value of serde_json::Value every time the data is read. Compared to other basic data types, handling JSON data takes more parsing time and needs more memory space.

Databend has improved the read performance of JSON data using the following methods:

  • To speed up the parsing and reduce memory usage, Databend stores the JSON data as JSONB in binary format and uses the built-in j_entry structure to hold data type and offset position of each element.

  • Adding virtual columns to speed up the queries. Databend extracts the frequently queried fields and the fields of the same data type and stores them as separate virtual columns. Data will be directly read from the virtual columns when querying, which makes Databend achieve the same performance as querying other basic data types.

The most impressive part of the movie Spider-Man: No Way Home is "three generations coming together. In the story, when the spider-man's friend repeats the magic words "Find Peter Parker!", we surprisingly see two old friends on the screen, the previous generations of spider-man. They travel from other universes to join forces and develop cures for the villains.

Did you know that you have a similar magic power in Databend? That is, you can always get back the previous versions of your data in a few simple steps whenever you need them. The secret is Databend automatically creates and saves snapshots with timestamps of your tables when a data updating occurs, so you can track the history of a table and see how what it looked like at a time point in the past.

The following code creates a table first, and then inserts values with three separate SQL statements:

create table spiderman(gen int, nickname varchar);

insert into spiderman values(1,'Peter-1');
insert into spiderman values(2,'Peter-2');
insert into spiderman values(3,'Peter-3');

Databend creates and saves three snapshots for the code above. Each one holds a historical version of the data in the table.

To find them, use the system function FUSE_SNAPSHOT. The function returns everything you may need to know about the saved snapshots of a table, such as the snapshot IDs, timestamps, and locations.

select snapshot_id,previous_snapshot_id, timestamp from fuse_snapshot('default','spiderman');

---
+----------------------------------+----------------------------------+----------------------------+
| snapshot_id | previous_snapshot_id | timestamp |
+----------------------------------+----------------------------------+----------------------------+
| 34b8df220edc4d8cb9e3e76118788686 | 4bb479751b7144d8aa2b53e5b281453f | 2022-08-30 01:18:53.202724 |
| 4bb479751b7144d8aa2b53e5b281453f | a2801ed9656d42c9812f2921214f0795 | 2022-08-30 01:18:35.597615 |
| a2801ed9656d42c9812f2921214f0795 | NULL | 2022-08-30 01:18:21.750208 |
+----------------------------------+----------------------------------+----------------------------+

You can now query the history data with a snapshot or timestamp by including the AT clause in the SELECT statement:

select * from spiderman at(snapshot=>'a2801ed9656d42c9812f2921214f0795');

---
+------+----------+
| gen | nickname |
+------+----------+
| 1 | Peter-1 |
+------+----------+
select * from spiderman at(timestamp=>'2022-08-30 01:18:21.750208'::timestamp);

---
+------+----------+
| gen | nickname |
+------+----------+
| 1 | Peter-1 |
+------+----------+

The "magic" explained above is part of the powerful Time Travel feature of Databend that enables you to query, back up, or restore from a specified historical version of your data. That's not all about the feature. You can do more with the snapshots to make your work easier. Join the Databend community to find out more "magic tricks".

Hello, everyone! I'm Xuanwo. Today, on behalf of the Databend community, I would like to announce the official release of v0.8.

Development of Databend v0.8 started on March 28th, with 5000+ commits and 4600+ file changes. In the last 5 months, the community of 120+ contributors added 420K lines of code and removed 160K lines, equivalent to rewriting Databend once. In this release, the community made significant improvements to the SQL Planner framework and migrated all SQL statements to the new Planner, providing full JOIN and subquery support.

Click here to download Databend v0.8

Let's see what has been done in v0.8.

What's Databend?

Databend is a modern cloud data warehouse based on Rust that enables high-performance, elastic and scalable real-time data analysis and activates the data potential of users.

databend-arch

Significant improvements

New Planner: JOIN! JOIN! JOIN!

To better support complex SQL queries and improve user experience, Databend v0.8 is designed with a new Planner framework.

Databend has added JOIN and proper subquery support, driven by New Planner.

select vip_info.Client_ID, vip_info.Region
from vip_info right
join purchase_records
on vip_info.Client_ID = purchase_records.Client_ID;

New Parser: The Best Parser!

While refactoring Planner, the databend community has implemented a new nom-based Parser that balances development efficiency with user experience.

New Parser makes it easy for developers to design/develop/test complex SQL syntax in an intuitive way

COPY
~ INTO ~ #copy_unit
~ FROM ~ #copy_unit
~ ( FILES ~ "=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")")?
~ ( PATTERN ~ "=" ~ #literal_string)?
~ ( FILE_FORMAT ~ "=" ~ #options)?
~ ( VALIDATION_MODE ~ "=" ~ #literal_string)?
~ ( SIZE_LIMIT ~ "=" ~ #literal_u64)?

It also gives the user specific and precise information about the error.

MySQL [(none)]> select number from numbers(10) as t inner join numbers(30) as t1 using(number);
ERROR 1105 (HY000): Code: 1065, Text = error:
--> SQL:1:8
|
1 | select number from numbers(10) as t inner join numbers(30) as t1 using(number)
| ^^^^^^ column reference is ambiguous

No more worrying about not knowing what's wrong with SQL. Visit The New Databend SQL Planner for more information.

New Features

In addition to the newly designed Planner, the Databend community has implemented a number of new features.

COPY Enhancement

COPY capabilities have been greatly enhanced, and Databend can now:

  • Copy data from any supported storage service (even https!)
COPY
INTO ontime200
FROM 'https://ci.databend.org/dataset/stateful/ontime_2006_[200-300].csv'
FILE_FORMAT = (TYPE = CSV)
  • Support for copying compressed files
COPY
INTO ontime200
FROM 's3://bucket/dataset/stateful/ontime.csv.gz'
FILE_FORMAT = (TYPE = CSV COMPRESSION=AUTO)
  • UNLOAD data to any supported storage service
COPY
INTO 'azblob://bucket/'
FROM ontime200
FILE_FORMAT = (TYPE = PARQUET)

Hive Support

Databend v0.8 designed and developed the Multi Catalog and implemented Hive Metastore support on top of it!

Databend can now interface directly to Hive and read data from HDFS.

select * from hive.default.customer_p2 order by c_nation;

Time Travel

A long time ago, the Databend community shared an implementation of the underlying FUSE Engine, From Git to Fuse Engine, where one of the most important features was the support for time travel, allowing us to query data tables at any point in time.

Starting from v0.8, this feature is now officially installed and we can now

  • Query the data table for a specified time
-- Travel to the time when the last row was inserted
select * from demo at (TIMESTAMP => '2022-06-22 08:58:54.509008'::TIMESTAMP);
+----------+
| c |
+----------+
| batch1.1 |
| batch1.2 |
| batch2.1 |
+----------+
  • Recover mistakenly deleted data tables
DROP TABLE test;

SELECT * FROM test;
ERROR 1105 (HY000): Code: 1025, Text = Unknown table 'test'.

-- un-drop table
UNDROP TABLE test;

-- check
SELECT * FROM test;
+------+------+
| a | b |
+------+------+
| 1 | a |
+------+------+

Make business data have more security!

CTE Support

CTE (Common Table Expression) is a frequently used feature in OLAP business to define a temporary result set within the execution of a single statement, which is valid only during the query period, enabling the reuse of code segments, improving readability and better implementation of complex queries.

Databend v0.8 re-implements the CTE based on New Planner and now users can happily use WITH to declare the CTE.

WITH customers_in_quebec
AS (SELECT customername,
city
FROM customers
WHERE province = 'Québec')
SELECT customername
FROM customers_in_quebec
WHERE city = 'Montréal'
ORDER BY customername;

In addition to these features mentioned above, Databend v0.8 also supports UDFs, adds DELETE statements, further enhances support for semi-structured data types, not to mention the numerous SQL statement improvements and new methods added. Thanks to all the contributors to the Databend community, without you all the new features mentioned here would not have been possible!

Quality Enhancement

Feature implementation is just the first part of product delivery. In Databend v0.8, the community introduced the concept of engineering quality, which evaluates the quality of Databend development in three dimensions: users, contributors, and community.

Reassuring users

In order for users to use Databend with confidence, the community has added a lot of tests over the last three months, fetching stateless test sets from YDB and others, adding stateful tests for ontime, hits and other datasets, putting SQL Logic Test online to cover all interfaces, and enabling SQL Fuzz testing to cover boundary cases.

Furthermore, the community has also gone live with Databend Perf to do continuous performance testing of Databend in production environments to catch unexpected performance regressions in time.

Make contributors comfortable

Databend is a large Rust project that has been criticized by the community for its build time.

To improve this issue and make contributors feel comfortable, the community went live with a highly configurable, specially tuned Self-hosted Runner to perform integration tests for PR and enabled several services or tools such as Mergify, mold, dev-tools, etc. to optimize the CI process.

We also initiated a new plan to restructure the Databend project, splitting the original huge query crate into multiple sub-crates to avoid, as much as possible, the situation of changing one line of code and check execution for five minutes.

Keeping the community happy

Databend is a contributor and participant in the open source community. During the development of v0.8, the Databend community established the principle of Upstream First, actively following and adopting the latest upstream releases, giving feedback on known bugs, contributing their own patches, and starting Tracking issues of upstream first violation to keep up with the latest developments.

The Databend community is actively exploring integration with other open source projects and has already implemented integration and support for third-party drivers such as Vector, sqlalchemy, clickhouse-driver, etc.

Next Steps

Databend v0.8 is a solid foundation release with a new Planner that makes it easier to implement features and make optimizations. In version 0.9, we expect improvements in the following areas.

  • Query Result Cache
  • JSON Optimization
  • Table Share
  • Processor Profiling
  • Resource Quota
  • Data Caching

Please check the Release proposal: Nightly v0.9 for the latest news~

Get going now!

Visit the release log and download the latest version to learn more, and feel free to submit feedback using GitHub Issues if you encounter problems!

This post gives you a general idea about the TPC-H benchmark and explains how to run a TPC-H benchmark on Databend.

What's TPC-H?

TPC-H is a decision support benchmark. It consists of a suite of business-oriented ad hoc queries and concurrent data modifications. The queries and the data populating the database have been chosen to have broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, execute queries with a high degree of complexity, and give answers to critical business questions.

The TPC-H benchmark simulates a system for online sales of parts and components and defines eight tables in total. The structure, data volume, and mutual relationship of each table are shown in the figure below:

The benchmark workload consists of twenty-two decision support queries that must be executed as part of the TPC-H benchmark. Each TPC-H query asks a business question and includes the corresponding query to answer the question. More information about TPC-H can be found at https://www.tpc.org/tpch/.

Running TPC-H Benchmark on Databend

This section describes the steps to run the TPC-H benchmark on Databend and provides the related scripts.

Step 1: Generate test data with TPC-H Docker

The following code pulls a docker image and allocates the data in the path where you are running the TPC-H benchmark.

docker pull ghcr.io/databloom-ai/tpch-docker:main
docker run -it -v "$(pwd)":/data ghcr.io/databloom-ai/tpch-docker:main dbgen -vf -s 1

TPC-H comes with various data set sizes to test different scale factors. You can use the -s option to set scale factor in the command (for example, the code above sets the scale factor to 1). For more information about the command, see https://github.com/databloom-ai/TPCH-Docker.

SF (Gigabytes)Size
1Consists of the base row size (several million elements).
10Consists of the base row size x 10.
100Consists of the base row size x 100 (several hundred million elements).
1000Consists of the base row size x 1000 (several billion elements).

Step 2: Create database and tables

CREATE DATABASE IF NOT EXISTS tpch;

USE tpch;

CREATE TABLE IF NOT EXISTS nation
(
n_nationkey INTEGER NOT NULL,
n_name VARCHAR NOT NULL,
n_regionkey INT NOT NULL,
n_comment VARCHAR
);

CREATE TABLE IF NOT EXISTS region
(
r_regionkey INT NOT NULL,
r_name VARCHAR NOT NULL,
r_comment VARCHAR
);

CREATE TABLE IF NOT EXISTS part
(
p_partkey INT NOT NULL,
p_name VARCHAR NOT NULL,
p_mfgr VARCHAR NOT NULL,
p_brand VARCHAR NOT NULL,
p_type VARCHAR NOT NULL,
p_size INT NOT NULL,
p_container VARCHAR NOT NULL,
p_retailprice FLOAT NOT NULL,
p_comment VARCHAR NOT NULL
);

CREATE TABLE IF NOT EXISTS supplier
(
s_suppkey INT NOT NULL,
s_name VARCHAR NOT NULL,
s_address VARCHAR NOT NULL,
s_nationkey INT NOT NULL,
s_phone VARCHAR NOT NULL,
s_acctbal FLOAT NOT NULL,
s_comment VARCHAR NOT NULL
);

CREATE TABLE IF NOT EXISTS partsupp
(
ps_partkey INT NOT NULL,
ps_suppkey INT NOT NULL,
ps_availqty INT NOT NULL,
ps_supplycost FLOAT NOT NULL,
ps_comment VARCHAR NOT NULL
);

CREATE TABLE IF NOT EXISTS customer
(
c_custkey INT NOT NULL,
c_name VARCHAR NOT NULL,
c_address VARCHAR NOT NULL,
c_nationkey INT NOT NULL,
c_phone VARCHAR NOT NULL,
c_acctbal FLOAT NOT NULL,
c_mktsegment VARCHAR NOT NULL,
c_comment VARCHAR NOT NULL
);

CREATE TABLE IF NOT EXISTS orders
(
o_orderkey INT NOT NULL,
o_custkey INT NOT NULL,
o_orderstatus VARCHAR NOT NULL,
o_totalprice FLOAT NOT NULL,
o_orderdate DATE NOT NULL,
o_orderpriority VARCHAR NOT NULL,
o_clerk VARCHAR NOT NULL,
o_shippriority INT NOT NULL,
o_comment VARCHAR NOT NULL
);

CREATE TABLE IF NOT EXISTS lineitem
(
l_orderkey INT NOT NULL,
l_partkey INT NOT NULL,
l_suppkey INT NOT NULL,
l_linenumber INT NOT NULL,
l_quantity FLOAT NOT NULL,
l_extendedprice FLOAT NOT NULL,
l_discount FLOAT NOT NULL,
l_tax FLOAT NOT NULL,
l_returnflag VARCHAR NOT NULL,
l_linestatus VARCHAR NOT NULL,
l_shipdate DATE NOT NULL,
l_commitdate DATE NOT NULL,
l_receiptdate DATE NOT NULL,
l_shipinstruct VARCHAR NOT NULL,
l_shipmode VARCHAR NOT NULL,
l_comment VARCHAR NOT NULL
);

Step 3: Load test data to Databend

This step uses the HTTP API v1/streaming_load to load the test data to Databend. More information about this API can be found at https://docs.databend.com/doc/load-data/load/local.

The code below connects to Databend using the Root user. Please note that the root user only works when you access Databend from localhost. You will need to create new users and grant proper privileges first to connect to Databend remotely.

#!/bin/bash

for t in customer lineitem nation orders partsupp part region supplier
do
echo "$t"
insert_sql="insert into tpch.$t file_format = (type = CSV skip_header = 0 field_delimiter = '|' record_delimiter = '\n')"
curl -s -u root: -XPUT "http://localhost:8000/v1/streaming_load" -H "insert_sql: ${insert_sql}" -F 'upload=@"./'$t'.tbl"' > /dev/null 2>&1
done

Step 4: Run TPC-H queries

All the definitions of the TPC-H queries can be found at https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v3.0.1.pdf. You can simply run them by copying and pasting the scripts to Databend.

The Databend team ran the TPC-H benchmark around two months ago and uploaded their queries and results to GitHub. You can find them at https://github.com/datafuselabs/databend/tree/main/tests/suites/0_stateless/13_tpch. Please note that Databend now uses the new planner by default, so you DO NOT need to enable it any more before running the queries.

Data is important. If you're moving your business to the cloud to take advantage of the cloud-native features, the first thing you need to consider might be how to load your data to the cloud.

There are many ways to load data into Databend. You can use the command-line interface (CLI), API, or the mysqldump client utility, depending on where your data is stored.

image

The powerful COPY INTO commands allow you to load data from:

  • Files in an S3 bucket or a blob storage container.
  • Staged files (internal or external).
  • Files in a remote server.

If you have local data files to load, use the HTTP API v1/streaming_load to upload them to Databend.

Please note that Databend supports loading data from files in these formats:

  • CSV
  • JSON
  • Parquet

If you're coming from MySQL, Databend can also use a dump file (*.sql) created by the mysqldump client utility to load your data from MySQL.

Detailed explanations about loading data in different scenarios can be found at https://docs.databend.com/doc/load-data/load.

tip

This post was originally published by Anne-Laure Civeyrac on https://mergify.com.

image

Every day, major projects use Mergify to automate their GitHub workflow. Whether they have a core team of 3 or 50 people, the one thing they all have in common is that the project leads are willing to let their developers focus on what’s really important—code. So we decided to meet with some of them to get to know more about the challenges they face and discover how Mergify helps their teams be more efficient when it comes to pull requests. This time, we sat down (virtually) with Xuanwo, an infrastructure engineer who oversees automation, distributed systems and storage for the Databend project.

image

Xuanwo

Please could you give us a brief outline of the Databend project.

Of course! So Databend is a modern elasticity and performance cloud data warehouse. It uses the latest techniques in vectorized query processing to allow people to do blazing-fast data analytics on object storage platforms like S3, Azure Blob, or MinIO. It was mainly inspired by ClickHouse and Snowflake and focuses on online analytical processing.

How many people are currently working on the project?

Right now, we have more than 100 contributors on Databend, with about 30 of them contributing continuously.

What's your GitHub workflow on this project?

Databend is a very new project and it doesn’t have a stable release yet, so our pull request [PR] workflow is quite simple. All our contributions go through GitHub PRs. For every PR, we use GitHub Actions as the CI, where we run cargo check, cargo fmt, cargo clippy, and all our test cases. If all the checks pass, we merge the PR. And once on the main branch, the PR runs production CI, which goes through all test cases with the release build. We then release a nightly version daily, uploading our release builds to GitHub releases and the Docker Hub Registry.

image

How many people need to approve PRs?

We need two approvals for a PR to be merged.

What are the main challenges with this workflow?

Mainly the PR merge speed! Our developers are coding in Rust, which is known for not being very good with compilation speed. We typically need about 30 minutes to finish all our checks, and most of this time is consumed by rustc, the compiler for Rust. To make Rust compile faster, we set up our own self-hosted GitHub Actions runners, which have a very high performance. Thanks to this change, our PR merge time was reduced from 60 to 30 minutes.

What made the project team start using Mergify in the first place?

Before coming across Mergify, we enabled the option on GitHub that requires code to be updated before merging. But it added a lot of work for our maintainers, who had to merge the main branch repeatedly.

image

So to make their lives easier, we implemented Mergify to update the branch automatically and merge PRs after all tests have passed.

Which Mergify features do you use the most?

The automatic merge! And with the help of the Mergify team, we enabled the merge queue feature as well. So now, we can merge multiple PRs simultaneously with only one CI check. This helped us significantly reduce the waiting time for the CI.  

What is your favorite Mergify feature and why?

Oh, I love PR actions. Although I can implement the same features with GitHub Actions, I find Mergify's PR actions more simple and exciting. For example, last week, we introduced a new requirement that every PR must be semantic—we want all PRs to contain a valid title starting with a type like "fix", "feat", or "refactor".

image

Actions 

With the help of Mergify, I only needed to create some rules, such as adding corresponding labels if the PR title starts with "fix", comment in the PR with a help message if the PR title doesn't fulfill the requirements, and add post checks so that we can mark the PR as not mergeable.

This feature is very cool, and I would love to have it on issues too!

What has been the most significant impact of using Mergify on your team's performance so far?

Our teams don't need to worry about merges anymore! We can start jobs without having to wait on the PRs.  

What would be your n°1 tip for someone new to Mergify?

Don't try to migrate all your workloads to Mergify in one go. Migrating things one by one and progressively will make your lives easier.

If you had time to contribute to the Mergify project, what would your contribution be about?

I used to contribute to Mergify regarding a small documentation typo. But I am not comfortable contributing to more complex issues because Mergify uses Python, a programming language I'm unfamiliar with. But if I could contribute to one feature, it would be about commands. Mergify only supports a small set of commands. Maybe we could define new commands in PR rules and allow users to call them with Mergify bots. This feature would be exciting to me!

To support complex SQL queries and improve user experience, a large-scale refactoring work for Databend's SQL planner was started several months ago. At present, the refactoring is coming to an end. You can now modify the Session settings of Databend as follows to enable the new planner for early access:

image

Feature Highlights

A more friendly query experience

Data analysts and developers usually get various errors when coding SQL queries, and troubleshooting can be a nightmare when the queries are complex. I hate MySQL's error prompts because I have coded a query with dozens of JOIN clauses.

The new planner now includes some passes for strict semantic checking so that most errors can be intercepted during the compilation. A new error prompt algorithm was also introduced to help users locate the errors. When there is invalid syntax in your SQL query (for example, misspelled keywords or missing clauses), you will receive an error message that is more instructive.

image

If your SQL query has a semantic error (for example, you reference a column that is ambiguous, or a column does not exist at all), Databend can help you locate it.

image

You can also get a better experience when coding complex queries:

image

Support for JOIN queries and correlated subqueries

The new SQL planner supports JOIN queries (INNER JOIN, OUTER JOIN, CROSS JOIN) and correlated subqueries, and provides a Hash Join algorithm to execute JOIN queries.

For more information about how to use JOIN in Databend, go to https://docs.databend.com/sql/sql-commands/query-syntax/query-join

JOIN is a very important part of the OLAP query. In traditional star and snowflake schemas, we join dimensional tables with fact tables through the JOIN query to generate the resulting report.

TPC-H Benchmark is a set of OLAP query benchmarks developed by the TPC committee to evaluate the OLAP capabilities of database systems. It contains the following eight tables:

  • Lineitem: Holds product information.

  • Orders: Holds order information.

  • Customer: Holds customer information.

  • Part: Holds parts information.

  • Supplier: Holds supplier information.

  • Partsupp: Parts-Supplier Relationship Table

  • Nation: Holds nation information.

  • Region: Holds region information.

    TPC-H includes 22 complex queries, corresponding to different business needs. The new SQL planner now supports the Q9 query that calculates the profit amount for a specified year and region using a large number of JOIN calculations:

    image

    Correlated subqueries are also an essential part of SQL for coding complex queries. The Q4 query of TPC-H shows the order delivery status of various priority levels over a period of time and uses a correlated subquery with the EXISTS clause to filter overdue orders:

    image

    Brand New Architecture

    We redesigned the process of SQL parsing for the new SQL planner to support more complex semantic analysis and SQL optimization.

After the client sends a SQL statement to the databend-query server, the components in the new SQL planner process the SQL statement in the order shown in the flowchart below before returning the query result to the client:

image

The Parser starts to parse a SQL query after receiving it. If a syntax error is found during the parsing, the error information will be directly returned to the client; If the parsing is successful, an AST (Abstract Syntax Tree) for the query will be constructed.

Parser

To provide more powerful syntax analysis functions and a better development experience, we have developed a DSL (Domain Specific Language) nom-rule based on the nom Parser combinator and rewritten SQL Parser based on this framework.

With this framework, we can easily define the syntax for a statement. Taking the CREATE TABLE statement as an example, we can use DSL to describe it as follows:

image

The elegant syntax brings more fun to the work of coding a parser. Try it out if you're interested.

Binder

After the AST is successfully parsed by the Parser, we will semantically analyze it through Binder and generate an initial logical plan. During this process, we perform different types of semantic analysis:

  • Name resolution: Check the validity of the variables referenced in the SQL query by querying the relevant table and column object information in the Databend Catalog and bind the valid variables to their corresponding objects for subsequent analysis.

  • Type check: Check the validity of the expression according to the information obtained in the name resolution, and find a proper return type for the expression.

  • Subquery unnesting: Extract the subquery from the expression and translate it into relational algebra.

  • Grouping check: For queries with aggregate calculations, check whether non-aggregate columns are referenced.

With semantic analysis, we can eliminate most semantic errors and return them to the user during the compilation to provide the best troubleshooting experience.

Optimizer

After getting the initial logical plan, the optimizer will rewrite and optimize it and, finally, generate an executable physical plan.

The new planner introduced a set of Transformer Rule-based optimizer frameworks (Volcano/Cascades). An independent rule can be implemented by defining a relational algebra sub-tree structure pattern with related transform logic.

Take Predicate Push Down as a simple example:

image

We only need to define the pattern of the input plan:

image

And then implement a conversion function:

image

Interpreter

After the physical plan is generated by the Optimizer, we will translate it into an executable pipeline and hand it over to Databend's processor execution framework for calculation.

What's Next

Building a SQL planner from the ground up is a very challenging job, but the redesign and development let us find the most suitable architecture and functionalities for the system. In the future, we will continue to improve and consolidate the new SQL planner on these functions:

  • Cost-based Optimization (CBO)

  • Distributed query optimization

  • More optimization rules

Currently, we're in the middle of migrating to the new SQL planner. We will release an announcement when the migration is complete (around July 2022). Stay tuned.

Databend, developed with Rust, is a new open-source data warehouse architected toward the cloud. It is committed to providing fast elastic expansion capabilities and a pay-as-you-go user experience. GitHub: https://github.com/datafuselabs/databend

Introduction

This post explains the Databend base: Fuse Engine, a powerful columnar storage engine. The engine was designed by the Databend community with the following principles: Powerful performance, simple architecture, and high reliability.

Before we start, check out a challenging task that Databend completed: With the Fuse Engine deployed on AWS S3, a transaction wrote 22.89 TB of raw data in around one and a half hour.

mysql> INSERT INTO ontime_new SELECT * FROM ontime_new;
Query OK, 0 rows affected (1 hour 34 min 36.82 sec)
Read 31619274180 rows, 22.89 TB in 5675.207 sec., 5.57 million rows/sec., 4.03 GB/sec.

Meanwhile, the following conditions were met as well:

  • Distributed transactions: Multiple computing nodes can read and write the same data simultaneously (This is the first problem that an architecture that separates storage from compute must solve).
  • Snapshot isolation: Different versions of data do not affect each other so you can do Zero-Copy Cloning for tables.
  • Retrospective ability: You are allowed to switch to any version of the data, so you can recover with the Time Travel feature.
  • Data merging: A new version of data can be generated after merging.
  • Simple and robust: Data relationships are described using files, and you can recover entire data system based on these files.

From above, you will find that Fuse Engine is "Git-inspired". Before introducing the design of Fuse Engine, let's take a look at how the bottom layer of Git works.

How Git Works

Git implements data version control (including branch, commit, checkout, and merge) in a distributed environment. Based on Git semantics, it is possible to create a distributed storage engine. There are also some products built on Git-like on the market, such as Nessie - Transactional Catalog for Data Lakes and lakeFS.

To better explore the underlying working mechanism of Git, we use Git semantics to complete a series of "data" operations from the perspective of the database.

  1. Prepare a file named cloud.txt with the content:
2022/05/06, Databend, Cloud
  1. Commit the file cloud.txt to Git.
git commit -m "Add cloud.txt"
  1. Git generates a snapshot (Commit ID: 7d972c7ba9213c2a2b15422d4f31a8cbc9815f71).
git log 
commit 7d972c7ba9213c2a2b15422d4f31a8cbc9815f71 (HEAD)
Author: BohuTANG <[email protected]>
Date: Fri May 6 16:44:21 2022 +0800

Add cloud.txt
  1. Prepare another file named warehouse.txt.
2022/05/07, Databend, Warehouse
  1. Commit the file warehouse.txt to Git.
git commit -m "Add warehouse.txt"
  1. Git generates another snapshot (Commit ID: 15af34e4d16082034e1faeaddd0332b3836f1424).
commit 15af34e4d16082034e1faeaddd0332b3836f1424 (HEAD)
Author: BohuTANG <[email protected]>
Date: Fri May 6 17:41:43 2022 +0800

Add warehouse.txt

commit 7d972c7ba9213c2a2b15422d4f31a8cbc9815f71
Author: BohuTANG <[email protected]>
Date: Fri May 6 16:44:21 2022 +0800

Add cloud.txt

Git now keeps two versions of the data:

ID 15af34e4d16082034e1faeaddd0332b3836f1424,Version2
ID 7d972c7ba9213c2a2b15422d4f31a8cbc9815f71,Version1

We can switch between versions by the Commit ID, which implements the functions of Time Travel and Table Zero-Copy. How does Git make it possible in the bottom layer? It's not rocket science. Git introduces these types of object files to describe the relationship:

  • Commit: Describes tree object information
  • Tree: Describes blob object information
  • Blob: Describes file information

image

HEAD File

First, we need to know the HEAD pointer:

cat .git/HEAD
15af34e4d16082034e1faeaddd0332b3836f1424

Commit File

The Commit file records metadata related to the commit, such as the current tree and parent, as well as the committer, etc.

File path:

.git/objects/15/af34e4d16082034e1faeaddd0332b3836f1424

File content:

git cat-file -p 15af34e4d16082034e1faeaddd0332b3836f1424

tree 576c63e580846fa6df2337c1f074c8d840e0b70a
parent 7d972c7ba9213c2a2b15422d4f31a8cbc9815f71
author BohuTANG <[email protected]> 1651830103 +0800
committer BohuTANG <[email protected]> 1651830103 +0800

Add warehouse.txt

Tree File

The Tree file records all the files of the current version.

File path:

.git/objects/57/6c63e580846fa6df2337c1f074c8d840e0b70a

File content:

git cat-file -p 576c63e580846fa6df2337c1f074c8d840e0b70a

100644 blob 688de5069f9e873c7e7bd15aa67c6c33e0594dde cloud.txt
100644 blob bdea812b9602ed3c6662a2231b3f1e7b52dc1ccb warehouse.txt

Blob File

The Blob files are raw data files. You can view the file content using git cat-file (if you use Git to manage code, blobs are the code files).

git cat-file -p 688de5069f9e873c7e7bd15aa67c6c33e0594dde
2022/05/06, Databend, Cloud

git cat-file -p bdea812b9602ed3c6662a2231b3f1e7b52dc1ccb
2022/05/07, Databend, Warehouse

Fuse Engine

Databend's Fuse Engine was designed in a way similar to Git. It introduces three description files:

  • Snapshot: Describes segment object information.
  • Segment: Describes block object information.
  • Block: Describes parquet file information.

image

Let's repeat the operations we just did with Git in Fuse Engine.

  1. Create a table.
CREATE TABLE git(file VARCHAR, content VARCHAR);
  1. Write cloud.txt to Fuse Engine.

     INSERT INTO git VALUES('cloud.txt', '2022/05/06, Databend, Cloud');
  2. Fuse Engine generates a snapshot (Snapshot ID: 6450690b09c449939a83268c49c12bb2).

    CALL system$fuse_snapshot('default', 'git');
    *************************** 1. row ***************************
    snapshot_id: 6450690b09c449939a83268c49c12bb2
    snapshot_location: 53/133/_ss/6450690b09c449939a83268c49c12bb2_v1.json
    format_version: 1
    previous_snapshot_id: NULL
    segment_count: 1
    block_count: 1
    row_count: 1
    bytes_uncompressed: 68
    bytes_compressed: 351

  3. Write warehouse.txt to Fuse Engine.

    INSERT INTO git VALUES('warehouse.txt', '2022/05/07, Databend, Warehouse');
  4. Fuse Engine generates another snapshot (Snapshot ID efe2687fd1fc48f8b414b5df2cec1e19) that is linked to the previous one (Snapshot ID: 6450690b09c449939a83268c49c12bb2).

    CALL system$fuse_snapshot('default', 'git');
    *************************** 1. row ***************************
    snapshot_id: efe2687fd1fc48f8b414b5df2cec1e19
    snapshot_location: 53/133/_ss/efe2687fd1fc48f8b414b5df2cec1e19_v1.json
    format_version: 1
    previous_snapshot_id: 6450690b09c449939a83268c49c12bb2
    segment_count: 2
    block_count: 2
    row_count: 2
    *************************** 2. row ***************************
    snapshot_id: 6450690b09c449939a83268c49c12bb2
    snapshot_location: 53/133/_ss/6450690b09c449939a83268c49c12bb2_v1.json
    format_version: 1
    previous_snapshot_id: NULL
    segment_count: 1
    block_count: 1
    row_count: 1

    Fuse Engine now keeps two versions of the data:

    ID efe2687fd1fc48f8b414b5df2cec1e19, Version2
    ID 6450690b09c449939a83268c49c12bb2, Version1

    That's very similar to Git. Right?

Git needs a HEAD as an entry. So does Fuse Engine. Check the HEAD of Fuse Engine:

SHOW CREATE TABLE git\G;
*************************** 1. row ***************************
Table: git
Create Table: CREATE TABLE `git` (
`file` VARCHAR,
`content` VARCHAR
) ENGINE=FUSE SNAPSHOT_LOCATION='53/133/_ss/efe2687fd1fc48f8b414b5df2cec1e19_v1.json'

SNAPSHOT_LOCATION is the HEAD, which by default points to the latest snapshot efe2687fd1fc48f8b414b5df2cec1e19, then how do we switch to the snapshot data whose ID is 6450690b09c449939a83268c49c12bb2? First, check the snapshot information of the current table:

CALL system$fuse_snapshot('default', 'git')\G;
*************************** 1. row ***************************
snapshot_id: efe2687fd1fc48f8b414b5df2cec1e19
snapshot_location: 53/133/_ss/efe2687fd1fc48f8b414b5df2cec1e19_v1.json
format_version: 1
previous_snapshot_id: 6450690b09c449939a83268c49c12bb2
segment_count: 2
block_count: 2
row_count: 2
*************************** 2. row ***************************
snapshot_id: 6450690b09c449939a83268c49c12bb2
snapshot_location: 53/133/_ss/6450690b09c449939a83268c49c12bb2_v1.json
format_version: 1
previous_snapshot_id: NULL
segment_count: 1
block_count: 1
row_count: 1

Then create a new table (git_v1) and point SNAPSHOT_LOCATION to the snapshot file you need:

CREATE TABLE git_v1(`file` VARCHAR, `content` VARCHAR) SNAPSHOT_LOCATION='53/133/_ss/6450690b09c449939a83268c49c12bb2_v1.json';

SELECT * FROM git_v1;
+-----------+-----------------------------+
| file | content |
+-----------+-----------------------------+
| cloud.txt | 2022/05/06, Databend, Cloud |
+-----------+-----------------------------+

Snapshot File

Stores the segment information.

File path:

53/133/_ss/efe2687fd1fc48f8b414b5df2cec1e19_v1.json

File content:

{
"format_version":1,
"snapshot_id":"efe2687f-d1fc-48f8-b414-b5df2cec1e19",
"prev_snapshot_id":[
"6450690b-09c4-4993-9a83-268c49c12bb2",
1
],

"segments":[
[
"53/133/_sg/df56e911eb26446b9f8fac5acc65a580_v1.json"
],
[
"53/133/_sg/d0bff902b98846469480b23c2a8f93d7_v1.json"
]
]
... ...
}

Segment File

Stores block information.

File path:

 53/133/_sg/df56e911eb26446b9f8fac5acc65a580_v1.json

File content:

{
"format_version":1,
"blocks":[
{
"row_count":1,
"block_size":76,
"file_size":360,
"location":[
"53/133/_b/ba4a60013e27479e856f739aefeadfaf_v0.parquet",
0
],
"compression":"Lz4Raw"
}
]
... ...
}

Block File

The underlying data of Fuse Engine uses Parquet format, and each file is composed of multiple blocks.

Summary

In the early design period (October 2021) of Databend's Fuse Engine, the requirements were very clear, but the solution selection didn't go smoothly. At that time, the Databend community investigated a large number of Table Format solutions (such as Iceberg) on the market. The challenge was to choose between using an existing solution and building a new one. Finally, we decided to develop a simple and suitable Storage Engine that uses the Parquet standard as the storage format. Fuse Engine stores the Parquet Footer separately to reduce unnecessary Seek operations, and introduces a more flexible indexing mechanism, for example, operations such as Aggregation and Join can have their own indexes for acceleration.

Feel free to deploy Fuse Engine with your object storage to have a different experience on the big data analysis: https://docs.databend.com/doc/deploy

Databend on GitHub: https://github.com/datafuselabs/databend

Deploying Databend on Your Laptop in Minutes

Deploying a data warehouse sounds like a big job to you? Definitely NOT. Databend can be deployed to your laptop and uses the local file system as storage. You can complete the deployment in a few minutes even if you're new to Databend. Now let's get started!

tip

Databend requires a scalable storage (for example, object storage) to work. This blog uses local file system to provide you a hands-on experience. Never use a local file system as storage for production purposes.

STEP 1. Downloading Databend

a. Create a folder named databend in the directory /usr/local. Then create the following subfolders in the folder databend:

  • bin
  • data
  • etc
  • logs

b. Download and extract the latest Databend package for your platform from https://github.com/datafuselabs/databend/releases.

c. Move the extracted files databend-meta and databend-query to the folder /usr/local/databend/bin.

STEP 2. Deploying a Standalone databend-meta

a. Create a file named databend-meta.toml in the folder /usr/local/databend/etc with the following content:

dir = "metadata/_logs"
admin_api_address = "127.0.0.1:8101"
grpc_api_address = "127.0.0.1:9101"

[raft_config]
id = 1
single = true
raft_dir = "metadata/datas"

b. Open a terminal window and navigate to the folder /usr/local/databend/bin.

c. Run the following command to start databend-meta:

./databend-meta -c ../etc/databend-meta.toml > meta.log 2>&1 &

d. Run the following command to check if databend-meta was started successfully:

curl -I  http://127.0.0.1:8101/v1/health

STEP 3. Deploying a Standalone databend-query

a. Create a file named databend-query.toml in the folder /usr/local/databend/etc with the following content:

[log]
level = "INFO"
dir = "benddata/_logs"

[query]
# For admin RESET API.
admin_api_address = "127.0.0.1:8001"

# Metrics.
metric_api_address = "127.0.0.1:7071"

# Cluster flight RPC.
flight_api_address = "127.0.0.1:9091"

# Query MySQL Handler.
mysql_handler_host = "127.0.0.1"
mysql_handler_port = 3307


# Query HTTP Handler.
http_handler_host = "127.0.0.1"
http_handler_port = 8081

tenant_id = "tenant1"
cluster_id = "cluster1"

[meta]
address = "127.0.0.1:9101"
username = "root"
password = "root"

[storage]
# s3
type = "fs"

[storage.fs]
data_path = "benddata/datas"

b. Open a terminal window and navigate to the folder /usr/local/databend/bin.

c. Run the following command to start databend-meta:

./databend-query -c ../etc/databend-query.toml > query.log 2>&1 &

d. Run the following command to check if databend-meta was started successfully:

curl -I  http://127.0.0.1:8001/v1/health

There you go! You have successfully deployed Databend on your computer. If you have a SQL client on your computer, try the steps below to verify the deployment:

a. Create a connection to 127.0.0.1 from your SQL client. In the connection, set the port to 3307, and set the username to root.

b. Run the following commands to check if the query is successful:

CREATE TABLE t1(a int);

INSERT INTO t1 VALUES(1), (2);

SELECT * FROM t1;