Skip to content

Commit

Permalink
Support for copy local from file (vertica#128)
Browse files Browse the repository at this point in the history
* Copy from local file implementation and testing
  • Loading branch information
DMickens authored Jan 19, 2024
1 parent ac13032 commit 12438af
Show file tree
Hide file tree
Showing 17 changed files with 546 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,27 @@ export type MessageName =
| 'closeComplete'
| 'noData'
| 'portalSuspended'
| 'replicationStart'
| 'emptyQuery'
| 'copyDone'
| 'copyData'
| 'rowDescription'
| 'parameterDescription'
| 'parameterStatus'
| 'commandDescription'
| 'backendKeyData'
| 'notification'
| 'readyForQuery'
| 'commandComplete'
| 'dataRow'
| 'copyInResponse'
| 'copyOutResponse'
| 'loadFile'
| 'authenticationOk'
| 'authenticationMD5Password'
| 'authenticationSHA512Password'
| 'authenticationCleartextPassword'
| 'error'
| 'notice'
| 'verifyFiles'
| 'endOfBatchResponse'
| 'writeFile'

export interface BackendMessage {
name: MessageName
Expand Down Expand Up @@ -76,19 +76,19 @@ export const portalSuspended: BackendMessage = {
length: 5,
}

export const replicationStart: BackendMessage = {
name: 'replicationStart',
length: 4,
}

export const emptyQuery: BackendMessage = {
name: 'emptyQuery',
length: 4,
length: 5,
}

export const copyDone: BackendMessage = {
name: 'copyDone',
length: 4,
length: 5,
}

export const EndOfBatchResponse: BackendMessage = {
name: 'endOfBatchResponse',
length: 5
}

interface NoticeOrError {
Expand Down Expand Up @@ -133,23 +133,6 @@ export class DatabaseError extends Error implements NoticeOrError {
}
}

export class CopyDataMessage {
public readonly name = 'copyData'
constructor(public readonly length: number, public readonly chunk: Buffer) {}
}

export class CopyResponse {
public readonly columnTypes: number[]
constructor(
public readonly length: number,
public readonly name: MessageName,
public readonly binary: boolean,
columnCount: number
) {
this.columnTypes = new Array(columnCount)
}
}

export class Field {
constructor(
public readonly name: string,
Expand Down Expand Up @@ -186,6 +169,26 @@ export class Parameter {
) {}
}

export class LoadFileMessage {
public readonly name: MessageName = 'loadFile'
constructor (
public readonly length: number,
public readonly fileName: string
) {}
}

export class CopyInResponseMessage {
public readonly name: MessageName = 'copyInResponse'
public readonly columnFormats: number[]
constructor (
public readonly length: number,
public readonly isBinary: boolean,
public readonly numColumns: number,
) {
this.columnFormats = new Array(this.numColumns)
}
}

export class ParameterDescriptionMessage {
public readonly name: MessageName = 'parameterDescription'
//public readonly nonNativeTyeps: number //breadcrumb for non native types
Expand Down Expand Up @@ -225,16 +228,6 @@ export class BackendKeyDataMessage {
constructor(public readonly length: number, public readonly processID: number, public readonly secretKey: number) {}
}

export class NotificationResponseMessage {
public readonly name: MessageName = 'notification'
constructor(
public readonly length: number,
public readonly processId: number,
public readonly channel: string,
public readonly payload: string
) {}
}

export class ReadyForQueryMessage {
public readonly name: MessageName = 'readyForQuery'
constructor(public readonly length: number, public readonly status: string) {}
Expand Down Expand Up @@ -273,3 +266,25 @@ export class NoticeMessage implements BackendMessage, NoticeOrError {
public line: string | undefined
public routine: string | undefined
}

export class VerifyFilesMessage {
public readonly name: MessageName = 'verifyFiles'
public readonly fileNames: string[]
constructor(public readonly length: number,
public numFiles: number,
public files: string[],
public readonly rejectFile: string,
public readonly exceptionFile: string)
{
this.fileNames = [...files] // shallow copy
}
}

export class WriteFileMessage {
public readonly name: MessageName = 'writeFile'
constructor(public readonly length: number,
public fileName: string,
public fileLength: number,
public fileContents: string | bigint[] ) {}
}

7 changes: 7 additions & 0 deletions packages/v-protocol/src/buffer-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ export class BufferReader {
return result
}

//signed 64 bit little endian
public int64LE(): bigint {
const result = this.buffer.readBigInt64LE(this.offset)
this.offset += 8
return result
}

public string(length: number): string {
const result = this.buffer.toString(this.encoding, this.offset, this.offset + length)
this.offset += length
Expand Down
53 changes: 11 additions & 42 deletions packages/v-protocol/src/inbound-parser.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import BufferList from './testing/buffer-list'
import { parse } from '.'
import assert from 'assert'
import { PassThrough } from 'stream'
import { BackendMessage } from './messages'
import { BackendMessage } from './backend-messages'

var authOkBuffer = buffers.authenticationOk()
var paramStatusBuffer = buffers.parameterStatus('client_encoding', 'UTF8')
Expand Down Expand Up @@ -193,7 +193,7 @@ var expectedTwoParameterMessage = {
}

var testForMessage = function (buffer: Buffer, expectedMessage: any) {
it('recieves and parses ' + expectedMessage.name, async () => {
it('receives and parses ' + expectedMessage.name, async () => {
const messages = await parseBuffers([buffer])
const [lastMessage] = messages

Expand All @@ -215,14 +215,6 @@ var expectedMD5PasswordMessage = {
salt: Buffer.from([1, 2, 3, 4]),
}

var notificationResponseBuffer = buffers.notification(4, 'hi', 'boom')
var expectedNotificationResponseMessage = {
name: 'notification',
processId: 4,
channel: 'hi',
payload: 'boom',
}

const parseBuffers = async (buffers: Buffer[]): Promise<BackendMessage[]> => {
const stream = new PassThrough()
for (const buffer of buffers) {
Expand All @@ -242,10 +234,9 @@ describe('PgPacketStream', function () {
testForMessage(backendKeyDataBuffer, expectedBackendKeyDataMessage)
testForMessage(readyForQueryBuffer, expectedReadyForQueryMessage)
testForMessage(commandCompleteBuffer, expectedCommandCompleteMessage)
testForMessage(notificationResponseBuffer, expectedNotificationResponseMessage)
testForMessage(buffers.emptyQuery(), {
name: 'emptyQuery',
length: 4,
length: 5,
})

testForMessage(Buffer.from([0x6e, 0, 0, 0, 4]), {
Expand Down Expand Up @@ -389,51 +380,29 @@ describe('PgPacketStream', function () {
})
})

describe('parses replication start message', function () {
testForMessage(Buffer.from([0x57, 0x00, 0x00, 0x00, 0x04]), {
name: 'replicationStart',
length: 4,
})
})

describe('copy', () => {
testForMessage(buffers.copyIn(0), {
name: 'copyInResponse',
length: 7,
binary: false,
columnTypes: [],
isBinary: false,
columnFormats: [],
})

testForMessage(buffers.copyIn(2), {
name: 'copyInResponse',
length: 11,
binary: false,
columnTypes: [0, 1],
isBinary: false,
columnFormats: [0, 1],
})

testForMessage(buffers.copyOut(0), {
name: 'copyOutResponse',
length: 7,
binary: false,
columnTypes: [],
})

testForMessage(buffers.copyOut(3), {
name: 'copyOutResponse',
length: 13,
binary: false,
columnTypes: [0, 1, 2],
testForMessage(buffers.loadFile('sampleFile'), {
name: 'loadFile',
length: 15,
})

testForMessage(buffers.copyDone(), {
name: 'copyDone',
length: 4,
})

testForMessage(buffers.copyData(Buffer.from([5, 6, 7])), {
name: 'copyData',
length: 7,
chunk: Buffer.from([5, 6, 7]),
length: 5,
})
})

Expand Down
2 changes: 1 addition & 1 deletion packages/v-protocol/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* =============================================================================
*/

import { BackendMessage, DatabaseError } from './messages'
import { BackendMessage, DatabaseError } from './backend-messages'
import { serialize } from './serializer'
import { VerticaType } from './vertica-types'
import { Parser, MessageCallback } from './parser'
Expand Down
Loading

0 comments on commit 12438af

Please sign in to comment.