Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process event creation through SQS queue #195

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 23 additions & 36 deletions app/models/affiliation_identifier.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class AffiliationIdentifier < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze

include Queueable

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
Expand Down Expand Up @@ -102,43 +104,28 @@ def self.push_item(item)

# there can be one or more affiliation_identifier per DOI
Array.wrap(push_items).each do |iiitem|
# send to DataCite Event Data API
if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = "#{ENV['LAGOTTINO_URL']}/events"

data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
}

response = Maremma.post(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end
},
}

send_event_import_message(data.to_json)

Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue."
end

push_items.length
Expand Down
33 changes: 33 additions & 0 deletions app/models/concerns/queueable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module Queueable
extend ActiveSupport::Concern

require "aws-sdk-sqs"

class_methods do
def send_event_import_message(data)
send_message(data, shoryuken_class: "EventImportWorker", queue_name: "events")
end

private

def send_message(body, options = {})
sqs = Aws::SQS::Client.new
queue_name_prefix = ENV["SQS_PREFIX"].present? ? ENV["SQS_PREFIX"] : Rails.env
queue_url =
sqs.get_queue_url(queue_name: "#{queue_name_prefix}_#{options[:queue_name]}").queue_url
options[:shoryuken_class] ||= "EventImportWorker"

options = {
queue_url: queue_url,
message_attributes: {
"shoryuken_class" => {
string_value: options[:shoryuken_class], data_type: "String"
},
},
message_body: body.to_json,
}

sqs.send_message(options)
end
end
end
41 changes: 41 additions & 0 deletions app/models/event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
class Event < Base
def process_message(sqs_msg, data)
if data.blank?
Rails.logger.info("[Event Import] data object is blank.")
return
end

response = post_to_event_service(data.to_json)
handle_logging(response, log_prefix)
end

private

def post_to_event_service
Maremma.post(
"#{ENV['LAGOTTINO_URL']}/events",
data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")
end

def log_prefix
subj_id = data["data"]["attributes"]["subjId"]
relation_type_id = data["data"]["attributes"]["relationTypeId"]
obj_id = data["data"]["attributes"]["objId"]

"[EventImportWorker] #{subj_id} #{relation_type_id} #{obj_id}"
end

def handle_logging(response)
if response.status == 200 || response.status == 201
Rails.logger.info("#{log_prefix} pushed to the Event Data service.")
elsif response.status == 409
Rails.logger.info("#{log_prefix} already pushed to the Event Data service.")
elsif response.body["errors"].present?
Rails.logger.error("#{log_prefix} had an error: #{response.body["errors"]}")
Rails.logger.error(data.inspect)
end
end
end
59 changes: 23 additions & 36 deletions app/models/funder_identifier.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class FunderIdentifier < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze

include Queueable

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
Expand Down Expand Up @@ -92,43 +94,28 @@ def self.push_item(item)

# there can be one or more funder_identifier per DOI
Array.wrap(push_items).each do |iiitem|
# send to DataCite Event Data Query API
if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = "#{ENV['LAGOTTINO_URL']}/events"

data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
}

response = Maremma.post(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end
},
}

send_event_import_message(data.to_json)

Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue."
end

push_items.length
Expand Down
55 changes: 21 additions & 34 deletions app/models/name_identifier.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class NameIdentifier < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze

include Queueable

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
Expand Down Expand Up @@ -113,43 +115,28 @@ def self.push_item(item)

# there can be one or more name_identifier per DOI
Array.wrap(push_items).each do |iiitem|
# send to DataCite Event Data API
if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = "#{ENV['LAGOTTINO_URL']}/events"

data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
}
},
}

response = Maremma.post(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")
send_event_import_message(data.to_json)

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue."

# send to Profiles service, which then pushes to ORCID
if ENV["STAFF_PROFILES_ADMIN_TOKEN"].present?
Expand Down
59 changes: 23 additions & 36 deletions app/models/orcid_affiliation.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class OrcidAffiliation < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze

include Queueable

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
Expand Down Expand Up @@ -107,43 +109,28 @@ def self.push_item(item)

# there can be one or more affiliation_identifier per DOI
Array.wrap(push_items).each do |iiitem|
# send to DataCite Event Data API
if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = "#{ENV['LAGOTTINO_URL']}/events"

data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
}

response = Maremma.post(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end
},
}

send_event_import_message(data.to_json)

Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue."
end

total_push_items += push_items
Expand Down
Loading