Stop writing fragile cron scripts. Learn to build stateful, incremental data streams from any REST API using Python and a pull-based CDC model.Stop writing fragile cron scripts. Learn to build stateful, incremental data streams from any REST API using Python and a pull-based CDC model.

Stateful API-to-Database Synchronization: Implementing Incremental Data Ingestion from REST APIs wit

In distributed systems architecture, the synchronization gap between external HTTP APIs and relational database targets represents a persistent engineering challenge—particularly when API responses lack native transactional semantics or expose non-normalized, deeply nested JSON payloads (comment threads, organizational hierarchies, multi-level BOM structures). CocoIndex's Custom Source abstraction provides a declarative framework for converting arbitrary HTTP/gRPC endpoints into stateful, incrementally-synced data streams with built-in checkpointing, content fingerprinting (SHA256-based deduplication), and differential update propagation. Unlike imperative polling scripts (cron jobs invoking curl + jq + psql INSERT), Custom Sources implement a **pull-based CDC (Change Data Capture) model** via ordinal watermarking—analogous to Kafka Connect source connectors, but optimized for HTTP APIs without native offset management. The framework maintains persistent state (last-seen timestamp vectors, content hashes) in a local metadata store (SQLite by default, configurable to Postgres/Redis for multi-process deployments), enabling exactly-once delivery semantics and automatic retry logic with exponential backoff. This tutorial demonstrates building a production-ready Custom Source targeting the HackerNews Algolia Search API (hn.algolia.com/api/v1), implementing:

• Stateless API→Stateful Stream transformation: Converting REST GET requests into append-log semantics

• Recursive tree traversal with cycle detection: Handling unbounded-depth comment DAGs (Directed Acyclic Graphs) without stack overflow

• Schema evolution support: Python dataclass-based typing with forward compatibility via structural subtyping

• Postgres sink connector integration: Leveraging psycopg3 async connection pooling + COPY protocol for bulk inserts

• GIN-indexed full-text search configuration: Materializing tsvector columns with English dictionary stemming + ranking functions

\ In this example, we build a custom connector for HackerNews. It fetches recent stories + nested comments, indexes them, and exposes a simple search interface powered by Postgres full-text search.

\

Why Use a Custom Source?

In many scenarios, pipelines don't just read from clean tables. They depend on:

  • Internal REST services
  • Partner APIs
  • Legacy systems
  • Non-standard data models that don’t fit traditional connectors

CocoIndex’s Custom Source API makes these integrations declarative, incremental, and safe by default. Instead of writing ad-hoc scripts, you wrap your API as a “source component,” and CocoIndex takes it from there.

\

Project Walkthrough — Building a HackerNews Index

Goals

  1. Call HackerNews Search API
  2. Fetch nested comments
  3. Update only modified threads
  4. Store content in Postgres
  5. Expose a text search interface

CocoIndex handles change detection, idempotency, lineage, and state sync automatically.

Overview

\ The pipeline consists of three major parts:

  1. Define a custom source (HackerNewsConnector)
  • Calls HackerNews API
  • Emits rows for changed/updated threads
  • Pulls full thread + comment tree
  1. Build an index with CocoIndex Flow
  • Collect thread content
  • Collect all comments recursively
  • Export to a Postgres table (hn_messages)
  1. Add a lightweight query handler
  • Uses PostgreSQL full-text search
  • Returns ranked matches for a keyword query

Each cocoindex update only processes changed HN threads and keeps everything in sync.

The project is open source and available on GitHub.

\

Prerequisites

  • Install Postgres if you don't have one.

    \

Defining the Data Model

Every custom source defines two lightweight data types:

  • Key Type → uniquely identifies an item
  • Value Type → the full content for that item

In Hacker News, each news is a thread, and each thread can have multiple comments.  HackerNews Thread and Comments

\ For HackerNews, let’s define keys like this:

class _HackerNewsThreadKey(NamedTuple): """Row key type for HackerNews source.""" thread_id: str

\ Keys must be:

  • hashable
  • serializable
  • stable (doesn’t change over time)

Values hold the actual dataset:

@dataclasses.dataclass class _HackerNewsComment: id: str author: str | None text: str | None created_at: datetime | None @dataclasses.dataclass class _HackerNewsThread: """Value type for HackerNews source.""" author: str | None text: str url: str | None created_at: datetime | None comments: list[_HackerNewsComment]

\ This tells CocoIndex exactly what every HackerNews “item” looks like when fully fetched. _HackerNewsThread holds a post and all its comments, while _HackerNewsComment represents individual comments.

\

Building a Custom Source Connector

A Custom Source has two parts:

  1. SourceSpec — declarative configuration
  2. SourceConnector — operational logic for reading data

Writing the SourceSpec

SourceSpec in CocoIndex is a declarative configuration that tells the system what data to fetch and how to connect to a source. It doesn’t fetch data itself — that’s handled by the source connector.

class HackerNewsSource(SourceSpec): """Source spec for HackerNews API.""" tag: str | None = None max_results: int = 100

\ Fields:

  • tag
  • Optional filter for the type of HackerNews content.
  • Example: "story""job""poll".
  • If None, it fetches all types.
  • max_results
  • Maximum number of threads to fetch from HackerNews at a time.
  • Helps limit the size of the index for performance or testing.

\

Defining the connector

Sets up the connector's configuration and HTTP session so it can fetch HackerNews data efficiently.

@source_connector( spec_cls=HackerNewsSource, key_type=_HackerNewsThreadKey, value_type=_HackerNewsThread, ) class HackerNewsConnector: """Custom source connector for HackerNews API.""" _spec: HackerNewsSource _session: aiohttp.ClientSession def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession): self._spec = spec self._session = session @staticmethod async def create(spec: HackerNewsSource) -> "HackerNewsConnector": """Create a HackerNews connector from the spec.""" return HackerNewsConnector(spec, aiohttp.ClientSession())

  • source_connector tells CocoIndex that this class is a custom source connector. It specifies:
  • spec_cls: the configuration class (HackerNewsSource)
  • key_type: how individual items are identified (_HackerNewsThreadKey)
  • value_type: the structure of the data returned (_HackerNewsThread)
  • create() is called by CocoIndex to initialize the connector, and it sets up a fresh aiohttp.ClientSession for making HTTP requests.

\

Listing Available Threads

The list() method in HackerNewsConnector is responsible for discovering all available HackerNews threads that match the given criteria (tag, max results) and returning metadata about them. CocoIndex uses this to know which threads exist and which may have changed.

async def list( self, ) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]: """List HackerNews threads using the search API.""" # Use HackerNews search API search_url = "https://hn.algolia.com/api/v1/search_by_date" params: dict[str, Any] = {"hitsPerPage": self._spec.max_results} if self._spec.tag: params["tags"] = self._spec.tag async with self._session.get(search_url, params=params) as response: response.raise_for_status() data = await response.json() for hit in data.get("hits", []): if thread_id := hit.get("objectID", None): utime = hit.get("updated_at") ordinal = ( int(datetime.fromisoformat(utime).timestamp()) if utime else NO_ORDINAL ) yield PartialSourceRow( key=_HackerNewsThreadKey(thread_id=thread_id), data=PartialSourceRowData(ordinal=ordinal), )

list() fetches metadata for all recent HackerNews threads.

  • For each thread:
  • It generates a PartialSourceRow with:
    • key: the thread ID
    • ordinal: the last updated timestamp
  • Purpose: allows CocoIndex to track what threads exist and which have changed without fetching full thread content.

\

Fetching Full Thread Content

This async method fetches a single HackerNews thread (including its comments) from the API, and wraps the result in a PartialSourceRowData object — the structure CocoIndex uses for row-level ingestion.

async def get_value( self, key: _HackerNewsThreadKey ) -> PartialSourceRowData[_HackerNewsThread]: """Get a specific HackerNews thread by ID using the items API.""" # Use HackerNews items API to get full thread with comments item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}" async with self._session.get(item_url) as response: response.raise_for_status() data = await response.json() if not data: return PartialSourceRowData( value=NON_EXISTENCE, ordinal=NO_ORDINAL, content_version_fp=None, ) return PartialSourceRowData( value=HackerNewsConnector._parse_hackernews_thread(data) )

  • get_value() fetches the full content of a specific thread, including comments.
  • Parses the raw JSON into structured Python objects (_HackerNewsThread + _HackerNewsComment).
  • Returns a PartialSourceRowData containing the full thread.

\

Ordinal Support

Tells CocoIndex that this source provides timestamps (ordinals).

def provides_ordinal(self) -> bool: return True

CocoIndex uses ordinals to incrementally update only changed threads, improving efficiency.

\

Parsing JSON into Structured Data

This static method takes the raw JSON response from the API and turns it into a normalized _HackerNewsThread object containing:

  • The post (title, text, metadata)
  • All nested comments, flattened into a single list
  • Proper Python datetime objects

It performs a recursive traversal of the comment tree.

@staticmethod def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread: comments: list[_HackerNewsComment] = [] def _add_comments(parent: dict[str, Any]) -> None: children = parent.get("children", None) if not children: return for child in children: ctime = child.get("created_at") if comment_id := child.get("id", None): comments.append( _HackerNewsComment( id=str(comment_id), author=child.get("author", ""), text=child.get("text", ""), created_at=datetime.fromisoformat(ctime) if ctime else None, ) ) _add_comments(child) _add_comments(data) ctime = data.get("created_at") text = data.get("title", "") if more_text := data.get("text", None): text += "\n\n" + more_text return _HackerNewsThread( author=data.get("author"), text=text, url=data.get("url"), created_at=datetime.fromisoformat(ctime) if ctime else None, comments=comments, )

  • Converts raw HackerNews API response into _HackerNewsThread and _HackerNewsComment.
  • _add_comments() recursively parses nested comments.
  • Combines title + text into the main thread content.
  • Produces a fully structured object ready for indexing.

\

Putting It All Together in a Flow

Your flow now reads exactly like a React component.

Define the flow and connect source

@cocoindex.flow_def(name="HackerNewsIndex") def hackernews_flow( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: # Add the custom source to the flow data_scope["threads"] = flow_builder.add_source( HackerNewsSource(tag="story", max_results=500), refresh_interval=timedelta(minutes=1), ) # Create collectors for different types of searchable content message_index = data_scope.add_collector()

\

Process each thread and collect structured information

with data_scope["threads"].row() as thread: # Index the main thread content message_index.collect( id=thread["thread_id"], thread_id=thread["thread_id"], content_type="thread", author=thread["author"], text=thread["text"], url=thread["url"], created_at=thread["created_at"], )

\

Process each comment of a thread and collect structured information

with thread["comments"].row() as comment: message_index.collect( id=comment["id"], thread_id=thread["thread_id"], content_type="comment", author=comment["author"], text=comment["text"], created_at=comment["created_at"], )

\

Export to database tables

message_index.export( "hn_messages", cocoindex.targets.Postgres(), primary_key_fields=["id"], )

CocoIndex now:

  • polls the HackerNews API
  • tracks changes incrementally
  • flattens nested comments
  • exports to Postgres
  • supports live mode

Your app can now query it as a real-time search index.

\

Querying & Searching the HackerNews Index

At this point you are done with the index flow. As the next step, you could define query handlers — so you can run queries in CocoInsight. You can use any library or framework of your choice to perform queries. You can read more in the documentation about Query Handler.

@hackernews_flow.query_handler() def search_text(query: str) -> cocoindex.QueryOutput: """Search HackerNews threads by title and content.""" table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages") with connection_pool().connection() as conn: with conn.cursor() as cur: # Simple text search using PostgreSQL's text search capabilities cur.execute( f""" SELECT id, thread_id, author, content_type, text, created_at, ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank FROM {table_name} WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s) ORDER BY rank DESC, created_at DESC """, (query, query), ) results = [] for row in cur.fetchall(): results.append( { "id": row[0], "thread_id": row[1], "author": row[2], "content_type": row[3], "text": row[4], "created_at": row[5].isoformat(), } ) return cocoindex.QueryOutput(results=results)

\ This code defines a query handler that searches HackerNews threads and comments indexed in CocoIndex. It determines the database table storing the messages, then uses PostgreSQL full-text search (to_tsvector and plainto_tsquery) to find rows matching the query.

Results are ranked by relevance (ts_rank) and creation time, formatted into dictionaries, and returned as a structured cocoindex.QueryOutput. Essentially, it performs a full-text search over the indexed content and delivers ranked, structured results.

\

Running Your HackerNews Custom Source

Once your custom source and flow are ready, running it with CocoIndex is straightforward. You can either update the index on-demand or keep it continuously in sync with HackerNews.

\

1. Install Dependencies

Make sure you have Python installed and then install your project in editable mode:

pip install -e .

This installs CocoIndex along with all required dependencies, letting you develop and update the connector without reinstalling.

\

2. Update the Target (On-Demand)

To populate your target (e.g., Postgres) with the latest HackerNews threads:

cocoindex update main

  • Only threads that have changed will be re-processed.
  • Your target remains in sync with the most recent 500 HackerNews threads.
  • Efficient incremental updates save time and compute resources.

Note that each time when you run the update command, CocoIndex will only re-process threads that have changed, and keep the target in sync with the recent 500 threads from HackerNews. You can also run update command in live mode, which will keep the target in sync with the source continuously:

cocoindex update -L main

  • Runs the flow in live mode, polling HackerNews periodically.
  • CocoIndex automatically handles incremental changes and keeps the target synchronized.
  • Ideal for dashboards, search, or AI pipelines that require real-time data.
  • \

3. Troubleshoot & Inspect with CocoInsight

CocoInsight lets you visualize and debug your flow, see the lineage of your data, and understand what’s happening under the hood.

Start the server:

cocoindex server -ci main

Then open the UI in your browser: https://cocoindex.io/cocoinsight

Note that this requires QueryHandler setup in previous step.

\

What You Can Build Next

This simple example opens the door to a lot more:

  • Build a trending-topic detector
  • Run LLM summarization pipelines on top of indexed threads
  • Add embeddings + vector search
  • Mirror HN into your internal data warehouse
  • Build a real-time HN dashboard
  • Extend to other news sources (Reddit, Lobsters, etc.)

Because the whole pipeline is declarative and incremental, extending it is straightforward.

Since Custom Sources allow you to wrap any Python logic into an incremental data stream, the best use cases are usually "Hard-to-Reach" data—systems that don't have standard database connectors, have complex nesting, or require heavy pre-processing.

The Knowledge Aggregator for LLM Context

Building a context engine for an AI bot often requires pulling from non-standard documentation sources.

The "Composite" Entity (Data Stitching)

Most companies have user data fragmented across multiple microservices. You can build a Custom Source that acts as a "virtual join" before the data ever hits your index. For example the Source:

  1. Fetches a User ID from an Auth Service (Okta/Auth0).
  2. Uses that ID to fetch billing status from Stripe API.
  3. Uses that ID to fetch usage logs from an Internal Redis.

Instead of managing complex ETL joins downstream, the Custom Source yields a single User360 object. CocoIndex tracks the state of this composite object; if the user upgrades in Stripe or changes their email in Auth0, the index updates automatically.

The "Legacy Wrapper" (Modernization Layer)

Enterprises often have valuable data locked in systems that are painful to query (SOAP, XML, Mainframes). You get a modern, queryable SQL interface (via the CocoIndex target) on top of a 20-year-old system without rewriting the legacy system itself.

Public Data Monitor (Competitive Intelligence)

Tracking changes on public websites or APIs that don't offer webhooks.

  • The Source:
  • Competitor Pricing: Scraping e-commerce product pages.
  • Regulatory Feeds: Polling a government RSS feed or FDA drug approval database.
  • Crypto/Stocks: Hitting a CoinGecko or Yahoo Finance API.

The CocoIndex Value: Using the diff capabilities, you can trigger downstream alerts only when a price changes by >5% or a new regulation is posted, rather than spamming your database with identical polling results.

\

Why This Matters

Custom Sources extend this model to any API — internal, external, legacy, or real-time.

This unlocks a simple but powerful pattern:

Whether you’re indexing HackerNews or orchestrating dozens of enterprise services, the framework gives you a stable backbone with:

  • persistent state
  • deterministic updates
  • automatic lineage
  • flexible target exports
  • minimal infrastructure overhead

⭐ Try It, Fork It, Star It

If you found this useful, a star on GitHub means a lot — it helps others discover CocoIndex and supports further development.

Market Opportunity
Witnet Logo
Witnet Price(WIT)
$0,00069
$0,00069$0,00069
-0,14%
USD
Witnet (WIT) Live Price Chart
Disclaimer: The articles reposted on this site are sourced from public platforms and are provided for informational purposes only. They do not necessarily reflect the views of MEXC. All rights remain with the original authors. If you believe any content infringes on third-party rights, please contact [email protected] for removal. MEXC makes no guarantees regarding the accuracy, completeness, or timeliness of the content and is not responsible for any actions taken based on the information provided. The content does not constitute financial, legal, or other professional advice, nor should it be considered a recommendation or endorsement by MEXC.

You May Also Like

Bitcoin Has Taken Gold’s Role In Today’s World, Eric Trump Says

Bitcoin Has Taken Gold’s Role In Today’s World, Eric Trump Says

Eric Trump on Tuesday described Bitcoin as a “modern-day gold,” calling it a liquid store of value that can act as a hedge to real estate and other assets. Related Reading: XRP’s Biggest Rally Yet? Analyst Projects $20+ In October 2025 According to reports, the remark came during a TV appearance on CNBC’s Squawk Box, tied to the launch of American Bitcoin, the mining and treasury firm he helped start. Company Holdings And Strategy Based on public filings and company summaries, American Bitcoin has accumulated 2,443 BTC on its balance sheet. That stash has been valued in the low hundreds of millions of dollars at recent spot prices. The firm mixes large-scale mining with the goal of holding Bitcoin as a strategic reserve, which it says will help it grow both production and asset holdings over time. Eric Trump’s comments were direct. He told viewers that institutions are treating Bitcoin more like a store of value than a fringe idea, and he warned firms that resist blockchain adoption. The tone was strong at times, and the line about Bitcoin being a modern equivalent of gold was used to frame American Bitcoin’s role as both miner and holder.   Eric Trump has said: bitcoin is modern-day gold — unusual_whales (@unusual_whales) September 16, 2025 How The Company Went Public American Bitcoin moved toward a public listing via an all-stock merger with Gryphon Digital Mining earlier this year, a deal that kept most of the original shareholders in control and positioned the new entity for a Nasdaq debut. Reports show that mining partner Hut 8 holds a large ownership stake, leaving the Trump family and other backers with a minority share. The listing brought fresh attention and capital to the firm as it began trading under the ticker ABTC. Market watchers say the firm’s public debut highlights two trends: mining companies are trying to grow by both producing and holding Bitcoin, and political ties are bringing more headlines to crypto firms. Some analysts point out that holding large amounts of Bitcoin on the balance sheet exposes a company to price swings, while supporters argue it aligns incentives between miners and investors. Related Reading: Ethereum Bulls Target $8,500 With Big Money Backing The Move – Details Reaction And Possible Risks Based on coverage of the launch, investors have reacted with both enthusiasm and caution. Supporters praise the prospect of a US-based miner that aims to be transparent and aggressive about building a reserve. Critics point to governance questions, possible conflicts tied to high-profile backers, and the usual risks of a volatile asset being held on corporate balance sheets. Eric Trump’s remark that Bitcoin has taken gold’s role in today’s world reflects both his belief in its value and American Bitcoin’s strategy of mining and holding. Whether that view sticks will depend on how investors and institutions respond in the months ahead. Featured image from Meta, chart from TradingView
Share
NewsBTC2025/09/18 06:00
Fed Makes First Rate Cut of the Year, Lowers Rates by 25 Bps

Fed Makes First Rate Cut of the Year, Lowers Rates by 25 Bps

The post Fed Makes First Rate Cut of the Year, Lowers Rates by 25 Bps appeared on BitcoinEthereumNews.com. The Federal Reserve has made its first Fed rate cut this year following today’s FOMC meeting, lowering interest rates by 25 basis points (bps). This comes in line with expectations, while the crypto market awaits Fed Chair Jerome Powell’s speech for guidance on the committee’s stance moving forward. FOMC Makes First Fed Rate Cut This Year With 25 Bps Cut In a press release, the committee announced that it has decided to lower the target range for the federal funds rate by 25 bps from between 4.25% and 4.5% to 4% and 4.25%. This comes in line with expectations as market participants were pricing in a 25 bps cut, as against a 50 bps cut. This marks the first Fed rate cut this year, with the last cut before this coming last year in December. Notably, the Fed also made the first cut last year in September, although it was a 50 bps cut back then. All Fed officials voted in favor of a 25 bps cut except Stephen Miran, who dissented in favor of a 50 bps cut. This rate cut decision comes amid concerns that the labor market may be softening, with recent U.S. jobs data pointing to a weak labor market. The committee noted in the release that job gains have slowed, and that the unemployment rate has edged up but remains low. They added that inflation has moved up and remains somewhat elevated. Fed Chair Jerome Powell had also already signaled at the Jackson Hole Conference that they were likely to lower interest rates with the downside risk in the labor market rising. The committee reiterated this in the release that downside risks to employment have risen. Before the Fed rate cut decision, experts weighed in on whether the FOMC should make a 25 bps cut or…
Share
BitcoinEthereumNews2025/09/18 04:36
UK Looks to US to Adopt More Crypto-Friendly Approach

UK Looks to US to Adopt More Crypto-Friendly Approach

The post UK Looks to US to Adopt More Crypto-Friendly Approach appeared on BitcoinEthereumNews.com. The UK and US are reportedly preparing to deepen cooperation on digital assets, with Britain looking to copy the Trump administration’s crypto-friendly stance in a bid to boost innovation.  UK Chancellor Rachel Reeves and US Treasury Secretary Scott Bessent discussed on Tuesday how the two nations could strengthen their coordination on crypto, the Financial Times reported on Tuesday, citing people familiar with the matter.  The discussions also involved representatives from crypto companies, including Coinbase, Circle Internet Group and Ripple, with executives from the Bank of America, Barclays and Citi also attending, according to the report. The agreement was made “last-minute” after crypto advocacy groups urged the UK government on Thursday to adopt a more open stance toward the industry, claiming its cautious approach to the sector has left the country lagging in innovation and policy.  Source: Rachel Reeves Deal to include stablecoins, look to unlock adoption Any deal between the countries is likely to include stablecoins, the Financial Times reported, an area of crypto that US President Donald Trump made a policy priority and in which his family has significant business interests. The Financial Times reported on Monday that UK crypto advocacy groups also slammed the Bank of England’s proposal to limit individual stablecoin holdings to between 10,000 British pounds ($13,650) and 20,000 pounds ($27,300), claiming it would be difficult and expensive to implement. UK banks appear to have slowed adoption too, with around 40% of 2,000 recently surveyed crypto investors saying that their banks had either blocked or delayed a payment to a crypto provider.  Many of these actions have been linked to concerns over volatility, fraud and scams. The UK has made some progress on crypto regulation recently, proposing a framework in May that would see crypto exchanges, dealers, and agents treated similarly to traditional finance firms, with…
Share
BitcoinEthereumNews2025/09/18 02:21