Install Kafka Applier
Installation of the Kafka replication requires special configuration of the Extractor and Applier hosts so that each is configured for the correct datasource type.
Before installing the applier, the following additions need adding to the extractor configuration. Apply the following parameter to the extractor configuration before installing the applier
Add the following to
/etc/tungsten/tungsten.ini[alpha]...Existing Replicator Config...enable-heterogeneous-service=trueshell> tpm updateNoteThe above step is only applicable for standalone extractors. If you are configuring replications from an existing Tungsten Cluster (Cluster-Extractor), follow the steps outlined here to ensure the cluster is configured correctly: "Replicating Data Out of a Cluster"
Unpack the Tungsten Replicator distribution in staging directory:
shell> tar zxf tungsten-replicator-8.0.4-132.tar.gzChange into the staging directory:
shell> cd tungsten-replicator-8.0.4-132Create the
/etc/tungsten/tungsten.iniusing the example below as a template.Example tungsten.ini[defaults]install-directory=/opt/continuentprofile-script=~/.bash_profilerest-api-admin-user=apiuserrest-api-admin-password=secretreplicator-rest-api-address=0.0.0.0[alpha]master=sourcehostmembers=localhostdatasource-type=kafkareplication-user=rootreplication-password=nullreplication-port=9092property=replicator.applier.dbms.zookeeperString=localhost:2181property=replicator.applier.dbms.requireacks=1Show argument definitions
install-directory=/opt/continuentInstallation directory.profile-script=~/.bash_profileAppend commands to include env.sh in this profile script.rest-api-admin-user=apiuserSpecify the initial Admin Username for API access.Available from v7.0.0rest-api-admin-password=secretSpecify the initial Admin User Password for API access.rest-api-admin-passwordalias only available from version 7.1.2 onwards.Available from v7.0.0replicator-rest-api-address=0.0.0.0Address for the API to bind too.Available from v7.0.0If your MySQL source is a Tungsten Cluster, ensure the additional steps below are also included in your applier configuration
First, prepare the required filter configuration file as follows on the Kafka applier host(s) only:
shell> mkdir -p /opt/continuent/share/shell> cp tungsten-replicator/support/filters-config/convertstringfrommysql.json /opt/continuent/share/Then, include the following parameters in the configuration
property=replicator.stage.remote-to-thl.filters=convertstringfrommysqlproperty=replicator.filter.convertstringfrommysql.definitionsFile=/opt/continuent/share/convertstringfrommysql.jsonOnce the prerequisites and configuring of the installation has been completed, the software can be installed:
shell> ./tools/tpm install
If you encounter problems during the installation, check the output of the /tmp/tungsten-configure.log file for
more information about the root cause.
Once the service is configured and running, the service can be monitored as normal using the trepctl command. See "Management and Monitoring of Kafka Deployments" for
more information.
Optional Configuration Parameters for Kafka
A number of optional, configurable, properties are available that control how Tungsten Replicator applies and populates information when
the data is written into Kafka. The following properties can be set during configuration using --property=PROPERTYNAME=value:
| Option | Description |
|---|---|
replicator.applier.dbms.embedCommitTime.kafka | Sets whether the commit time for the source row is embedded into the message. |
replicator.applier.dbms.embedSchemaTable | Embed the source schema name and table name in the stored document |
replicator.applier.dbms.enabletxinfo.kafka | Embeds transaction information (generated by the rowaddtxninfo filter) into each Kafka message |
replicator.applier.dbms.enabletxninfoTopic | Embeds transaction information into a separate Kafka message broadcast on an independent channel from the one used by the actual database data. One message is sent per transaction or THL event. |
replicator.applier.dbms.keyFormat | Determines the format of the message ID. |
replicator.applier.dbms.requireacks | Sets the acknowledgement counter for sending messages into the Kafka queue. |
replicator.applier.dbms.retrycount | The number of retries for sending each message. |
replicator.applier.dbms.txninfoTopic | Sets the topic name for transaction messages. |
replicator.applier.dbms.zookeeperString | Connection string for Zookeeper, including hostname and port |
replicator.applier.dbms.embedCommitTime.kafka
| Default | true |
|---|---|
| Valid values | truefalse |
| Product | TR |
{
"_seqno" : "4865",
"_source_table" : "msg",
"_committime" : "2017-07-13 15:30:37.0",
"_source_schema" : "test",
"record" : {
"msg" : "Hello Kafka",
"id" : "2384726"
},
"_optype" : "INSERT"
}
replicator.applier.dbms.embedSchemaTable
| Default | true |
|---|---|
| Valid values | truefalse |
| Product | TR |
replicator.applier.dbms.useSchemaAsIndex andreplicator.applier.dbms.useTableAsType).{
"_seqno" : "4865",
"_source_table" : "msg",
"_committime" : "2017-07-13 15:30:37.0",
"_source_schema" : "test",
"record" : {
"msg" : "Hello Kafka",
"id" : "2384726"
},
"_optype" : "INSERT"
}
replicator.applier.dbms.enabletxinfo.kafka
| Default | false |
|---|---|
| Valid values | truefalse |
| Product | TR |
rowaddtxninfo filter and other information embedded in each THL event into each message sent. The transaction information includes information about the entire transaction (row counts, event ID and tables modified) into each message. Since one message is normally sent for each row of data, by adding the information about the full transaction into the message it's possible to validate and identify what other messages may be part of a single transaction when the messages are being re-assembled by a Kafka client. For example, when looking at a single message in Kafka, the message includes a txninfo section:{
"_source_table" : "msg",
"_committime" : "2018-03-07 12:53:21.0",
"record" : {
"msg2" : "txinfo",
"id" : "109",
"msg" : "txinfo"
},
"_optype" : "INSERT",
"_seqno" : "164",
"txnInfo" : {
"schema" : [
{
"schemaName" : "msg",
"rowCount" : "1",
"tableName" : "msg"
},
{
"rowCount" : "2",
"schemaName" : "msg",
"tableName" : "msgsub"
}
],
"serviceName" : "alpha",
"totalCount" : "3",
"tungstenTransId" : "164",
"firstRecordInTransaction" : "true"
},
"_source_schema" : "msg"
}
schema- An array of the row counts within this transaction, with a row count included for each schema and table.serviceName- The name of the Tungsten Replicator service that generated the message.totalCount- The total number of rows modified within the entire transaction.firstRecordInTransaction- If this field exists, it should always be set to true and indicats that this message was generated by the first row inserted, updated or deleted in the overall transaction. This effectively indicates the start of the overall transaction.lastRecordInTransaction- If this field exists, it should always be set to true and indicates that this message was generated by the last row inserted, updated or deleted in the overall transaction. This effectively indicates the end of the overall transaction
firstRecordInTransaction and lastRecordInTransaction can be used to identify the start and end of the transaction overall.replicator.applier.dbms.enabletxninfoTopic
| Default | false |
|---|---|
| Valid values | truefalse |
| Product | TR |
replicator.applier.dbms.txninfoTopic property. The default message sent will look like the following example:{
"txnInfo" : {
"tungstenTransId" : "164",
"schema" : [
{
"schemaName" : "msg",
"rowCount" : "1",
"tableName" : "msg"
},
{
"schemaName" : "msg",
"rowCount" : "2",
"tableName" : "msgsub"
}
],
"totalCount" : "3",
"serviceName" : "alpha"
}
}
schema- An array of the row counts within this transaction, with a row count included for each schema and table.serviceName- The name of the Tungsten Replicator service that generated the message.totalCount- The total number of rows modified within the entire transaction.
replicator.applier.dbms.keyFormat
| Default | pkey |
|---|---|
| Valid values | pkeypkeyustspkeyustspkey |
| Product | TR |
tspkeyus, then the format of the message ID will consist of the schemaname, table name and primary key column information separated by underscores, for example: SCHEMANAME_TABLENAME_234.pkey- Combine the primary key column values into a single stringpkeyus- Combine the primary key column values into a single string joined by an underscore charactertspkeyus- Combine the schema name, table name, and primary key column values into a single stringtspkey- Combine the schema name, table name, and primary key column values into a single string joined by an underscore character
replicator.applier.dbms.requireacks
| Default | all |
|---|---|
| Valid values | all1 |
| Product | TR |
replicator.applier.dbms.retrycount
| Default | 0 |
|---|---|
| Valid values | |
| Product | TR |
replicator.applier.dbms.txninfoTopic
| Default | tungsten_transactions |
|---|---|
| Product | TR |
replicator.applier.dbms.addtxninfo.replicator.applier.dbms.zookeeperString
| Default | ${replicator.global.db.host}:2181 |
|---|---|
| Product | TR |