Author Archives: Leo Cheung

Ethereum-compatible NFT On Avalanche

While blockchain has been steadily gaining increasing attention from the general public over the past couple of years, it’s NFT, short for non-fungible token, that has recently taken the center stage. In particular, NFT shines in the area of provenance of authenticity. By programmatically binding a given asset to a unique digital token referencing immutable associated transactions on a blockchain, the NFT essentially serves as the “digital receipt” of the asset.

Currently Ethereum is undergoing a major upgrade to cope with future growth of the blockchain platform which has been suffering from low transaction rate and high gas fee due to the existing unscalable Proof of Work consensus algorithm. As described in a previous blockchain overview blog post, off-chain solutions including bridging the Ethereum main chain with layer-2 subchains such as Polygon help circumvent the performance issue.

Avalanche

Some layer-1 blockchains support Ethereum’s NFT standards (e.g. ERC-721, ERC-1155) in addition to providing their own native NFT specs. Among them is Avalanche which has been steadily growing its market share (in terms of TVL), trailing behind only a couple of prominent layer-1 blockchains such as Solana and Cardano.

With separation of concerns (SoC) being one of the underlying design principles, Avalanche uses a subnet model in which validators on the subnet only operate on the specific blockchains of their interest Also in line with the SoC design principle, Avalanche comes with 3 built-in blockchains each of which serves specific purposes with its own set of API:

  • Exchange Chain (X-Chain) – for creation & exchange of digital smart assets (including its native token AVAX) which are bound to programmatic governance rules
  • Platform Chain (P-Chain) – for creating & tracking subnets, each comprising a dynamic group of stake holders responsible for consensually validating blockchains of interest
  • Contract Chain (C-Chain) – for developing smart contract applications

NFT on Avalanche

Avalanche allows creation of native NFTs as a kind of its smart digital assets. Its website provides tutorials for creating such NFTs using its Go-based AvalancheGo API. But perhaps its support of the Ethereum-compatible NFT standards with much higher transaction rate and lower cost than the existing Ethereum mainnet is what helps popularize the platform.

In this blog post, we’re going to create on the Avalanche platform ERC-721 compliant NFTs which require programmatic implementation of their sale/transfer terms in smart contracts. C-Chain is therefore the targeted blockchain. And rather than deploying our NFTs on the Avalanche mainnet, we’ll use the Avalanche Fuji Testnet which allows developers to pay for transactions in test-only AVAX tokens freely available from some designated crypto faucet.

Scaffold-ETH: an Ethereum development stack

A code repository of comprehensive Ethereum-based blockchain computing functions, Scaffold-ETH, offers a suite of tech stacks best for fast prototyping development along with sample code for various use cases of decentralized applications. The stacks include Solidity, Hardhat, Ether.js and ReactJS.

The following softwares are required for installing Scaffold-ETH, building and deploying NFT smart contracts:

Launching NFTs on Avalanche using a customized Scaffold-ETH

For the impatient, the revised code repo is at this GitHub link. Key changes made to the original branch in Scaffold-ETH will be highlighted at the bottom of this post.

To get a copy of Scaffold-ETH repurposed for NFTs on Avalanche, first git-clone the repo:

git clone https://github.com/oel/avalanche-scaffold-eth-nft avax-scaffold-eth-nft

Next, open up a couple of shell command terminals and navigate to the project-root (e.g. avax-scaffold-eth-nft).

Step 1: From the 1st shell terminal, install the necessary dependent modules.

cd avax-scaffold-eth-nft/
yarn install

Step 2: From the 2nd terminal, specify an account as the deployer.

Choose an account that owns some AVAX tokens (otherwise, get free tokens from an AVAX faucet) on the Avalanche Fuji testnet and create file packages/hardhat/mnemonic.txt with the account’s 12-word mnemonic in it.

cd avax-scaffold-eth-nft/
yarn account
yarn deploy --network fujiAvalanche

For future references, the “deployed at” smart contract address should be saved. Transactions oriented around the smart contract can be reviewed at snowtrace.io.

Step 3: Back to the 1st terminal, start the Node.js server at port# 3000.

yarn start

This will spawn a web page on the default browser (which should have been installed with the MetaMask extension).

Step 4: From the web browser, connect to the MetaMask account which will receive the NFTs

Step 5: Back to the 2nd terminal, mint the NFTs.

yarn mint --network fujiAvalanche

The address of the NFT recipient account connected to the browser app will be prompted. Upon successful minting, images of the NFTs should be automatically displayed on the web page.

To transfer any of the NFTs to another account, enter the address of the account to be transferred to and click “transfer”. Note that the account connected to the browser app would need to own some AVAX tokens (again if not, get free tokens from an AVAX faucet).

The web page upon successful minting should look like below:

Avalanche NFTs using Scaffold-ETH (MetaMask connected)

Key changes made to the original Scaffold-ETH branch

It should be noted that Scaffold-ETH is a popular code repo under active development. The branch I had experimented with a few months ago is already markedly different from the same branch I git-cloned for custom modification. That prompted me to clone a separate repo to serve as a “snapshot” of the branch, rather than just showing my modifications to an evolving code base.

Below are the main changes made to the Scaffold-ETH Simple NFT Example branch git-cloned on March 30:

Hardhat configuration script: packages/hardhat/hardhat.config.js

The defaultNetwork value in the original Hardhat configuration script is “localhost” by default, assuming a local instance of a selected blockchain is in place. The following change sets the default network to the Fuji testnet, whose network configuration parameters need to be added as shown below.

const defaultNetwork = "fujiAvalanche";
# const defaultNetwork = "mainnetAvalanche";
...
module.exports = {
  ...
  networks: {
    ...
    fujiAvalanche: {
      url: "https://api.avax-test.network/ext/bc/C/rpc",
      gasPrice: 225000000000,
      chainId: 43113,
      accounts: {
        mnemonic: mnemonic(),
      },
    },
    ...

Note that with the explicit defaultNetwork value set to “fujiAvalanche”, one could skip the --network fujiAvalanche command line option in the smart contract deploy and mint commands.

ReactJS main app: packages/react-app/src/App.jsx

To avoid compilation error, the following imports need to be moved up above the variable declaration section in main Node.js app.

import { useContractConfig } from "./hooks"
import Portis from "@portis/web3";
import Fortmatic from "fortmatic";
import Authereum from "authereum";

...
const targetNetwork = NETWORKS.fujiAvalanche;
# const targetNetwork = NETWORKS.mainnetAvalanche

Minting script: packages/hardhat/scripts/mint.js

A few notes:

  • The square-shaped animal icon images for the NFTs used in the minting script are from public domain sources. Here’s the link to the author’s website.
  • Node module prompt-sync is being used (thus is also added to the main package.json dependency list). It’s to avoid having to hardcode the NFT recipient address in the minting script.
  • The code below makes variable toAddress a dynamic input value and replaces the original NFT images with the square-styling images along with a modularized mintItem function.
...
const prompt = require('prompt-sync')();

const delayMS = 5000  // Increase delay as needed!

const main = async () => {

  // ADDRESS TO MINT TO:
  // const toAddress = "0x36f90A958f94F77c26614DB170a5C8a7DF062A90"
  const toAddress = prompt("Enter the address to mint to: ");

  console.log("\n\n 🎫 Minting to "+toAddress+"...\n");

  const { deployer } = await getNamedAccounts();
  const yourCollectible = await ethers.getContract("YourCollectible", deployer);

  // Item #1

  const iconCrocodile = {
    "description": "Squared Croc Icon",
    "external_url": "https://blog.genuine.com/",
    "image": "https://blog.genuine.com/wp-content/uploads/2022/03/Crocodile-icon.png",
    "name": "Squared Crocodile",
    "attributes": [
       {
         "trait_type": "Color",
         "value": "Green"
       }
    ]
  }
  mintItem(iconCrocodile, yourCollectible, toAddress)

  await sleep(delayMS)

  // Item #2

  const iconDuck = {
    "description": "Squared Duck Icon",
    "external_url": "https://blog.genuine.com/",
    "image": "https://blog.genuine.com/wp-content/uploads/2022/03/Duck-icon.png",
    "name": "Squared Duck",
    "attributes": [
       {
         "trait_type": "Color",
         "value": "Yellow"
       }
    ]
  }
  mintItem(iconDuck, yourCollectible, toAddress)

  await sleep(delayMS)

  // Item #3

  const iconEagle = {
    "description": "Squared Eagle Icon",
    "external_url": "https://blog.genuine.com/",
    "image": "https://blog.genuine.com/wp-content/uploads/2022/03/Eagle-icon.png",
    "name": "Squared Eagle",
    "attributes": [
       {
         "trait_type": "Color",
         "value": "Dark Gray"
       }
    ]
  }
  mintItem(iconEagle, yourCollectible, toAddress)

  await sleep(delayMS)

  // Item #4

  const iconElephant = {
    "description": "Squared Elephant Icon",
    "external_url": "https://blog.genuine.com/",
    "image": "https://blog.genuine.com/wp-content/uploads/2022/03/Elephant-icon.png",
    "name": "Squared Elephant",
    "attributes": [
       {
         "trait_type": "Color",
         "value": "Light Gray"
       }
    ]
  }
  mintItem(iconElephant, yourCollectible, toAddress)

  await sleep(delayMS)

  // Item #5

  const iconFish = {
    "description": "Squared Fish Icon",
    "external_url": "https://blog.genuine.com/",
    "image": "https://blog.genuine.com/wp-content/uploads/2022/03/Fish-icon.png",
    "name": "Squared Fish",
    "attributes": [
       {
         "trait_type": "Color",
         "value": "Blue"
       }
    ]
  }
  mintItem(iconFish, yourCollectible, toAddress)

  await sleep(delayMS)

  console.log("Transferring Ownership of YourCollectible to "+toAddress+"...")

  await yourCollectible.transferOwnership(toAddress, { gasLimit: 8000000 });  // Increase limit as needed!

  await sleep(delayMS)

  ...
}

async function mintItem(item, contract, mintTo, limit = 8000000) {  // Increase limit as needed!
  console.log("Uploading `%s` ...", item.name)
  const uploaded = await ipfs.add(JSON.stringify(item))

  console.log("Minting `%s` with IPFS hash ("+uploaded.path+") ...", item.name)
  await contract.mintItem(mintTo,uploaded.path,{gasLimit:limit})
}
...

A Brief Overview Of Blockchains

It’s early 2022, and blockchain has amassed more attention than ever. Initially emerged in the form of a cryptocurrency, followed by the rising of additional ones operating on open-source development platforms, and further catalyzed by a frenzy of NFT, the word “blockchain” has effectively evolved from a geek keyword into a household term.

While a majority of the public is still skeptical about the legitimacy of the blockchain phenomenon, apparently many bystanders are beginning to be bombarded by a mix of curiosity and a feeling of being left out, especially with the recent NFT mania.

It should be noted that by “blockchains”, I’m generally referring to public permissionless blockchains for brevity.

Permissionless vs permissioned

Most of the blockchains commonly heard of, such as Bitcoin, Ethereum, Cardano, are permissionless blockchains which anyone can anonymously participate. Many of these blockchains are open-source. On the other hand, permissioned blockchains from products like Hyperledger Fabric require permissions and proof of identity (e.g. KYC/AML) from their operators for participation.

From a programming enthusiast’s perspective, there is some interesting technological aspect in blockchain that warrants a deep dive. Rather than only authoring smart contracts at the user-application layer, exploring how a blockchain’s participating nodes competing to grow a decentralized immutable ledger via a consensus algorithm will give one a full picture of its technological merit. That’s one of the motivations for me to develop a proof-of-concept crypto-mining blockchain system back in 2020.

Last I surveyed the blockchain landscape, the choice for a development platform was simple. That was about 3 years ago. Excluding permissioned-only blockchains, the only platform ready for “big time” development back then was Ethereum with scripting language Solidity and framework Truffle for smart contracts. By “big time”, I mean ready for the average programming enthusiasts to participate the ecosystem through available tech docs and self-learning. Much has evolved since.

Trending blockchain development platforms

There is now a big list of open-source blockchains besides Bitcoin and Ethereum — Solana, Cardano, Polkadot, Avalanche, Polygon, Algorand, Tezos, Flow, …, to name a few. From a technological perspective, what’s more significant is the abundance of dApp (decentralized application) development platforms provided by many of these blockchains. These dApp platforms built using contemporary programming languages like Go, Rust, Python, are competing among themselves, thus widening choices and expediting improvement that ultimately benefit the rapidly growing dApp development community.

While there are relatively few business cases for building a custom blockchain system, a much greater demand has emerged over the past couple of years for developing smart contract applications particularly in industries such as supply chain, DeFi, collectibles, arts, gaming and metaverse. Many blockchain platforms provide programming SDKs (software development kits) for popular languages like JavaScript, Python, or DSLs (domain specific languages) such as Solidity that many Ethereum dApp developers are already familiar with.

Meanwhile, frameworks for smart contract development that help boost developers’ productivity have also prospered. Within the Ethereum ecosystem, the once predominant Truffle is now competing head to head with other frameworks like Hardhat.

Bitcoin versus Ethereum

Even though Bitcoin remains the blockchain network with the largest market cap, it primarily serves a single purpose as a decentralized cryptocurrency. It wasn’t designed as a platform for development of business/financial applications like many other blockchain platforms were. Hence comparing Bitcoin with, say, Ethereum is like comparing apples to oranges. That said, Bitcoin’s price apparently is still driving the ups and downs of almost every average blockchain network out there.

Despite all the relatively new blockchain development platforms emerged over the past few years, Ethereum remains the distinguished leader with its TVL (total value locked) taking up well over half of the pie comprising all chains. To overcome the known issues of its underlying PoW (Proof of Work) consensus in scalability (and eco-friendliness), resulting in slow transactions and high gas fees, Ethereum is undergoing a major transition to Ethereum 2.0 with a PoS (Proof of Stake) consensus that will supposedly allow it to cope with future growth.

Layer-2 blockchains on top of Ethereum

To circumvent Ethereum’s existing scalability problem, various kinds of off-chain solutions have been put in place. They are oftentimes broadly referred to as layer-2 blockchain solutions, supplementing the underlying layer-1 blockchain (in this case Ethereum). The common goal of the layer-2 solutions is to alleviate loads from the main chain (i.e. Ethereum) by delegating the bulk of compute-intensive tasks to auxiliary chains with more efficient way of handling those tasks.

Here’s a quick rundown of the most common types of layer-2 blockchain solutions:

Rollup – a kind of the off-chain solutions that executes transactions in rolled-up batches outside of the main chain while keeping transaction data and proof mechanism on the chain. A zero-knowledge (ZK) rollup performs validity proof whereas an optimistic rollup (e.g. Optimism) assumes transactions are valid and runs fraud proof upon challenge calls. e.g. Loopring is a ZK rollup and Optimism is an optimistic rollup.

State Channel – another kind of solutions that allows its participants to bypass node validation process on the main chain by locking certain portion of the state via a multi-signature smart contract, perform transactions off-chain and unlock the state with appended state changes back to the main chain. Examples of state channel operators are Raiden, Connext.

Plasma – a “nested” blockchain with its own separate security from the main chain that performs basic transactions and relies on fraud proof upon validity challenge. e.g. OMG Plasma Project.

Sidechain – a relatively more independent off-chain setup with its own security, consensus mechanism and block structure. e.g. Polygon is a popular layer-2 blockchain of this category.

For more details, visit Ethereum’s developer site re: off-chain scaling.

Other layer-1 blockchains

Meanwhile, a number of layer-1 blockchains such as Avalanche, Solana, Algorand, have been steadily growing their market share (in terms of TVL). Many of these blockchains were built using leading-edge tech stacks like Rust, Go, Haskell, with improved architectures and more efficient, scalable eco-friendly consensus mechanisms. By offering the dApp development community cutting-edge technology and versatile tools/SDKs, these blockchain operators strategically grow their market share in the highly competitive space.

While “Ethereum killers” sounds like a baiting term, it’s indeed possible for one or more of these blockchains to dethrone Ethereum before its v2.0 has a chance to succeed the underperforming v1.0 in mid/late 2022. With things evolving at a cut-throat pace in the blockchain world and Ethereum’s upgrade taking considerable time, wouldn’t it be logical to expect that one of the newer blockchains with improved design would swiftly take over as the leader? Evidently, Ethereum’s huge lead in market share (again, in terms of TVL) and early adopters buy-in has helped secure its leader position. Perhaps more importantly is the inherent limitations imposed on any given blockchain that improvement can’t be made simultaneously in all aspects.

Trilemma vs CAP theorem

Somewhat analogous to the CAP theorem (consistency/availability/partition tolerance) for distributed systems, the blockchain trilemma claims that security, scalability and decentralization cannot be simultaneously maximized without certain trade-off among them. For distributed systems, a decent degree of partition tolerance is a must. Thus they generally trade consistency for availability (e.g. Cassandra) or vice versa (e.g. HBase).

On the other hand, a blockchain by design should be highly decentralized. But it also must be highly secured, or else immutability of the stored transactions can’t be guaranteed. For a given blockchain, there is much less room for trade-off given that security and decentralization are integrally critical, in a way leaving scalability the de facto sacrificial lamb.

This is where the trilemma differs from CAP. Under the well formulated CAP theorem, suitable trade-off among the key requirements can result in a practical distributed system. For instance, Cassandra defers consistency for better availability and remains a widely adopted distributed storage system.

Given the axiomatic importance of security and decentralization in a blockchain, developers essentially have to forgo the trade-off approach and think outside the box on boosting scalability. Sharding helps address the issue to some extent. Some contemporary blockchains (e.g. Avalanche) boost up scalability and operational efficiency by splitting governance, exchange and smart contract development into separate inter-related chains each with optimal consensus algorithm. Then there are always the various intermediary off-chain (layer-2) solutions as described earlier.

What about layer-0?

While a layer-1 blockchain allows developers to create and run platform-specific dApps/smart contracts, a layer-0 blockchain network is one on which blockchain operators build independent blockchains with full sovereignty. To build a custom blockchain, developers may elect to repurpose from an existing open-source layer-1 blockchain which consists of specific features of interest. When high autonomy and custom features (e.g. custom security mechanism) are required, it might make more sense to build it off a layer-0 network which provides an inter-network “backbone” with an underlying consensus algorithm, standard communication protocols among blockchains and SDKs with comprehensive features.

One of the popular layer-0 networks is Cosmos which some high-profile blockchains like Binance Chain were built on top of. Another layer-0 network is Polkadot that offers a “centralized” security model, contrary to Cosmos’ leaving the responsibility of security setup to individual blockchain operators. For core feature comparisons between the two networks, here’s a nice blog post.

Blockchain oracles

Then there are blockchain projects whose main role is to connect a blockchain with data sources from the outside world. A blockchain can be viewed as an autonomous ecosystem operating in an isolated environment with its own governance model. Such isolation is by-design, disallowing on-chain smart contracts to arbitrarily reference external data like currency rates, IoT sensor data, that can be prone to fraud.

An oracle provide a “gateway” for the smart contracts on a blockchain to connect to off-chain data sources in the real world. To ensure trustlessness, decentralized oracles emerge to follow the very principle embraced by general-purpose blockchains. Examples of such oracles are Chainlink and Band Protocol.

NFT – provenance of authenticity

NFT, short for non-fungible token, has taken the world by storm. It’s riding on the trend of blockchain in its early stage when a majority of the general public are still wondering what to make of the phenomenon. At present, the most popular use case of NFT is probably in provenance of authenticity. By programmatically binding a given asset (e.g. a painting) to a unique digital token consisting of pointers to unchangeable associated transactions (e.g. transfers of ownership) on a blockchain, the NFT essentially represents the digital receipt of the asset.

A majority of the NFTs seen today are associated with digital assets such as digital arts. A similar provenance mechanism can also be extended to cover physical assets like collectible arts by having the corresponding NFTs referencing immutable “digital specs” of the physical items. The “digital specs” of a given physical item could be a combination of the unique ID of the engraved tag, detailed imagery, etc.

Given the prominence of Ethereum, most NFTs follow its standards such as ERC-721 standard which requires a “smart contract” that programmatically describes the key attributes of the token and implements how the token can be transferred among accounts. Once a smart contract is deployed on a blockchain, it cannot be changed. Nor can any results from executing the contract that are stored in validated blocks.

Outside of the Ethereum arena, some blockchain projects come up with their own native NFT standards. For example, Algorand’s NFT is treated as one of it’s own built-in asset types and can be created by parametrically specifying certain attributes in the metadata of the digital asset without the need of an explicit smart contract. There are also blockchains like Avalanche providing both their native NFTs as well as Ethereum-compatible ones.

Final Thoughts

Over the years, the various emerged blockchains have interwoven into an inter-network that is increasingly more structured and functionally rich. The blockchain-powered inter-network is being touted by many early players as the next generation of the internet and is sometimes referred to as Web3. Layer-0 blockchain projects aim exactly to be an inter-network of independent blockchains. Among other blockchain projects, some aim to create a blockchain-powered internet extension (e.g. Internet Computer).

From a technological point of view, there are definitely merits of using blockchain’s underlying technology. In particular, the trustless decentralized ledger on a blockchain that keeps an immutable log of transactions can be used in many business use cases in various industry sectors like asset management. Some real-world applications using permissioned blockchains have demonstrated success in, for instance, improving traceability and transparency in the supply chain industry. The permission and proof of identity requirement does make fraud attempts significantly harder. Nonetheless, that inevitably breaks trustlessness, although such requirement is generally acceptable in operating a private alliance network.

Viewing from a different angle, cryptocurrencies except for stablecoins like Tether are volatile and will likely be so in the foreseeable future. Given that most prominent public blockchains are operated in conjunction with a corresponding cryptocurrency, when evaluating for a blockchain platform to land on, it warrants a good look at its cryptocurrency counterpart. Even though volatility may not be as critical for the purpose of development as for investment, it does make assessment of a given blockchain’s long-term prospect non-trivial.

Another obstacle to mainstream adoption of public blockchains is that many blockchain projects have emerged with superficial or even downright fraudulent plans for opportunistic reward in the prospering space. And the NFT frenzy permeated with scams also hasn’t helped gaining public trust either. In addition, people are overwhelmed by the numerous blockchain projects out there. As of this post, there are thousands of public blockchain projects, over 80 of which each with its corresponding cryptocurrency market cap above $1 billion. It’s likely only a small percentage of them will prevail when the dust settles.

All those issues are in some way hindering blockchain technology from becoming a lasting mainstream computing class. On the other hand, for those who are adventurous and determined to make the most out of the yet-to-be-mature but improving computing technology, it may be the right time now to dive in.

ETL & Pipelining With Alpakka Kafka

With the increasing demand for big data transformations on distributed platforms, one approach is to put in place a streaming ETL system with built-in packpressure using Alpakka Kafka API that allows composable pipelines to be constructed on-demand. This blog post extends the previous post which shows a skeletal version of the system.

Let’s first review the diagram shown in the previous post of what we’re aiming to build — a streaming ETL system empowered by reactive streams and Apache Kafka’s publish-subscribe machinery for durable stream data to be produced or consumed by various data processing/storage systems:

Alpakka Kafka - Streaming ETL

In this blog post, we’re going to:

  1. enhance the data warehouse consumer to programmatically track the commit offset positions,
  2. plug into an existing consumer a data processing pipeline as a stream processing operator, and,
  3. add to the streaming ETL system a mix of heterogeneous producers and consumers

Action item #1 would address the requirement of at-least-once delivery in stream consumption. #2 illustrates how to add to the streaming ETL system a custom data pipeline as a composable stream flow, and #3 showcases how data in various storage systems can participate in the real-time stream to operate (serially or in parallel) as composable sources, flows and sinks. All relevant source code is available in this GitHub repo.

Real-time streaming ETL/pipelining of property listing data

For usage demonstration, the application runs ETL/pipelining of data with a minified real estate property listing data model. It should be noted that expanding or even changing it altogether to a different data model should not affect how the core streaming ETL system operates.

Below are a couple of links related to library dependencies and configurations for the core application:

  • Library dependencies in build.sbt [separate tab]
  • Configurations for Akka, Kafka, PostgreSQL & Cassandra in application.conf [separate tab]

It’s also worth noting that the application can be scaled up with just configurative changes. For example, if the Kafka brokers and Cassandra database span across multiple hosts, relevant configurations like Kafka’s bootstrap.servers could be "10.1.0.1:9092,10.1.0.2:9092,10.1.0.3:9092" and contact-points for Cassandra might look like ["10.2.0.1:9042","10.2.0.2:9042"].

Next, let’s get ourselves familiarized with the property listing data definitions in the PostgreSQL and Cassandra, as well as the property listing classes that model the schemas.

A Kafka producer using Alpakka Csv

Alpakka comes with a simple API for CSV file parsing with method lineScanner() that takes parameters including the delimiter character and returns a Flow[ByteString, List[ByteString], NotUsed].

Below is the relevant code in CsvPlain.scala that highlights how the CSV file gets parsed and materialized into a stream of Map[String,String] via CsvParsing and CsvToMap, followed by transforming into a stream of PropertyListing objects.

    val source: Source[PropertyListing, NotUsed] =
      FileIO.fromPath(Paths.get(csvFilePath))
        .via(CsvParsing.lineScanner(CsvParsing.Tab))
        .viaMat(CsvToMap.toMapAsStrings())(Keep.right)
        .drop(offset).take(limit)
        .map(toClassPropertyListing(_))

    // ...

    source
      .map{ property =>
        val prodRec = new ProducerRecord[String, String](
          topic, property.propertyId.toString, property.toJson.compactPrint
        )
        println(s"[CSV] >>> Producer msg: $prodRec")
        prodRec
      }
      .runWith(Producer.plainSink(producerSettings))

Note that the drop(offset)/take(limit) code line, which can be useful for testing, is for taking a segmented range of the stream source and can be removed if preferred.

A subsequent map wraps each of the PropertyListing objects in a ProducerRecord[K,V] with the associated topic and key/value of type String/JSON before being streamed into Kafka via Alpakka Kafka’s Producer.plainSink().

A Kafka producer using Alpakka Slick

The PostgresPlain producer, which is pretty much identical to the one described in the previous blog post, creates a Kafka producer using Alpakka Slick which allows SQL queries into a PostgreSQL database to be coded in Slick’s functional programming style.

The partial code below shows how method Slick.source() takes a streaming query and returns a stream source of PropertyListing objects.

    val source: Source[PropertyListing, NotUsed] =
      Slick
        .source(TableQuery[PropertyListings].sortBy(_.propertyId).drop(offset).take(limit).result)

    // ...

    source
      .map{ property =>
        val prodRec = new ProducerRecord[String, String](
          topic, property.propertyId.toString, property.toJson.compactPrint
        )
        println(s"[POSTRES] >>> Producer msg: $prodRec")
        prodRec
      }
      .runWith(Producer.plainSink(producerSettings))

The high-level code logic in PostgresPlain is similar to that of the CsvPlain producer.

A Kafka consumer using Alpakka Cassandra

We created a Kafka consumer in the previous blog post using Alpakka Kafka’s Consumer.plainSource[K,V] for consuming data from a given Kafka topic into a Cassandra database.

The following partial code from the slightly refactored version of the consumer, CassandraPlain shows how data associated with a given Kafka topic can be consumed via Alpakka Kafka’s Consumer.plainSource().

  def runPropertyListing(consumerGroup: String,
                         topic: String)(implicit
                                        cassandraSession: CassandraSession,
                                        jsonFormat: JsonFormat[PropertyListing],
                                        system: ActorSystem,
                                        ec: ExecutionContext): Future[Done] = {

    val consumerConfig = system.settings.config.getConfig("akkafka.consumer.with-brokers")
    val consumerSettings =
      ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
        .withGroupId(consumerGroup)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val table = "propertydata.property_listing"
    val partitions = 10 // number of partitions

    val statementBinder: (ConsumerRecord[String, String], PreparedStatement) => BoundStatement = {
      case (msg, preparedStatement) =>
        val p = msg.value().parseJson.convertTo[PropertyListing]
        preparedStatement.bind(
          (p.propertyId % partitions).toString, Int.box(p.propertyId), p.dataSource.getOrElse("unknown"),
          Double.box(p.bathrooms.getOrElse(0)), Int.box(p.bedrooms.getOrElse(0)), Double.box(p.listPrice.getOrElse(0)), Int.box(p.livingArea.getOrElse(0)),
          p.propertyType.getOrElse(""), p.yearBuilt.getOrElse(""), p.lastUpdated.getOrElse(""), p.streetAddress.getOrElse(""), p.city.getOrElse(""), p.state.getOrElse(""), p.zip.getOrElse(""), p.country.getOrElse("")
        )
    }
    val cassandraFlow: Flow[ConsumerRecord[String, String], ConsumerRecord[String, String], NotUsed] =
      CassandraFlow.create(
        CassandraWriteSettings.defaults,
        s"""INSERT INTO $table (partition_key, property_id, data_source, bathrooms, bedrooms, list_price, living_area, property_type, year_built, last_updated, street_address, city, state, zip, country)
           |VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""".stripMargin,
        statementBinder
      )

    val control: DrainingControl[Done] =
      Consumer
        .plainSource(consumerSettings, Subscriptions.topics(topic))
        .via(cassandraFlow)
        .toMat(Sink.ignore)(DrainingControl.apply)
        .run()

    Thread.sleep(2000)
    control.drainAndShutdown()
  }

Alpakka’s CassandraFlow.create() is the stream processing operator responsible for funneling data into the Cassandra database. Note that it takes a CQL PreparedStatement along with a “statement binder” that binds the incoming class variables to the corresponding Cassandra table columns before executing the CQL.

Enhancing the Kafka consumer for ‘at-least-once’ consumption

To enable at-least-once consumption by Cassandra, instead of Consumer.plainSource[K,V], we construct the stream graph via Alpakka Kafka Consumer.committableSource[K,V] which offers programmatic tracking of the commit offset positions. By keeping the commit offsets as an integral part of the streaming data, failed streams could be re-run from the offset positions.

The main stream composition code of the enhanced consumer, CassandraCommittable.scala, is shown below.

  def runPropertyListing(consumerGroup: String,
                         topic: String)(implicit
                                        cassandraSession: CassandraSession,
                                        jsonFormat: JsonFormat[PropertyListing],
                                        system: ActorSystem,
                                        ec: ExecutionContext): Future[Done] = {

    val consumerConfig = system.settings.config.getConfig("akkafka.consumer.with-brokers")
    val consumerSettings =
      ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
        .withGroupId(consumerGroup)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val committerConfig = system.settings.config.getConfig("akka.kafka.committer")
    val committerSettings = CommitterSettings(committerConfig)

    val table = "propertydata.property_listing"
    val partitions = 10 // number of partitions

    val statementBinder: (CommittableMessage[String, String], PreparedStatement) => BoundStatement = {
      case (msg, preparedStatement) =>
        val p = msg.record.value().parseJson.convertTo[PropertyListing]
        preparedStatement.bind(
          (p.propertyId % partitions).toString, Int.box(p.propertyId), p.dataSource.getOrElse("unknown"),
          Double.box(p.bathrooms.getOrElse(0)), Int.box(p.bedrooms.getOrElse(0)), Double.box(p.listPrice.getOrElse(0)), Int.box(p.livingArea.getOrElse(0)),
          p.propertyType.getOrElse(""), p.yearBuilt.getOrElse(""), p.lastUpdated.getOrElse(""), p.streetAddress.getOrElse(""), p.city.getOrElse(""), p.state.getOrElse(""), p.zip.getOrElse(""), p.country.getOrElse("")
        )
    }
    val cassandraFlow: Flow[CommittableMessage[String, String], CommittableMessage[String, String], NotUsed] =
      CassandraFlow.create(
        CassandraWriteSettings.defaults,
        s"""INSERT INTO $table (partition_key, property_id, data_source, bathrooms, bedrooms, list_price, living_area, property_type, year_built, last_updated, street_address, city, state, zip, country)
           |VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""".stripMargin,
        statementBinder
      )

    val control =
      Consumer
        .committableSource(consumerSettings, Subscriptions.topics(topic))
        .via(cassandraFlow)
        .map(_.committableOffset)
        .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
        .run()

    Thread.sleep(2000)
    control.drainAndShutdown()
  }

A couple of notes:

  1. In order to be able to programmatically keep track of the commit offset positions, each of the stream elements emitted from Consumer.committableSource[K,V] is wrapped in a CommittableMessage[K,V] object, consisting of the CommittableOffset value in addition to the Kafka ConsumerRecord[K,V].
  2. Committing the offset should be done after the stream data is processed for at-least-once consumption, whereas committing prior to processing the stream data would only achieve at-most-once delivery.

Adding a property-rating pipeline to the Alpakka Kafka consumer

Next, we add a data processing pipeline to the consumer to perform a number of ratings of the individual property listings in the stream before delivering the rated property listing data to the Cassandra database, as illustrated in the following diagram.

Alpakka Kafka - Streaming ETL w/ custom pipelines

Since the CassandraFlow.create() stream operator will be executed after the rating pipeline, the corresponding “statement binder” necessary for class-variable/table-column binding will now need to encapsulate also PropertyRating along with CommittableMessage[K,V], as shown in the partial code of CassandraCommittableWithRatings.scala below.

  def runPropertyListing(consumerGroup: String,
                         topic: String)(implicit
                                         cassandraSession: CassandraSession,
                                         jsonFormat: JsonFormat[PropertyListing],
                                         system: ActorSystem,
                                         ec: ExecutionContext): Future[Done] = {

    val consumerConfig = system.settings.config.getConfig("akkafka.consumer.with-brokers")
    val consumerSettings =
      ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
        .withGroupId(consumerGroup)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val committerConfig = system.settings.config.getConfig("akka.kafka.committer")
    val committerSettings = CommitterSettings(committerConfig)

    val table = "propertydata.rated_property_listing"
    val partitions = 10 // number of partitions

    val statementBinder: ((PropertyRating, CommittableMessage[String, String]), PreparedStatement) => BoundStatement = {
      case ((rating, msg), preparedStatement) =>
        val p = msg.record.value().parseJson.convertTo[PropertyListing]
        preparedStatement.bind(
          (p.propertyId % partitions).toString, Int.box(p.propertyId), p.dataSource.getOrElse("unknown"),
          Double.box(p.bathrooms.getOrElse(0)), Int.box(p.bedrooms.getOrElse(0)), Double.box(p.listPrice.getOrElse(0)), Int.box(p.livingArea.getOrElse(0)),
          p.propertyType.getOrElse(""), p.yearBuilt.getOrElse(""), p.lastUpdated.getOrElse(""), p.streetAddress.getOrElse(""), p.city.getOrElse(""), p.state.getOrElse(""), p.zip.getOrElse(""), p.country.getOrElse(""),
          rating.affordability.getOrElse(0), rating.comfort.getOrElse(0), rating.neighborhood.getOrElse(0), rating.schools.getOrElse(0)
        )
    }
    val cassandraFlow: Flow[(PropertyRating, CommittableMessage[String, String]), (PropertyRating, CommittableMessage[String, String]), NotUsed] =
      CassandraFlow.create(
        CassandraWriteSettings.defaults,
        s"""INSERT INTO $table (partition_key, property_id, data_source, bathrooms, bedrooms, list_price, living_area, property_type, year_built, last_updated, street_address, city, state, zip, country, rating_affordability, rating_comfort, rating_neighborhood, rating_schools)
           |VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""".stripMargin,
        statementBinder
      )

    val control =
      Consumer
        .committableSource(consumerSettings, Subscriptions.topics(topic))
        .via(PropertyRating.compute())
        .via(cassandraFlow)
        .map { case (_, msg) => msg.committableOffset }
        .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
        .run()

    Thread.sleep(2000)
    control.drainAndShutdown()
  }

For demonstration purpose, we create a dummy pipeline for rating of individual real estate properties in areas such as affordability, neighborhood, each returning just a Future of random integers between 1 and 5 after a random time delay. The rating related fields along with the computation logic are wrapped in class PropertyRating as shown below.

case class PropertyRating(
    propertyId: Int,
    affordability: Option[Int],
    comfort: Option[Int],
    neighborhood: Option[Int],
    schools: Option[Int]
  )

object PropertyRating {
  def rand = java.util.concurrent.ThreadLocalRandom.current

  def biasedRandNum(l: Int, u: Int, biasedNums: Set[Int], biasedFactor: Int = 1): Int = {
    Vector
      .iterate(rand.nextInt(l, u+1), biasedFactor)(_ => rand.nextInt(l, u+1))
      .dropWhile(!biasedNums.contains(_))
      .headOption match {
        case Some(n) => n
        case None => rand.nextInt(l, u+1)
      }
  }

  def fakeRating()(implicit ec: ExecutionContext): Future[Int] = Future{  // Fake rating computation
    Thread.sleep(biasedRandNum(1, 9, Set(3, 4, 5)))  // Sleep 1-9 secs
    biasedRandNum(1, 5, Set(2, 3, 4))  // Rating 1-5; mostly 2-4
  }

  def compute()(implicit ec: ExecutionContext): Flow[CommittableMessage[String, String], (PropertyRating, CommittableMessage[String, String]), NotUsed] =
    Flow[CommittableMessage[String, String]].mapAsync(1){ msg =>
      val propertyId = msg.record.key().toInt  // let it crash in case of bad PK data
      ( for {
            affordability <- PropertyRating.fakeRating()
            comfort <- PropertyRating.fakeRating()
            neighborhood <- PropertyRating.fakeRating()
            schools <- PropertyRating.fakeRating()
          }
          yield new PropertyRating(propertyId, Option(affordability), Option(comfort), Option(neighborhood), Option(schools))
        )
        .map(rating => (rating, msg)).recover{ case e => throw new Exception("ERROR in computeRatingFlow()!") }
    }
}

A Kafka consumer with a custom flow & stream destination

The application is also bundled with a consumer with the property rating pipeline followed by a custom flow to showcase how one can compose an arbitrary side-effecting operator with custom stream destination.

  def customBusinessLogic(key: String, value: String, rating: PropertyRating)(
    implicit ec: ExecutionContext): Future[Done] = Future {

    println(s"KEY: $key  VALUE: $value  RATING: $rating")
    // Perform custom business logic with key/value
    // and save to an external storage, etc.
    Done
  }

  def runPropertyListing(consumerGroup: String,
                         topic: String)(implicit
                                         jsonFormat: JsonFormat[PropertyListing],
                                         system: ActorSystem,
                                         ec: ExecutionContext): Future[Done] = {

    val consumerConfig = system.settings.config.getConfig("akkafka.consumer.with-brokers")
    val consumerSettings =
      ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
        .withGroupId(consumerGroup)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val committerConfig = system.settings.config.getConfig("akka.kafka.committer")
    val committerSettings = CommitterSettings(committerConfig)

    val control =
      Consumer
        .committableSource(consumerSettings, Subscriptions.topics(topic))
        .via(PropertyRating.compute())
        .mapAsync(1) { case (rating, msg) =>
          customBusinessLogic(msg.record.key, msg.record.value, rating)
            .map(_ => msg.committableOffset)
        }
        .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
        .run()

    Thread.sleep(5000)
    control.drainAndShutdown()
  }

Note that mapAsync is used to allow the stream transformation by the custom business logic to be carried out asynchronously.

Running the streaming ETL/pipelining system

To run the application that comes with sample real estate property listing data on a computer, go to the GitHub repo and follow the README instructions to launch the producers and consumers on one or more command-line terminals.

Also included in the README are instructions about how to run a couple of set queries to verify data that get ETL-ed to the Cassandra tables via Alpakka Cassandra’s CassandraSource which takes a CQL query as its argument.

Further enhancements

Depending on specific business requirement, the streaming ETL system can be further enhanced in a number of areas.

  1. This streaming ETL system offers at-least-once delivery only in stream consumptions. If an end-to-end version is necessary, one could enhance the producers by using Producer.committabbleSink() or Producer.flexiFlow() instead of Producer.plainSink().
  2. For exactly-once delivery, which is a generally much more stringent requirement, one approach to achieve that would be to atomically persist the in-flight data with the corresponding commit offset positions using a reliable storage system.
  3. In case tracking of Kafka’s topic partition assignment is required, one can use Consumer.committablePartitionedSource[K,V] instead of Consumer.committableSource[K,V]. More details can be found in the tech doc.
  4. To gracefully restart a stream on failure with a configurable backoff, Akka Stream provides method RestartSource.onFailuresWithBackoff for that as illustrated in an example in this tech doc.