GoldenGate for Big Data and Kafka Handlers hands-on – part 2

Preamble

The GoldenGate for Big Data integration with Kafka is possible through three different Kafka Handlers also called connectors:

  • Kafka Generic Handler (Pub/Sub)
  • Kafka Connect Handler
  • Kafka REST Proxy Handler

Only the two first are available under the Opensource Apache Licensed version so we will review only those two. Oracle has written few articles on the differences (see references section) but those small sentences sum-up it well:

Kafka Handler

Can send raw bytes messages in four formats: JSON, Avro, XML, delimited text

Kafka Connect Handler

Generates in-memory Kafka Connect schemas and messages. Passes the messages to Kafka Connect converter to convert to bytes to send to Kafka.
There are currently only two converters: JSON and Avro. Only Confluent currently has Avro. But using the Kafka Connect interface allows the user to integrate with the open source Kafka Connect connectors.

This picture from Oracle corporation (see references section for complete article) summarize it well:

kafka04
kafka04

I initially thought the Kafka Connect Handler was provided as a plugin by Conluent (https://www.confluent.io/product/connectors/) but it is included by default in defaut Apache Kafka:

The Kafka Connect framework is also included in the Apache versions as well as Confluent version.

So it is possible to run the OGG Kafka Connect Handler with Apache Kafka. And it is possible to run open source Kafka Connect connectors with Apache Kafka.

One thing Confluent Kafka has that Apache Kafka does not is the Avro schema registry and the Avro Converter.

Oracle test case creation

My simple test case, created in my pdb1 pluggable database, will be as follow:

SQL> CREATE USER appuser IDENTIFIED BY secure_password;
 
USER created.
 
SQL> GRANT CONNECT, RESOURCE TO appuser;
 
GRANT succeeded.
 
SQL> ALTER USER appuser quota unlimited ON users;
 
USER altered.
 
SQL> CONNECT appuser/secure_password@pdb1
Connected.
 
SQL> CREATE TABLE test01(id NUMBER, descr VARCHAR2(50), CONSTRAINT TEST01_PK PRIMARY KEY (id) ENABLE);
 
TABLE created.
 
SQL> DESC test01
 Name                                      NULL?    TYPE
 ----------------------------------------- -------- ----------------------------
 ID                                        NOT NULL NUMBER
 DESCR                                              VARCHAR2(50)

GoldenGate extract configuration

In this chapter I create an extract (capture) process to extract figures from my appuser.test01 test table:

GGSCI (server01) 1> add credentialstore
 
Credential store created.
 
GGSCI (server01) 2> alter credentialstore add user c##ggadmin@orcl alias c##ggadmin
Password:
 
Credential store altered.
 
GGSCI (server01) 3> info credentialstore
 
Reading from credential store:
 
Default domain: OracleGoldenGate
 
  Alias: ggadmin
  Userid: c##ggadmin@orcl

Use ‘alter credentialstore delete user’ to remove an alias…

GGSCI (server01) 11> dblogin useridalias c##ggadmin
Successfully logged into database CDB$ROOT.
 
GGSCI (server01 as c##ggadmin@orcl/CDB$ROOT) 10> add trandata pdb1.appuser.test01
 
2021-01-22 10:55:21  INFO    OGG-15131  Logging of supplemental redo log data is already enabled for table PDB1.APPUSER.TEST01.
 
2021-01-22 10:55:21  INFO    OGG-15135  TRANDATA for instantiation CSN has been added on table PDB1.APPUSER.TEST01.
 
2021-01-22 10:55:21  INFO    OGG-10471  ***** Oracle Goldengate support information on table APPUSER.TEST01 *****
Oracle Goldengate support native capture on table APPUSER.TEST01.
Oracle Goldengate marked following column as key columns on table APPUSER.TEST01: ID.

Configure the extract process (ERROR: Invalid group name (must be at most 8 characters).):

GGSCI (server01) 10> dblogin useridalias c##ggadmin
Successfully logged into database CDB$ROOT.
 
GGSCI (server01 as c##ggadmin@orcl/CDB$ROOT) 11> edit params ext01
 
 
 
GGSCI (server01 as c##ggadmin@orcl/CDB$ROOT) 12> view params ext01
 
extract ext01
useridalias c##ggadmin
ddl include mapped
exttrail ./dirdat/ex
sourcecatalog pdb1
table appuser.test01

Add, register extract and add the EXTTRAIL (name must be 2 characters or less !):

GGSCI (server01 as c##ggadmin@orcl/CDB$ROOT) 19> add extract ext01, integrated tranlog, begin now
EXTRACT (Integrated) added.
 
GGSCI (server01 as c##ggadmin@orcl/CDB$ROOT) 20> register extract ext01 database container (pdb1)
 
2021-01-22 11:06:30  INFO    OGG-02003  Extract EXT01 successfully registered with database at SCN 7103554.
 
 
GGSCI (server01 as c##ggadmin@orcl/CDB$ROOT) 21> add exttrail ./dirdat/ex, extract ext01
EXTTRAIL added.

Finally start it with (you can also use ‘view report ext01’ to get more detailed informations):

GGSCI (server01 as c##ggadmin@orcl/CDB$ROOT) 22> start ext01
 
Sending START request to MANAGER ...
EXTRACT EXT01 starting
 
 
GGSCI (server01 as c##ggadmin@orcl/CDB$ROOT) 23> info all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
 
MANAGER     RUNNING
EXTRACT     RUNNING     EXT01       00:00:00      19:13:08

Test it works by inserting a row in your test table:

SQL> INSERT INTO test01 VALUES(10,'Ten');
 
1 ROW created.
 
SQL> COMMIT;
 
COMMIT complete.

And checking you trail files get created in chosen directory:

[oracle@server01 oggcore_1]$ ll dirdat
total 2
-rw-r----- 1 oracle dba 1294 Jan 22 11:24 ex000000000

GoldenGate for Big Data and Kafka Handler configuration

One cool directory to look at is AdapterExamples directory located inside you GoldenGate for Big Data installation and in my case AdapterExamples/big-data/kafka* sub directories:

[oracle@server01 big-data]$ pwd
/u01/app/oracle/product/19.1.0/oggbigdata_1/AdapterExamples/big-data
[oracle@server01 big-data]$ ll -d kafka*
drwxr-x--- 2 oracle dba 96 Sep 25  2019 kafka
drwxr-x--- 2 oracle dba 96 Sep 25  2019 kafka_connect
drwxr-x--- 2 oracle dba 96 Sep 25  2019 kafka_REST_proxy
[oracle@server01 big-data]$ ll kafka
total 4
-rw-r----- 1 oracle dba  261 Sep  3  2019 custom_kafka_producer.properties
-rw-r----- 1 oracle dba 1082 Sep 25  2019 kafka.props
-rw-r----- 1 oracle dba  332 Sep  3  2019 rkafka.prm

So in kafka directory we have three files that you can copy to dirprm directory of your GoldenGate for Big Data installation. Now you must customize them to match your configuration.

In custom_kafka_producer.properties I have just changed bootstrap.servers variable to match my Kafka server:

bootstrap.servers=localhost:9092
acks=1
reconnect.backoff.ms=1000
 
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=16384
linger.ms=0

In kafka.props I have changed gg.classpath to an extract of a Kafka installation directory (this directory must be owned or readable by oracle account). It means that if Kafka is installed on another server (normal configuration) you must copy the libs to your GoldenGate for Big Data server. The chosen example payload format is avro_op (Avro in operation more verbose format). Can be one of these: xml, delimitedtext, json, json_row, avro_row, avro_op:

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate=${tableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.MetaHeaderTemplate=${alltokens}
 
 
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
 
gg.log=log4j
gg.log.level=INFO
 
gg.report.time=30sec
 
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/u01/kafka_2.13-2.7.0/libs/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
 
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

In rkafka.prm I have changed MAP | TARGET parameter:

REPLICAT rkafka
-- Trail file for this example is located in "AdapterExamples/trail" directory
-- Command to add REPLICAT
-- add replicat rkafka, exttrail AdapterExamples/trail/tr
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP pdb1.appuser.*, TARGET appuser.*;

As explained in rkafka.prm file I add replicat process (dump) and trail directory (it must be the one of your legacy GoldenGate installation):

GGSCI (server01) 1> add replicat rkafka, exttrail /u01/app/oracle/product/19.1.0/oggcore_1/dirdat/ex
REPLICAT added.
 
 
GGSCI (server01) 2> info all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
 
MANAGER     RUNNING
REPLICAT    STOPPED     RKAFKA      00:00:00      00:00:03
 
 
GGSCI (server01) 3> start rkafka
 
Sending START request to MANAGER ...
REPLICAT RKAFKA starting
 
 
GGSCI (server01) 4> info all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
 
MANAGER     RUNNING
REPLICAT    RUNNING     RKAFKA      00:00:00      00:12:08

As a test I insert a row in my test table:

SQL> INSERT INTO test01 VALUES(1,'One');
 
1 ROW created.
 
SQL> COMMIT;
 
COMMIT complete.

I can read the event on the topic that has my table name (the first event is the test I have done when I configured GoldenGate):

[kafka@server01 kafka_2.13-2.7.0]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
TEST01
__consumer_offsets
mySchemaTopic
quickstart-events
[kafka@server01 kafka_2.13-2.7.0]$ bin/kafka-console-consumer.sh --topic TEST01 --from-beginning --bootstrap-server localhost:9092
APPUSER.TEST01I42021-01-22 11:32:20.00000042021-01-22T15:20:51.081000(00000000000000001729ID$@Ten
APPUSER.TEST01I42021-01-22 15:24:55.00000042021-01-22T15:25:00.665000(00000000000000001872ID▒?One

You can clean the installation with:

stop rkafka
delete replicat rkafka

GoldenGate for Big Data and Kafka Connect Handler configuration

Same as previous chapter I copy the demo configuration files with:

[oracle@server01 kafka_connect]$ pwd
/u01/app/oracle/product/19.1.0/oggbigdata_1/AdapterExamples/big-data/kafka_connect
[oracle@server01 kafka_connect]$ ll
total 4
-rw-r----- 1 oracle dba  592 Sep  3  2019 kafkaconnect.properties
-rw-r----- 1 oracle dba  337 Sep  3  2019 kc.prm
-rw-r----- 1 oracle dba 1733 Sep 25  2019 kc.props
[oracle@server01 kafka_connect]$ cp * ../../../dirprm/

As I’m using the same server for all component kafkaconnect.properties file is already correct. I have just added converter.type to correct a bug. We see that here the example is configured to use JSON for payload:

bootstrap.servers=localhost:9092
acks=1
 
#JSON Converter Settings
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
 
#Avro Converter Settings
#key.converter=io.confluent.connect.avro.AvroConverter
#value.converter=io.confluent.connect.avro.AvroConverter
#key.converter.schema.registry.url=http://localhost:8081
#value.converter.schema.registry.url=http://localhost:8081
 
 
#Adjust for performance
buffer.memory=33554432
batch.size=16384
linger.ms=0
 
converter.type=key
converter.type=value
converter.type=header

In kc.props file I only change gg.classpath parameter with exact same comment as Kafka Handler configuration:

gg.handlerlist=kafkaconnect
 
#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties
gg.handler.kafkaconnect.mode=op
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}
gg.handler.kafkahandler.MetaHeaderTemplate=${alltokens}
 
#The formatter properties
gg.handler.kafkaconnect.messageFormatting=row
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.iso8601Format=false
gg.handler.kafkaconnect.pkUpdateHandling=abend
gg.handler.kafkaconnect.includeTableName=true
gg.handler.kafkaconnect.includeOpType=true
gg.handler.kafkaconnect.includeOpTimestamp=true
gg.handler.kafkaconnect.includeCurrentTimestamp=true
gg.handler.kafkaconnect.includePosition=true
gg.handler.kafkaconnect.includePrimaryKeys=false
gg.handler.kafkaconnect.includeTokens=false
 
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
 
gg.log=log4j
gg.log.level=INFO
 
gg.report.time=30sec
 
#Apache Kafka Classpath
gg.classpath=/u01/kafka_2.13-2.7.0/libs/*
#Confluent IO classpath
#gg.classpath={Confluent install dir}/share/java/kafka-serde-tools/*:{Confluent install dir}/share/java/kafka/*:{Confluent install dir}/share/java/confluent-common/*
 
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=.:ggjava/ggjava.jar:./dirprm

In kc.prm file I only change the MAP | TARGET configuration as follow:

REPLICAT kc
-- Trail file for this example is located in "AdapterExamples/trail" directory
-- Command to add REPLICAT
-- add replicat conf, exttrail AdapterExamples/trail/tr NODBCHECKPOINT
TARGETDB LIBFILE libggjava.so SET property=dirprm/kc.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 1000
MAP pdb1.appuser.*, TARGET appuser.*;

Add the Kafka Connect Handler replicat with:

GGSCI (server01) 1> info all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
 
MANAGER     RUNNING
 
 
GGSCI (server01) 2> add replicat kc, exttrail /u01/app/oracle/product/19.1.0/oggcore_1/dirdat/ex
REPLICAT added.
 
 
GGSCI (server01) 3> info all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
 
MANAGER     RUNNING
REPLICAT    STOPPED     KC          00:00:00      00:00:05
 
 
GGSCI (server01) 3> start kc
 
Sending START request to MANAGER ...
REPLICAT KC starting
 
 
GGSCI (server01) 4> info all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
 
MANAGER     RUNNING
REPLICAT    RUNNING     KC          00:00:00      00:00:04

Add a new row in the test table (and commit):

SQL> CONNECT appuser/secure_password@pdb1
Connected.
SQL> SELECT * FROM test01;
 
        ID DESCR
---------- --------------------------------------------------
        10 Ten
         1 One
 
SQL> INSERT INTO test01 VALUES(2,'Two');
 
1 ROW created.
 
SQL> COMMIT;
 
COMMIT complete.

Reading the new topic you should see new lines coming:

[kafka@server01 kafka_2.13-2.7.0]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
APPUSER.TEST01
TEST01
__consumer_offsets
mySchemaTopic
quickstart-events
[kafka@server01 kafka_2.13-2.7.0]$ bin/kafka-console-consumer.sh --topic APPUSER.TEST01 --from-beginning --bootstrap-server localhost:9092
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"op_type"},{"type":"string","optional":true,"field":"op_ts"},{"type":"string","optional":true,"field":"current_ts"},{"type":"string","optional":true,"field":"pos"},{"type":"double","optional":true,"field":"ID"},{"type":"string","optional":true,"field":"DESCR"}],"optional":false,"name":"APPUSER.TEST01"},"payload":{"table":"APPUSER.TEST01","op_type":"I","op_ts":"2021-01-22 11:32:20.000000","current_ts":"2021-01-22 17:36:00.285000","pos":"00000000000000001729","ID":10.0,"DESCR":"Ten"}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"op_type"},{"type":"string","optional":true,"field":"op_ts"},{"type":"string","optional":true,"field":"current_ts"},{"type":"string","optional":true,"field":"pos"},{"type":"double","optional":true,"field":"ID"},{"type":"string","optional":true,"field":"DESCR"}],"optional":false,"name":"APPUSER.TEST01"},"payload":{"table":"APPUSER.TEST01","op_type":"I","op_ts":"2021-01-22 15:24:55.000000","current_ts":"2021-01-22 17:36:00.727000","pos":"00000000000000001872","ID":1.0,"DESCR":"One"}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"op_type"},{"type":"string","optional":true,"field":"op_ts"},{"type":"string","optional":true,"field":"current_ts"},{"type":"string","optional":true,"field":"pos"},{"type":"double","optional":true,"field":"ID"},{"type":"string","optional":true,"field":"DESCR"}],"optional":false,"name":"APPUSER.TEST01"},"payload":{"table":"APPUSER.TEST01","op_type":"I","op_ts":"2021-01-22 17:38:23.000000","current_ts":"2021-01-22 17:38:27.800000","pos":"00000000000000002013","ID":2.0,"DESCR":"Two"}}

References

Yannick Jaquier on LinkedinYannick Jaquier on RssYannick Jaquier on Twitter
Yannick Jaquier
Find more about me on social media.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>