JDBC driver that converts any INSERT
, UPDATE
and DELETE
statements into append-only INSERT
s. Instead of
updating rows in-place it inserts the new version of the row along with version metadata.
SQL-on-Hadoop data management systems such as Apache HAWQ do not offer the same style of INSERT, UPDATE, and DELETE that users expect of traditional RDBMS. Unlike transactional systems, big-data analytical queries are dominated by SELECT over millions or billions of rows. Analytical databases are optimized for this kind of workload. The storage systems are optimized for high throughput scans, and commonly implemented as immutable (append-only) persistence stores. No in-place updates are allowed.
The SQL-on-Hadoop systems naturally support append-only operations such as INSERT
. The UPDATE
or DELETE
demand
an alternative approach:
HAWQ-304, HIVE-5317
This project emulates INSERT
, UPDATE
and DELETE
by turning them into append-only INSERT
s. Instead of updating
rows in-place it inserts the new version of the row using two additional metadata columns: version_number
and
subsequent_version_number
of either TIMESTAMP
or BIGINT
type.
Note that this project can be used as workaround. A complete solution will be provided by HAWQ-304.
Use as a standard JDBC Connection
:
public class Main {
public static void main(String[] argv) throws Exception {
Class.forName(org.apache.calcite.jdbc.Driver.class.getName());
Properties info = new Properties();
info.setProperty("lex", "JAVA"); // Enables case sensitivity
info.setProperty("model", "path/to/myModel.json"); // See section below
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
// use connection as usual
}
}
Consult sql-rewriter-springboot-example and journalled-sql-rewriter-example for more elaborated examples.
SqlLine offers a handy command-line tool for testing sql-rewriter
.
To connect to the SQL-Rewrite JDBC driver you need to provide a model. Models can be JSON files, or built programmatically. A model is comprised of two group of attributes:
- Calcite generic attributes, as explained here model attributes. Note
that to use journalling on a schema, the
type
must becustom
and thefactory
must beorg.apache.calcite.adapter.jdbc.JournalledJdbcSchema$Factory
(see example below). sql-rewrite
specific attributes set via theoperand
properties. The table below explains the specific properties.
Property | Description | Default |
---|---|---|
dataSource † |
Class name to use as the underlying DataSource |
none |
connection † |
Path to the backend jdbc connection configuration file | none |
jdbcDriver † |
See section below | none |
jdbcUrl † |
See section below | none |
jdbcUser † |
See section below | none |
jdbcPassword † |
See section below | none |
jdbcSchema |
The schema name in the database. Note that due to CALCITE-1692 this must match the name |
none |
journalSuffix |
Journal table suffix | _journal |
journalVersionField |
Journal table version number column name | version_number |
journalSubsequentVersionField |
Journal table delete flag column name | subsequent_version_number |
journalVersionType |
The type of the version columns. Either TIMESTAMP or BIGINT |
TIMESTAMP |
journalDefaultKey |
List of columns to use as primary keys by default (applies when tables do not have an explicit list given in journalTables ) |
none |
journalTables |
List of journalled tables to be managed. Expressions involving other tables will pass-through unchanged. This can be a list of table names, or a map of table names to primary key columns. |
none |
†: Provide one of: dataSource
or connection
or jdbcDriver
& jdbcUrl
.
For example:
{
"version": "1.0",
"defaultSchema": "doesntmatter",
"schemas": [
{
"name": "hr",
"type": "custom",
"factory": "org.apache.calcite.adapter.jdbc.JournalledJdbcSchema$Factory",
"operand": {
"connection": "myTestConnection.json",
"jdbcSchema": "hr",
"journalSuffix": "_journal",
"journalVersionField": "version_number",
"journalSubsequentVersionField": "subsequent_version_number",
"journalDefaultKey": ["id"],
"journalTables": {
"emps": ["empid"],
"depts": ["deptno"]
}
}
}
]
}
Backend DB connection configuration can be provided inside model.json
, or in a separate file referenced by
model.json
(via the connection
operand).
The connection configuration contains the common JDBC connection properties like driver, jdbc URL, and credentials.
Property | Description | Default |
---|---|---|
jdbcDriver |
JDBC driver Class name. For example: org.postgresql.Driver |
none |
jdbcUrl |
JDBC URL. For example: jdbc:postgresql://localhost:5432/postgres |
none |
jdbcUser |
The database user on whose behalf the connection is being made. | blank |
jdbcPassword |
The database user’s password. | blank |
For example:
{
"jdbcDriver": "org.postgresql.Driver",
"jdbcUrl": "jdbc:postgresql://localhost:5432/postgres",
"jdbcUser": "myDatabaseUser",
"jdbcPassword": "myDatabasePassword"
}
sql-rewrite
leverages Apache Calcite to implement a JDBC adapter between the end-users
and the backend SQL-on-Hadoop system. It exposes a fully-fledged JDBC interface to the end-users while internally
converts the incoming INSERT
, UPDATE
and DELETE
into append-only INSERT
s and forwards later to the backend DB
(e.g. Apache HAWQ).
Lets have a Department table called depts
, with deptno
(key) and department_name
columns:
CREATE TABLE hr.depts (
deptno SERIAL NOT NULL,
department_name TEXT NOT NULL,
PRIMARY KEY (deptno)
);
The sql-rewrite
convention requires you to create a corresponding journal table named <your-table-name>_journal
,
with the same schema as the original table plus two metadata columns: version_number
and subsequent_version_number
of TIMESTAMP
or BIGINT
type. The column order does not matter.
CREATE TABLE hr.depts_journal (
deptno SERIAL NOT NULL,
version_number TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
subsequent_version_number TIMESTAMP WITH TIME ZONE NULL DEFAULT NULL,
department_name TEXT NOT NULL,
PRIMARY KEY (deptno, version_number)
);
version_number
— version when the row that was inserted. An increasing number, the highest value represents the current row state.subsequent_version_number
— denotes the next version of the record. Since existing records cannot be updated, this will usually be NULL (the exception being deleted records, where this is set to match theversion_number
). Also background archival tasks can populate this for older records as an optimisation.
Note that the new key is composed of the original key(s) (in this example deptno
) and the version_number
!
Below are few sample INSERT
, UPDATE
, DELETE
and SELECT
statements and their internal representation.
- Issuing an
INSERT
against the Calcite JDBC driver
INSERT INTO hr.depts (deptno, department_name) VALUES (666, 'Pivotal');
is translated into following SQL statement
INSERT INTO hr.depts_journal (deptno, department_name) VALUES (666, 'Pivotal');
Note that the table name is replaced from depts
to depts_journal
. Actually the depts
table may not even exist.
Data is always stored by the depts_journal
table!
UPDATE
issued against the Calcite JDBC
UPDATE hr.depts SET department_name='New Name' WHERE deptno = 666;
is expanded into an INSERT
/ SELECT
statement like this
INSERT INTO hr.depts_journal (deptno, department_name)
SELECT
deptno,
'New Name' as department_name
FROM (
SELECT *, MAX(version_number) OVER (PARTITION BY deptno) AS last_version_number
FROM hr.depts_journal
) AS last_link
WHERE subsequent_version_number IS NULL
AND version_number = last_version_number
AND deptno = 666;
DELETE
issued against the Calcite JDBC
DELETE FROM hr.depts WHERE deptno=666;
is expanded into an INSERT
/ SELECT
statement like this
INSERT INTO hr.depts_journal (deptno, department_name, version_number, subsequent_version_number)
SELECT
deptno,
department_name,
CURRENT_TIMESTAMP AS version_number,
CURRENT_TIMESTAMP AS subsequent_version_number
FROM (
SELECT *, MAX(version_number) OVER (PARTITION BY deptno) AS last_version_number
FROM hr.depts_journal
) AS last_link
WHERE subsequent_version_number IS NULL
AND version_number = last_version_number
AND deptno = 666;
SELECT
query against the Calcite JDBC
SELECT * FROM hr.depts;
is converted into SELECT
such as
SELECT
deptno,
department_name
FROM (
SELECT *, MAX(version_number) OVER (PARTITION BY deptno) AS last_version_number
FROM hr.depts_journal
) AS link_last
WHERE subsequent_version_number IS NULL AND version_number = last_version_number;
For every deptno
only the row with the highest version_number
is returned.
The MAX(version_number) OVER (PARTITION BY deptno)
window function
computes the max version_number
per deptno
.
When using this project, it is important to be aware of the following limitations:
- When using
TIMESTAMP
versioning, concurrent updates to the same record can lead to data loss. If users A and B both send an update to the same record simultaneously, one of the users changes will be lost, even if they were updating different columns. Similarly, if one user deletes a record while another is updating it, the update may “win”, causing the record to not be deleted. ForBIGINT
versioning, one of the users will get a duplicate key error. - Unique indexes cannot be defined. Similarly, “UPSERT” (
ON CONFLICT UPDATE
) is not supported. - Table manipulations (DDL) are not supported.
- Only ANSI SQL syntax can be used. For example,
INSERT
…RETURNING
is not supported. - Performing
INSERT
s with explicit key values will cause strange behaviour if the key currently or previously existed. (forBIGINT
versioning it will be rejected if the key ever existed, and forTIMESTAMP
it will be accepted even if an existing non-deleted record has the same key).
On the target Posgres/Greenplum or HAWQ create test schema hr
and table depts_journal
:
DROP SCHEMA IF EXISTS hr CASCADE;
CREATE SCHEMA hr;
CREATE TABLE hr.depts_journal (
deptno SERIAL NOT NULL,
version_number TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
subsequent_version_number TIMESTAMP WITH TIME ZONE NULL DEFAULT NULL,
department_name TEXT NOT NULL,
PRIMARY KEY (deptno, version_number)
);
start sqlline
from the root folder:
./sqlline
Connect to the journalled-sql-rewriter-example model.
sqlline> !connect jdbc:calcite:lex=JAVA;model=journalled-sql-rewriter-example/src/main/resources/myTestModel.json
Hit enter for username and password.
Insert new rows:
0: jdbc:calcite:lex=JAVA> INSERT INTO hr.depts (deptno, department_name) VALUES (666, 'TEST1');
0: jdbc:calcite:lex=JAVA> INSERT INTO hr.depts (deptno, department_name) VALUES (999, 'TEST2');
Check content:
0: jdbc:calcite:lex=JAVA> select * from hr.depts;
+------------+-----------------+
| deptno | department_name |
+------------+-----------------+
| 666 | TEST1 |
| 999 | TEST2 |
+------------+-----------------+
2 rows selected (0.035 seconds)
Update a deptno=666
0: jdbc:calcite:lex=JAVA> UPDATE hr.depts SET department_name='NEW VALUE' WHERE deptno=666;
Delete a deptno=999
0: jdbc:calcite:lex=JAVA> DELETE FROM hr.depts WHERE deptno=999;
Check table content:
0: jdbc:calcite:lex=JAVA> select * from hr.depts;
+------------+-----------------+
| deptno | department_name |
+------------+-----------------+
| 666 | NEW VALUE |
+------------+-----------------+
1 row selected (0.02 seconds)