Implementing CDC in Bento via ConduitIO #396
Replies: 5 comments 23 replies
-
|
Hey! Thanks for the detailed write up here 😄 I'm not all too familiar with Conduit, or the OpenCDC data format, so I've started to spike out an implementation for this -- mapping conduit's source/destination to bento's input/output. Will update this discussion thread (and address more of your points) when I've gotten some more context on what this would entail 🍱 |
Beta Was this translation helpful? Give feedback.
-
|
Hey @nickchomey. Here's a proof-of-concept for this on my personal bento fork: https://github.com/gregfurman/bento/tree/add/conduit/internal/impl/conduit For backwards compatability, you can define conduit components as you would in the official conduit spec except in a bento pipeline i.e input:
conduit:
plugin: "github.com/conduitio/conduit-connector-generator"
settings:
format:
type: "structured"
options:
id: "int"
name: "string"
company: "string"
trial: "bool"
recordCount: "2"
pipeline:
processors:
- conduit:
plugin: field.set
settings:
field: '.Payload.After.test_field'
value: '10'
output:
conduit:
plugin: "github.com/conduitio/conduit-connector-file"
settings:
path: ./example.outThis is by no means complete but illustrates how something like this would be done. Couple of considerations:
I think given how many extra go dependencies this would end up changing, it'd probably make sense to have it as a standalone distro of Bento instead of being present in the main build. Would love to hear thoughts! |
Beta Was this translation helpful? Give feedback.
-
|
By the way, Conduit was originally conceived as a Golang alternative to Confluent Kafka Connect. Given that Warpstream is now owned by Confluent, and it seems like it's relatively easy to integrate Conduit with Bento, surely there's tremendous synergies to be had here beyond just adding CDC connectors and LLM Processors. Conduit has a connector that wraps Confluent Kafka Connect Connectors. Though, all the Golang CDC connectors are surely a better pairing for Bento/Conduit. Likewise, their automatically-extracted schemas use Avro and Confluent Schema Registry, which is what your schema_registry_encode/decode and avro scanner/processor use. Again, Conduit's CDC + Schema capabilities + Bento's stream processing capabilities are an enormously complementary pairing! |
Beta Was this translation helpful? Give feedback.
-
|
Hey @gregfurman https://github.com/gregfurman/bento/tree/add/conduit/internal/impl/conduit looks like a good start. I can help with this and getting all the conduit connectors running. I guess it's not in main branch yet, because it needs dogfooding and more work ? |
Beta Was this translation helpful? Give feedback.
-
|
I urgently need the functionality of CDC. How is the progress now? Where can I get progress information? |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Problem
Using CDC from database change/replication logs has long been an important component in data architectures, and especially for realtime stream processing. Popular tools include Debezium, Kafka Connect, Fivetran and more. None of these are Golang, which makes them cumbersome/expensive.
One of the major changes implemented in Redpanda Connect since the conversion from Benthos was to add a handful of CDC input connectors (mysql, postgres, mongo, gcp spanner), as well as a variety of LLM processors (ollama, openai etc...). It would be great if Bento could get similar capabilities, but unfortunately all of those connectors have an enterprise license, which is, of course, their prerogative.
Proposed Solution
But rather than write your own cdc connectors from scratch, I am proposing a much more seamless and powerful solution: create a new Conduit input/output/processor component that wraps ConduitIO's 60+ CDC source/destination connectors. I have been happily using Conduit, and think it is the ideal tool for CDC-specific tasks, but I sense that their development has come to an end due to business factors. Rather than fork and try to maintain it all in futility, I think that it would be best to just replace the core Conduit pipeline with Bento, which is a very complementary and formidable pairing due to Conduit's focus on CDC and Bento's focus on stream processing.
In fact, I actually previously created a Conduit Processor that was simply a wrapper around an embedded Bento processing pipeline - it received Conduit's record stream, converted them from their (fantastic) OpenCDC format to Bento's
service.Message, Bento's pipeline would run and return the transformed records, and then it would convert back to OpenCDC to carry on in the Conduit pipeline. I never actually "used" it, but I'm quite sure it would work well enough (despite surely needing improvements).What I am proposing here is sort of the opposite: create a Bento input/output component that simply wraps Conduit source/destination connectors so that they can be used in a Bento pipeline. The actual logic for implementing CDC is largely (completely?) independent of Conduit's and Bento's pipeline architecture - the source/input connectors just connect to some data source, take a snapshot, listen for realtime changes, and output them into the pipeline. Destination/output connectors are the same - receive records, connect to a db, and output/write them to it.
A similar thing could probably be done for Conduit's new cohere, openai and ollama processors, so that Bento doesn't need to write its own.
It would be ideal if Bento could support the OpenCDC record format rather than need to convert between that and
service.Message- I'd even propose making it the new default for Bento messages, as it makes it simple to interoperate across different connectors.The primary obstacle with this plan would probably be with regards to Conduit's native support for Avro schemas - schemas are extracted and attached to each record, which can then be used through the processing pipeline and destination connector. But I see that as an opportunity to improve Bento, rather than a roadblock.
Alternatives considered
An alternative would be to simply port the connectors directly into fresh Bento connectors. I suppose this would ultimately be cleaner, but would be much more effort than making 3 Conduit input/output/processor wrapper components (and would also cause more fragmentation
/decoupling from the original connectors)
Let me know what you think!
Beta Was this translation helpful? Give feedback.
All reactions