From 12438af711d94a3d3611d743c330a5010b571e00 Mon Sep 17 00:00:00 2001 From: Daniel Mickens Date: Fri, 19 Jan 2024 10:19:21 -0500 Subject: [PATCH] Support for copy local from file (#128) * Copy from local file implementation and testing --- .../src/{messages.ts => backend-messages.ts} | 91 ++++---- packages/v-protocol/src/buffer-reader.ts | 7 + .../v-protocol/src/inbound-parser.test.ts | 53 +---- packages/v-protocol/src/index.ts | 2 +- packages/v-protocol/src/parser.ts | 138 ++++++------ packages/v-protocol/src/serializer.ts | 69 ++++-- .../v-protocol/src/testing/test-buffers.ts | 14 +- packages/vertica-nodejs/lib/client.js | 32 ++- packages/vertica-nodejs/lib/connection.js | 36 +++- packages/vertica-nodejs/lib/query.js | 67 +++++- packages/vertica-nodejs/lib/result.js | 10 + .../integration/client/copy-tests.js | 204 ++++++++++++++++++ .../test/integration/connection/copy-tests.js | 50 ----- .../integration/connection/query-tests.js | 2 +- packages/vertica-nodejs/test/test-buffers.js | 4 - .../test/unit/client/notification-tests.js | 10 - .../unit/connection/inbound-parser-tests.js | 16 -- 17 files changed, 546 insertions(+), 259 deletions(-) rename packages/v-protocol/src/{messages.ts => backend-messages.ts} (85%) create mode 100644 packages/vertica-nodejs/mochatest/integration/client/copy-tests.js delete mode 100644 packages/vertica-nodejs/test/integration/connection/copy-tests.js delete mode 100644 packages/vertica-nodejs/test/unit/client/notification-tests.js diff --git a/packages/v-protocol/src/messages.ts b/packages/v-protocol/src/backend-messages.ts similarity index 85% rename from packages/v-protocol/src/messages.ts rename to packages/v-protocol/src/backend-messages.ts index 0fc6dcac..2c027bdd 100644 --- a/packages/v-protocol/src/messages.ts +++ b/packages/v-protocol/src/backend-messages.ts @@ -24,27 +24,27 @@ export type MessageName = | 'closeComplete' | 'noData' | 'portalSuspended' - | 'replicationStart' | 'emptyQuery' | 'copyDone' - | 'copyData' | 'rowDescription' | 'parameterDescription' | 'parameterStatus' | 'commandDescription' | 'backendKeyData' - | 'notification' | 'readyForQuery' | 'commandComplete' | 'dataRow' | 'copyInResponse' - | 'copyOutResponse' + | 'loadFile' | 'authenticationOk' | 'authenticationMD5Password' | 'authenticationSHA512Password' | 'authenticationCleartextPassword' | 'error' | 'notice' + | 'verifyFiles' + | 'endOfBatchResponse' + | 'writeFile' export interface BackendMessage { name: MessageName @@ -76,19 +76,19 @@ export const portalSuspended: BackendMessage = { length: 5, } -export const replicationStart: BackendMessage = { - name: 'replicationStart', - length: 4, -} - export const emptyQuery: BackendMessage = { name: 'emptyQuery', - length: 4, + length: 5, } export const copyDone: BackendMessage = { name: 'copyDone', - length: 4, + length: 5, +} + +export const EndOfBatchResponse: BackendMessage = { + name: 'endOfBatchResponse', + length: 5 } interface NoticeOrError { @@ -133,23 +133,6 @@ export class DatabaseError extends Error implements NoticeOrError { } } -export class CopyDataMessage { - public readonly name = 'copyData' - constructor(public readonly length: number, public readonly chunk: Buffer) {} -} - -export class CopyResponse { - public readonly columnTypes: number[] - constructor( - public readonly length: number, - public readonly name: MessageName, - public readonly binary: boolean, - columnCount: number - ) { - this.columnTypes = new Array(columnCount) - } -} - export class Field { constructor( public readonly name: string, @@ -186,6 +169,26 @@ export class Parameter { ) {} } +export class LoadFileMessage { + public readonly name: MessageName = 'loadFile' + constructor ( + public readonly length: number, + public readonly fileName: string + ) {} +} + +export class CopyInResponseMessage { + public readonly name: MessageName = 'copyInResponse' + public readonly columnFormats: number[] + constructor ( + public readonly length: number, + public readonly isBinary: boolean, + public readonly numColumns: number, + ) { + this.columnFormats = new Array(this.numColumns) + } +} + export class ParameterDescriptionMessage { public readonly name: MessageName = 'parameterDescription' //public readonly nonNativeTyeps: number //breadcrumb for non native types @@ -225,16 +228,6 @@ export class BackendKeyDataMessage { constructor(public readonly length: number, public readonly processID: number, public readonly secretKey: number) {} } -export class NotificationResponseMessage { - public readonly name: MessageName = 'notification' - constructor( - public readonly length: number, - public readonly processId: number, - public readonly channel: string, - public readonly payload: string - ) {} -} - export class ReadyForQueryMessage { public readonly name: MessageName = 'readyForQuery' constructor(public readonly length: number, public readonly status: string) {} @@ -273,3 +266,25 @@ export class NoticeMessage implements BackendMessage, NoticeOrError { public line: string | undefined public routine: string | undefined } + +export class VerifyFilesMessage { + public readonly name: MessageName = 'verifyFiles' + public readonly fileNames: string[] + constructor(public readonly length: number, + public numFiles: number, + public files: string[], + public readonly rejectFile: string, + public readonly exceptionFile: string) + { + this.fileNames = [...files] // shallow copy + } +} + +export class WriteFileMessage { + public readonly name: MessageName = 'writeFile' + constructor(public readonly length: number, + public fileName: string, + public fileLength: number, + public fileContents: string | bigint[] ) {} +} + diff --git a/packages/v-protocol/src/buffer-reader.ts b/packages/v-protocol/src/buffer-reader.ts index 909f0c23..24fabbe3 100644 --- a/packages/v-protocol/src/buffer-reader.ts +++ b/packages/v-protocol/src/buffer-reader.ts @@ -55,6 +55,13 @@ export class BufferReader { return result } + //signed 64 bit little endian + public int64LE(): bigint { + const result = this.buffer.readBigInt64LE(this.offset) + this.offset += 8 + return result + } + public string(length: number): string { const result = this.buffer.toString(this.encoding, this.offset, this.offset + length) this.offset += length diff --git a/packages/v-protocol/src/inbound-parser.test.ts b/packages/v-protocol/src/inbound-parser.test.ts index 214530ac..8feff442 100644 --- a/packages/v-protocol/src/inbound-parser.test.ts +++ b/packages/v-protocol/src/inbound-parser.test.ts @@ -21,7 +21,7 @@ import BufferList from './testing/buffer-list' import { parse } from '.' import assert from 'assert' import { PassThrough } from 'stream' -import { BackendMessage } from './messages' +import { BackendMessage } from './backend-messages' var authOkBuffer = buffers.authenticationOk() var paramStatusBuffer = buffers.parameterStatus('client_encoding', 'UTF8') @@ -193,7 +193,7 @@ var expectedTwoParameterMessage = { } var testForMessage = function (buffer: Buffer, expectedMessage: any) { - it('recieves and parses ' + expectedMessage.name, async () => { + it('receives and parses ' + expectedMessage.name, async () => { const messages = await parseBuffers([buffer]) const [lastMessage] = messages @@ -215,14 +215,6 @@ var expectedMD5PasswordMessage = { salt: Buffer.from([1, 2, 3, 4]), } -var notificationResponseBuffer = buffers.notification(4, 'hi', 'boom') -var expectedNotificationResponseMessage = { - name: 'notification', - processId: 4, - channel: 'hi', - payload: 'boom', -} - const parseBuffers = async (buffers: Buffer[]): Promise => { const stream = new PassThrough() for (const buffer of buffers) { @@ -242,10 +234,9 @@ describe('PgPacketStream', function () { testForMessage(backendKeyDataBuffer, expectedBackendKeyDataMessage) testForMessage(readyForQueryBuffer, expectedReadyForQueryMessage) testForMessage(commandCompleteBuffer, expectedCommandCompleteMessage) - testForMessage(notificationResponseBuffer, expectedNotificationResponseMessage) testForMessage(buffers.emptyQuery(), { name: 'emptyQuery', - length: 4, + length: 5, }) testForMessage(Buffer.from([0x6e, 0, 0, 0, 4]), { @@ -389,51 +380,29 @@ describe('PgPacketStream', function () { }) }) - describe('parses replication start message', function () { - testForMessage(Buffer.from([0x57, 0x00, 0x00, 0x00, 0x04]), { - name: 'replicationStart', - length: 4, - }) - }) - describe('copy', () => { testForMessage(buffers.copyIn(0), { name: 'copyInResponse', length: 7, - binary: false, - columnTypes: [], + isBinary: false, + columnFormats: [], }) testForMessage(buffers.copyIn(2), { name: 'copyInResponse', length: 11, - binary: false, - columnTypes: [0, 1], + isBinary: false, + columnFormats: [0, 1], }) - testForMessage(buffers.copyOut(0), { - name: 'copyOutResponse', - length: 7, - binary: false, - columnTypes: [], - }) - - testForMessage(buffers.copyOut(3), { - name: 'copyOutResponse', - length: 13, - binary: false, - columnTypes: [0, 1, 2], + testForMessage(buffers.loadFile('sampleFile'), { + name: 'loadFile', + length: 15, }) testForMessage(buffers.copyDone(), { name: 'copyDone', - length: 4, - }) - - testForMessage(buffers.copyData(Buffer.from([5, 6, 7])), { - name: 'copyData', - length: 7, - chunk: Buffer.from([5, 6, 7]), + length: 5, }) }) diff --git a/packages/v-protocol/src/index.ts b/packages/v-protocol/src/index.ts index ac3e5f68..7c39ad7e 100644 --- a/packages/v-protocol/src/index.ts +++ b/packages/v-protocol/src/index.ts @@ -16,7 +16,7 @@ * ============================================================================= */ -import { BackendMessage, DatabaseError } from './messages' +import { BackendMessage, DatabaseError } from './backend-messages' import { serialize } from './serializer' import { VerticaType } from './vertica-types' import { Parser, MessageCallback } from './parser' diff --git a/packages/v-protocol/src/parser.ts b/packages/v-protocol/src/parser.ts index d309a601..f894b480 100644 --- a/packages/v-protocol/src/parser.ts +++ b/packages/v-protocol/src/parser.ts @@ -25,13 +25,9 @@ import { noData, portalSuspended, copyDone, - replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, - CopyDataMessage, - CopyResponse, - NotificationResponseMessage, RowDescriptionMessage, ParameterDescriptionMessage, Parameter, @@ -46,7 +42,12 @@ import { AuthenticationMD5Password, NoticeMessage, AuthenticationSHA512Password, -} from './messages' + VerifyFilesMessage, + LoadFileMessage, + CopyInResponseMessage, + EndOfBatchResponse, + WriteFileMessage, +} from './backend-messages' import { BufferReader } from './buffer-reader' import assert from 'assert' @@ -70,29 +71,29 @@ type StreamOptions = TransformOptions & { } const enum MessageCodes { - DataRow = 0x44, // D - ParseComplete = 0x31, // 1 - BindComplete = 0x32, // 2 - CloseComplete = 0x33, // 3 - CommandComplete = 0x43, // C - ReadyForQuery = 0x5a, // Z - NoData = 0x6e, // n - NotificationResponse = 0x41, // A - AuthenticationResponse = 0x52, // R - ParameterStatus = 0x53, // S - BackendKeyData = 0x4b, // K - ErrorMessage = 0x45, // E - NoticeMessage = 0x4e, // N - RowDescriptionMessage = 0x54, // T + ParseComplete = 0x31, // 1 + BindComplete = 0x32, // 2 + CloseComplete = 0x33, // 3 + CommandComplete = 0x43, // C + DataRow = 0x44, // D + ErrorMessage = 0x45, // E + VerifyFiles = 0x46, // F + CopyInResponse = 0x47, // G + LoadFile = 0x48, // H + EmptyQuery = 0x49, // I + EndOfBatchResponse = 0x4a, // J + BackendKeyData = 0x4b, // K + NoticeMessage = 0x4e, // N + WriteFile = 0x4f, // O + AuthenticationResponse = 0x52, // R + ParameterStatus = 0x53, // S + RowDescriptionMessage = 0x54, // T + ReadyForQuery = 0x5a, // Z + CopyDoneResponse = 0x63, // c + CommandDescriptionMessage = 0x6d, // m + NoData = 0x6e, // n + PortalSuspended = 0x73, // s ParameterDescriptionMessage = 0x74, // t - CommandDescriptionMessage = 0x6d, // m - PortalSuspended = 0x73, // s - ReplicationStart = 0x57, // W - EmptyQuery = 0x49, // I - CopyIn = 0x47, // G - CopyOut = 0x48, // H - CopyDone = 0x63, // c - CopyData = 0x64, // d } export type MessageCallback = (msg: BackendMessage) => void @@ -186,20 +187,18 @@ export class Parser { return noData case MessageCodes.PortalSuspended: return portalSuspended - case MessageCodes.CopyDone: + case MessageCodes.CopyDoneResponse: return copyDone - case MessageCodes.ReplicationStart: - return replicationStart case MessageCodes.EmptyQuery: return emptyQuery + case MessageCodes.EndOfBatchResponse: + return EndOfBatchResponse case MessageCodes.DataRow: return this.parseDataRowMessage(offset, length, bytes) case MessageCodes.CommandComplete: return this.parseCommandCompleteMessage(offset, length, bytes) case MessageCodes.ReadyForQuery: return this.parseReadyForQueryMessage(offset, length, bytes) - case MessageCodes.NotificationResponse: - return this.parseNotificationMessage(offset, length, bytes) case MessageCodes.AuthenticationResponse: return this.parseAuthenticationResponse(offset, length, bytes) case MessageCodes.ParameterStatus: @@ -216,12 +215,14 @@ export class Parser { return this.parseParameterDescriptionMessage(offset, length, bytes) case MessageCodes.CommandDescriptionMessage: return this.parseCommandDescriptionMessage(offset, length, bytes) - case MessageCodes.CopyIn: - return this.parseCopyInMessage(offset, length, bytes) - case MessageCodes.CopyOut: - return this.parseCopyOutMessage(offset, length, bytes) - case MessageCodes.CopyData: - return this.parseCopyData(offset, length, bytes) + case MessageCodes.CopyInResponse: + return this.parseCopyInResponseMessage(offset, length, bytes) + case MessageCodes.LoadFile: + return this.parseLoadFileMessage(offset, length, bytes) + case MessageCodes.VerifyFiles: + return this.parseVerifyFilesMessage(offset, length, bytes) + case MessageCodes.WriteFile: + return this.parseWriteFileMessage(offset, length, bytes) default: assert.fail(`unknown message code: ${code.toString(16)}`) } @@ -239,36 +240,51 @@ export class Parser { return new CommandCompleteMessage(length, text) } - private parseCopyData(offset: number, length: number, bytes: Buffer) { - const chunk = bytes.slice(offset, offset + (length - 4)) - return new CopyDataMessage(length, chunk) - } - - private parseCopyInMessage(offset: number, length: number, bytes: Buffer) { - return this.parseCopyMessage(offset, length, bytes, 'copyInResponse') - } - - private parseCopyOutMessage(offset: number, length: number, bytes: Buffer) { - return this.parseCopyMessage(offset, length, bytes, 'copyOutResponse') + private parseVerifyFilesMessage(offset: number, length: number, bytes: Buffer) { + this.reader.setBuffer(offset, bytes) + const numFiles = this.reader.int16() //int16 number of files, n + const fileNames: string[] = new Array(numFiles) + for (let i = 0; i < numFiles; i++) { + fileNames[i] = this.reader.cstring() //string[n], name of each file + } + const rejectFile = this.reader.cstring() //string reject file name + const exceptionFile = this.reader.cstring() //string exceptions file name + return new VerifyFilesMessage(length, numFiles, fileNames, rejectFile, exceptionFile) } - private parseCopyMessage(offset: number, length: number, bytes: Buffer, messageName: MessageName) { + private parseWriteFileMessage(offset: number, length: number, bytes: Buffer) { this.reader.setBuffer(offset, bytes) - const isBinary = this.reader.byte() !== 0 - const columnCount = this.reader.int16() - const message = new CopyResponse(length, messageName, isBinary, columnCount) - for (let i = 0; i < columnCount; i++) { - message.columnTypes[i] = this.reader.int16() + const fileName = this.reader.cstring() + const fileLength = this.reader.int32() + let fileContents: string | bigint[] + // if filename is empty, it means we used returnrejected instead of rejection file, the fileLength + // will be in mutliples of 8 bytes for each rejected row number in Little Endian 64 bit format + if (fileName.length === 0) { + fileContents = [] + for (let i = 0; i < fileLength; i += 8) { + fileContents.push(this.reader.int64LE()) + } + } else { + fileContents = this.reader.string(fileLength) } - return message + return new WriteFileMessage(length, fileName, fileLength, fileContents) + } + + private parseCopyInResponseMessage(offset: number, length: number, bytes: Buffer) { + this.reader.setBuffer(offset, bytes) + const isBinary = this.reader.byte() !== 0 + const columnCount = this.reader.int16() + const message = new CopyInResponseMessage(length, isBinary, columnCount) + for (let i = 0; i < columnCount; i++) { + message.columnFormats[i] = this.reader.int16() + } + return message } - private parseNotificationMessage(offset: number, length: number, bytes: Buffer) { + private parseLoadFileMessage(offset: number, length: number, bytes: Buffer) { this.reader.setBuffer(offset, bytes) - const processId = this.reader.int32() - const channel = this.reader.cstring() - const payload = this.reader.cstring() - return new NotificationResponseMessage(length, processId, channel, payload) + const fileName = this.reader.cstring() + return new LoadFileMessage(length, fileName) } private parseRowDescriptionMessage(offset: number, length: number, bytes: Buffer) { diff --git a/packages/v-protocol/src/serializer.ts b/packages/v-protocol/src/serializer.ts index f6353e51..978ab9f8 100644 --- a/packages/v-protocol/src/serializer.ts +++ b/packages/v-protocol/src/serializer.ts @@ -17,21 +17,25 @@ */ import { Writer } from './buffer-writer' +import { statSync } from 'fs'; const enum code { - startup = 0x70, - query = 0x51, - parse = 0x50, - bind = 0x42, - execute = 0x45, - flush = 0x48, - sync = 0x53, - end = 0x58, - close = 0x43, - describe = 0x44, - copyFromChunk = 0x64, - copyDone = 0x63, - copyFail = 0x66, + bind = 0x42, // B + close = 0x43, // C + describe = 0x44, // D + execute = 0x45, // E + verifiedFiles = 0x46, // F + flush = 0x48, // H + parse = 0x50, // P + query = 0x51, // Q + sync = 0x53, // S + end = 0x58, // X aka Terminate + copyDone = 0x63, // c + copyData = 0x64, // d + copyError = 0x65, // e + copyFail = 0x66, // f + endOfBatchRequest = 0x6A, // j + startup = 0x70 // p } const writer = new Writer() @@ -251,19 +255,53 @@ const close = (msg: PortalOpts): Buffer => { } const copyData = (chunk: Buffer): Buffer => { - return writer.add(chunk).flush(code.copyFromChunk) + return writer.add(chunk).flush(code.copyData) +} + +const copyError = (fileName: string, lineNumber: number, methodName: string, errorMsg: string): Buffer => { + writer.addCString(fileName) + writer.addInt32(lineNumber) + writer.addCString(methodName) + writer.addCString(errorMsg) + return writer.flush(code.copyError) } const copyFail = (message: string): Buffer => { return cstringMessage(code.copyFail, message) } +type genericConfig = { + [key: string]: any; +} + +function getFileSize(filePath: string): number { + try { + const stats = statSync(filePath); + return stats.size; + } catch (error) { + return -1; // or throw an exception if you prefer + } +} + + +//numFiles: number, fileNames: string[], fileLengths: number[] +const verifiedFiles = (config: genericConfig): Buffer => { + writer.addInt16(config.numFiles) // In 3.15 this will be 'writer.addInt32(config.numFiles) + for(let i = 0; i < config.numFiles; i++) { + writer.addCString(config.fileNames[i]) + writer.addInt32(0) + writer.addInt32(getFileSize(config.fileNames[i])) + } + return writer.flush(code.verifiedFiles) +} + const codeOnlyBuffer = (code: code): Buffer => Buffer.from([code, 0x00, 0x00, 0x00, 0x04]) const flushBuffer = codeOnlyBuffer(code.flush) const syncBuffer = codeOnlyBuffer(code.sync) const endBuffer = codeOnlyBuffer(code.end) const copyDoneBuffer = codeOnlyBuffer(code.copyDone) +const endOfBatchRequestBuffer = codeOnlyBuffer(code.endOfBatchRequest) const serialize = { startup, @@ -280,8 +318,11 @@ const serialize = { end: () => endBuffer, copyData, copyDone: () => copyDoneBuffer, + copyError, copyFail, + EndOfBatchRequest: () => endOfBatchRequestBuffer, cancel, + verifiedFiles, } export { serialize } diff --git a/packages/v-protocol/src/testing/test-buffers.ts b/packages/v-protocol/src/testing/test-buffers.ts index c0bdf188..6e07cdcf 100644 --- a/packages/v-protocol/src/testing/test-buffers.ts +++ b/packages/v-protocol/src/testing/test-buffers.ts @@ -122,10 +122,6 @@ const buffers = { return new BufferList().join(true, '2') }, - notification: function (id: number, channel: string, payload: string) { - return new BufferList().addInt32(id).addCString(channel).addCString(payload).join(true, 'A') - }, - emptyQuery: function () { return new BufferList().join(true, 'I') }, @@ -150,15 +146,9 @@ const buffers = { return list.join(true, 'G') }, - copyOut: function (cols: number) { + loadFile: function (fileName: string) { const list = new BufferList() - // text mode - .addByte(0) - // column count - .addInt16(cols) - for (let i = 0; i < cols; i++) { - list.addInt16(i) - } + list.addCString(fileName) return list.join(true, 'H') }, diff --git a/packages/vertica-nodejs/lib/client.js b/packages/vertica-nodejs/lib/client.js index 2c1c946c..1bdc3510 100644 --- a/packages/vertica-nodejs/lib/client.js +++ b/packages/vertica-nodejs/lib/client.js @@ -272,12 +272,15 @@ class Client extends EventEmitter { con.on('emptyQuery', this._handleEmptyQuery.bind(this)) con.on('commandComplete', this._handleCommandComplete.bind(this)) con.on('parseComplete', this._handleParseComplete.bind(this)) - con.on('copyInResponse', this._handleCopyInResponse.bind(this)) - con.on('copyData', this._handleCopyData.bind(this)) - con.on('notification', this._handleNotification.bind(this)) con.on('parameterDescription', this._handleParameterDescription.bind(this)) con.on('parameterStatus', this._handleParameterStatus.bind(this)) con.on('bindComplete', this._handleBindComplete.bind(this)) + con.on('copyInResponse', this._handleCopyInResponse.bind(this)) + con.on('copyDoneResponse', this._handleCopyDoneResponse.bind(this)) + con.on('loadFile', this._handleLoadFile.bind(this)) + con.on('writeFile', this._handleWriteFile.bind(this)) + con.on('verifyFiles', this._handleVerifyFiles.bind(this)) + con.on('endOfBatchResponse', this._handleEndOfBatchResponse.bind(this)) } // TODO(bmc): deprecate pgpass "built in" integration since this.password can be a function @@ -419,7 +422,6 @@ class Client extends EventEmitter { this.emit('error', err) } - // handle error messages from the postgres backend _handleErrorMessage(msg) { if (this._connecting) { return this._handleErrorWhileConnecting(msg) @@ -478,12 +480,26 @@ class Client extends EventEmitter { this.activeQuery.handleCopyInResponse(this.connection) } - _handleCopyData(msg) { - this.activeQuery.handleCopyData(msg, this.connection) + _handleCopyDoneResponse(msg) { + this.activeQuery._handleCopyDoneResponse(msg, this.connection) + } + + _handleLoadFile(msg) { + // initiate copy data message transfer. + // What determines the size sent to the server in each message? + this.activeQuery.handleLoadFile(msg, this.connection) + } + + _handleWriteFile(msg) { + this.activeQuery.handleWriteFile(msg, this.connection) + } + + _handleVerifyFiles(msg) { + this.activeQuery.handleVerifyFiles(msg, this.connection) } - _handleNotification(msg) { - this.emit('notification', msg) + _handleEndOfBatchResponse() { + //noop } _handleNotice(msg) { diff --git a/packages/vertica-nodejs/lib/connection.js b/packages/vertica-nodejs/lib/connection.js index eddaf804..5043613e 100644 --- a/packages/vertica-nodejs/lib/connection.js +++ b/packages/vertica-nodejs/lib/connection.js @@ -24,6 +24,8 @@ const flushBuffer = serialize.flush() const syncBuffer = serialize.sync() const endBuffer = serialize.end() +const bufferSize = 65536 // 64KB + // TODO(bmc) support binary mode at some point class Connection extends EventEmitter { constructor(config) { @@ -288,14 +290,46 @@ class Connection extends EventEmitter { this._send(serialize.copyData(chunk)) } - endCopyFrom() { + sendCopyDone() { this._send(serialize.copyDone()) } + sendCopyError(fileName, lineNumber, methodName, errorMsg) { + this._send(serialize.copyError(fileName, lineNumber, methodName, errorMsg)) + } + sendCopyFail(msg) { this._send(serialize.copyFail(msg)) } + sendVerifiedFiles(msg) { + this._send(serialize.verifiedFiles(msg)) + } + + sendCopyData(msg) { + this._send(serialize.copyData(msg)) + } + + sendEndOfBatchRequest() { + this._send(serialize.EndOfBatchRequest()) + } + + sendCopyDataStream(msg) { + const buffer = Buffer.alloc(bufferSize); + const fd = fs.openSync(msg.fileName, 'r'); + let bytesRead = 0; + do { + // read bufferSize bytes from the file into our buffer starting at the current position in the file + bytesRead = fs.readSync(fd, buffer, 0, bufferSize, null); + if (bytesRead > 0) { + // Process the chunk (buffer.slice(0, bytesRead)) here + this.sendCopyData(buffer.subarray(0, bytesRead)) + } + } while (bytesRead > 0); + fs.closeSync(fd); + this.sendEndOfBatchRequest() + } + makeStatementName() { return "s" + Atomics.add(this.statementCounter, 0, 1) } diff --git a/packages/vertica-nodejs/lib/query.js b/packages/vertica-nodejs/lib/query.js index 928d31d8..60d131d1 100644 --- a/packages/vertica-nodejs/lib/query.js +++ b/packages/vertica-nodejs/lib/query.js @@ -18,6 +18,8 @@ const { EventEmitter } = require('events') const Result = require('./result') const utils = require('./utils') +const fs = require('fs') +const fsPromises = require('fs').promises class Query extends EventEmitter { constructor(config, values, callback) { @@ -250,8 +252,71 @@ class Query extends EventEmitter { connection.sendCopyFail('No source stream defined') } + async handleVerifyFiles(msg, connection) { + try { // Check if the data file can be read + await fsPromises.access(msg.files[0], fs.constants.R_OK); + } catch (readInputFileErr) { // Can't open input file for reading, send CopyError + console.log(readInputFileErr.code) + connection.sendCopyError(msg.files[0], 0, '', "Unable to open input file for reading") + return; + } + if (msg.rejectFile) { + try { // Check if the rejections file can be written to, if specified + await fsPromises.access(msg.rejectFile, fs.constants.W_OK); + } catch (writeRejectsFileErr) { + if (writeRejectsFileErr.code === 'ENOENT') { // file doesn't exist, see if we can create it + try { + const rejectHandle = await fsPromises.open(msg.rejectFile, 'w'); + await rejectHandle.close() + } catch (createErr) { // can't open or create output file for writing, send CopyError + connection.sendCopyError(msg.rejectFile, 0, '', "Unable to open or create rejects file for writing") + return + } + } else { // file exists but we can't open, likely permissions issue + connection.sendCopyError(msg.rejectFile, 0, '', "Reject file exists but could not be opened for writing") + return + } + } + } + if (msg.exceptionFile) { + try { // Check if the exceptions file can be written to, if specified + await fsPromises.access(msg.exceptionFile, fs.constants.W_OK); + } catch (writeExceptionsFileErr) { // Can't open exceptions output file for writing, send CopyError + if (writeExceptionsFileErr.code === 'ENOENT') { // file doesn't exist, see if we can create it + try { + const exceptionHandle = await fsPromises.open(msg.exceptionFile, 'w'); + await exceptionHandle.close() + } catch (createErr) { // can't open or create output file for writing, send CopyError + connection.sendCopyError(msg.exceptionFile, 0, '', "Unable to open or create exception file for writing") + return + } + } else { // file exists but we can't open, likely permissions issue + connection.sendCopyError(msg.rejectFile, 0, '', "Exception file exists but could not be opened for writing") + return + } + } + } + connection.sendVerifiedFiles(msg); // All files are verified + } + + handleLoadFile(msg, connection) { + connection.sendCopyDataStream(msg) + } + + handleWriteFile(msg, connection) { + if (msg.fileName.length === 0) { //using returnrejected, fileContents is an array of row numbers, not a string + this._result._setRejectedRows(msg.fileContents) + } else { // future enhancement, move file IO to util + fs.appendFile(msg.fileName, msg.fileContents, (err) => { + if (err) { + console.error('Error writing to file:', err); + } + }); + } + } + // eslint-disable-next-line no-unused-vars - handleCopyData(msg, connection) { + handleCopyDoneResponse(msg, connection) { // noop } } diff --git a/packages/vertica-nodejs/lib/result.js b/packages/vertica-nodejs/lib/result.js index febacb18..40be51ff 100644 --- a/packages/vertica-nodejs/lib/result.js +++ b/packages/vertica-nodejs/lib/result.js @@ -36,6 +36,16 @@ class Result { } } + #rejectedRows = [] + + getRejectedRows() { + return this.#rejectedRows + } + + _setRejectedRows(rows) { + this.#rejectedRows = rows + } + // adds a command complete message addCommandComplete(msg) { var match diff --git a/packages/vertica-nodejs/mochatest/integration/client/copy-tests.js b/packages/vertica-nodejs/mochatest/integration/client/copy-tests.js new file mode 100644 index 00000000..27667776 --- /dev/null +++ b/packages/vertica-nodejs/mochatest/integration/client/copy-tests.js @@ -0,0 +1,204 @@ +'use strict' +const vertica = require('../../../lib') +const assert = require('assert') +const path = require('path') +const fs = require('fs') + +describe('Running Copy Commands', function () { + // global pool to use for queries + const pool = new vertica.Pool() + + // global file names and paths + const goodFileName = "copy-good.dat" + const badFileName = "copy-bad.dat" + const goodFilePath = path.join(process.cwd(), goodFileName); + const badFilePath = path.join(process.cwd(), badFileName) + const goodFileContents = "1|'a'\n2|'b'\n3|'c'\n4|'d'\n5|'e'" // 5 correctly formatted rows + const badFileContents = "1|'a'\n'b'|2\n3|'c'\n'd'|4\n5|'e'" // rows 2 and 4 malformed + + // generate temporary test files, create table before tests begin + before((done) => { + fs.writeFile(goodFilePath, goodFileContents, () => { + fs.writeFile(badFilePath, badFileContents, () => { + pool.query("CREATE TABLE copyTable (num int, let char)", (done)) + }) + }) + }) + + // delete temporary test files, drop table after tests are complete + after((done) => { + fs.unlink(goodFilePath, () => { + fs.unlink(badFilePath, () => { + pool.query("DROP TABLE IF EXISTS copyTable", () => { + pool.end(done) + }) + }) + }) + }) + + // remove data from table between tests + afterEach((done) => { + pool.query("DELETE FROM copyTable", (done)) + }) + + it('succeeds with basic copy from file command', function(done) { + pool.query("COPY copyTable FROM LOCAL 'copy-good.dat' RETURNREJECTED", (err, res) => { + assert.equal(err, undefined) + assert.equal(res.rows[0]['Rows Loaded'], 5) // 5 good rows in goodFileContents + assert.deepEqual(res.getRejectedRows(), []) + done() + }) + }) + + it('returns rejected rows with RETURNREJECTED specified', function(done) { + pool.query("COPY copyTable FROM LOCAL 'copy-bad.dat' RETURNREJECTED", (err, res) => { + assert.equal(err, undefined) + assert.equal(res.rows[0]['Rows Loaded'], 3) // 3 good rows in badFileContents + assert.deepEqual(res.getRejectedRows(), [2, 4]) // rows 2 and 4 are malformed + }) + done() + }) + + it('writes rejects to file with REJECTED DATA specified', function (done) { + pool.query("COPY copyTable FROM LOCAL 'copy-bad.dat' REJECTED DATA 'rejects.txt'", (err, res) => { + try { + assert.equal(err, undefined) + assert.equal(res.rows[0]['Rows Loaded'], 3) // 3 good rows in badFileContents + fs.readFile('rejects.txt', 'utf8', (err, data) => { + assert.equal(err, undefined) + assert.equal(data, "'b'|2\n'd'|4\n") // rows 2 and 4 are malformed + }) + } finally { + fs.unlink('rejects.txt', done) + } + }) + }) + + it('succeeds when data file is larger than buffer size requiring multiple copyData messages', function(done) { + const largeFilePath = path.join(process.cwd(), "large-copy.dat") + const writableStream = fs.createWriteStream(largeFilePath, { encoding: 'utf8' }); + const bytesPerLine = 6 // single quote + letter + single quote + bar + integer + newline = 6 bytes + const desiredFileSize = 66000 // 65536 is our max buffer size. This will force multiple copyData messages + const requiredLines = desiredFileSize / bytesPerLine + + for (let i = 1; i <= requiredLines; i++) { + const char = String.fromCharCode('a'.charCodeAt(0) + (i % 26)); // a - z + const line = `${i}|'${char}'\n` + writableStream.write(line) + } + writableStream.end() + + pool.query("COPY copyTable FROM LOCAL 'large-copy.dat' RETURNREJECTED", (err, res) => { + try { + assert.equal(err, undefined) + assert.equal(res.rows[0]['Rows Loaded'], requiredLines) + assert.deepEqual(res.getRejectedRows(), []) + } finally { + fs.unlink(largeFilePath, done) + } + }) + }) + + it('succeeds with binary copy local files', function(done) { + const binaryFileContents = Buffer.from(goodFileContents, 'utf-8') + const binaryFilePath = path.join(process.cwd(), 'binary-copy.bin') + fs.writeFile(binaryFilePath, binaryFileContents, () => { + pool.query("COPY copyTable FROM LOCAL 'binary-copy.bin' RETURNREJECTED", (err, res) => { + try { + assert.equal(err, undefined) + assert.equal(res.rows[0]['Rows Loaded'], 5) + assert.deepEqual(res.getRejectedRows(), []) + } finally { + fs.unlink(binaryFilePath, done) + } + }) + }) + }) + + it('behaves properly when input file does not exist', function(done) { + pool.query("COPY copyTable FROM LOCAL 'nonexistant.dat' RETURNREJECTED", (err) => { + assert.ok(err.message.includes("Unable to open input file for reading ")) + done() + }) + }) + + it ('behaves properly when rejects file cannot be written to', function(done) { + const readOnlyFilePath = path.join(process.cwd(), 'readOnlyRejects.txt') + fs.writeFile(readOnlyFilePath, '', () => { + fs.chmod(readOnlyFilePath, 0o444, () => { + pool.query("COPY copyTable FROM LOCAL 'copy-good.dat' REJECTED DATA 'readOnlyRejects.txt'", (err) => { + try { + assert.ok(err.message.includes("Reject file exists but could not be opened for writing")) + } finally { + fs.unlink(readOnlyFilePath, done) + } + }) + }) + }) + }) + + it ('behaves properly when exceptions file cannot be written to', function(done) { + const readOnlyFilePath = path.join(process.cwd(), 'readOnlyExceptions.txt') + fs.writeFile(readOnlyFilePath, '', () => { + fs.chmod(readOnlyFilePath, 0o444, () => { + pool.query("COPY copyTable FROM LOCAL 'copy-good.dat' EXCEPTIONS 'readOnlyExceptions.txt'", (err) => { + try { + assert.ok(err.message.includes("Exception file exists but could not be opened for writing")) + } finally { + fs.unlink(readOnlyFilePath, done) + } + }) + }) + }) + }) + + it ('succeeds with rejects file larger than buffer size', function(done) { + // file logic copied from good large copy file test, but with the columns switched so they are all bad instead + const largeFilePath = path.join(process.cwd(), "large-copy-bad.dat") + const writableStream = fs.createWriteStream(largeFilePath, { encoding: 'utf8' }); + const bytesPerLine = 6 // single quote + letter + single quote + bar + integer + newline = 6 bytes + const desiredFileSize = 66000 // 65536 is our max buffer size. This will force multiple copyData messages + const requiredLines = desiredFileSize / bytesPerLine + + for (let i = 1; i <= requiredLines; i++) { + const char = String.fromCharCode('a'.charCodeAt(0) + (i % 26)); // a - z + const line = `'${char}'|${i}\n` + writableStream.write(line) + } + writableStream.end() + + pool.query("COPY copyTable FROM LOCAL 'large-copy-bad.dat' REJECTED DATA 'rejects-large.txt'", (err, res) => { + try { + assert.equal(err, undefined); + assert.equal(res.rows[0]['Rows Loaded'], 0); + assert.deepEqual(res.getRejectedRows(), []); + + fs.stat('rejects-large.txt', (statErr, stats) => { + assert.equal(statErr, null); + assert.equal(stats.size, 98894); + }); + } finally { + fs.unlink('large-copy-bad.dat', () => { + fs.unlink('rejects-large.txt', done) + }); + } + }); + + }) + it ('behaves properly with ABORT ON ERROR', function(done) { + done() + }) + + it('succeeds using glob patterns', function(done) { + done() + }) + + it('succeeds with multiple input files', function(done) { + done() + }) + + it('succeeds with basic copy from stdin command', function(done) { + //todo + done() + }) +}) diff --git a/packages/vertica-nodejs/test/integration/connection/copy-tests.js b/packages/vertica-nodejs/test/integration/connection/copy-tests.js deleted file mode 100644 index 7f1a3c66..00000000 --- a/packages/vertica-nodejs/test/integration/connection/copy-tests.js +++ /dev/null @@ -1,50 +0,0 @@ -'use strict' -var helper = require('./test-helper') -var assert = require('assert') - -// COPY IS NOT YET SUPPORTED -test('COPY', function () { - console.log(" -- COPY functionality not yet supported") -}) -/* -test('COPY FROM events check', function () { - helper.connect(function (con) { - var stdinStream = con.query('COPY person FROM STDIN') - con.on('copyInResponse', function () { - con.endCopyFrom() - }) - assert.emits( - con, - 'copyInResponse', - function () { - con.endCopyFrom() - }, - 'backend should emit copyInResponse after COPY FROM query' - ) - assert.emits( - con, - 'commandComplete', - function () { - con.end() - }, - 'backend should emit commandComplete after COPY FROM stream ends' - ) - }) -}) - -test('COPY TO events check', function () { - helper.connect(function (con) { - var stdoutStream = con.query('COPY person TO STDOUT') - assert.emits(con, 'copyOutResponse', function () {}, 'backend should emit copyOutResponse after COPY TO query') - assert.emits(con, 'copyData', function () {}, 'backend should emit copyData on every data row') - assert.emits( - con, - 'copyDone', - function () { - con.end() - }, - 'backend should emit copyDone after all data rows' - ) - }) -}) -*/ \ No newline at end of file diff --git a/packages/vertica-nodejs/test/integration/connection/query-tests.js b/packages/vertica-nodejs/test/integration/connection/query-tests.js index 4105bb71..26450cea 100644 --- a/packages/vertica-nodejs/test/integration/connection/query-tests.js +++ b/packages/vertica-nodejs/test/integration/connection/query-tests.js @@ -7,7 +7,7 @@ var rows = [] // it's cumbersome to use the api this way test('simple query', function () { helper.connect(function (con) { - con.query('select * from ids') + con.query('select id from ids order by id asc') assert.emits(con, 'dataRow') con.on('dataRow', function (msg) { rows.push(msg.fields) diff --git a/packages/vertica-nodejs/test/test-buffers.js b/packages/vertica-nodejs/test/test-buffers.js index fc73b7fb..692125bd 100644 --- a/packages/vertica-nodejs/test/test-buffers.js +++ b/packages/vertica-nodejs/test/test-buffers.js @@ -130,10 +130,6 @@ buffers.bindComplete = function () { return new BufferList().join(true, '2') } -buffers.notification = function (id, channel, payload) { - return new BufferList().addInt32(id).addCString(channel).addCString(payload).join(true, 'A') -} - buffers.emptyQuery = function () { return new BufferList().join(true, 'I') } diff --git a/packages/vertica-nodejs/test/unit/client/notification-tests.js b/packages/vertica-nodejs/test/unit/client/notification-tests.js deleted file mode 100644 index 4661b8d0..00000000 --- a/packages/vertica-nodejs/test/unit/client/notification-tests.js +++ /dev/null @@ -1,10 +0,0 @@ -'use strict' -var helper = require('./test-helper') - -test('passes connection notification', async function () { - var client = await helper.client() - assert.emits(client, 'notice', function (msg) { - assert.equal(msg, 'HAY!!') - }) - client.connection.emit('notice', 'HAY!!') -}) diff --git a/packages/vertica-nodejs/test/unit/connection/inbound-parser-tests.js b/packages/vertica-nodejs/test/unit/connection/inbound-parser-tests.js index f396b7d9..931a9967 100644 --- a/packages/vertica-nodejs/test/unit/connection/inbound-parser-tests.js +++ b/packages/vertica-nodejs/test/unit/connection/inbound-parser-tests.js @@ -143,14 +143,6 @@ var expectedMD5PasswordMessage = { name: 'authenticationMD5Password', } -var notificationResponseBuffer = buffers.notification(4, 'hi', 'boom') -var expectedNotificationResponseMessage = { - name: 'notification', - processId: 4, - channel: 'hi', - payload: 'boom', -} - test('Connection', function () { testForMessage(authOkBuffer, expectedAuthenticationOkayMessage) testForMessage(plainPasswordBuffer, expectedPlainPasswordMessage) @@ -163,7 +155,6 @@ test('Connection', function () { testForMessage(backendKeyDataBuffer, expectedBackendKeyDataMessage) testForMessage(readyForQueryBuffer, expectedReadyForQueryMessage) testForMessage(commandCompleteBuffer, expectedCommandCompleteMessage) - testForMessage(notificationResponseBuffer, expectedNotificationResponseMessage) test('empty row message', function () { var message = testForMessage(emptyRowDescriptionBuffer, expectedEmptyRowDescriptionMessage) test('has no fields', function () { @@ -372,13 +363,6 @@ test('Connection', function () { name: 'portalSuspended', }) }) - - test('parses replication start message', function () { - testForMessage(Buffer.from([0x57, 0x00, 0x00, 0x00, 0x04]), { - name: 'replicationStart', - length: 4, - }) - }) }) // since the data message on a stream can randomly divide the incomming