Skip to main content

Example Pt. 2: Ethereum Event Listener

In this tutorial, we will build a simple event listener that listens to EVM events, and credits an account when a log is heard. The listener will be configured to listen to a specific contract address, and will listen to the EVM event signature Credit(address,uint256)

This tutorial uses concepts and code from the resolution extension introduced in this part 1: example Kwil credits.

This tutorial is for example purposes only, and should not be used in production. Production implementations should account for transient network failures, subscription disconnections, Ethereum reorgs, and other edge cases. For a production-ready implementation of this event listener, please refer to the Kwil EVM Deposits Listener that is included in all Kwil nodes by default.

Prerequisites

This tutorial assumes that the user is familiar with the following concepts:

Step 1. Querying Ethereum

To get started, we will first need a way to query the Ethereum blockchain for new events. We will use the popular go-ethereum library, which is the main implementation of Ethereum. We will use this library to connect to an Ethereum node, subscribe to new blocks, and query for logs.

The code below relies on the AccountCreditResolution struct and the CreditAccountEventType constant from part 1.

credit_listener.go
package main

import (
"context"
"math/big"
"strings"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/accounts/abi"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/kwilteam/kwil-db/extensions/listeners"
"github.com/kwilteam/kwil-db/extensions/resolutions/credit"
)

// contractABIStr is the ABI of the smart contract the listener listens to.
// It follows the Ethereum ABI JSON format, and matches the `Credit(address,uint256)` event signature.
const contractABIStr = `[{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"_from","type":"address"},{"indexed":false,"internalType":"uint256","name":"_amount","type":"uint256"}],"name":"Credit","type":"event"}]`

// creditEventSignature is the EVM event signature the listener listens to.
var creditEventSignature ethcommon.Hash = crypto.Keccak256Hash([]byte("Credit(address,uint256)"))

// processBlock gets all credit events for a range of blocks and processes them.
// it will broadcast the credit events to the Kwil network.
func processBlock(ctx context.Context, eventstore listeners.EventStore, client *ethclient.Client, blockNumber int64, contract ethcommon.Address) error {
// query the logs for the credit event
logs, err := client.FilterLogs(ctx, ethereum.FilterQuery{
ToBlock: big.NewInt(blockNumber),
FromBlock: big.NewInt(blockNumber),
Addresses: []ethcommon.Address{contract},
Topics: [][]ethcommon.Hash{{creditEventSignature}},
})
if err != nil {
return err
}

// get the abi, so that we can decode the logs+
eventABI, err := abi.JSON(strings.NewReader(contractABIStr))
if err != nil {
return err
}

for _, log := range logs {
data, err := eventABI.Unpack("Credit", log.Data)
if err != nil {
return err
}

// data[0] is the address of the account to credit
// data[1] is the amount to credit the account

account := data[0].(ethcommon.Address)
amount := data[1].(*big.Int)

// get the resolution from the credt resolution extension
resolution := AccountCreditResolution{
Account: account.Bytes(),
Amount: amount,
TxHash: log.TxHash.Bytes(),
}

bts, err := resolution.MarshalBinary()
if err != nil {
return err
}

// broadcast the resolution to the network
err = eventstore.Broadcast(ctx, CreditAccountEventType, bts)
if err != nil {
return err
}
}

return nil
}

Step 2. Listening to Ethereum

In order to listen to Ethereum, we will need to read in our local node's configuration, create a client, and listen for new blocks. If the configuration is not set, or if the configuration is invalid, we will immediately shut down the listener. We will also stop the listener if the context is cancelled.

credit_listener.go
package main

import (
"context"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/kwilteam/kwil-db/extensions/listeners"
)

// EthListener is an event listener that listens to ethereum events and broadcasts corresponding
// account credits to the Kwil network.
func EthListener(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// we will first get the configs that the local node has for the eth_listener extension
config, ok := service.ExtensionConfigs["eth_listener"]
if !ok {
service.Logger.Info("eth_listener: no config found. skipping...")
return nil
}

rpcProvider, ok := config["rpc_provider"]
if !ok {
return fmt.Errorf("eth_listener: rpc_provider not found in config")
}

contractAddressStr, ok := config["contract_address"]
if !ok {
return fmt.Errorf("eth_listener: contract_address not found in config")
}
contractAddress := ethcommon.HexToAddress(contractAddressStr)

// we will now use go-ethereum to subscribe to new blocks
client, err := ethclient.Dial(rpcProvider)
if err != nil {
return fmt.Errorf("eth_listener: failed to connect to ethereum client: %w", err)
}
defer client.Close()

headers := make(chan *types.Header)
sub, err := client.SubscribeNewHead(ctx, headers)
if err != nil {
return fmt.Errorf("eth_listener: failed to subscribe to new block headers: %w", err)
}
defer sub.Unsubscribe()

// we will now listen for new blocks and process them
// if the context is cancelled, we will stop listening
for {
select {
case <-ctx.Done():
return nil
case err := <-sub.Err():
return fmt.Errorf("eth_listener: subscription error: %w", err)
case header := <-headers:
err := processBlock(ctx, eventstore, client, header.Number.Int64(), contractAddress)
if err != nil {
return fmt.Errorf("eth_listener: failed to process block: %w", err)
}
}
}
}

Step 3. Configuration

In order to run the listener, each Kwil node will need to configure an Ethereum RPC provider and smart contract address to listen to. This can be done in each node's local config.toml file:

config.toml
# ...
[app.extensions.eth_listener]
rpc_provider = "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY"
contract_address = "0xYOUR_CONTRACT_ADDRESS"
# ...

The listener will read in the values set in config.toml, using them to connect to the Ethereum network and listen to the specified contract address.

Step 4. Register the Event Listener

Register the event listener with the Kwil node. This is done by calling the RegisterListener function in the package's init function.

credit_listener.go
package main

import (
"github.com/kwilteam/kwil-db/extensions/listeners"
)

func init() {
err := listeners.RegisterListener("eth_listener", EthListener)
if err != nil {
panic(err)
}
}

Full Example

The full example of the event listener extension implemented in this tutorial is shown below:

credit_listener.go
credit_listener.go
package main

import (
"context"
"fmt"
"math/big"
"strings"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/kwilteam/kwil-db/common"
"github.com/kwilteam/kwil-db/extensions/listeners"
"github.com/kwilteam/kwil-db/extensions/resolutions/credit"
)

// EthListener is an event listener that listens to ethereum events and broadcasts corresponding
// account credits to the Kwil network.
func EthListener(ctx context.Context, service *common.Service, eventstore listeners.EventStore) error {
// we will first get the configs that the local node has for the eth_listener extension
config, ok := service.ExtensionConfigs["eth_listener"]
if !ok {
service.Logger.Info("eth_listener: no config found. skipping...")
return nil
}

rpcProvider, ok := config["rpc_provider"]
if !ok {
return fmt.Errorf("eth_listener: rpc_provider not found in config")
}

contractAddressStr, ok := config["contract_address"]
if !ok {
return fmt.Errorf("eth_listener: contract_address not found in config")
}
contractAddress := ethcommon.HexToAddress(contractAddressStr)

// we will now use go-ethereum to subscribe to new blocks
client, err := ethclient.Dial(rpcProvider)
if err != nil {
return fmt.Errorf("eth_listener: failed to connect to ethereum client: %w", err)
}
defer client.Close()

headers := make(chan *types.Header)
sub, err := client.SubscribeNewHead(ctx, headers)
if err != nil {
return fmt.Errorf("eth_listener: failed to subscribe to new block headers: %w", err)
}
defer sub.Unsubscribe()

// we will now listen for new blocks and process them
// if the context is cancelled, we will stop listening
for {
select {
case <-ctx.Done():
return nil
case err := <-sub.Err():
return fmt.Errorf("eth_listener: subscription error: %w", err)
case header := <-headers:
err := processBlock(ctx, eventstore, client, header.Number.Int64(), contractAddress)
if err != nil {
return fmt.Errorf("eth_listener: failed to process block: %w", err)
}
}
}
}

// contractABIStr is the ABI of the smart contract the listener listens to.
// It follows the Ethereum ABI JSON format, and matches the `Credit(address,uint256)` event signature.
const contractABIStr = `[{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"_from","type":"address"},{"indexed":false,"internalType":"uint256","name":"_amount","type":"uint256"}],"name":"Credit","type":"event"}]`

// creditEventSignature is the EVM event signature the listener listens to.
var creditEventSignature ethcommon.Hash = crypto.Keccak256Hash([]byte("Credit(address,uint256)"))

// processBlock gets all credit events for a range of blocks and processes them.
// it will broadcast the credit events to the network.
func processBlock(ctx context.Context, eventstore listeners.EventStore, client *ethclient.Client, blockNumber int64, contract ethcommon.Address) error {
// query the logs for the credit event
logs, err := client.FilterLogs(ctx, ethereum.FilterQuery{
ToBlock: big.NewInt(blockNumber),
FromBlock: big.NewInt(blockNumber),
Addresses: []ethcommon.Address{contract},
Topics: [][]ethcommon.Hash{{creditEventSignature}},
})
if err != nil {
return err
}

// get the abi, so that we can decode the logs+
eventABI, err := abi.JSON(strings.NewReader(contractABIStr))
if err != nil {
return err
}

for _, log := range logs {
data, err := eventABI.Unpack("Credit", log.Data)
if err != nil {
return err
}

// data[0] is the address of the account to credit
// data[1] is the amount to credit the account

account := data[0].(ethcommon.Address)
amount := data[1].(*big.Int)

resolution := AccountCreditResolution{
Account: account.Bytes(),
Amount: amount,
TxHash: log.TxHash.Bytes(),
}

bts, err := resolution.MarshalBinary()
if err != nil {
return err
}

err = eventstore.Broadcast(ctx, CreditAccountEventType, bts)
if err != nil {
return err
}
}

return nil
}

func init() {
err := listeners.RegisterListener("eth_listener", EthListener)
if err != nil {
panic(err)
}
}

To build and run the event listener extension example, follow the standard kwild build and run process.