Skip to content

Commit

Permalink
Add ItemSelector support
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Nov 12, 2024
1 parent 35b84b6 commit 2cd4ab2
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 2 deletions.
17 changes: 15 additions & 2 deletions lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def initialize(workflow, name, payload)
@item_processor = ItemProcessor.new(payload["ItemProcessor"], name)
@items_path = ReferencePath.new(payload.fetch("ItemsPath", "$"))
@item_reader = payload["ItemReader"]
@item_selector = payload["ItemSelector"]
@item_selector = PayloadTemplate.new(payload["ItemSelector"]) if payload["ItemSelector"]
@item_batcher = ItemBatcher.new(payload["ItemBatcher"], name + ["ItemBatcher"]) if payload["ItemBatcher"]
@result_writer = payload["ResultWriter"]
@max_concurrency = payload["MaxConcurrency"]&.to_i
Expand All @@ -53,7 +53,20 @@ def start(context)

input = process_input(context)

context.state["ItemProcessorContext"] = input.map { |item| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => item.to_json).to_h }
context.state["ItemProcessorContext"] = input.map.with_index do |item, index|
item_processor_context = {
"Execution" => {
"Id" => context.execution["Id"]
},
"Map" => {
"Item" => {"Index" => index, "Value" => item}
}
}

item_processor_input = item_selector ? item_selector.value(item_processor_context, context.state["Input"]) : item

Context.new(item_processor_context, :input => item_processor_input.to_json).to_h
end
end

def end?
Expand Down
34 changes: 34 additions & 0 deletions spec/workflow/states/map_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,40 @@
end
end

context "with an ItemSelector" do
let(:input) { {"delivery-partner" => "ACME", "colors" => ["red", "green", "blue"]} }
let(:workflow) do
payload = {
"Validate-All" => {
"Type" => "Map",
"ItemsPath" => "$.colors",
"ItemSelector" => {
"index.$" => "$$.Map.Item.Index",
"value.$" => "$$.Map.Item.Value",
"courier.$" => "$.delivery-partner"
},
"MaxConcurrency" => 1,
"ItemProcessor" => {
"StartAt" => "Validate",
"States" => {
"Validate" => {
"Type" => "Pass",
"End" => true
}
}
},
"End" => true,
}
}
make_workflow(ctx, payload)
end

it "sets the context output" do
loop while state.run_nonblock!(ctx) != 0
expect(ctx.output).to eq([{"index" => 0, "value" => "red", "courier" => "ACME"}, {"index" => 1, "value" => "green", "courier" => "ACME"}, {"index" => 2, "value" => "blue", "courier" => "ACME"}])
end
end

context "with an ItemBatcher" do
let(:input) { {"foo" => "bar", "colors" => ["red", "green", "blue"]} }
let(:workflow) do
Expand Down

0 comments on commit 2cd4ab2

Please sign in to comment.