如何监听Fabric链码的事件

在本文中,我们将讨论Fabric链码中不同类型的基于通道的事件和事件监听器,何时使用它们,特别是我们将学习如何实现Fabric chaincode事件监听器。本文简化并详细解释了有关Fabric主题的官方教程的版本,因此我们建议你也阅读。

关于基于通道的事件和事件监听器

一般而言,事件是程序检测到的动作或事件。用户单击鼠标按钮是一个事件的示例。程序/App必须已经实现了某种事件监听器才能监听事件并在事件发生时采取某些操作。

基于通道的事件特定于单个通道,并且在将块添加到通道Fabric 分类账时会发生这些事件。客户端应用程序可以使用Fabric Node.js客户端注册监听器,以在将块添加到通道Fabric 分类账时接收块。Fabric Node.js客户端还可以通过处理传入的块并查找特定的交易或链代码事件来协助客户端应用程序。根据收到的信息,客户端应用程序可以采取某些操作。

在Hyperledger Fabric中,有三种类型的事件监听器:

  • 块监听器,当需要监视添加到Fabric 分类账的新块时,我们使用它。当新块被提交给节点上的Fabric 分类账时,将通知Fabric客户端Node.js.之后,应用程序可以采取某些行动。
  • 交易监听器,当需要监视组织节点上的交易完成时,我们使用它。当新块被提交给节点上的Fabric 分类账时,将通知客户端Node.js.然后,客户端Node.js将检查块以查找已注册的交易标识符。如果找到交易,则将通过交易ID,交易状态和块编号通知回调函数。
  • 链码监听器,我们在需要监控将从我们的链代码中发布的事件时使用它。当新块提交到Fabric 分类账时,将通知客户端Node.js.然后,客户端Node.js将检查链代码事件名称字段中的已注册链代码模式。

实现链码chaincode事件监听器

在CryptoKajmak应用程序中,我们实现了链代码事件,该事件每次由于到期日期从Fabric分类账中删除kajmak时向最终用户发送通知。通知具有以下形式:

拥有所有权的ID为kajmakID的Kajmak已被删除,因为它已于expirationDate日期过期。例如,

ID为4且其所有者为majra的Kajmak已被删除,因为它已于日期27.02.2019 上午11:30过期。

通知写在名为notifikacije.txt的本地文件中,该文件位于应用程序文件夹(crypto-kajmak文件夹)中用户计算机上。该通知可由客户端应用程序或某些其他外部应用程序使用。

链码事件实现的过程有两个主要步骤:

  • 1.在链代码中调用SetEvent方法。
  • 2.为应用程序客户端使用Node.js实现事件监听器。

注意:你需要熟悉JavaScript promises。

让我们从第一步开始。

第1步:在链代码中调用SetEvent方法

这一步非常简单。

首先,我们需要在我们想要发出事件的链代码文件中找到(或写入,如果函数尚不存在)。在我们的例子中,这是deleteKajmak函数。

kajmak-chaincode.go文件中没有SetEvent方法的deleteKajmak函数的代码:

1
2
3
4
5
6
7
8
9
10
11
12
//deleteKajmak method definition
func (s *SmartContract) deleteKajmak(APIstub shim.ChaincodeStubInterface, args []string) sc.Response {
if len(args) != 8 {
return shim.Error("Incorrect number of arguments. Expecting 8")
}
err := APIstub.DelState(args[0])
if (err != nil) {
return shim.Error(fmt.Sprintf("Failed to delete kajmak: %s", args[0]))
}
fmt.Printf("- deleteKajmak:\n%s\n", args[0])
return shim.Success(nil);
}

接下来,我们将调用SetEvent方法。那是shim.ChaincodeStubInterface API方法,看起来像这样:

1
SetEvent(name string, payload []byte) error

此方法通常位于PutStateDelState或与Fabric 分类账交互的类似方法之下。它有两个参数。首先是`haincode事件的名称。数据类型是字符串。第二个是我们想要在链码中发出的有效载荷(数据)。数据类型是字节。让我们创建我们将从链代码发出的有效负载数据(通知)(首先作为字符串,然后我们将其转换为字节):

1
2
eventPayload := "Kajmak with ID " + args[0] + " whose owner is " + args[2] + " was deleted because it has expired on date " + args[7]
payloadAsBytes := []byte(eventPayload)

现在,我们将使用适当的参数调用SetEvent方法。我们的活动名称将是deleteEvent。在SetEvent函数下面,我们将检查是否有错误。

1
2
3
4
eventErr := APIstub.SetEvent("deleteEvent",payloadAsBytes)
if (eventErr != nil) {
return shim.Error(fmt.Sprintf("Failed to emit event"))
}

这就是我们需要在链码中添加的全部内容。

kajmak-chaincode.go文件中的函数deleteKajmak现在看起来像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//deleteKajmak method definition
func (s *SmartContract) deleteKajmak(APIstub shim.ChaincodeStubInterface, args []string) sc.Response {
if len(args) != 8 {
return shim.Error("Incorrect number of arguments. Expecting 8")
}
err := APIstub.DelState(args[0])
if (err != nil) {
return shim.Error(fmt.Sprintf("Failed to delete kajmak: %s", args[0]))
}
eventPayload := "Kajmak with ID " + args[0] + " whose owner is " + args[2] + " was deleted because it has expired on date " + args[7]
payloadAsBytes := []byte(eventPayload)
eventErr := APIstub.SetEvent("deleteEvent",payloadAsBytes)
if (eventErr != nil) {
return shim.Error(fmt.Sprintf("Failed to emit event"))
}
fmt.Printf("- deleteKajmak:\n%s\n", args[0])
return shim.Success(nil);
}

第2步:为应用程序端客户端使用Node.js实现事件监听器

正如我们已经说过的,客户端应用程序可以使用Fabric Node.js客户端来注册事件监听器。这正是我们将在CryptoKajmak应用程序中执行的操作,以监听链代码中发出的事件。

Fabric Node.js客户端放置在helper.js文件中,该文件是客户端应用程序(CryptoKajmak)和区块链网络相互交互的点。我们想要在chaincode中监听来自deleteKajmak函数的事件,因此我们将在helper.js文件中的deleteKajmak函数中实现chaincode事件监听器。

在我们发送Fabric分类账更新提议之后,以及在将提交响应发送到helper.js文件中的Orderer之前,我们需要用于监听链代码事件的代码。

我们将在实现链代码事件监听器之前和之后在helper.js文件中显示deleteKajmak函数,之后我们将编写详细解释。

下面列出了在实现链代码监听器之前helper.js文件中的函数deleteKajmak

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
var deleteKajmak = async function(username,userOrg,kajmakData) {
var error_message = null;
try {
var array = kajmakData.split("-");
console.log(array);
var key = array[0]
var name = array[1]
var owner = array[2]
var animal = array[3]
var location = array[4]
var quantity = array[5]
var productionDate = array[6]
var expirationDate = array[7]
var client = await getClientForOrg(userOrg,username);
logger.debug('Successfully got the fabric client for the organization "%s"', userOrg);
var channel = client.getChannel('mychannel');
if(!channel) {
let message = util.format('Channel %s was not defined in the connection profile', channelName);
logger.error(message);
throw new Error(message);
}
var targets = null;
if(userOrg == "Org1") {
targets = ['peer0.org1.example.com'];
} else if(userOrg == "Org2") {
targets = ['peer0.org2.example.com'];
}
var tx_id = client.newTransactionID();
console.log("Assigning transaction_id: ", tx_id._transaction_id);
var tx_id_string = tx_id.getTransactionID();
var request = {
targets: targets,
chaincodeId: 'kajmak-app',
fcn: 'deleteKajmak',
args: [key, name, owner, animal, location, quantity, productionDate, expirationDate],
chainId: channel,
txId: tx_id
};
let results = await channel.sendTransactionProposal(request);
var proposalResponses = results[0];
var proposal = results[1];
let isProposalGood = false;
if (proposalResponses && proposalResponses[0].response && proposalResponses[0].response.status === 200) {
isProposalGood = true;
console.log('Transaction proposal was good');
} else {
console.error('Transaction proposal was bad');
}
if (isProposalGood) {
console.log(util.format('Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s"', proposalResponses[0].response.status, proposalResponses[0].response.message));
var promises = [];
var requestMain = {
txId: tx_id,
proposalResponses: proposalResponses,
proposal: proposal
};
var sendPromise = channel.sendTransaction(requestMain);
promises.push(sendPromise);
let results = await Promise.all(promises);
logger.debug(util.format('------->>> R E S P O N S E : %j', results));
let response = results.pop(); // orderer results are last in the results
if (response.status === 'SUCCESS') {
logger.info('Successfully sent transaction to the orderer.');
} else {
error_message = util.format('Failed to order the transaction. Error code: %s',response.status);
logger.debug(error_message);
}
} else {
error_message = util.format('Failed to send Proposal and receive all good ProposalResponse');
logger.debug(error_message);
}
} catch(error) {
logger.error('Failed to delete kajmak with error: %s', error.toString());
}
};

以上代码的简要说明:

使用Channel类中的sendTransactionProposal方法将Fabric分类账更新的提议发送给支持节点。如果提议是好的,则使用Channel类中的sendTransaction方法将提议响应发送到Orderer。

然后,Orderer将交易命令为块和广播块,以便将提交块到提交节点分配到Fabric分类账。

下面列出了实现链代码监听器后helper.js文件中的函数deleteKajmak:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
var deleteKajmak = async function(username,userOrg,kajmakData) {
var error_message = null;
try {
var array = kajmakData.split("-");
console.log(array);
var key = array[0]
var name = array[1]
var owner = array[2]
var animal = array[3]
var location = array[4]
var quantity = array[5]
var productionDate = array[6]
var expirationDate = array[7]
var client = await getClientForOrg(userOrg,username);
logger.debug('Successfully got the fabric client for the organization "%s"', userOrg);
var channel = client.getChannel('mychannel');
if(!channel) {
let message = util.format('Channel %s was not defined in the connection profile', channelName);
logger.error(message);
throw new Error(message);
}
var targets = null;
if(userOrg == "Org1") {
targets = ['peer0.org1.example.com'];
} else if(userOrg == "Org2") {
targets = ['peer0.org2.example.com'];
}
var tx_id = client.newTransactionID();
console.log("Assigning transaction_id: ", tx_id._transaction_id);
var tx_id_string = tx_id.getTransactionID();
var request = {
targets: targets,
chaincodeId: 'kajmak-app',
fcn: 'deleteKajmak',
args: [key, name, owner, animal, location, quantity, productionDate, expirationDate],
chainId: channel,
txId: tx_id
};
let results = await channel.sendTransactionProposal(request);
var proposalResponses = results[0];
var proposal = results[1];
let isProposalGood = false;
if (proposalResponses && proposalResponses[0].response && proposalResponses[0].response.status === 200) {
isProposalGood = true;
console.log('Transaction proposal was good');
} else {
console.error('Transaction proposal was bad');
}
if (isProposalGood) {
console.log(util.format('Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s"', proposalResponses[0].response.status, proposalResponses[0].response.message));
var promises = [];
let event_hubs = channel.getChannelEventHubsForOrg();
event_hubs.forEach((eh) => {
logger.debug('invokeDeleteKajmakEventPromise - setting up event');
console.log(eh);
let invokeEventPromise = new Promise((resolve, reject) => {
let regid = null;
let event_timeout = setTimeout(() => {
if(regid) {
let message = 'REQUEST_TIMEOUT:' + eh.getPeerAddr();
logger.error(message);
eh.unregisterChaincodeEvent(regid);
eh.disconnect();
}
reject();
}, 20000);
regid = eh.registerChaincodeEvent('kajmak-app', 'deleteEvent',(event, block_num, txnid, status) => {
console.log('Successfully got a chaincode event with transid:'+ txnid + ' with status:'+status);
let event_payload = event.payload.toString();
console.log(event_payload);
if(event_payload.indexOf(array[0]) > -1) {
clearTimeout(event_timeout);
//Chaincode event listeners are meant to run continuously
//Therefore the default to automatically unregister is false
//So in this case we want to shutdown the event listener once
// we see the event with the correct payload
eh.unregisterChaincodeEvent(regid);
console.log('Successfully received the chaincode event on block number '+ block_num);
resolve(event_payload);
} else {
console.log('Successfully got chaincode event ... just not the one we are looking for on block number '+ block_num);
}
}, (err) => {
clearTimeout(event_timeout);
logger.error(err);
reject(err);
}
//no options specified
//startBlock will default to latest
//endBlock will default to MAX
//unregister will default to false
//disconnect will default to false
);
eh.connect(true);
});
promises.push(invokeEventPromise);
console.log(eh.isconnected());
});

var requestMain = {
txId: tx_id,
proposalResponses: proposalResponses,
proposal: proposal
};
var sendPromise = channel.sendTransaction(requestMain);
promises.push(sendPromise);
let results = await Promise.all(promises);
logger.debug(util.format('------->>> R E S P O N S E : %j', results));
let response = results.pop(); // orderer results are last in the results
if (response.status === 'SUCCESS') {
logger.info('Successfully sent transaction to the orderer.');
} else {
error_message = util.format('Failed to order the transaction. Error code: %s',response.status);
logger.debug(error_message);
}

// now see what each of the event hubs reported
for(let i in results) {
let event_hub_result = results[i];
let event_hub = event_hubs[i];
logger.debug('Event results for event hub :%s',event_hub.getPeerAddr());
if(typeof event_hub_result === 'string') {
logger.debug(event_hub_result);
var rezultat = {event_payload : event_hub_result};
return rezultat;
} else {
if(!error_message) error_message = event_hub_result.toString();
logger.debug(event_hub_result.toString());
}
}
} else {
error_message = util.format('Failed to send Proposal and receive all good ProposalResponse');
logger.debug(error_message);
}
} catch(error) {
logger.error('Failed to delete kajmak with error: %s', error.toString());
}
};

上面代码中使用的主要方法的说明:

getChannelEventHubsForOrg(mspid)

Channel类的方法。返回ChannelEventHub实例的列表(数组),该实例基于组织中此通道中定义的节点项。ChannelEventHub对象有两个参数:

如果省略表示组织的mspid的可选mspid参数,则将使用当前设置的Fabric客户端的当前组织。

registerChaincodeEvent(ccid, eventname, onEvent, onError, options)

ChannelEventHub类的方法。注册监听器以接收链代码事件。返回一个对象,该对象应被视为一个opaque句柄,用于通过registerChaincodeEvent方法取消注册chaincode事件。参数是:

如果未指定最后一个参数选项,则使用默认设置:unregister:falsedisconnect:falsestartBlock将是最后一个块。

unregisterChaincodeEvent(listener_handle, throwError)

ChannelEventHub类的方法。取消注册由registerChaincodeEvent方法返回的listener_handle对象表示的chaincode事件监听器。参数是:

connect(full_block)

ChannelEventHub类的方法。建立与节点事件源的连接。参数full_block是布尔类型,它表示与节点的连接将向此ChannelEventHub发送完整块或已过滤块。我们想要读取从链代码中发送的有效负载,因此我们需要接收未经过滤的块,而full_block的值为true

disconnect()

断开事件中心与节点事件源的连接。将关闭所有事件监听器并向所有提供onError回调的监听器发送带有ChannelEventHub has been shutdown消息的Error对象。

setTimeout

此方法在指定的毫秒数后调用函数或计算表达式。例如,要在3秒(3000毫秒)后显示警告框:

1
setTimeout(function(){ alert("Hello"); }, 3000);

clearTimeout

防止使用setTimeout设置的函数执行。

实现链代码监听器后,helper.js文件中deleteKajmak函数的简要说明:

Fabric 分类账更新的提议将发送给支持节点。如果交易提议是好的,Fabric Node.js客户端会注册监听器以监听从链代码中发出的事件。事件超时设置为20秒。这意味着监听器将等待20秒,如果在该时间内未收到事件,则监听器将取消注册并断开事件中心与对等源的连接。

当新块添加到Fabric分类账时,Node.js客户端检查该块并搜索具有指定名称的事件(deleteEvent)。如果从指定的链代码中接收到具有指定名称的事件(kajmak-app),调用clearTimeout函数以停止执行setTimeout函数,取消注册监听器并返回事件有效负载(关于kajmak删除的通知)到服务器。然后,服务器将事件有效负载写入名为notifikacije.txt的本地文件。

======================================================================

如果你想学习区块链并在Blockchain Technologies建立职业生涯,那么请查看我们分享的一些以太坊、比特币、EOS、Fabric等区块链相关的交互式在线编程实战教程:

  • java以太坊开发教程,主要是针对java和android程序员进行区块链以太坊开发的web3j详解。
  • python以太坊,主要是针对python工程师使用web3.py进行区块链以太坊开发的详解。
  • php以太坊,主要是介绍使用php进行智能合约开发交互,进行账号创建、交易、转账、代币开发以及过滤器和交易等内容。
  • 以太坊入门教程,主要介绍智能合约与dapp应用开发,适合入门。
  • 以太坊开发进阶教程,主要是介绍使用node.js、mongodb、区块链、ipfs实现去中心化电商DApp实战,适合进阶。
  • ERC721以太坊通证实战,课程以一个数字艺术品创作与分享DApp的实战开发为主线,深入讲解以太坊非同质化通证的概念、标准与开发方案。内容包含ERC-721标准的自主实现,讲解OpenZeppelin合约代码库二次开发,实战项目采用Truffle,IPFS,实现了通证以及去中心化的通证交易所。
  • C#以太坊,主要讲解如何使用C#开发基于.Net的以太坊应用,包括账户管理、状态与交易、智能合约开发与交互、过滤器和交易等。
  • java比特币开发教程,本课程面向初学者,内容即涵盖比特币的核心概念,例如区块链存储、去中心化共识机制、密钥与脚本、交易与UTXO等,同时也详细讲解如何在Java代码中集成比特币支持功能,例如创建地址、管理钱包、构造裸交易等,是Java工程师不可多得的比特币开发学习课程。
  • php比特币开发教程,本课程面向初学者,内容即涵盖比特币的核心概念,例如区块链存储、去中心化共识机制、密钥与脚本、交易与UTXO等,同时也详细讲解如何在Php代码中集成比特币支持功能,例如创建地址、管理钱包、构造裸交易等,是Php工程师不可多得的比特币开发学习课程。
  • c#比特币开发教程,本课程面向初学者,内容即涵盖比特币的核心概念,例如区块链存储、去中心化共识机制、密钥与脚本、交易与UTXO等,同时也详细讲解如何在C#代码中集成比特币支持功能,例如创建地址、管理钱包、构造裸交易等,是C#工程师不可多得的比特币开发学习课程。
  • EOS入门教程,本课程帮助你快速入门EOS区块链去中心化应用的开发,内容涵盖EOS工具链、账户与钱包、发行代币、智能合约开发与部署、使用代码与智能合约交互等核心知识点,最后综合运用各知识点完成一个便签DApp的开发。
  • 深入浅出玩转EOS钱包开发,本课程以手机EOS钱包的完整开发过程为主线,深入学习EOS区块链应用开发,课程内容即涵盖账户、计算资源、智能合约、动作与交易等EOS区块链的核心概念,同时也讲解如何使用eosjs和eosjs-ecc开发包访问EOS区块链,以及如何在React前端应用中集成对EOS区块链的支持。课程内容深入浅出,非常适合前端工程师深入学习EOS区块链应用开发。
  • Hyperledger Fabric 区块链开发详解,本课程面向初学者,内容即包含Hyperledger Fabric的身份证书与MSP服务、权限策略、信道配置与启动、链代码通信接口等核心概念,也包含Fabric网络设计、nodejs链代码与应用开发的操作实践,是Nodejs工程师学习Fabric区块链开发的最佳选择。
  • Hyperledger Fabric java 区块链开发详解,课程面向初学者,内容即包含Hyperledger Fabric的身份证书与MSP服务、权限策略、信道配置与启动、链代码通信接口等核心概念,也包含Fabric网络设计、java链代码与应用开发的操作实践,是java工程师学习Fabric区块链开发的最佳选择。
  • tendermint区块链开发详解,本课程适合希望使用tendermint进行区块链开发的工程师,课程内容即包括tendermint应用开发模型中的核心概念,例如ABCI接口、默克尔树、多版本状态库等,也包括代币发行等丰富的实操代码,是go语言工程师快速入门区块链开发的最佳选择。

汇智网原创翻译,转载请标明出处。这里是如何监听Fabric链码的事件