Skip to content

Commit

Permalink
Blocks (#17)
Browse files Browse the repository at this point in the history
* - Rename `Message` to `Block` to better align with IOTA 2.0

* - Updated inx and iota.go
  • Loading branch information
alexsporn authored May 19, 2022
1 parent f40e16a commit bba54ea
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 154 deletions.
124 changes: 60 additions & 64 deletions core/mqtt/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,32 +55,32 @@ func (s *Server) PublishReceipt(r *inx.RawReceipt) {
s.PublishOnTopicIfSubscribed(topicReceipts, receipt)
}

func (s *Server) PublishMessage(msg *inx.RawMessage) {
func (s *Server) PublishBlock(msg *inx.RawBlock) {

message, err := msg.UnwrapMessage(serializer.DeSeriModeNoValidation, nil)
block, err := msg.UnwrapBlock(serializer.DeSeriModeNoValidation, nil)
if err != nil {
return
}

s.PublishRawOnTopicIfSubscribed(topicMessages, msg.GetData())
s.PublishRawOnTopicIfSubscribed(topicBlocks, msg.GetData())

switch payload := message.Payload.(type) {
switch payload := block.Payload.(type) {
case *iotago.Transaction:
s.PublishRawOnTopicIfSubscribed(topicMessagesTransaction, msg.GetData())
s.PublishRawOnTopicIfSubscribed(topicBlocksTransaction, msg.GetData())

switch p := payload.Essence.Payload.(type) {
case *iotago.TaggedData:
s.PublishRawOnTopicIfSubscribed(topicMessagesTransactionTaggedData, msg.GetData())
s.PublishRawOnTopicIfSubscribed(topicBlocksTransactionTaggedData, msg.GetData())
if len(p.Tag) > 0 {
txTaggedDataTagTopic := strings.ReplaceAll(topicMessagesTransactionTaggedDataTag, parameterTag, iotago.EncodeHex(p.Tag))
txTaggedDataTagTopic := strings.ReplaceAll(topicBlocksTransactionTaggedDataTag, parameterTag, iotago.EncodeHex(p.Tag))
s.PublishRawOnTopicIfSubscribed(txTaggedDataTagTopic, msg.GetData())
}
}

case *iotago.TaggedData:
s.PublishRawOnTopicIfSubscribed(topicMessagesTaggedData, msg.GetData())
s.PublishRawOnTopicIfSubscribed(topicBlocksTaggedData, msg.GetData())
if len(payload.Tag) > 0 {
taggedDataTagTopic := strings.ReplaceAll(topicMessagesTaggedDataTag, parameterTag, iotago.EncodeHex(payload.Tag))
taggedDataTagTopic := strings.ReplaceAll(topicBlocksTaggedDataTag, parameterTag, iotago.EncodeHex(payload.Tag))
s.PublishRawOnTopicIfSubscribed(taggedDataTagTopic, msg.GetData())
}

Expand All @@ -93,40 +93,40 @@ func (s *Server) PublishMessage(msg *inx.RawMessage) {
}
}

func (s *Server) hasSubscriberForTransactionIncludedMessage(transactionID *iotago.TransactionID) bool {
transactionTopic := strings.ReplaceAll(topicTransactionsIncludedMessage, parameterTransactionID, transactionID.ToHex())
func (s *Server) hasSubscriberForTransactionIncludedBlock(transactionID iotago.TransactionID) bool {
transactionTopic := strings.ReplaceAll(topicTransactionsIncludedBlock, parameterTransactionID, transactionID.ToHex())
return s.MQTTBroker.HasSubscribers(transactionTopic)
}

func (s *Server) PublishTransactionIncludedMessage(transactionID *iotago.TransactionID, message *inx.RawMessage) {
transactionTopic := strings.ReplaceAll(topicTransactionsIncludedMessage, parameterTransactionID, transactionID.ToHex())
s.PublishRawOnTopicIfSubscribed(transactionTopic, message.GetData())
func (s *Server) PublishTransactionIncludedBlock(transactionID iotago.TransactionID, block *inx.RawBlock) {
transactionTopic := strings.ReplaceAll(topicTransactionsIncludedBlock, parameterTransactionID, transactionID.ToHex())
s.PublishRawOnTopicIfSubscribed(transactionTopic, block.GetData())
}

func hexEncodedMessageIDsFromINXMessageIDs(s []*inx.MessageId) []string {
func hexEncodedBlockIDsFromINXBlockIDs(s []*inx.BlockId) []string {
results := make([]string, len(s))
for i, msgID := range s {
messageID := msgID.Unwrap()
results[i] = iotago.EncodeHex(messageID[:])
for i, blkID := range s {
blockID := blkID.Unwrap()
results[i] = iotago.EncodeHex(blockID[:])
}
return results
}

func (s *Server) PublishMessageMetadata(metadata *inx.MessageMetadata) {
func (s *Server) PublishBlockMetadata(metadata *inx.BlockMetadata) {

messageID := iotago.MessageIDToHexString(metadata.UnwrapMessageID())
singleMessageTopic := strings.ReplaceAll(topicMessageMetadata, parameterMessageID, messageID)
hasSingleMessageTopicSubscriber := s.MQTTBroker.HasSubscribers(singleMessageTopic)
hasAllMessagesTopicSubscriber := s.MQTTBroker.HasSubscribers(topicMessageMetadataReferenced)
blockID := metadata.UnwrapBlockID().ToHex()
singleBlockTopic := strings.ReplaceAll(topicBlockMetadata, parameterBlockID, blockID)
hasSingleBlockTopicSubscriber := s.MQTTBroker.HasSubscribers(singleBlockTopic)
hasAllBlocksTopicSubscriber := s.MQTTBroker.HasSubscribers(topicBlockMetadataReferenced)

if !hasSingleMessageTopicSubscriber && !hasAllMessagesTopicSubscriber {
if !hasSingleBlockTopicSubscriber && !hasAllBlocksTopicSubscriber {
return
}

response := &messageMetadataPayload{
MessageID: messageID,
Parents: hexEncodedMessageIDsFromINXMessageIDs(metadata.GetParents()),
Solid: metadata.GetSolid(),
response := &blockMetadataPayload{
BlockID: blockID,
Parents: hexEncodedBlockIDsFromINXBlockIDs(metadata.GetParents()),
Solid: metadata.GetSolid(),
}

referencedByIndex := metadata.GetReferencedByMilestoneIndex()
Expand All @@ -138,13 +138,13 @@ func (s *Server) PublishMessageMetadata(metadata *inx.MessageMetadata) {

var inclusionState string
switch metadata.GetLedgerInclusionState() {
case inx.MessageMetadata_NO_TRANSACTION:
case inx.BlockMetadata_NO_TRANSACTION:
inclusionState = "noTransaction"
case inx.MessageMetadata_CONFLICTING:
case inx.BlockMetadata_CONFLICTING:
inclusionState = "conflicting"
conflict := metadata.GetConflictReason()
response.ConflictReason = &conflict
case inx.MessageMetadata_INCLUDED:
case inx.BlockMetadata_INCLUDED:
inclusionState = "included"
}
response.LedgerInclusionState = &inclusionState
Expand All @@ -166,11 +166,11 @@ func (s *Server) PublishMessageMetadata(metadata *inx.MessageMetadata) {
return
}

if hasSingleMessageTopicSubscriber {
s.MQTTBroker.Send(singleMessageTopic, jsonPayload)
if hasSingleBlockTopicSubscriber {
s.MQTTBroker.Send(singleBlockTopic, jsonPayload)
}
if referenced && hasAllMessagesTopicSubscriber {
s.MQTTBroker.Send(topicMessageMetadataReferenced, jsonPayload)
if referenced && hasAllBlocksTopicSubscriber {
s.MQTTBroker.Send(topicBlockMetadataReferenced, jsonPayload)
}
}

Expand All @@ -182,12 +182,11 @@ func payloadForOutput(ledgerIndex uint32, output *inx.LedgerOutput, iotaOutput i

outputID := output.GetOutputId().Unwrap()
rawRawOutputJSON := json.RawMessage(rawOutputJSON)
transactionID := outputID.TransactionID()

return &outputPayload{
Metadata: &outputMetadataPayload{
MessageID: iotago.MessageIDToHexString(output.GetMessageId().Unwrap()),
TransactionID: transactionID.ToHex(),
BlockID: output.GetBlockId().Unwrap().ToHex(),
TransactionID: outputID.TransactionID().ToHex(),
Spent: false,
OutputIndex: outputID.Index(),
MilestoneIndexBooked: output.GetMilestoneIndexBooked(),
Expand Down Expand Up @@ -216,10 +215,7 @@ func (s *Server) PublishOnUnlockConditionTopics(baseTopic string, output iotago.
return strings.ReplaceAll(topic, parameterAddress, addressString)
}

unlockConditions, err := output.UnlockConditions().Set()
if err != nil {
return
}
unlockConditions := output.UnlockConditionsSet()

// this tracks the addresses used by any unlock condition
// so that after checking all conditions we can see if anyone is subscribed to the wildcard
Expand Down Expand Up @@ -272,14 +268,14 @@ func (s *Server) PublishOnUnlockConditionTopics(baseTopic string, output iotago.
}
}

func (s *Server) PublishOnOutputChainTopics(outputID *iotago.OutputID, output iotago.Output, payloadFunc func() interface{}) {
func (s *Server) PublishOnOutputChainTopics(outputID iotago.OutputID, output iotago.Output, payloadFunc func() interface{}) {

switch o := output.(type) {
case *iotago.NFTOutput:
nftID := o.NFTID
if nftID.Empty() {
// Use implicit NFTID
nftAddr := iotago.NFTAddressFromOutputID(*outputID)
nftAddr := iotago.NFTAddressFromOutputID(outputID)
nftID = nftAddr.NFTID()
}
topic := strings.ReplaceAll(topicNFTOutputs, parameterNFTID, nftID.String())
Expand All @@ -289,7 +285,7 @@ func (s *Server) PublishOnOutputChainTopics(outputID *iotago.OutputID, output io
aliasID := o.AliasID
if aliasID.Empty() {
// Use implicit AliasID
aliasID = iotago.AliasIDFromOutputID(*outputID)
aliasID = iotago.AliasIDFromOutputID(outputID)
}
topic := strings.ReplaceAll(topicAliasOutputs, parameterAliasID, aliasID.String())
s.PublishPayloadFuncOnTopicIfSubscribed(topic, payloadFunc)
Expand Down Expand Up @@ -328,8 +324,8 @@ func (s *Server) PublishOutput(ledgerIndex uint32, output *inx.LedgerOutput) {
// If this is the first output in a transaction (index 0), then check if someone is observing the transaction that generated this output
if outputID.Index() == 0 {
transactionID := outputID.TransactionID()
if s.hasSubscriberForTransactionIncludedMessage(&transactionID) {
s.fetchAndPublishTransactionInclusionWithMessage(context.Background(), &transactionID, output.GetMessageId().Unwrap())
if s.hasSubscriberForTransactionIncludedBlock(transactionID) {
s.fetchAndPublishTransactionInclusionWithBlock(context.Background(), transactionID, output.GetBlockId().Unwrap())
}
}

Expand Down Expand Up @@ -358,42 +354,42 @@ func (s *Server) PublishSpent(ledgerIndex uint32, spent *inx.LedgerSpent) {
s.PublishOnUnlockConditionTopics(topicSpentOutputsByUnlockConditionAndAddress, iotaOutput, payloadFunc)
}

func messageIDFromMessageMetadataTopic(topicName string) *iotago.MessageID {
if strings.HasPrefix(topicName, "message-metadata/") && !strings.HasSuffix(topicName, "/referenced") {
messageIDHex := strings.Replace(topicName, "message-metadata/", "", 1)
messageID, err := iotago.MessageIDFromHexString(messageIDHex)
func blockIDFromBlockMetadataTopic(topicName string) iotago.BlockID {
if strings.HasPrefix(topicName, "block-metadata/") && !strings.HasSuffix(topicName, "/referenced") {
blockIDHex := strings.Replace(topicName, "block-metadata/", "", 1)
blockID, err := iotago.BlockIDFromHexString(blockIDHex)
if err != nil {
return nil
return iotago.EmptyBlockID()
}
return &messageID
return blockID
}
return nil
return iotago.EmptyBlockID()
}

func transactionIDFromTransactionsIncludedMessageTopic(topicName string) *iotago.TransactionID {
if strings.HasPrefix(topicName, "transactions/") && strings.HasSuffix(topicName, "/included-message") {
func transactionIDFromTransactionsIncludedBlockTopic(topicName string) iotago.TransactionID {
if strings.HasPrefix(topicName, "transactions/") && strings.HasSuffix(topicName, "/included-block") {
transactionIDHex := strings.Replace(topicName, "transactions/", "", 1)
transactionIDHex = strings.Replace(transactionIDHex, "/included-message", "", 1)
transactionIDHex = strings.Replace(transactionIDHex, "/included-block", "", 1)

decoded, err := iotago.DecodeHex(transactionIDHex)
if err != nil || len(decoded) != iotago.TransactionIDLength {
return nil
return emptyTransactionID
}
transactionID := &iotago.TransactionID{}
transactionID := iotago.TransactionID{}
copy(transactionID[:], decoded)
return transactionID
}
return nil
return emptyTransactionID
}

func outputIDFromOutputsTopic(topicName string) *iotago.OutputID {
func outputIDFromOutputsTopic(topicName string) iotago.OutputID {
if strings.HasPrefix(topicName, "outputs/") && !strings.HasPrefix(topicName, "outputs/unlock") {
outputIDHex := strings.Replace(topicName, "outputs/", "", 1)
outputID, err := iotago.OutputIDFromHex(outputIDHex)
if err != nil {
return nil
return emptyOutputID
}
return &outputID
return outputID
}
return nil
return emptyOutputID
}
Loading

0 comments on commit bba54ea

Please sign in to comment.