Skip to content

arashkhavari/mongodb-mysql-cdc

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

1.Install Kafka services - I used Confluent community platform. My guide: https://docs.confluent.io/current/quickstart/ce-quickstart.html

2.Configure Debezium - Load MongoDB Connector

 > confluent-hub install mongodb/kafka-connect-mongodb:latest

or

 https://debezium.io/documentation/reference/0.10/install.html

3.use Source connector or Sink connector (Source Produce to Kafka and Sink Consume from Kafka) pic5 from Control Center or

 > curl -X PUT http://localhost:8083/connectors/source-mongodb-inventory/config -H "Content-Type: application/json" -d '{
      "tasks.max":1,
      "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "connection.uri":"<>",
      "database":"BigBoxStore",
      "collection":"inventory",
      "pipeline":"[{\"$match\": { \"$and\": [ { \"updateDescription.updatedFields.quantity\" : { \"$lte\": 5 } },   {\"operationType\": \"update\"}]}}]", 
      "topic.prefix": ""  
  }'

4.Check the topic and producer working pic4 From Control Center

5.Develope Kafka Consumer

I am using Python Kafka Consumer module as mention below

https://github.com/dpkp/kafka-python

 from kafka import KafkaConsumer
 import sys
 import json
 import mysql.connector
 consumer = KafkaConsumer('nekso.rates')
 for message in consumer: 
   json_map = json.loads(message.value.decode('utf-8'))
   mydb = mysql.connector.connect(host="", user="", passwd="", database="")
   mycursor = mydb.cursor()
     if json_map['operationType'] == "insert":
       _id = json_map['fullDocument']['_id']['$oid']
       created_date = json_map['fullDocument']['createdDate']['$date']
       try:
         type(created_date)
       except KeyError:
         created_date = "NULL"
       varias="insert into rates (_id,created_date) values(%s, %s)"
       values=_id,created_date
       mycursor.execute(varias, values)
       mydb.commit()
       mycursor.close()
       with open("<PATH>/consumer.log", "a") as logfile:
         logfile.write("topic=%s offset=%d \n" % (message.topic, message.offset))
     else:
       with open("<PATH>/consumer.log", "a") as logfile:
         logfile.write("topic=%s offset=%d value=%s \n" % (message.topic, message.offset, message.value.decode('utf-8')))

If Consumer get the new data this code writes a log file(offset and topic). HINT: Producer and Consumer should hava a same traffic. pic3 6.use Supervisord for run python script add end of file in /etc/supervisor.conf

  [program:consumer]
  command=python <PATH>/consumer.py
  process_name=%(program_name)s_%(process_num)02d
  numprocs=1
  priority=999 
  autostart=true
  autorestart=true 
  startsecs=1
  startretries=3
  user=root
  redirect_stderr=true
  stdout_logfile=<PATH>/consumer.log

Uncomment WEB UI for monitoring

  [inet_http_server]
  port=0.0.0.0:9001
  username=username
  password=password

pic2

note: replace env_sample to .env_pro

About

Python Kafka consumer - Change Data Capture from MongoDB to MySQL

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages