Skip to content

Commit

Permalink
Separate raw_insert method from data_insert method to simplify compre…
Browse files Browse the repository at this point in the history
…ssion handling (#224)
  • Loading branch information
genzgd authored Jul 18, 2023
1 parent 657f226 commit 8621aea
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 42 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ In any case, this should not affect the basic usage of Superset with ClickHouse.
your Superset installation, the ClickHouse datasource will be available with either the enhanced connection dialog
or a standard SqlAlchemy DSN in the form of `clickhousedb://{username}:{password}@{host}:{port}`.

## 0.6.8, 2023-07-18
### Bug Fix
- Fixed client `raw_insert` method when a compression method specified. https://github.com/ClickHouse/clickhouse-connect/issues/223

### Improvement
- Add compression parameter to the clickhouse `tools.insert_file` method. '.gz' and '.gzip' extensions are automatically
recognized.

## 0.6.7, 2023-07-18
### Bug Fixes
- Fixed an issue for older versions of ClickHouse where the server would send an initial block of 0 rows for larger queries.
Expand Down
2 changes: 1 addition & 1 deletion clickhouse_connect/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.6.7
0.6.8
4 changes: 3 additions & 1 deletion clickhouse_connect/driver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,13 +697,15 @@ def raw_insert(self, table: str,
column_names: Optional[Sequence[str]] = None,
insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None,
settings: Optional[Dict] = None,
fmt: Optional[str] = None) -> QuerySummary:
fmt: Optional[str] = None,
compression: Optional[str] = None) -> QuerySummary:
"""
Insert data already formatted in a bytes object
:param table: Table name (whether qualified with the database name or not)
:param column_names: Sequence of column names
:param insert_block: Binary or string data already in a recognized ClickHouse format
:param settings: Optional dictionary of ClickHouse settings (key/string values)
:param compression: Recognized ClickHouse `Accept-Encoding` header compression value
:param fmt: Valid clickhouse format
"""

Expand Down
43 changes: 23 additions & 20 deletions clickhouse_connect/driver/httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,57 +226,60 @@ def data_insert(self, context: InsertContext) -> QuerySummary:
if context.empty:
logger.debug('No data included in insert, skipping')
return QuerySummary()
if context.compression is None:
context.compression = self.write_compression
block_gen = self._transform.build_insert(context)

def error_handler(response: HTTPResponse):
def error_handler(resp: HTTPResponse):
# If we actually had a local exception when building the insert, throw that instead
if context.insert_exception:
ex = context.insert_exception
context.insert_exception = None
raise ProgrammingError('Internal serialization error. This usually indicates invalid data types ' +
'in an inserted row or column') from ex # type: ignore
self._error_handler(response)
self._error_handler(resp)

headers = {'Content-Type': 'application/octet-stream'}
if context.compression is None:
context.compression = self.write_compression
if context.compression:
headers['Content-Encoding'] = context.compression
block_gen = self._transform.build_insert(context)

resp = self.raw_insert(insert_block=block_gen,
settings=context.settings,
compression=context.compression,
status_handler=error_handler)
params = {}
if self.database:
params['database'] = self.database
params.update(self._validate_settings(context.settings))

response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False)
logger.debug('Context insert response code: %d, content: %s', response.status, response.data)
context.data = None
return resp
return QuerySummary(self._summary(response))

def raw_insert(self, table: str = None,
column_names: Optional[Sequence[str]] = None,
insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None,
settings: Optional[Dict] = None,
fmt: Optional[str] = None,
compression: Optional[str] = None,
status_handler: Optional[Callable] = None) -> QuerySummary:
compression: Optional[str] = None) -> QuerySummary:
"""
See BaseClient doc_string for this method
"""
write_format = fmt if fmt else self._write_format
params = {}
headers = {'Content-Type': 'application/octet-stream'}
if compression:
headers['Content-Encoding'] = compression
if table:
cols = f" ({', '.join([quote_identifier(x) for x in column_names])})" if column_names is not None else ''
query = f'INSERT INTO {table}{cols} FORMAT {write_format}'
if isinstance(insert_block, str):
query = f'INSERT INTO {table}{cols} FORMAT {fmt if fmt else self._write_format}'
if not compression and isinstance(insert_block, str):
insert_block = query + '\n' + insert_block
elif isinstance(insert_block, (bytes, bytearray, BinaryIO)):
elif not compression and isinstance(insert_block, (bytes, bytearray, BinaryIO)):
insert_block = (query + '\n').encode() + insert_block
else:
params['query'] = query
if self.database:
params['database'] = self.database
params.update(self._validate_settings(settings or {}))
response = self._raw_request(insert_block, params, headers,
error_handler=status_handler,
server_wait=False)
logger.debug('Insert response code: %d, content: %s', response.status, response.data)
response = self._raw_request(insert_block, params, headers, server_wait=False)
logger.debug('Raw insert response code: %d, content: %s', response.status, response.data)
return QuerySummary(self._summary(response))

@staticmethod
Expand Down
13 changes: 11 additions & 2 deletions clickhouse_connect/driver/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,18 @@ def insert_file(client: Client,
fmt: Optional[str] = None,
column_names: Optional[Sequence[str]] = None,
database: Optional[str] = None,
settings: Optional[Dict[str, Any]] = None) -> QuerySummary:
settings: Optional[Dict[str, Any]] = None,
compression: Optional[str] = None) -> QuerySummary:
full_table = f'{quote_identifier(database)}.{quote_identifier(table)}' if database else quote_identifier(table)
if not fmt:
fmt = 'CSV' if column_names else 'CSVWithNames'
if compression is None:
if file_path.endswith('.gzip') or file_path.endswith('.gz'):
compression = 'gzip'
with open(file_path, 'rb') as file:
return client.raw_insert(full_table, column_names=column_names, insert_block=file, fmt=fmt, settings=settings)
return client.raw_insert(full_table,
column_names=column_names,
insert_block=file,
fmt=fmt,
settings=settings,
compression=compression)
Binary file added tests/integration_tests/movies.csv.gz
Binary file not shown.
15 changes: 0 additions & 15 deletions tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,6 @@ def test_insert(test_client: Client, test_table_engine: str):
test_client.command('DROP TABLE IF EXISTS test_system_insert')


def test_raw_insert(test_client: Client, table_context: Callable):
with table_context('test_raw_insert', ["`weir'd` String", 'value String']):
csv = 'value1\nvalue2'
test_client.raw_insert('test_raw_insert', ['"weir\'d"'], csv.encode(), fmt='CSV')
result = test_client.query('SELECT * FROM test_raw_insert')
assert result.result_set[1][0] == 'value2'

test_client.command('TRUNCATE TABLE test_raw_insert')
tsv = 'weird1\tvalue__`2\nweird2\tvalue77'
test_client.raw_insert('test_raw_insert', ["`weir'd`", 'value'], tsv, fmt='TSV')
result = test_client.query('SELECT * FROM test_raw_insert')
assert result.result_set[0][1] == 'value__`2'
assert result.result_set[1][1] == 'value77'


def test_decimal_conv(test_client: Client, table_context: Callable):
with table_context('test_num_conv', ['col1 UInt64', 'col2 Int32', 'f1 Float64']):
data = [[Decimal(5), Decimal(-182), Decimal(55.2)], [Decimal(57238478234), Decimal(77), Decimal(-29.5773)]]
Expand Down
35 changes: 35 additions & 0 deletions tests/integration_tests/test_raw_insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from pathlib import Path
from typing import Callable

from clickhouse_connect.driver import Client


def test_raw_insert(test_client: Client, table_context: Callable):
with table_context('test_raw_insert', ["`weir'd` String", 'value String']):
csv = 'value1\nvalue2'
test_client.raw_insert('test_raw_insert', ['"weir\'d"'], csv.encode(), fmt='CSV')
result = test_client.query('SELECT * FROM test_raw_insert')
assert result.result_set[1][0] == 'value2'

test_client.command('TRUNCATE TABLE test_raw_insert')
tsv = 'weird1\tvalue__`2\nweird2\tvalue77'
test_client.raw_insert('test_raw_insert', ["`weir'd`", 'value'], tsv, fmt='TSV')
result = test_client.query('SELECT * FROM test_raw_insert')
assert result.result_set[0][1] == 'value__`2'
assert result.result_set[1][1] == 'value77'


def test_raw_insert_compression(test_client: Client, table_context: Callable):
data_file = f'{Path(__file__).parent}/movies.csv.gz'
with open(data_file, mode='rb') as movies_file:
data = movies_file.read()
with table_context('test_gzip_movies', ['movie String', 'year UInt16', 'rating Decimal32(3)']):
insert_result = test_client.raw_insert('test_gzip_movies', None, data, fmt='CSV', compression='gzip',
settings={'input_format_allow_errors_ratio': .2,
'input_format_allow_errors_num': 5}
)
assert 248 == insert_result.written_rows
res = test_client.query(
'SELECT count() as count, sum(rating) as rating, max(year) as year FROM test_gzip_movies').first_item
assert res['count'] == 248
assert res['year'] == 2022
6 changes: 3 additions & 3 deletions tests/integration_tests/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@


def test_csv_upload(test_client: Client, table_context: Callable):
data_file = f'{Path(__file__).parent}/movies.csv'
data_file = f'{Path(__file__).parent}/movies.csv.gz'
with table_context('test_csv_upload', ['movie String', 'year UInt16', 'rating Decimal32(3)']):
insert_result = insert_file(test_client, 'test_csv_upload', data_file,
settings={'input_format_allow_errors_ratio': .2,
'input_format_allow_errors_num': 5})
settings={'input_format_allow_errors_ratio': .2,
'input_format_allow_errors_num': 5})
assert 248 == insert_result.written_rows
res = test_client.query(
'SELECT count() as count, sum(rating) as rating, max(year) as year FROM test_csv_upload').first_item
Expand Down

0 comments on commit 8621aea

Please sign in to comment.