Syndicate’s mirrors allow you to send data from blockchains or HTTP resources to target blockchains. This becomes useful when creating cross-chain applications that could benefit from data that lives outside of a single blockchain.

Dependencies

Our data mirror service is a simple HTTP server that listens to events on desired blockchains or polls HTTP resources and allows you to broadcast transactions to target blockchains using Syndicate’s API.

Before getting started, ensure you have created an account with Syndicate. Additionally make sure you have bun installed on your system.

Next, clone the mirror repository.

Install dependencies

bun i

Run the server

bun dev

Upon starting the server two listeners will be initialized, a ChainListener and an PollListener.

These two listeners implement the logic necessary for retreiving data from their sources

  • ChainListener: queries events emitted on a blockchain
  • PollListener: fetches from async resources

Listeners

ChainListener

On start, chain listeners will immediately query all events emitted from the fromBlock to the most recent block and will continue listening to all future events. An example is shown below for listening to all purchases of a CryptoPunk and minting an NFT for the purchaser on Base.

The following parameters can be passed to define a ChainListener:

  • rpcUrl: the RPC URL of the target chain you are listening to events on
  • fromBlock: the block from which you want to start listening
  • event: the ABI of the event emitted from contractAddress
  • contractAddress: the source contract address that emits an event
  • onData: callback that receives all data emitted in the event
  • pollingInterval: (optional) value in seconds to poll for event
  • id: (optional) identifier for the listener
import { SyndicateClient } from "@syndicateio/syndicate-node"
import { waitForHash } from "@syndicateio/syndicate-node/utils/waitForHash"
import { v5 } from "uuid"
import { parseAbiItem } from "viem"
import { ChainListener } from "../chain"

const ETH_MAINNET_RPC_URL = "https://eth.drpc.org"
const PUNK_LISTEN_FROM_BLOCK = BigInt(19963995)
const PUNK_ADDRESS = "0xb47e3cd837dDF8e4c57F05d70Ab865de6e193BBB"
const PUNK_BOUGHT_EVENT = parseAbiItem(
	"event PunkBought(uint indexed punkIndex, uint value, address indexed fromAddress, address indexed toAddress)",
)

const syndicate = new SyndicateClient({ token: "SYNDICATE_API_KEY" })
const projectId = "<your-project-ID>"
const chainId = 5432 // replace with target chain ID

export const chainListener = new ChainListener({
	id: "punk-mirror",
	rpcUrl: ETH_MAINNET_RPC_URL,
	fromBlock: PUNK_LISTEN_FROM_BLOCK,
	event: PUNK_BOUGHT_EVENT,
	contractAddress: PUNK_ADDRESS,
	onData: async ({ logs }) => {
		const promises = logs.map(
			async ({
				args: { punkIndex, fromAddress, toAddress },
				transactionHash,
			}) => {
				console.debug(`[punk-mirror]: punk ${punkIndex} bought`)

				if (!toAddress) {
					console.error(`toAddress is not defined for punkIndex: ${punkIndex}`)
					return
				}

				// We generate a unique uuid to prevent duplicate transactions
				const requestId = v5(
					`${transactionHash}-${punkIndex}-${fromAddress}-${toAddress}`,
					v5.URL,
				)

				// Check if the request has already been processed
				const req = await syndicate.wallet
					.getTransactionRequest(projectId, requestId)
					.catch((e) => {
						console.debug(`[punk-mirror] beginning reflection ${requestId}`)
					})

				if (req) {
					const attempt = req.transactionAttempts?.find(
						(a) => a.transactionId === requestId,
					)
					console.debug(
						`[punk-mirror] tx ${requestId} already reflected: (${attempt?.status}) ${attempt?.hash}`,
					)
					return
				}

				// Mint an NFT on base to the purchaser of a cryptopunk
				const { transactionId } = await syndicate.transact
					.sendTransaction({
						requestId,
						chainId,
						projectId,
						contractAddress: "<your-contract-address>",
						functionSignature: "<your-function-signature>",
						args: {
							to: toAddress,
						},
					})
					.catch((_) => {
						throw new Error(
							`request ID: ${requestId} has already been processed`,
						)
					})
				const hash = waitForHash(syndicate, { transactionId, projectId })
				console.log(`got hash: ${hash} for transaction ID: ${transactionId}`)
			},
		)

		const settled = await Promise.allSettled(promises)
		const rejected = settled.filter((p) => p.status === "rejected")
		if (rejected.length > 0) {
			console.error(`rejected: ${rejected.length}`)
			for (const promise of rejected) {
				console.error(promise)
			}
		}
	},
})

chainListener.init()

PollListener

An PollListener allows you to query multiple async resources and broadcast data to a target blockchain.

The following parameters can be passed to define a PollListener

  • dataFetchers: array of functions that return promises fetching data
  • onData: callback that receives data from the fetchers
  • pollingInterval: (optional) value in seconds to call data fetchers
  • id: (optional) identifier for the listener
import { SyndicateClient } from "@syndicateio/syndicate-node"
import { waitForHash } from "@syndicateio/syndicate-node/utils/waitForHash"
import { PollListener } from "../poll"

const syndicate = new SyndicateClient({ token: "SYNDICATE_API_KEY" })
const projectId = "<your-project-ID>"
const chainId = 5432 // replace with target chain ID

async function getPrice() {
	const res = await fetch(
		"https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd",
	)
	if (!res.ok) {
		throw new Error("Failed to fetch price")
	}
	const data = (await res.json()) as { ethereum: { usd: number } }
	return data.ethereum.usd
}

export const pollListener = new PollListener<number>({
	id: "price-mirror",
	pollingInterval: 90,
	dataFetchers: [() => getPrice()],
	onData: async ({ fulfilled, rejected }) => {
		const price = fulfilled[0]
		if (!price) {
			console.debug("[price-mirror]: no price data")
			return
		}

		console.debug(`[price-mirror]: got eth price: ${price}`)

		const { transactionId } = await syndicate.transact
			.sendTransaction({
				chainId,
				projectId,
				contractAddress: "<your-contract-address>",
				functionSignature: "updatePrice(uint256 price)",
				args: { price },
			})
			.catch((_) => {
				throw new Error("Could not get price")
			})
		const hash = await waitForHash(syndicate, {
			transactionId,
			projectId,
		})
		console.log(`got hash: ${hash} for transaction ID: ${transactionId}`)

		if (rejected.length > 0) {
			console.error(`rejected: ${rejected.length}`)
			for (const err of rejected) {
				console.error(err)
			}
		}
	},
})

pollListener.init()

Examples

Farcaster FID Mapping on HAM

Syndicate currently mirrors address verfications for all Farcaster users utilizing data provided by Indexing Co.

You can access the data onchain on HAM at 0x709aaa2232A62992a3CD7C777F85DA972B5627BE (explorer).

Function Signature
getVerifiedAddressByFID(uint256)get an address given an FID
getFIDByVerifiedAddress(address)get an FID given an address
getBulkVerifiedAddresses(uint256[] fids)get many addresses given many FIDs
getBulkFIDsFromVerifiedAddresses(address[] addresses)get many FIDs given many addresses
import type { QueryRowsResponse } from "@google-cloud/bigquery"
import { waitForHash } from "@syndicateio/syndicate-node/utils/waitForHash"
import { SyndicateClient } from "@syndicateio/syndicate-node"
import { PollListener } from "~/listeners/poll"
import bq, { DeltaOperation } from "./bq"
import { onData } from "./onData"
import config from "~/config"

const syndicate = new Syndicate("SYNDICATE_API_KEY")
const projectId = "<project-ID>"
const functionSignature =
	"bulkOverwriteAndRemove((uint256 fid, address owner)[] fidsToOverwrite, uint256[] fidsToRemove)"

const hamListener = new PollListener<QueryRowsResponse>({
	pollIntervalSeconds: 25,
	dataToAwait: [() => bq.read()],
	onData: ({ fulfilled, rejected }) => {
		if (rejected.length > 0) {
			console.error("[ham] rejected promises", rejected)
		}

		const rows = fulfilled?.[0]?.[0]
		if (!rows) {
			console.warn("[ham] no rows found, returning")
			return
		}

		const additions = ethRows.filter(
			(row) => row.operation === DeltaOperation.Added,
		)
		const deletions = ethRows.filter(
			(row) => row.operation === DeltaOperation.Deleted,
		)

		try {
			const { transactionId } = await syndicate
				.sendTransaction({
					functionSignature,
					projectId,
					contractAddress: config.ham.targetContractAddress,
					chainId: config.ham.chainId,
					args: {
						fidsToOverwrite: additions.map((row) => ({
							fid: row.fid,
							owner: getAddress(row.address),
						})),
						fidsToRemove: deletions.map((row) => row.fid),
					},
				})
				.catch((e) => {
					console.error("[ham] got error sending transaction", e)
					throw e
				})

			console.log(
				`[ham] mirrored ${additions.length} additions and ${deletions.length} deletions to ham with transaction ID ${transactionId}`,
			)

			const hash = waitForHash(syndicate, { transactionId, projectId }).catch((e) => {
					console.error(
						`[ham] got error waiting for hash for transaction ID: ${transactionId}`,
						e,
					)
					throw e
				})

			console.log(`[ham] transaction ${transactionId} mined with hash ${hash}`)

			await bq.write(
				transactionId,
				functionSignature,
				ethRows.map((row) => ({
					fid: row.fid,
					address: row.address,
				})),
			)
		} catch (e) {
			const firstRow = rows[0]
			const lastRow = rows[rows.length - 1]
			console.error(
				`[ham] got error for batch (${firstRow.fid}, ${firstRow.address})-(${lastRow.fid}-${lastRow.address})`,
				e,
			)
		}
	},
})

hamListener.init()

Hosting

Our data mirror service is open source and should be self-hosted. It does not use external dependencies such as a database or memory-cache and it can be deployed to any cloud provider such as Render, fly.io, Heroku. When hosting you will want to ensure the instance is always on so the service is able to continually poll resources. A Dockerfile can be found in the repository here if you are running your service in a container.