diff --git a/archives/messages.go b/archives/messages.go index 634b209..3a8c859 100644 --- a/archives/messages.go +++ b/archives/messages.go @@ -18,7 +18,7 @@ const ( visibilityDeletedBySender = "X" ) -const lookupMsgs = ` +const sqlLookupMsgs = ` SELECT rec.visibility, row_to_json(rec) FROM ( SELECT mm.id, @@ -58,8 +58,7 @@ SELECT rec.visibility, row_to_json(rec) FROM ( LEFT JOIN LATERAL (select coalesce(jsonb_agg(label_row), '[]'::jsonb) as data from (select uuid, name from msgs_label ml INNER JOIN msgs_msg_labels mml ON ml.id = mml.label_id AND mml.msg_id = mm.id) as label_row) as labels_agg ON True WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 -ORDER BY created_on ASC, id ASC) rec; -` +ORDER BY created_on ASC, id ASC) rec;` // writeMessageRecords writes the messages in the archive's date range to the passed in writer func writeMessageRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) { @@ -69,7 +68,7 @@ func writeMessageRecords(ctx context.Context, db *sqlx.DB, archive *Archive, wri // first write our normal records var record, visibility string - rows, err := db.QueryxContext(ctx, lookupMsgs, archive.Org.ID, archive.StartDate, archive.endDate()) + rows, err := db.QueryxContext(ctx, sqlLookupMsgs, archive.Org.ID, archive.StartDate, archive.endDate()) if err != nil { return 0, errors.Wrapf(err, "error querying messages for org: %d", archive.Org.ID) } @@ -93,28 +92,21 @@ func writeMessageRecords(ctx context.Context, db *sqlx.DB, archive *Archive, wri return recordCount, nil } -const selectOrgMessagesInRange = ` -SELECT mm.id, mm.visibility -FROM msgs_msg mm +const sqlSelectOrgMessagesInRange = ` + SELECT mm.id, mm.visibility + FROM msgs_msg mm LEFT JOIN contacts_contact cc ON cc.id = mm.contact_id -WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 -ORDER BY mm.created_on ASC, mm.id ASC -` + WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 + ORDER BY mm.created_on ASC, mm.id ASC` -const deleteMessageLogs = ` -DELETE FROM channels_channellog -WHERE msg_id IN(?) -` +const sqlDeleteChannelLogs = ` +DELETE FROM channels_channellog WHERE msg_id IN(?)` -const deleteMessageLabels = ` -DELETE FROM msgs_msg_labels -WHERE msg_id IN(?) -` +const sqlDeleteMessageLabels = ` +DELETE FROM msgs_msg_labels WHERE msg_id IN(?)` -const deleteMessages = ` -DELETE FROM msgs_msg -WHERE id IN(?) -` +const sqlDeleteMessages = ` +DELETE FROM msgs_msg WHERE id IN(?)` // DeleteArchivedMessages takes the passed in archive, verifies the S3 file is still present (and correct), then selects // all the messages in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time @@ -147,7 +139,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 } // ok, archive file looks good, let's build up our list of message ids, this may be big but we are int64s so shouldn't be too big - rows, err := db.QueryxContext(outer, selectOrgMessagesInRange, archive.OrgID, archive.StartDate, archive.endDate()) + rows, err := db.QueryxContext(outer, sqlSelectOrgMessagesInRange, archive.OrgID, archive.StartDate, archive.endDate()) if err != nil { return err } @@ -193,19 +185,19 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 } // first delete any channel logs - err = executeInQuery(ctx, tx, deleteMessageLogs, idBatch) + err = executeInQuery(ctx, tx, sqlDeleteChannelLogs, idBatch) if err != nil { return errors.Wrap(err, "error removing channel logs") } // then any labels - err = executeInQuery(ctx, tx, deleteMessageLabels, idBatch) + err = executeInQuery(ctx, tx, sqlDeleteMessageLabels, idBatch) if err != nil { return errors.Wrap(err, "error removing message labels") } // finally, delete our messages - err = executeInQuery(ctx, tx, deleteMessages, idBatch) + err = executeInQuery(ctx, tx, sqlDeleteMessages, idBatch) if err != nil { return errors.Wrap(err, "error deleting messages") } @@ -239,27 +231,19 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 return nil } -const selectOldOrgBroadcasts = ` -SELECT - id -FROM - msgs_broadcast -WHERE - org_id = $1 AND - created_on < $2 AND - schedule_id IS NULL -ORDER BY - created_on ASC, - id ASC -LIMIT 1000000; -` +const sqlSelectOldOrgBroadcasts = ` + SELECT id + FROM msgs_broadcast + WHERE org_id = $1 AND created_on < $2 AND schedule_id IS NULL +ORDER BY created_on ASC, id ASC + LIMIT 1000000;` // DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no active messages on them func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error { start := dates.Now() threshhold := now.AddDate(0, 0, -org.RetentionPeriod) - rows, err := db.QueryxContext(ctx, selectOldOrgBroadcasts, org.ID, threshhold) + rows, err := db.QueryxContext(ctx, sqlSelectOldOrgBroadcasts, org.ID, threshhold) if err != nil { return err } @@ -344,11 +328,7 @@ func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sq } if count > 0 { - logrus.WithFields(logrus.Fields{ - "elapsed": dates.Since(start), - "count": count, - "org_id": org.ID, - }).Info("completed deleting broadcasts") + logrus.WithFields(logrus.Fields{"elapsed": dates.Since(start), "count": count, "org_id": org.ID}).Info("completed deleting broadcasts") } return nil diff --git a/archives/runs.go b/archives/runs.go index 6cab301..1c29676 100644 --- a/archives/runs.go +++ b/archives/runs.go @@ -22,7 +22,7 @@ const ( RunStatusFailed = "F" ) -const lookupFlowRuns = ` +const sqlLookupRuns = ` SELECT rec.uuid, rec.exited_on, row_to_json(rec) FROM ( SELECT @@ -58,13 +58,12 @@ FROM ( WHERE fr.org_id = $1 AND fr.modified_on >= $2 AND fr.modified_on < $3 ORDER BY fr.modified_on ASC, id ASC -) as rec; -` +) as rec;` // writeRunRecords writes the runs in the archive's date range to the passed in writer func writeRunRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) { var rows *sqlx.Rows - rows, err := db.QueryxContext(ctx, lookupFlowRuns, archive.Org.ID, archive.StartDate, archive.endDate()) + rows, err := db.QueryxContext(ctx, sqlLookupRuns, archive.Org.ID, archive.StartDate, archive.endDate()) if err != nil { return 0, errors.Wrapf(err, "error querying run records for org: %d", archive.Org.ID) } @@ -96,18 +95,15 @@ func writeRunRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer return recordCount, nil } -const selectOrgRunsInRange = ` -SELECT fr.id, fr.status -FROM flows_flowrun fr +const sqlSelectOrgRunsInRange = ` + SELECT fr.id, fr.status + FROM flows_flowrun fr LEFT JOIN contacts_contact cc ON cc.id = fr.contact_id -WHERE fr.org_id = $1 AND fr.modified_on >= $2 AND fr.modified_on < $3 -ORDER BY fr.modified_on ASC, fr.id ASC -` + WHERE fr.org_id = $1 AND fr.modified_on >= $2 AND fr.modified_on < $3 + ORDER BY fr.modified_on ASC, fr.id ASC` -const deleteRuns = ` -DELETE FROM flows_flowrun -WHERE id IN(?) -` +const sqlDeleteRuns = ` +DELETE FROM flows_flowrun WHERE id IN(?)` // DeleteArchivedRuns takes the passed in archive, verifies the S3 file is still present (and correct), then selects // all the runs in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time @@ -140,7 +136,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie } // ok, archive file looks good, let's build up our list of run ids, this may be big but we are int64s so shouldn't be too big - rows, err := db.QueryxContext(outer, selectOrgRunsInRange, archive.OrgID, archive.StartDate, archive.endDate()) + rows, err := db.QueryxContext(outer, sqlSelectOrgRunsInRange, archive.OrgID, archive.StartDate, archive.endDate()) if err != nil { return err } @@ -189,7 +185,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie } // delete our runs - err = executeInQuery(ctx, tx, deleteRuns, idBatch) + err = executeInQuery(ctx, tx, sqlDeleteRuns, idBatch) if err != nil { return errors.Wrap(err, "error deleting runs") } diff --git a/testdb.sql b/testdb.sql index 4bacc63..7ac5212 100644 --- a/testdb.sql +++ b/testdb.sql @@ -1,6 +1,25 @@ CREATE EXTENSION IF NOT EXISTS HSTORE; +DROP TABLE IF EXISTS archives_archive CASCADE; +DROP TABLE IF EXISTS channels_channellog CASCADE; +DROP TABLE IF EXISTS channels_channel CASCADE; +DROP TABLE IF EXISTS flows_flowrun CASCADE; +DROP TABLE IF EXISTS flows_flow CASCADE; +DROP TABLE IF EXISTS msgs_broadcast_contacts CASCADE; +DROP TABLE IF EXISTS msgs_broadcast_groups CASCADE; +DROP TABLE IF EXISTS msgs_broadcast_urns CASCADE; +DROP TABLE IF EXISTS msgs_broadcastmsgcount CASCADE; +DROP TABLE IF EXISTS msgs_broadcast CASCADE; +DROP TABLE IF EXISTS msgs_label CASCADE; +DROP TABLE IF EXISTS msgs_msg_labels CASCADE; +DROP TABLE IF EXISTS msgs_msg CASCADE; +DROP TABLE IF EXISTS contacts_contacturn CASCADE; +DROP TABLE IF EXISTS contacts_contactgroup_contacts CASCADE; +DROP TABLE IF EXISTS contacts_contactgroup CASCADE; +DROP TABLE IF EXISTS contacts_contact CASCADE; +DROP TABLE IF EXISTS auth_user CASCADE; DROP TABLE IF EXISTS orgs_org CASCADE; + CREATE TABLE orgs_org ( id serial primary key, name character varying(255) NOT NULL, @@ -9,7 +28,6 @@ CREATE TABLE orgs_org ( created_on timestamp with time zone NOT NULL ); -DROP TABLE IF EXISTS channels_channel CASCADE; CREATE TABLE channels_channel ( id serial primary key, name character varying(255) NOT NULL, @@ -17,7 +35,6 @@ CREATE TABLE channels_channel ( org_id integer references orgs_org(id) on delete cascade ); -DROP TABLE IF EXISTS contacts_contact CASCADE; CREATE TABLE contacts_contact ( id serial primary key, is_active boolean NOT NULL, @@ -26,15 +43,12 @@ CREATE TABLE contacts_contact ( modified_by_id integer NOT NULL, modified_on timestamp with time zone NOT NULL, org_id integer NOT NULL references orgs_org(id) on delete cascade, - is_blocked boolean NOT NULL, name character varying(128), language character varying(3), uuid character varying(36) NOT NULL, - is_stopped boolean NOT NULL, fields jsonb ); -DROP TABLE IF EXISTS contacts_contacturn CASCADE; CREATE TABLE contacts_contacturn ( id serial primary key, contact_id integer, @@ -48,34 +62,28 @@ CREATE TABLE contacts_contacturn ( identity character varying(255) NOT NULL ); -DROP TABLE IF EXISTS contacts_contactgroup CASCADE; CREATE TABLE contacts_contactgroup ( id serial primary key, - uuid character varying(36) NOT NULL, + uuid uuid NOT NULL, name character varying(128) NOT NULL ); -DROP TABLE IF EXISTS contacts_contactgroup_contacts CASCADE; CREATE TABLE contacts_contactgroup_contacts ( id serial primary key, contactgroup_id integer NOT NULL, contact_id integer NOT NULL ); -DROP TABLE IF EXISTS flows_flow CASCADE; CREATE TABLE flows_flow ( id serial primary key, uuid character varying(36) NOT NULL, name character varying(128) NOT NULL ); -DROP TABLE IF EXISTS channels_channellog CASCADE; -DROP TABLE IF EXISTS msgs_msg_labels CASCADE; -DROP TABLE IF EXISTS msgs_msg CASCADE; CREATE TABLE msgs_msg ( id serial primary key, broadcast_id integer NULL, - uuid character varying(36) NULL, + uuid uuid NULL, text text NOT NULL, high_priority boolean NULL, created_on timestamp with time zone NOT NULL, @@ -96,50 +104,41 @@ CREATE TABLE msgs_msg ( contact_urn_id integer NULL references contacts_contacturn(id) on delete cascade, org_id integer NOT NULL references orgs_org(id) on delete cascade, flow_id integer NULL references flows_flow(id) on delete cascade, - metadata text, - topup_id integer + metadata text ); -DROP TABLE IF EXISTS msgs_broadcast_recipients; -DROP TABLE IF EXISTS msgs_broadcast; CREATE TABLE msgs_broadcast ( id serial primary key, - "text" hstore NOT NULL, - purged BOOLEAN NOT NULL, + text hstore NOT NULL, created_on timestamp with time zone NOT NULL, schedule_id int NULL, org_id integer NOT NULL references orgs_org(id) on delete cascade ); -DROP TABLE IF EXISTS msgs_broadcast_contacts; CREATE TABLE msgs_broadcast_contacts ( id serial primary key, broadcast_id integer NOT NULL, contact_id integer NOT NULL ); -DROP TABLE IF EXISTS msgs_broadcast_groups; CREATE TABLE msgs_broadcast_groups ( id serial primary key, broadcast_id integer NOT NULL, group_id integer NOT NULL ); -DROP TABLE IF EXISTS msgs_broadcast_urns; CREATE TABLE msgs_broadcast_urns ( id serial primary key, broadcast_id integer NOT NULL, contacturn_id integer NOT NULL ); -DROP TABLE IF EXISTS msgs_broadcastmsgcount; CREATE TABLE msgs_broadcastmsgcount ( id serial primary key, - "count" integer NOT NULL, + count integer NOT NULL, broadcast_id integer NOT NULL ); -DROP TABLE IF EXISTS msgs_label CASCADE; CREATE TABLE msgs_label ( id serial primary key, uuid character varying(36) NULL, @@ -152,18 +151,14 @@ CREATE TABLE msgs_msg_labels ( label_id integer NOT NULL ); -DROP TABLE IF EXISTS auth_user CASCADE; CREATE TABLE auth_user ( id serial primary key, username character varying(128) NOT NULL ); -DROP TABLE IF EXISTS api_webhookevent CASCADE; -DROP TABLE IF EXISTS flows_actionlog CASCADE; -DROP TABLE IF EXISTS flows_flowrun CASCADE; CREATE TABLE flows_flowrun ( id serial primary key, - uuid character varying(36) NOT NULL UNIQUE, + uuid uuid NOT NULL UNIQUE, responded boolean NOT NULL, contact_id integer NOT NULL references contacts_contact(id), flow_id integer NOT NULL references flows_flow(id), @@ -178,7 +173,6 @@ CREATE TABLE flows_flowrun ( delete_from_results boolean ); -DROP TABLE IF EXISTS archives_archive CASCADE; CREATE TABLE archives_archive ( id serial primary key, archive_type varchar(16) NOT NULL, @@ -218,16 +212,16 @@ INSERT INTO archives_archive(id, archive_type, created_on, start_date, period, r (NEXTVAL('archives_archive_id_seq'), 'message', '2017-09-02 00:00:00.000000+00', '2017-09-01 00:00:00.000000+00', 'M', 0, 0, '', '', TRUE, 0, 3), (NEXTVAL('archives_archive_id_seq'), 'message', '2017-10-08 00:00:00.000000+00', '2017-10-08 00:00:00.000000+00', 'D', 0, 0, '', '', TRUE, 0, 2); -INSERT INTO contacts_contact(id, is_active, created_by_id, created_on, modified_by_id, modified_on, org_id, is_blocked, name, language, uuid, is_stopped) VALUES -(1, TRUE, -1, '2017-11-10 21:11:59.890662+00', -1, '2017-11-10 21:11:59.890662+00', 1, FALSE, NULL, 'eng', 'c7a2dd87-a80e-420b-8431-ca48d422e924', FALSE), -(3, TRUE, -1, '2015-03-26 10:07:14.054521+00', -1, '2015-03-26 10:07:14.054521+00', 1, FALSE, NULL, NULL, '7a6606c7-ff41-4203-aa98-454a10d37209', TRUE), -(4, TRUE, -1, '2015-03-26 13:04:58.699648+00', -1, '2015-03-26 13:04:58.699648+00', 1, TRUE, NULL, NULL, '29b45297-15ad-4061-a7d4-e0b33d121541', FALSE), -(5, TRUE, -1, '2015-03-27 07:39:28.955051+00', -1, '2015-03-27 07:39:28.955051+00', 1, FALSE, 'John Doe', NULL, '51762bba-01a2-4c4e-b5cd-b182d0405cd4', FALSE), -(6, TRUE, -1, '2015-10-30 19:42:27.001837+00', -1, '2015-10-30 19:42:27.001837+00', 2, FALSE, 'Ajodinabiff Dane', NULL, '3e814add-e614-41f7-8b5d-a07f670a698f', FALSE), -(7, TRUE, -1, '2017-11-10 21:11:59.890662+00', -1, '2017-11-10 21:11:59.890662+00', 3, FALSE, 'Joanne Stone', NULL, '7051dff0-0a27-49d7-af1f-4494239139e6', FALSE), -(8, TRUE, -1, '2015-03-27 13:39:43.995812+00', -1, '2015-03-27 13:39:43.995812+00', 2, FALSE, NULL, 'fre', 'b46f6e18-95b4-4984-9926-dded047f4eb3', FALSE), -(9, TRUE, -1, '2017-11-10 21:11:59.890662+00', -1, '2017-11-10 21:11:59.890662+00', 2, FALSE, NULL, NULL, '9195c8b7-6138-4d84-ac56-5192cc3d8ceb', FALSE), -(10, TRUE, -1, '2016-08-22 14:20:05.690311+00', -1, '2016-08-22 14:20:05.690311+00', 2, FALSE, 'John Arbies', NULL, '2b8bd28d-43e0-4c34-a4bb-0f10b11fdb8a', FALSE); +INSERT INTO contacts_contact(id, is_active, created_by_id, created_on, modified_by_id, modified_on, org_id, name, language, uuid) VALUES +(1, TRUE, -1, '2017-11-10 21:11:59.890662+00', -1, '2017-11-10 21:11:59.890662+00', 1, NULL, 'eng', 'c7a2dd87-a80e-420b-8431-ca48d422e924'), +(3, TRUE, -1, '2015-03-26 10:07:14.054521+00', -1, '2015-03-26 10:07:14.054521+00', 1, NULL, NULL, '7a6606c7-ff41-4203-aa98-454a10d37209'), +(4, TRUE, -1, '2015-03-26 13:04:58.699648+00', -1, '2015-03-26 13:04:58.699648+00', 1, NULL, NULL, '29b45297-15ad-4061-a7d4-e0b33d121541'), +(5, TRUE, -1, '2015-03-27 07:39:28.955051+00', -1, '2015-03-27 07:39:28.955051+00', 1, 'John Doe', NULL, '51762bba-01a2-4c4e-b5cd-b182d0405cd4'), +(6, TRUE, -1, '2015-10-30 19:42:27.001837+00', -1, '2015-10-30 19:42:27.001837+00', 2, 'Ajodinabiff Dane', NULL, '3e814add-e614-41f7-8b5d-a07f670a698f'), +(7, TRUE, -1, '2017-11-10 21:11:59.890662+00', -1, '2017-11-10 21:11:59.890662+00', 3, 'Joanne Stone', NULL, '7051dff0-0a27-49d7-af1f-4494239139e6'), +(8, TRUE, -1, '2015-03-27 13:39:43.995812+00', -1, '2015-03-27 13:39:43.995812+00', 2, NULL, 'fre', 'b46f6e18-95b4-4984-9926-dded047f4eb3'), +(9, TRUE, -1, '2017-11-10 21:11:59.890662+00', -1, '2017-11-10 21:11:59.890662+00', 2, NULL, NULL, '9195c8b7-6138-4d84-ac56-5192cc3d8ceb'), +(10, TRUE, -1, '2016-08-22 14:20:05.690311+00', -1, '2016-08-22 14:20:05.690311+00', 2, 'John Arbies', NULL, '2b8bd28d-43e0-4c34-a4bb-0f10b11fdb8a'); INSERT INTO contacts_contacturn(id, contact_id, scheme, org_id, priority, path, display, identity) VALUES (1, 1, 'tel', 1, 50, '+12067791111', NULL, 'tel:+12067791111'), @@ -258,11 +252,11 @@ INSERT INTO flows_flow(id, uuid, name) VALUES (3, '3914b88e-625b-4603-bd9f-9319dc331c6b', 'Flow 3'), (4, 'cfa2371d-2f06-481d-84b2-d974f3803bb0', 'Flow 4'); -INSERT INTO msgs_broadcast(id, text, created_on, purged, org_id, schedule_id) VALUES -(1, 'eng=>"hello",fre=>"bonjour"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2, 1), -(2, 'base=>"hola"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2, NULL), -(3, 'base=>"not purged"'::hstore, '2017-08-12 19:11:59.890662+02:00', FALSE, 2, NULL), -(4, 'base=>"new"'::hstore, '2019-08-12 19:11:59.890662+02:00', FALSE, 2, NULL); +INSERT INTO msgs_broadcast(id, text, created_on, org_id, schedule_id) VALUES +(1, 'eng=>"hello",fre=>"bonjour"'::hstore, '2017-08-12 22:11:59.890662+02:00', 2, 1), +(2, 'base=>"hola"'::hstore, '2017-08-12 22:11:59.890662+02:00', 2, NULL), +(3, 'base=>"not purged"'::hstore, '2017-08-12 19:11:59.890662+02:00', 2, NULL), +(4, 'base=>"new"'::hstore, '2019-08-12 19:11:59.890662+02:00', 2, NULL); INSERT INTO msgs_msg(id, broadcast_id, uuid, text, created_on, sent_on, modified_on, direction, status, visibility, msg_type, attachments, channel_id, contact_id, contact_urn_id, org_id, flow_id, msg_count, error_count, next_attempt) VALUES (1, NULL, '2f969340-704a-4aa2-a1bd-2f832a21d257', 'message 1', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, NULL, 1, 0, '2017-08-12 21:11:59.890662+00'),